sanghyeok An created KAFKA-20357:
------------------------------------
Summary: lastProducerEpoch fields never persisted to transaction
log
Key: KAFKA-20357
URL: https://issues.apache.org/jira/browse/KAFKA-20357
Project: Kafka
Issue Type: Bug
Affects Versions: 4.0.0
Reporter: sanghyeok An
Assignee: sanghyeok An
This issue is a follow-up to KAFKA-20310.
During epoch rotation, if the coordinator fails after writing the updated
transaction metadata but before the client receives the InitProducerId
response, a retry from the client may be incorrectly rejected with
PRODUCER_FENCED.
This appears to be caused not only by previousProducerId not being persisted in
the transaction log, but also by lastProducerEpoch not being restored correctly
after failover. As a result, the retry is initially accepted by
TransactionCoordinator.isValidProducerId, but later fails in
TransactionMetadata.prepareIncrementProducerEpoch because the recovered
metadata no longer contains the previous epoch information needed to recognize
the retry correctly.
Scenario 2: Client retry after coordinator failover during epoch rotation
# The client has producerId = 100 and producerEpoch = 32766, so epoch
exhaustion is imminent.
# The client sends InitProducerId, and the coordinator performs epoch rotation
in memory:
** prevProducerId = 100
** producerId = 101
** producerEpoch = 0
** lastProducerEpoch = 32766
# The updated transaction metadata is written to the log, but
previousProducerId is not persisted in TransactionLog.valueToBytes().
Proposed related fix area:
https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
# The coordinator fails before the client receives the InitProducerId response.
# After failover and recovery, the coordinator reconstructs in-memory state as:
** producerId = 101
** producerEpoch = 0
** prevProducerId = 100
** lastProducerEpoch = -1
# The client retries InitProducerId with expectedProducerIdAndEpoch = (100,
32766).
# In TransactionCoordinator.isValidProducerId, the retry is accepted because:
** producerIdAndEpoch.producerId == txnMetadata.prevProducerId
** the client epoch is exhausted
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L240-L243]
# The coordinator then proceeds into the InitProducerId handling path and
calls txnMetadata.prepareIncrementProducerEpoch(...), expecting to continue the
retry flow.
** Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L262-L268]
# However, this path does not treat the request as a valid retry of the
previous epoch rotation, because lastProducerEpoch was not restored correctly
after failover.
# Inside TransactionMetadata.prepareIncrementProducerEpoch(...), the retry
validation eventually fails.
** Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L163]
# More specifically:
** the exhausted-epoch retry path is not taken because the current in-memory
producerEpoch is 0, while the original exhausted epoch was 32766
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L168]
** the retry is not recognized as a duplicate/retry of the previous rotation
because lastProducerEpoch is -1 instead of 32766
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L174]
** the method eventually throws PRODUCER_FENCED
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L182-L186]
# As a result, the coordinator returns PRODUCER_FENCED even though this is a
legitimate retry after failover during epoch rotation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)