sanghyeok An created KAFKA-20357:
------------------------------------

             Summary: lastProducerEpoch fields never persisted to transaction 
log
                 Key: KAFKA-20357
                 URL: https://issues.apache.org/jira/browse/KAFKA-20357
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 4.0.0
            Reporter: sanghyeok An
            Assignee: sanghyeok An


This issue is a follow-up to KAFKA-20310.
 
During epoch rotation, if the coordinator fails after writing the updated 
transaction metadata but before the client receives the InitProducerId 
response, a retry from the client may be incorrectly rejected with 
PRODUCER_FENCED.
This appears to be caused not only by previousProducerId not being persisted in 
the transaction log, but also by lastProducerEpoch not being restored correctly 
after failover. As a result, the retry is initially accepted by 
TransactionCoordinator.isValidProducerId, but later fails in 
TransactionMetadata.prepareIncrementProducerEpoch because the recovered 
metadata no longer contains the previous epoch information needed to recognize 
the retry correctly.

 

Scenario 2: Client retry after coordinator failover during epoch rotation
 # The client has producerId = 100 and producerEpoch = 32766, so epoch 
exhaustion is imminent.
 # The client sends InitProducerId, and the coordinator performs epoch rotation 
in memory:
 ** prevProducerId = 100
 ** producerId = 101
 ** producerEpoch = 0
 ** lastProducerEpoch = 32766
 # The updated transaction metadata is written to the log, but 
previousProducerId is not persisted in TransactionLog.valueToBytes().
Proposed related fix area:
https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
 # The coordinator fails before the client receives the InitProducerId response.
 # After failover and recovery, the coordinator reconstructs in-memory state as:
 ** producerId = 101
 ** producerEpoch = 0
 ** prevProducerId = 100
 ** lastProducerEpoch = -1
 # The client retries InitProducerId with expectedProducerIdAndEpoch = (100, 
32766).
 # In TransactionCoordinator.isValidProducerId, the retry is accepted because:
 ** producerIdAndEpoch.producerId == txnMetadata.prevProducerId
 ** the client epoch is exhausted
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L240-L243]
 # The coordinator then proceeds into the InitProducerId handling path and 
calls txnMetadata.prepareIncrementProducerEpoch(...), expecting to continue the 
retry flow.
 ** Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L262-L268]
 # However, this path does not treat the request as a valid retry of the 
previous epoch rotation, because lastProducerEpoch was not restored correctly 
after failover.
 # Inside TransactionMetadata.prepareIncrementProducerEpoch(...), the retry 
validation eventually fails.

 ** Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L163]
 # More specifically:
 ** the exhausted-epoch retry path is not taken because the current in-memory 
producerEpoch is 0, while the original exhausted epoch was 32766
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L168]
 ** the retry is not recognized as a duplicate/retry of the previous rotation 
because lastProducerEpoch is -1 instead of 32766
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L174]
 ** the method eventually throws PRODUCER_FENCED
Relevant code:
[https://github.com/apache/kafka/blob/5729bb614fbd3397a7894d3831fb46ddc555a27b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java#L182-L186]
 # As a result, the coordinator returns PRODUCER_FENCED even though this is a 
legitimate retry after failover during epoch rotation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to