urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r970386896
##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws
InterruptedException {
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof TimeoutException);
}
+
runUntil(commitResult::isCompleted); // the commit shouldn't be
completed without being sent since the produce request failed.
assertFalse(commitResult.isSuccessful()); // the commit shouldn't
succeed since the produce request failed.
- assertThrows(TimeoutException.class, commitResult::await);
+ assertThrows(KafkaException.class, commitResult::await);
- assertTrue(transactionManager.hasAbortableError());
- assertTrue(transactionManager.hasOngoingTransaction());
+ assertTrue(transactionManager.hasFatalBumpableError());
+ assertFalse(transactionManager.hasOngoingTransaction());
assertFalse(transactionManager.isCompleting());
- assertTrue(transactionManager.transactionContainsPartition(tp0));
- TransactionalRequestResult abortResult =
transactionManager.beginAbort();
-
- prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT,
producerId, epoch);
- prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch
+ 1));
- runUntil(abortResult::isCompleted);
- assertTrue(abortResult.isSuccessful());
- assertFalse(transactionManager.hasOngoingTransaction());
- assertFalse(transactionManager.transactionContainsPartition(tp0));
+ assertThrows(KafkaException.class, () ->
transactionManager.beginAbort());
Review Comment:
@ijuma I think it would be possible to avoid a fatal state, but it would
require a client-side epoch bump.
When an IniPid is sent during an ongoing transaction, the coordinator bumps
the producer epoch to fence off the current producer. This bumped epoch is
never returned to any producers as a valid epoch. This never-exposed epoch
could be used by the producer to stay in a usable state.
In short:
epoch=0 -> delivery timeout occurs -> send fencing InitPid -> epoch=1 (on
coordinator side) -> increase epoch on client side -> send another InitPid ->
safely acquire epoch=2
Since epoch=1 will never be used by another producer, this is a safe
operation, and an actual fencing operation (by another producer instance) can
be detected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]