This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27102b3187e KAFKA-20090 Add recovery logic to handle MaxValue epochFix
max epoch (#21469)
27102b3187e is described below
commit 27102b3187e183f35d9e2c22bdf92f87d8457131
Author: Artem Livshits <[email protected]>
AuthorDate: Thu Mar 5 04:24:23 2026 -0800
KAFKA-20090 Add recovery logic to handle MaxValue epochFix max epoch
(#21469)
Changes:
- TransactionMetadata: Log errors instead of throwing when epoch hits
MAX_VALUE
- ProducerAppendInfo: Allow marker writes at MAX_VALUE epoch for
recovery
- TransactionsTest: Add comprehensive test case
- TransactionCoordinator: Add test accessor for transaction manager
Add testRecoveryFromEpochOverflow to verify that the system correctly
handles the scenario when producer epoch reaches Short.MaxValue (32767).
The test validates:
- Epoch can reach Short.MaxValue through transaction timeouts
- When epoch overflow is detected, errors are logged but processing
continues
- Transaction markers at MAX_VALUE epoch are accepted to allow recovery
- Producer ID rotation occurs after overflow is detected
- New transactions can proceed with rotated producer ID
Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
Reviewers: Justine Olshan <[email protected]>, chickenchickenlove
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../transaction/TransactionCoordinator.scala | 3 +
.../integration/kafka/api/TransactionsTest.scala | 117 +++++++++++++++++++++
.../transaction/TransactionMetadataTest.scala | 9 +-
.../storage/internals/log/ProducerAppendInfo.java | 4 +-
.../transaction/TransactionMetadata.java | 15 ++-
5 files changed, 142 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 83ae523b7d5..82a2bd7706b 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -1006,6 +1006,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
def partitionFor(transactionalId: String): Int =
txnManager.partitionFor(transactionalId)
+ // Package-private for testing
+ private[kafka] def transactionManager: TransactionStateManager = txnManager
+
private def onEndTransactionComplete(txnIdAndPidEpoch:
TransactionalIdAndProducerIdEpoch)(error: Errors, newProducerId: Long,
newProducerEpoch: Short): Unit = {
error match {
case Errors.NONE =>
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 023e9f15b00..25b274cf3fe 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -19,6 +19,7 @@ package kafka.api
import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.TransactionState
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -695,6 +696,122 @@ class TransactionsTest extends IntegrationTestHarness {
assertThrows(classOf[IllegalStateException], () =>
producer.initTransactions())
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+
@MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRecoveryFromEpochOverflow(groupProtocol: String): Unit = {
+ // We could encounter a bug (see
https://issues.apache.org/jira/browse/KAFKA-20090)
+ // that only reproduces when epoch gets to Short.MaxValue - 1 and
transaction is
+ // aborted on timeout.
+ val transactionalId = "test-overflow"
+ var producer = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ val abortedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1,
0, "key".getBytes, "aborted".getBytes)
+
+ // Create a transaction, produce one record, and abort
+ producer.initTransactions()
+ producer.beginTransaction()
+ producer.send(abortedRecord)
+ producer.abortTransaction()
+ producer.close()
+
+ // Find the transaction coordinator partition for this transactional ID
+ val adminClient = createAdminClient()
+ try {
+ val txnDescription =
adminClient.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get()
+ val coordinatorId = txnDescription.coordinatorId()
+
+ // Access the transaction coordinator and update the epoch to
Short.MaxValue - 2
+ val coordinatorBroker = brokers.find(_.config.brokerId ==
coordinatorId).get
+ val txnCoordinator =
coordinatorBroker.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+ // Get the transaction metadata and update the epoch close to
Short.MaxValue
+ // to trigger the overflow scenario. We'll set it high enough that
subsequent
+ // operations will cause it to reach Short.MaxValue - 1 before the
timeout.
+
txnCoordinator.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
+ txnMetadataOpt.foreach { epochAndMetadata =>
+ epochAndMetadata.transactionMetadata.inLock(() => {
+
epochAndMetadata.transactionMetadata.setProducerEpoch((Short.MaxValue -
2).toShort)
+ null // inLock expects a Supplier that returns a value
+ })
+ }
+ }
+ } finally {
+ adminClient.close()
+ }
+
+ // Re-initialize the producer which will bump epoch
+ producer = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ producer.initTransactions()
+
+ // Start a transaction
+ producer.beginTransaction()
+ // Produce one record and wait for it to complete
+ producer.send(abortedRecord).get()
+ producer.flush()
+
+ // Check and assert that epoch of the transaction is Short.MaxValue - 1
(before timeout)
+ val adminClient2 = createAdminClient()
+ try {
+ val coordinatorId2 =
adminClient2.describeTransactions(java.util.List.of(transactionalId))
+ .description(transactionalId).get().coordinatorId()
+ val coordinatorBroker2 = brokers.find(_.config.brokerId ==
coordinatorId2).get
+ val txnCoordinator2 =
coordinatorBroker2.asInstanceOf[kafka.server.BrokerServer].transactionCoordinator
+
+
txnCoordinator2.transactionManager.getTransactionState(transactionalId).foreach
{ txnMetadataOpt =>
+ txnMetadataOpt.foreach { epochAndMetadata =>
+ val currentEpoch =
epochAndMetadata.transactionMetadata.producerEpoch()
+ assertEquals((Short.MaxValue - 1).toShort, currentEpoch,
+ s"Expected epoch to be ${Short.MaxValue - 1}, but got
$currentEpoch")
+ }
+ }
+
+ // Wait until state is complete abort
+ waitUntilTrue(() => {
+ val listResult = adminClient2.listTransactions()
+ val txns = listResult.all().get().asScala
+ txns.exists(txn =>
+ txn.transactionalId() == transactionalId &&
+ txn.state() == TransactionState.COMPLETE_ABORT
+ )
+ }, "Transaction was not aborted on timeout")
+ } finally {
+ adminClient2.close()
+ }
+
+ // Abort, this should be treated as retry of the abort caused by timeout
+ producer.abortTransaction()
+
+ // Start a transaction, it would use the state from abort
+ producer.beginTransaction()
+ // Produce one record and wait for it to complete
+ producer.send(abortedRecord).get()
+ producer.flush()
+
+ // Now init new producer and commit a transaction with a distinct value
+ val producer2 = createTransactionalProducer(transactionalId,
transactionTimeoutMs = 500)
+ producer2.initTransactions()
+ producer2.beginTransaction()
+ val committedRecord = new ProducerRecord[Array[Byte], Array[Byte]](topic1,
0, "key".getBytes, "committed".getBytes)
+ producer2.send(committedRecord).get()
+ producer2.commitTransaction()
+
+ // Verify that exactly one record is visible in read-committed mode
+ val consumer = createReadCommittedConsumer("test-consumer-group")
+ try {
+ val tp = new TopicPartition(topic1, 0)
+ consumer.assign(java.util.Set.of(tp))
+ val records = consumeRecords(consumer, 1)
+
+ val record = records.head
+ assertArrayEquals("key".getBytes, record.key, "Record key should match")
+ assertArrayEquals("committed".getBytes, record.value, "Record value
should be 'committed'")
+ assertEquals(0, record.partition, "Record should be in partition 0")
+ assertEquals(topic1, record.topic, "Record should be in topic1")
+ } finally {
+ consumer.close()
+ }
+ }
+
@ParameterizedTest
@CsvSource(Array(
"classic,false",
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
index b28e91f75c9..cab8d3e90de 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMetadataTest.scala
@@ -497,7 +497,14 @@ class TransactionMetadataTest {
time.milliseconds(),
TV_0)
assertTrue(txnMetadata.isProducerEpochExhausted)
- assertThrows(classOf[IllegalStateException], () =>
txnMetadata.prepareFenceProducerEpoch())
+
+ // When epoch is at max, prepareFenceProducerEpoch logs an error but
doesn't throw
+ // This allows graceful recovery through producer ID rotation
+ val preparedMetadata = txnMetadata.prepareFenceProducerEpoch()
+
+ // Epoch should remain at Short.MaxValue (not overflow to negative)
+ assertEquals(Short.MaxValue, preparedMetadata.producerEpoch)
+ assertEquals(TransactionState.PREPARE_EPOCH_FENCE,
preparedMetadata.txnState)
}
@Test
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
index 2a5d408e2e3..3edec7028d5 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java
@@ -127,9 +127,11 @@ public class ProducerAppendInfo {
// In both cases, the transaction has already ended
(currentTxnFirstOffset is empty).
// We suppress the InvalidProducerEpochException and allow the
duplicate marker to
// be written to the log.
+ // In some buggy scenarios we may start transaction with
MAX_VALUE. We allow
+ // code to gracefully recover from that.
if (transactionVersion >= 2 &&
producerEpoch == current &&
- updatedEntry.currentTxnFirstOffset().isEmpty()) {
+ (updatedEntry.currentTxnFirstOffset().isEmpty() ||
producerEpoch == Short.MAX_VALUE)) {
log.info("Idempotent transaction marker retry detected for
producer {} epoch {}. " +
"Transaction already completed, allowing
duplicate marker write.",
producerId, producerEpoch);
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
index 1940a8a90b8..00e4c636b72 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
@@ -139,11 +139,12 @@ public class TransactionMetadata {
public TxnTransitMetadata prepareFenceProducerEpoch() {
if (producerEpoch == Short.MAX_VALUE)
- throw new IllegalStateException("Cannot fence producer with epoch
equal to Short.MaxValue since this would overflow");
+ LOGGER.error("Fencing producer {} {} with epoch equal to
Short.MaxValue, this must not happen unless there is a bug", transactionalId,
producerId);
// If we've already failed to fence an epoch (because the write to the
log failed), we don't increase it again.
// This is safe because we never return the epoch to client if we fail
to fence the epoch
- short bumpedEpoch = hasFailedEpochFence ? producerEpoch : (short)
(producerEpoch + 1);
+ // Also don't increase if producerEpoch is already at max, to avoid
overflow.
+ short bumpedEpoch = hasFailedEpochFence || producerEpoch ==
Short.MAX_VALUE ? producerEpoch : (short) (producerEpoch + 1);
TransitionData data = new
TransitionData(TransactionState.PREPARE_EPOCH_FENCE);
data.producerEpoch = bumpedEpoch;
@@ -238,8 +239,14 @@ public class TransactionMetadata {
boolean noPartitionAdded) {
TransitionData data = new TransitionData(newState);
if (clientTransactionVersion.supportsEpochBump()) {
- // We already ensured that we do not overflow here. MAX_SHORT is
the highest possible value.
- data.producerEpoch = (short) (producerEpoch + 1);
+ if (producerEpoch == Short.MAX_VALUE && newState ==
TransactionState.PREPARE_ABORT) {
+ // If we're already in a broken state, we let the abort go
through without
+ // epoch overflow, so that we can recover and continue.
+ LOGGER.error("Aborting producer {} {} with epoch equal to
Short.MaxValue, this must not happen unless there is a bug", transactionalId,
producerId);
+ } else {
+ // We already ensured that we do not overflow here. MAX_SHORT
is the highest possible value.
+ data.producerEpoch = (short) (producerEpoch + 1);
+ }
data.lastProducerEpoch = producerEpoch;
} else {
data.producerEpoch = producerEpoch;