showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult
initializeTransactions(ProducerIdAndEpoc
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
- isEpochBump);
+ isEpochBump, false);
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING, "initTransactions");
}
+ synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException
cause) {
+ if (currentState == State.FATAL_ERROR || currentState ==
State.FATAL_BUMPABLE_ERROR) {
+ // Already in a fatal state, skip
+ return;
+ }
+ String errorMessage = "Encountered unrecoverable error due to batch
client side timeout";
+ RuntimeException failure = cause == null
+ ? new KafkaException(errorMessage)
+ : new KafkaException(errorMessage, cause);
+ transitionToFatalBumpableError(failure);
+
+ // If an epoch bump is possible, try to fence the current transaction
by bumping
+ if (canBumpEpoch()) {
+ log.info("Invoking InitProducerId with current producer ID and
epoch {} in order to bump the epoch to fence the current transaction",
producerIdAndEpoch);
+ InitProducerIdRequestData requestData = new
InitProducerIdRequestData()
+ .setTransactionalId(transactionalId)
+ .setTransactionTimeoutMs(transactionTimeoutMs)
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch);
+ InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
+ false, true);
+ enqueueRequest(handler);
+ } else {
+ log.info("Cannot bump epoch, transitioning into fatal error");
+ transitionToFatalError(failure);
Review Comment:
Let me make sure I understand your problem and solution. Are you saying the
issue happens only when the **"timed out transaction ID" is not re-used**, and
the abort marker arrived earlier than transaction records. Is my understanding
correct?
And what we are trying to do is to force bump the epoch when encountering
timeout exception, to let the fence mechanism help us abort previous in-flight
transactions. And next, we enter `fatal error` state as before. Is that right?
So, question: what if the initPid request failed (i.e. failed to bump the
epoch), what will happen? The pending transactions will still occur?
Thank you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]