urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r934188826
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult
initializeTransactions(ProducerIdAndEpoc
.setProducerId(producerIdAndEpoch.producerId)
.setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
- isEpochBump);
+ isEpochBump, false);
enqueueRequest(handler);
return handler.result;
}, State.INITIALIZING, "initTransactions");
}
+ synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException
cause) {
+ if (currentState == State.FATAL_ERROR || currentState ==
State.FATAL_BUMPABLE_ERROR) {
+ // Already in a fatal state, skip
+ return;
+ }
+ String errorMessage = "Encountered unrecoverable error due to batch
client side timeout";
+ RuntimeException failure = cause == null
+ ? new KafkaException(errorMessage)
+ : new KafkaException(errorMessage, cause);
+ transitionToFatalBumpableError(failure);
+
+ // If an epoch bump is possible, try to fence the current transaction
by bumping
+ if (canBumpEpoch()) {
+ log.info("Invoking InitProducerId with current producer ID and
epoch {} in order to bump the epoch to fence the current transaction",
producerIdAndEpoch);
+ InitProducerIdRequestData requestData = new
InitProducerIdRequestData()
+ .setTransactionalId(transactionalId)
+ .setTransactionTimeoutMs(transactionTimeoutMs)
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch);
+ InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
+ false, true);
+ enqueueRequest(handler);
+ } else {
+ log.info("Cannot bump epoch, transitioning into fatal error");
+ transitionToFatalError(failure);
Review Comment:
Out of order messages can occur even when the transactional.id is reused.
The issue I encountered was caused by a valid producer aborting the transaction
"too soon" - where too soon means that all of the last batches were timed out
due to the delivery.timeout.ms, but they were still in-flight. So the issue
occurs with a single producer, without any fencing or transactional.id reuse.
Yes, that summary is right. Bump to fence the in-flight requests, then
discard the producer.
If the initPid fails, there can be 2 scenarios:
1. Transaction times out due to transaction.timeout.ms - in this case, the
coordinator bumps the epoch, practically achieving the same fencing I am trying
to implement here.
2. Transactional.id is reused by a new producer instance - in this case, the
usual fencing happens.
So I believe that the essential change here is that the producer must not
abort when encountering a client side timeout.
As for the producer going into fatal state - I was thinking about a possible
workaround for that, and I think the producer can be kept in a usable state,
but it involves the epoch being increased on the client side. If this fatal
state solution is not acceptable, I can work on another version of the change
which involves this client-side bump. I was hesitant to do so because I wasn't
sure if the protocol allows such things, but since the idempotent producer does
the same, my guess is that it is safe.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]