This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32dbbe6a1f3 KAFKA-18464: Empty Abort Transaction can fence producer 
incorrectly with Transactions V2 (#18467)
32dbbe6a1f3 is described below

commit 32dbbe6a1f3ef39318c796bdc0a3b8da2c7060ad
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Fri Jan 10 16:51:58 2025 -0800

    KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with 
Transactions V2 (#18467)
    
    To avoid self-fencing in the commit/abort + empty abort scenario, return 
the concurrent transactions error when we have pending state and do the epoch 
check second. In this scenario, we will complete the previous transaction 
before proceeding to the empty abort.
    
    Added a test that failed before the change.
    
    Note -- only the pending state is checked earlier. This is because we don’t 
return from EndTxn (the first commit) until we already written to the log, 
transitioned to PrepareX, and have the pending CompleteX state. We don't need 
to worry about the cases of an EndTxn request coming in with PrepareX without 
the pending state because that would be an older request and/or retry which are 
already covered.
    
    Reviewers: Artem Livshits <alivsh...@confluent.io>, Jeff Kim 
<jeff....@confluent.io>
---
 .../coordinator/transaction/TransactionCoordinator.scala |  4 ++--
 .../scala/integration/kafka/api/TransactionsTest.scala   | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 35cd28de9c0..d6ca11add86 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -814,10 +814,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
             if (txnMetadata.producerId != producerId && !retryOnOverflow)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-            else if (!isValidEpoch)
-              Left(Errors.PRODUCER_FENCED)
             else if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence)
               Left(Errors.CONCURRENT_TRANSACTIONS)
+            else if (!isValidEpoch)
+              Left(Errors.PRODUCER_FENCED)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == 
TransactionResult.COMMIT)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 846f6413883..8ea2791c5b1 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -924,6 +924,22 @@ class TransactionsTest extends IntegrationTestHarness {
     }
   }
 
+  @ParameterizedTest(name = 
"{displayName}.quorum={0}.groupProtocol={1}.isTV2Enabled={2}")
+  @CsvSource(Array(
+    "kraft, consumer, true",
+  ))
+  def testEmptyAbortAfterCommit(quorum: String, groupProtocol: String, 
isTV2Enabled: Boolean): Unit = {
+    val producer = transactionalProducers.head
+
+    producer.initTransactions()
+    producer.beginTransaction()
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, 
"4", "4", willBeCommitted = false))
+    producer.commitTransaction()
+
+    producer.beginTransaction()
+    producer.abortTransaction()
+  }
+
   private def sendTransactionalMessagesWithValueRange(producer: 
KafkaProducer[Array[Byte], Array[Byte]], topic: String,
                                                       start: Int, end: Int, 
willBeCommitted: Boolean): Unit = {
     for (i <- start until end) {

Reply via email to