This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 97fb8be251d KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with Transactions V2 (#18467) 97fb8be251d is described below commit 97fb8be251dbda115ee5f89de6bb28d79107ffdf Author: Justine Olshan <jols...@confluent.io> AuthorDate: Fri Jan 10 16:51:58 2025 -0800 KAFKA-18464: Empty Abort Transaction can fence producer incorrectly with Transactions V2 (#18467) To avoid self-fencing in the commit/abort + empty abort scenario, return the concurrent transactions error when we have pending state and do the epoch check second. In this scenario, we will complete the previous transaction before proceeding to the empty abort. Added a test that failed before the change. Note -- only the pending state is checked earlier. This is because we don’t return from EndTxn (the first commit) until we already written to the log, transitioned to PrepareX, and have the pending CompleteX state. We don't need to worry about the cases of an EndTxn request coming in with PrepareX without the pending state because that would be an older request and/or retry which are already covered. Reviewers: Artem Livshits <alivsh...@confluent.io>, Jeff Kim <jeff....@confluent.io> --- .../coordinator/transaction/TransactionCoordinator.scala | 4 ++-- .../scala/integration/kafka/api/TransactionsTest.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 67d3d3d3624..e1edd4e4ddd 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -814,10 +814,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig, if (txnMetadata.producerId != producerId && !retryOnOverflow) Left(Errors.INVALID_PRODUCER_ID_MAPPING) - else if (!isValidEpoch) - Left(Errors.PRODUCER_FENCED) else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence) Left(Errors.CONCURRENT_TRANSACTIONS) + else if (!isValidEpoch) + Left(Errors.PRODUCER_FENCED) else txnMetadata.state match { case Ongoing => val nextState = if (txnMarkerResult == TransactionResult.COMMIT) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 9637eb7b943..b5b51c6f498 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -924,6 +924,22 @@ class TransactionsTest extends IntegrationTestHarness { } } + @ParameterizedTest(name = "{displayName}.quorum={0}.groupProtocol={1}.isTV2Enabled={2}") + @CsvSource(Array( + "kraft, consumer, true", + )) + def testEmptyAbortAfterCommit(quorum: String, groupProtocol: String, isTV2Enabled: Boolean): Unit = { + val producer = transactionalProducers.head + + producer.initTransactions() + producer.beginTransaction() + producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false)) + producer.commitTransaction() + + producer.beginTransaction() + producer.abortTransaction() + } + private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) {