urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r934188826


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException 
cause) {
+        if (currentState == State.FATAL_ERROR || currentState == 
State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch 
client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction 
by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and 
epoch {} in order to bump the epoch to fence the current transaction", 
producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new 
InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);
+        } else {
+            log.info("Cannot bump epoch, transitioning into fatal error");
+            transitionToFatalError(failure);

Review Comment:
   Out of order messages can occur even when the transactional.id is reused. 
The issue I encountered was caused by a valid producer aborting the transaction 
"too soon" - where too soon means that all of the last batches were timed out 
due to the delivery.timeout.ms, but they were still in-flight. So the issue 
occurs with a single producer, without any fencing or transactional.id reuse.
   
   Yes, that summary is right. Bump to fence the in-flight requests, then 
discard the producer.
   
   If the initPid fails, there can be 2 scenarios:
   1. Transaction times out due to transaction.timeout.ms - in this case, the 
coordinator bumps the epoch, practically achieving the same fencing I am trying 
to implement here.
   2. Transactional.id is reused by a new producer instance - in this case, the 
usual fencing happens.
   So I believe that the essential change here is that the producer must not 
abort when encountering a client side timeout.
   
   As for the producer going into fatal state - I was thinking about a possible 
workaround for that, and I think the producer can be kept in a usable state, 
but it involves the epoch being increased on the client side. If this fatal 
state solution is not acceptable, I can work on another version of the change 
which involves this client-side bump. I was hesitant to do so because I wasn't 
sure if the protocol allows such things, but since the idempotent producer does 
the same, my guess is that it is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to