[
https://issues.apache.org/jira/browse/KAFKA-20310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Justine Olshan resolved KAFKA-20310.
------------------------------------
Resolution: Fixed
> nextProducerId and prevProducerId fields never persisted to transaction log
> ---------------------------------------------------------------------------
>
> Key: KAFKA-20310
> URL: https://issues.apache.org/jira/browse/KAFKA-20310
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 4.0.0
> Reporter: Artem Livshits
> Priority: Critical
>
> The valueToBytes() method in
> [TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
> never writes nextProducerId and prevProducerId.
> *Scenario1: Coordinator failover during PREPARE_COMMIT/PREPARE_ABORT with
> epoch exhaustion*
> 1. Transaction in PREPARE_COMMIT state with epoch exhaustion:
> - producerId = 100, producerEpoch = 32767 (Short.MAX_VALUE)
> - nextProducerId = 101, nextProducerEpoch = 0
> 2. Metadata written to log, but nextProducerId/nextProducerEpoch are not
> persisted
> 3. Coordinator leadership changes
> 4. On recovery, transaction log read gives:
> - producerId = 100, producerEpoch = 32767
> - nextProducerId = -1 (NO_PRODUCER_ID)
> - nextProducerEpoch = -1 (NO_PRODUCER_EPOCH)
> 5. Transaction markers complete, prepareComplete() is called:
> Line 331-333 in TransactionMetadata.java:
> {code:java}
> if (clientTransactionVersion.supportsEpochBump() && nextProducerId !=
> RecordBatch.NO_PRODUCER_ID) {
> data.producerId = nextProducerId; // Would rotate to 101
> data.producerEpoch = hasNextProducerEpoch() ? nextProducerEpoch : 0;
> } else {
> data.producerId = producerId; // STUCK at 100
> data.producerEpoch = clientProducerEpoch(); // STUCK at 32767
> }
> {code}
> 6. Because nextProducerId is -1, the else branch executes
> 7. Producer stays at (100, 32767) - epoch exhausted
> 8. Next InitProducerId fails: "Cannot allocate any more producer epochs"
> *Scenario 2: Client retry after coordinator failover during epoch rotation*
> 1. Client has producer (100, 32766), epoch exhaustion imminent
> 2. InitProducerId triggers epoch rotation:
> - prevProducerId = 100
> - producerId = 101, producerEpoch = 0
> 3. Metadata written to log, but prevProducerId is NOT persisted
> 4. Coordinator fails before client receives response
> 5. On recovery:
> - prevProducerId = -1 (NO_PRODUCER_ID)
> - producerId = 101, producerEpoch = 0
> 6. Client retries InitProducerId with expectedProducerIdAndEpoch = (100,
> 32766)
> 7. Validation in TransactionCoordinator.scala line 243-245:
> {code:java}
> (producerIdAndEpoch.producerId == txnMetadata.prevProducerId && // 100 ==
> -1?
> TransactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch))
> {code}
> 8. Check fails, returns PRODUCER_FENCED
> *The fix* is to add
> {code:java}
> if (logVersion >= 1) {
> value.setPreviousProducerId(txnMetadata.prevProducerId());
> value.setNextProducerId(txnMetadata.nextProducerId());
> }
> {code}
> to valueToBytes() method in
> [TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)