This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 32dbbe6a1f3 KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with Transactions V2 (#18467) 32dbbe6a1f3 is described below commit 32dbbe6a1f3ef39318c796bdc0a3b8da2c7060ad Author: Justine Olshan <jols...@confluent.io> AuthorDate: Fri Jan 10 16:51:58 2025 -0800 KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with Transactions V2 (#18467) To avoid self-fencing in the commit/abort + empty abort scenario, return the concurrent transactions error when we have pending state and do the epoch check second. In this scenario, we will complete the previous transaction before proceeding to the empty abort. Added a test that failed before the change. Note -- only the pending state is checked earlier. This is because we don’t return from EndTxn (the first commit) until we already written to the log, transitioned to PrepareX, and have the pending CompleteX state. We don't need to worry about the cases of an EndTxn request coming in with PrepareX without the pending state because that would be an older request and/or retry which are already covered. Reviewers: Artem Livshits <alivsh...@confluent.io>, Jeff Kim <jeff....@confluent.io> --- .../coordinator/transaction/TransactionCoordinator.scala | 4 ++-- .../scala/integration/kafka/api/TransactionsTest.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 35cd28de9c0..d6ca11add86 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -814,10 +814,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig, if (txnMetadata.producerId != producerId && !retryOnOverflow) Left(Errors.INVALID_PRODUCER_ID_MAPPING) - else if (!isValidEpoch) - Left(Errors.PRODUCER_FENCED) else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) Left(Errors.CONCURRENT_TRANSACTIONS) + else if (!isValidEpoch) + Left(Errors.PRODUCER_FENCED) else txnMetadata.state match { case Ongoing => val nextState = if (txnMarkerResult == TransactionResult.COMMIT) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 846f6413883..8ea2791c5b1 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -924,6 +924,22 @@ class TransactionsTest extends IntegrationTestHarness { } } + @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.isTV2Enabled={2}") + @CsvSource(Array( + "kraft, consumer, true", + )) + def testEmptyAbortAfterCommit(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = { + val producer = transactionalProducers.head + + producer.initTransactions() + producer.beginTransaction() + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false)) + producer.commitTransaction() + + producer.beginTransaction() + producer.abortTransaction() + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) {