[ 
https://issues.apache.org/jira/browse/KAFKA-20310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-20310.
------------------------------------
    Resolution: Fixed

> nextProducerId and prevProducerId fields never persisted to transaction log
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-20310
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20310
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 4.0.0
>            Reporter: Artem Livshits
>            Priority: Critical
>
> The valueToBytes() method in 
> [TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
>  never writes nextProducerId and prevProducerId.
> *Scenario1: Coordinator failover during PREPARE_COMMIT/PREPARE_ABORT with 
> epoch exhaustion*
> 1. Transaction in PREPARE_COMMIT state with epoch exhaustion:
>    - producerId = 100, producerEpoch = 32767 (Short.MAX_VALUE)
>    - nextProducerId = 101, nextProducerEpoch = 0
> 2. Metadata written to log, but nextProducerId/nextProducerEpoch are not 
> persisted
> 3. Coordinator leadership changes
> 4. On recovery, transaction log read gives:
>    - producerId = 100, producerEpoch = 32767
>    - nextProducerId = -1 (NO_PRODUCER_ID)
>    - nextProducerEpoch = -1 (NO_PRODUCER_EPOCH)
> 5. Transaction markers complete, prepareComplete() is called:
>    Line 331-333 in TransactionMetadata.java:
> {code:java}
>  if (clientTransactionVersion.supportsEpochBump() && nextProducerId != 
> RecordBatch.NO_PRODUCER_ID) {
>      data.producerId = nextProducerId;         // Would rotate to 101
>      data.producerEpoch = hasNextProducerEpoch() ? nextProducerEpoch : 0;
>  } else {
>      data.producerId = producerId;             // STUCK at 100
>      data.producerEpoch = clientProducerEpoch();  // STUCK at 32767
>  }
> {code}
> 6. Because nextProducerId is -1, the else branch executes
> 7. Producer stays at (100, 32767) - epoch exhausted
> 8. Next InitProducerId fails: "Cannot allocate any more producer epochs"
> *Scenario 2: Client retry after coordinator failover during epoch rotation*
> 1. Client has producer (100, 32766), epoch exhaustion imminent
> 2. InitProducerId triggers epoch rotation:
>    - prevProducerId = 100
>    - producerId = 101, producerEpoch = 0
> 3. Metadata written to log, but prevProducerId is NOT persisted
> 4. Coordinator fails before client receives response
> 5. On recovery:
>    - prevProducerId = -1 (NO_PRODUCER_ID)
>    - producerId = 101, producerEpoch = 0
> 6. Client retries InitProducerId with expectedProducerIdAndEpoch = (100, 
> 32766)
> 7. Validation in TransactionCoordinator.scala line 243-245:
> {code:java}
>    (producerIdAndEpoch.producerId == txnMetadata.prevProducerId &&  // 100 == 
> -1?
>     TransactionMetadata.isEpochExhausted(producerIdAndEpoch.epoch))
> {code}
> 8. Check fails, returns PRODUCER_FENCED
> *The fix* is to add 
> {code:java}
>   if (logVersion >= 1) {
>       value.setPreviousProducerId(txnMetadata.prevProducerId());
>       value.setNextProducerId(txnMetadata.nextProducerId());
>   }
> {code}
> to valueToBytes() method in 
> [TransactionLog.java|https://github.com/apache/kafka/blob/7d53410d8febf4e39a65cba75951a941fe83c337/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java#L87]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to