This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 7f242fee15d KAFKA-14880; TransactionMetadata with producer epoch -1
should be expirable (#13499)
7f242fee15d is described below
commit 7f242fee15d1121c581cb18bbbb29fb746d3f350
Author: David Jacot <[email protected]>
AuthorDate: Thu Apr 6 08:45:16 2023 +0200
KAFKA-14880; TransactionMetadata with producer epoch -1 should be expirable
(#13499)
We have seen the following error in logs:
```
"Mar 22, 2019 @
21:57:56.655",Error,"kafka-0-0","transaction-log-manager-0","Uncaught exception
in scheduled task
'transactionalId-expiration'","java.lang.IllegalArgumentException: Illegal new
producer epoch -1
```
Investigations showed that it is actually possible for a transaction
metadata object to still have -1 as producer epoch when it transitions to Dead.
When a transaction metadata is created for the first time (in
handleInitProducerId), it has -1 as its producer epoch. Then a producer epoch
is attributed and the transaction coordinator tries to persist the change. If
the write fail for instance because there is an under min isr, the transaction
metadata remains with its epoch as -1 forever or until the init producer id is
retried.
This means that it is possible for transaction metadata to remain with -1
as producer epoch until it gets expired. At the moment, this is not allowed
because we enforce a producer epoch greater or equals to 0 in
prepareTransitionTo.
Reviewers: Luke Chen <[email protected]>, Justine Olshan
<[email protected]>
---
.../transaction/TransactionMetadata.scala | 5 ++-
.../transaction/TransactionStateManagerTest.scala | 49 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 0f6d4b78adf..dc208081d10 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -382,7 +382,10 @@ private[transaction] class TransactionMetadata(val
transactionalId: String,
if (newProducerId < 0)
throw new IllegalArgumentException(s"Illegal new producer id
$newProducerId")
- if (newEpoch < 0)
+ // The epoch is initialized to NO_PRODUCER_EPOCH when the
TransactionMetadata
+ // is created for the first time and it could stay like this until
transitioning
+ // to Dead.
+ if (newState != Dead && newEpoch < 0)
throw new IllegalArgumentException(s"Illegal new producer epoch
$newEpoch")
// check that the new state transition is valid and update the pending
state if necessary
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 6a56b768c47..24ad8d6550a 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -750,6 +750,55 @@ class TransactionStateManagerTest {
assertEquals(allTransactionalIds, expiredTransactionalIds)
}
+ @Test
+ def
testTransactionExpirationShouldNotFailWithUninitializedTransactionMetadata():
Unit = {
+ val partitionIds = 0 until numPartitions
+ val maxBatchSize = 512
+ val transactionalId = "id"
+ val allTransactionalIds = Set(transactionalId)
+
+ loadTransactionsForPartitions(partitionIds)
+
+ // When TransactionMetadata is intialized for the first time, it has the
following
+ // shape. Then, the producer id and producer epoch are initialized and we
try to
+ // write the change. If the write fails (e.g. under min isr), the
TransactionMetadata
+ // is left at it is. If the transactional id is never reused, the
TransactionMetadata
+ // will be expired and it should succeed.
+ val txnMetadata = TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = 1,
+ producerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = transactionTimeoutMs,
+ state = Empty,
+ timestamp = time.milliseconds()
+ )
+ transactionManager.putTransactionStateIfNotExists(txnMetadata)
+
+ time.sleep(txnConfig.transactionalIdExpirationMs + 1)
+
+ reset(replicaManager)
+ expectLogConfig(partitionIds, maxBatchSize)
+
+ val appendedRecords = mutable.Map.empty[TopicPartition,
mutable.Buffer[MemoryRecords]]
+ expectTransactionalIdExpiration(Errors.NONE, appendedRecords)
+
+ transactionManager.removeExpiredTransactionalIds()
+ verify(replicaManager, atLeastOnce()).appendRecords(
+ anyLong(),
+ ArgumentMatchers.eq((-1).toShort),
+ ArgumentMatchers.eq(true),
+ ArgumentMatchers.eq(AppendOrigin.Coordinator),
+ any(),
+ any(),
+ any[Option[ReentrantLock]],
+ any(),
+ any()
+ )
+
+ val expiredTransactionalIds =
collectTransactionalIdsFromTombstones(appendedRecords)
+ assertEquals(allTransactionalIds, expiredTransactionalIds)
+ }
+
private def collectTransactionalIdsFromTombstones(
appendedRecords: mutable.Map[TopicPartition, mutable.Buffer[MemoryRecords]]
): Set[String] = {