This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ccab9eb8b47 KAFKA-18660: Transactions Version 2 doesn't handle epoch
overflow correctly (#18730)
ccab9eb8b47 is described below
commit ccab9eb8b4781eb4b8edbd03b505ae31e1abdff7
Author: Justine Olshan <[email protected]>
AuthorDate: Thu Jan 30 13:42:10 2025 -0800
KAFKA-18660: Transactions Version 2 doesn't handle epoch overflow correctly
(#18730)
Fixed the typo that used the wrong producer ID and epoch when returning so
that we handle epoch overflow correctly.
We also had to rearrange the concurrent transaction handling so that we
don't self-fence when we start the new transaction with the new producer ID.
I also tested this with a modified version of the code where epoch overflow
happens on the first epoch bump (every request has a new producer id)
Reviewers: Artem Livshits <[email protected]>, Jeff Kim
<[email protected]>
---
.../transaction/TransactionCoordinator.scala | 22 ++++---
.../integration/kafka/api/TransactionsTest.scala | 25 ++++----
.../transaction/TransactionCoordinatorTest.scala | 69 +++++++++++++++++++++-
3 files changed, 97 insertions(+), 19 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index d6ca11add86..aec0f3a67ba 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -408,13 +408,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// generate the new transaction metadata with added partitions
txnMetadata.inLock {
- if (txnMetadata.producerId != producerId) {
+ if (txnMetadata.pendingTransitionInProgress) {
+ // return a retriable exception to let the client backoff and
retry
+ // This check is performed first so that the pending transition
can complete before subsequent checks.
+ // With TV2, we may be transitioning over a producer epoch
overflow, and the producer may be using the
+ // new producer ID that is still only in pending state.
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
Left(Errors.PRODUCER_FENCED)
- } else if (txnMetadata.pendingTransitionInProgress) {
- // return a retriable exception to let the client backoff and
retry
- Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state
== PrepareAbort) {
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == Ongoing &&
partitions.subsetOf(txnMetadata.topicPartitions)) {
@@ -812,10 +815,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
}
- if (txnMetadata.producerId != producerId && !retryOnOverflow)
- Left(Errors.INVALID_PRODUCER_ID_MAPPING)
- else if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != PrepareEpochFence)
+ if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != PrepareEpochFence) {
+ // This check is performed first so that the pending transition
can complete before the next checks.
+ // With TV2, we may be transitioning over a producer epoch
overflow, and the producer may be using the
+ // new producer ID that is still only in pending state.
Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.producerId != producerId &&
!retryOnOverflow)
+ Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (!isValidEpoch)
Left(Errors.PRODUCER_FENCED)
else txnMetadata.state match {
@@ -940,7 +946,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
case Right((txnMetadata, newPreSendMetadata)) =>
// we can respond to the client immediately and continue to
write the txn markers if
// the log append was successful
- responseCallback(Errors.NONE, txnMetadata.producerId,
txnMetadata.producerEpoch)
+ responseCallback(Errors.NONE, newPreSendMetadata.producerId,
newPreSendMetadata.producerEpoch)
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult,
txnMetadata, newPreSendMetadata)
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 2733167b469..fed2b313d06 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidProducerEpochException,
ProducerFencedException, TimeoutException}
+import org.apache.kafka.common.errors.{ConcurrentTransactionsException,
InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.test.api.Flaky
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
@@ -617,15 +617,20 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)
- try {
- // Now that the transaction has expired, the second send should fail
with a InvalidProducerEpochException.
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "2", "2", willBeCommitted = false)).get()
- fail("should have raised a InvalidProducerEpochException since the
transaction has expired")
- } catch {
- case _: InvalidProducerEpochException =>
- case e: ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
- }
+ TestUtils.waitUntilTrue(() => {
+ var foundException = false
+ try {
+ // Now that the transaction has expired, the second send should fail
with a InvalidProducerEpochException. We may see some
concurrentTransactionsExceptions.
+
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "2", "2", willBeCommitted = false)).get()
+ fail("should have raised an error due to concurrent transactions or
invalid producer epoch")
+ } catch {
+ case _: ConcurrentTransactionsException =>
+ case _: InvalidProducerEpochException =>
+ case e: ExecutionException =>
+ foundException =
e.getCause.isInstanceOf[InvalidProducerEpochException]
+ }
+ foundException
+ }, "Never returned the expected InvalidProducerEpochException")
// Verify that the first message was aborted and the second one was never
written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index c8e9a47df1a..7cfe5acb728 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
assertEquals(Errors.NONE, result.error)
}
+ @Test
+ def shouldGenerateNewProducerIdIfEpochsExhaustedV2(): Unit = {
+ initPidGenericMocks(transactionalId)
+
+ val txnMetadata1 = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+ (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty,
time.milliseconds(), time.milliseconds(), TV_2)
+ // We start with txnMetadata1 so we can transform the metadata to
PrepareCommit.
+ val txnMetadata2 = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+ (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty,
time.milliseconds(), time.milliseconds(), TV_2)
+ val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit,
TV_2, producerId2, time.milliseconds(), false)
+ txnMetadata2.completeTransitionTo(transitMetadata)
+
+ assertEquals(producerId, txnMetadata2.producerId)
+ assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
+
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata2))))
+
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any[TxnTransitMetadata],
+ capturedErrorsCallback.capture(),
+ any(),
+ any()
+ )).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+ coordinator.handleEndTransaction(transactionalId, producerId,
(Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
+ assertEquals(producerId2, newProducerId)
+ assertEquals(0, newEpoch)
+ assertEquals(Errors.NONE, error)
+ }
+
@Test
def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator(): Unit = {
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
@@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
- coordinator.handleEndTransaction(transactionalId, producerId,
nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
+ coordinator.handleEndTransaction(transactionalId, producerId,
nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
if (isRetry) {
assertEquals(Errors.PRODUCER_FENCED, error)
} else {
@@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
verify(transactionManager,
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
+ @Test
+ def
shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete():
Unit = {
+ val prepareWithPending = new TransactionMetadata(transactionalId,
producerId, producerId,
+ producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1,
PrepareCommit, collection.mutable.Set.empty[TopicPartition], 0,
time.milliseconds(), TV_2)
+ val txnTransitMetadata =
prepareWithPending.prepareComplete(time.milliseconds())
+
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
prepareWithPending))))
+
+ // Return CONCURRENT_TRANSACTIONS while transaction is still completing
+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2,
0, partitions, errorsCallback, TV_2)
+ assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
+
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
+
+ prepareWithPending.completeTransitionTo(txnTransitMetadata)
+ assertEquals(CompleteCommit, prepareWithPending.state)
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
prepareWithPending))))
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any[TxnTransitMetadata],
+ capturedErrorsCallback.capture(),
+ any(),
+ any())
+ ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2,
0, partitions, errorsCallback, TV_2)
+
+ assertEquals(Errors.NONE, error)
+ verify(transactionManager,
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+ }
+
@ParameterizedTest
@ValueSource(shorts = Array(0, 2))
def
shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit(transactionVersion:
Short): Unit = {