This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 685ce7b3543 KAFKA-18660: Transactions Version 2 doesn't handle epoch
overflow correctly (#18730) (#18758)
685ce7b3543 is described below
commit 685ce7b3543b6b9471e74b0963d39de1a7426ea8
Author: Justine Olshan <[email protected]>
AuthorDate: Fri Jan 31 09:23:31 2025 -0800
KAFKA-18660: Transactions Version 2 doesn't handle epoch overflow correctly
(#18730) (#18758)
Fixed the typo that used the wrong producer ID and epoch when returning so
that we handle epoch overflow correctly.
We also had to rearrange the concurrent transaction handling so that we
don't self-fence when we start the new transaction with the new producer ID.
I also tested this with a modified version of the code where epoch overflow
happens on the first epoch bump (every request has a new producer id)
Reviewers: Artem Livshits <[email protected]>, Jeff Kim
<[email protected]>
---
.../transaction/TransactionCoordinator.scala | 22 ++++---
.../integration/kafka/api/TransactionsTest.scala | 26 +++-----
.../transaction/TransactionCoordinatorTest.scala | 69 +++++++++++++++++++++-
3 files changed, 91 insertions(+), 26 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index e1edd4e4ddd..e0019f0d773 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -408,13 +408,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
// generate the new transaction metadata with added partitions
txnMetadata.inLock {
- if (txnMetadata.producerId != producerId) {
+ if (txnMetadata.pendingTransitionInProgress) {
+ // return a retriable exception to let the client backoff and
retry
+ // This check is performed first so that the pending transition
can complete before subsequent checks.
+ // With TV2, we may be transitioning over a producer epoch
overflow, and the producer may be using the
+ // new producer ID that is still only in pending state.
+ Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
Left(Errors.PRODUCER_FENCED)
- } else if (txnMetadata.pendingTransitionInProgress) {
- // return a retriable exception to let the client backoff and
retry
- Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == PrepareCommit || txnMetadata.state
== PrepareAbort) {
Left(Errors.CONCURRENT_TRANSACTIONS)
} else if (txnMetadata.state == Ongoing &&
partitions.subsetOf(txnMetadata.topicPartitions)) {
@@ -812,10 +815,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
}
- if (txnMetadata.producerId != producerId && !retryOnOverflow)
- Left(Errors.INVALID_PRODUCER_ID_MAPPING)
- else if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != PrepareEpochFence)
+ if (txnMetadata.pendingTransitionInProgress &&
txnMetadata.pendingState.get != PrepareEpochFence) {
+ // This check is performed first so that the pending transition
can complete before the next checks.
+ // With TV2, we may be transitioning over a producer epoch
overflow, and the producer may be using the
+ // new producer ID that is still only in pending state.
Left(Errors.CONCURRENT_TRANSACTIONS)
+ } else if (txnMetadata.producerId != producerId &&
!retryOnOverflow)
+ Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (!isValidEpoch)
Left(Errors.PRODUCER_FENCED)
else txnMetadata.state match {
@@ -940,7 +946,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
case Right((txnMetadata, newPreSendMetadata)) =>
// we can respond to the client immediately and continue to
write the txn markers if
// the log append was successful
- responseCallback(Errors.NONE, txnMetadata.producerId,
txnMetadata.producerEpoch)
+ responseCallback(Errors.NONE, newPreSendMetadata.producerId,
newPreSendMetadata.producerEpoch)
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult,
txnMetadata, newPreSendMetadata)
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index b32fea75ca6..78eccde0365 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidProducerEpochException,
ProducerFencedException, TimeoutException}
+import org.apache.kafka.common.errors.{ConcurrentTransactionsException,
InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.test.api.Flaky
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
@@ -623,28 +623,20 @@ class TransactionsTest extends IntegrationTestHarness {
// Wait for the expiration cycle to kick in.
Thread.sleep(600)
- if (quorum == "zk") {
- // In zk mode, transaction v1 is used.
+ TestUtils.waitUntilTrue(() => {
+ var foundException = false
try {
- // Now that the transaction has expired, the second send should fail
with a ProducerFencedException.
+ // Now that the transaction has expired, the second send should fail
with a InvalidProducerEpochException. We may see some
concurrentTransactionsExceptions.
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "2", "2", willBeCommitted = false)).get()
- fail("should have raised a ProducerFencedException since the
transaction has expired")
- } catch {
- case _: ProducerFencedException =>
- case e: ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
- }
- } else {
- try {
- // Now that the transaction has expired, the second send should fail
with a InvalidProducerEpochException.
-
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1,
null, "2", "2", willBeCommitted = false)).get()
- fail("should have raised a InvalidProducerEpochException since the
transaction has expired")
+ fail("should have raised an error due to concurrent transactions or
invalid producer epoch")
} catch {
+ case _: ConcurrentTransactionsException =>
case _: InvalidProducerEpochException =>
case e: ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
+ foundException =
e.getCause.isInstanceOf[InvalidProducerEpochException]
}
- }
+ foundException
+ }, "Never returned the expected InvalidProducerEpochException")
// Verify that the first message was aborted and the second one was never
written at all.
val nonTransactionalConsumer = nonTransactionalConsumers.head
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index ab5ff72cd98..e5b48d92466 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
assertEquals(Errors.NONE, result.error)
}
+ @Test
+ def shouldGenerateNewProducerIdIfEpochsExhaustedV2(): Unit = {
+ initPidGenericMocks(transactionalId)
+
+ val txnMetadata1 = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+ (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty,
time.milliseconds(), time.milliseconds(), TV_2)
+ // We start with txnMetadata1 so we can transform the metadata to
PrepareCommit.
+ val txnMetadata2 = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+ (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty,
time.milliseconds(), time.milliseconds(), TV_2)
+ val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit,
TV_2, producerId2, time.milliseconds(), false)
+ txnMetadata2.completeTransitionTo(transitMetadata)
+
+ assertEquals(producerId, txnMetadata2.producerId)
+ assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
+
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata1))))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata2))))
+
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any[TxnTransitMetadata],
+ capturedErrorsCallback.capture(),
+ any(),
+ any()
+ )).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+ coordinator.handleEndTransaction(transactionalId, producerId,
(Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
+ assertEquals(producerId2, newProducerId)
+ assertEquals(0, newEpoch)
+ assertEquals(Errors.NONE, error)
+ }
+
@Test
def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator(): Unit = {
when(transactionManager.validateTransactionTimeoutMs(anyInt()))
@@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
.thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
- coordinator.handleEndTransaction(transactionalId, producerId,
nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
+ coordinator.handleEndTransaction(transactionalId, producerId,
nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion,
endTxnCallback)
if (isRetry) {
assertEquals(Errors.PRODUCER_FENCED, error)
} else {
@@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
verify(transactionManager,
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
+ @Test
+ def
shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete():
Unit = {
+ val prepareWithPending = new TransactionMetadata(transactionalId,
producerId, producerId,
+ producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1,
PrepareCommit, collection.mutable.Set.empty[TopicPartition], 0,
time.milliseconds(), TV_2)
+ val txnTransitMetadata =
prepareWithPending.prepareComplete(time.milliseconds())
+
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
prepareWithPending))))
+
+ // Return CONCURRENT_TRANSACTIONS while transaction is still completing
+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2,
0, partitions, errorsCallback, TV_2)
+ assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
+
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
+
+ prepareWithPending.completeTransitionTo(txnTransitMetadata)
+ assertEquals(CompleteCommit, prepareWithPending.state)
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
prepareWithPending))))
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any[TxnTransitMetadata],
+ capturedErrorsCallback.capture(),
+ any(),
+ any())
+ ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+ coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2,
0, partitions, errorsCallback, TV_2)
+
+ assertEquals(Errors.NONE, error)
+ verify(transactionManager,
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+ }
+
@ParameterizedTest
@ValueSource(shorts = Array(0, 2))
def
shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit(transactionVersion:
Short): Unit = {