urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r942423899
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult
initializeTransactions(ProducerIdAndEpoc
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
- isEpochBump);
+ isEpochBump, false);
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING, "initTransactions");
}
+ synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException
cause) {
+ if (currentState == State.FATAL_ERROR || currentState ==
State.FATAL_BUMPABLE_ERROR) {
+ // Already in a fatal state, skip
+ return;
+ }
+ String errorMessage = "Encountered unrecoverable error due to batch
client side timeout";
+ RuntimeException failure = cause == null
+ ? new KafkaException(errorMessage)
+ : new KafkaException(errorMessage, cause);
+ transitionToFatalBumpableError(failure);
+
+ // If an epoch bump is possible, try to fence the current transaction
by bumping
+ if (canBumpEpoch()) {
+ log.info("Invoking InitProducerId with current producer ID and
epoch {} in order to bump the epoch to fence the current transaction",
producerIdAndEpoch);
+ InitProducerIdRequestData requestData = new
InitProducerIdRequestData()
+ .setTransactionalId(transactionalId)
+ .setTransactionTimeoutMs(transactionTimeoutMs)
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch);
+ InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
+ false, true);
+ enqueueRequest(handler);
Review Comment:
At this point we enter the FATAL_BUMPABLE_ERROR, which still allows the
Sender to send requests - see the changes in the Sender class and in
TransactionManager#maybeTerminateRequestWithError.
If the producer is closed gracefully, we will try to send this last
InitProducerID request. After the InitProducerID was successful, we transition
into FATAL_ERROR, and won't send anything else.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]