This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 97fb8be251d KAFKA-18464: Empty Abort Transaction can fence producer 
incorrectly with Transactions V2 (#18467)
97fb8be251d is described below

commit 97fb8be251dbda115ee5f89de6bb28d79107ffdf
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Fri Jan 10 16:51:58 2025 -0800

    KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with 
Transactions V2 (#18467)
    
    To avoid self-fencing in the commit/abort + empty abort scenario, return 
the concurrent transactions error when we have pending state and do the epoch 
check second. In this scenario, we will complete the previous transaction 
before proceeding to the empty abort.
    
    Added a test that failed before the change.
    
    Note -- only the pending state is checked earlier. This is because we don’t 
return from EndTxn (the first commit) until we already written to the log, 
transitioned to PrepareX, and have the pending CompleteX state. We don't need 
to worry about the cases of an EndTxn request coming in with PrepareX without 
the pending state because that would be an older request and/or retry which are 
already covered.
    
    Reviewers: Artem Livshits <alivsh...@confluent.io>, Jeff Kim 
<jeff....@confluent.io>
---
 .../coordinator/transaction/TransactionCoordinator.scala |  4 ++--
 .../scala/integration/kafka/api/TransactionsTest.scala   | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 67d3d3d3624..e1edd4e4ddd 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -814,10 +814,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
             if (txnMetadata.producerId != producerId && !retryOnOverflow)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-            else if (!isValidEpoch)
-              Left(Errors.PRODUCER_FENCED)
             else if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence)
               Left(Errors.CONCURRENT_TRANSACTIONS)
+            else if (!isValidEpoch)
+              Left(Errors.PRODUCER_FENCED)
             else txnMetadata.state match {
               case Ongoing =>
                 val nextState = if (txnMarkerResult == 
TransactionResult.COMMIT)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 9637eb7b943..b5b51c6f498 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -924,6 +924,22 @@ class TransactionsTest extends IntegrationTestHarness {
     }
   }
 
+  @ParameterizedTest(name = 
"{displayName}.quorum={0}.groupProtocol={1}.isTV2Enabled={2}")
+  @CsvSource(Array(
+    "kraft, consumer, true",
+  ))
+  def testEmptyAbortAfterCommit(quorum: String, groupProtocol: String, 
isTV2Enabled: Boolean): Unit = {
+    val producer = transactionalProducers.head
+
+    producer.initTransactions()
+    producer.beginTransaction()
+    
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, 
"4", "4", willBeCommitted = false))
+    producer.commitTransaction()
+
+    producer.beginTransaction()
+    producer.abortTransaction()
+  }
+
   private def sendTransactionalMessagesWithValueRange(producer: 
KafkaProducer[Array[Byte], Array[Byte]], topic: String,
                                                       start: Int, end: Int, 
willBeCommitted: Boolean): Unit = {
     for (i <- start until end) {

Reply via email to