urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r942423899


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoc
                     .setProducerId(producerIdAndEpoch.producerId)
                     .setProducerEpoch(producerIdAndEpoch.epoch);
             InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
-                    isEpochBump);
+                    isEpochBump, false);
             enqueueRequest(handler);
             return handler.result;
         }, State.INITIALIZING, "initTransactions");
     }
 
+    synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException 
cause) {
+        if (currentState == State.FATAL_ERROR || currentState == 
State.FATAL_BUMPABLE_ERROR) {
+            // Already in a fatal state, skip
+            return;
+        }
+        String errorMessage = "Encountered unrecoverable error due to batch 
client side timeout";
+        RuntimeException failure = cause == null
+                ? new KafkaException(errorMessage)
+                : new KafkaException(errorMessage, cause);
+        transitionToFatalBumpableError(failure);
+
+        // If an epoch bump is possible, try to fence the current transaction 
by bumping
+        if (canBumpEpoch()) {
+            log.info("Invoking InitProducerId with current producer ID and 
epoch {} in order to bump the epoch to fence the current transaction", 
producerIdAndEpoch);
+            InitProducerIdRequestData requestData = new 
InitProducerIdRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setTransactionTimeoutMs(transactionTimeoutMs)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
+            InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
+                    false, true);
+            enqueueRequest(handler);

Review Comment:
   At this point we enter the FATAL_BUMPABLE_ERROR, which still allows the 
Sender to send requests - see the changes in the Sender class and in 
TransactionManager#maybeTerminateRequestWithError.
   If the producer is closed gracefully, we will try to send this last 
InitProducerID request. After the InitProducerID was successful, we transition 
into FATAL_ERROR, and won't send anything else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to