urbandan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r970386896


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws 
InterruptedException {
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof  TimeoutException);
         }
+
         runUntil(commitResult::isCompleted);  // the commit shouldn't be 
completed without being sent since the produce request failed.
         assertFalse(commitResult.isSuccessful());  // the commit shouldn't 
succeed since the produce request failed.
-        assertThrows(TimeoutException.class, commitResult::await);
+        assertThrows(KafkaException.class, commitResult::await);
 
-        assertTrue(transactionManager.hasAbortableError());
-        assertTrue(transactionManager.hasOngoingTransaction());
+        assertTrue(transactionManager.hasFatalBumpableError());
+        assertFalse(transactionManager.hasOngoingTransaction());
         assertFalse(transactionManager.isCompleting());
-        assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-        TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
-
-        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
-        prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch 
+ 1));
-        runUntil(abortResult::isCompleted);
-        assertTrue(abortResult.isSuccessful());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertFalse(transactionManager.transactionContainsPartition(tp0));
+        assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort());

Review Comment:
   @ijuma I think it would be possible to avoid a fatal state, but it would 
require a client-side epoch bump.
   When an IniPid is sent during an ongoing transaction, the coordinator bumps 
the producer epoch to fence off the current producer. This bumped epoch is 
never returned to any producers as a valid epoch. This never-exposed epoch 
could be used by the producer to stay in a usable state.
   
   In short:
   epoch=0 -> delivery timeout occurs -> send fencing InitPid -> epoch=1 (on 
coordinator side) -> increase epoch on client side -> send another InitPid -> 
safely acquire epoch=2
   
   Since epoch=1 will never be used by another producer, this is a safe 
operation, and an actual fencing operation (by another producer instance) can 
be detected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to