[ 
https://issues.apache.org/jira/browse/KAFKA-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728723#comment-17728723
 ] 

Daniel Urban commented on KAFKA-14497:
--------------------------------------

AFAIU, the information about the replicated state of a transaction is not 
stored in the snapshot at all. I think the data stored in the snapshot file 
needs to be extended with the extra information whether the completed 
transaction is replicated.

By the time ProducerStateManager#completeTxn is called (which puts the 
transaction into ProducerStateManager.unreplicatedTxns), the producer entry is 
already cleared (ProducerAppendInfo#appendEndTxnMarker - currentTxnFirstOffset 
is empty, indicating that there is no pending transaction). If a snapshot is 
created at this point, and then the snapshot is loaded, there is no way to 
differentiate between replicated and unreplicated transactions.

Instead, ProducerAppendInfo#appendEndTxnMarker should also set a flag showing 
that while the transaction is complete, it might still be unreplicated. Then, 
when ProducerStateManager#removeUnreplicatedTransactions is called, the flag in 
the producer entry can be cleared.

This way the snapshot would contain the full data, and we could also recover 
the state of unreplicatedTxns.

[~hachikuji] wdyt about this approach? If it seems okay, I can take a look into 
this and submit a PR.

> LastStableOffset is advanced prematurely when a log is reopened.
> ----------------------------------------------------------------
>
>                 Key: KAFKA-14497
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14497
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Vincent Jiang
>            Priority: Major
>
> In below test case, last stable offset of log is advanced prematurely after 
> reopen:
>  # producer #1 appends transaction records to leader. offsets = [0, 1, 2, 3]
>  # producer #2 appends transactional records to leader. offsets =  [4, 5, 6, 
> 7]
>  # all records are replicated to followers and high watermark advanced to 8.
>  # at this point, lastStableOffset = 0. (first offset of an open transaction)
>  # producer #1 aborts the transaction by writing an abort marker at offset 8. 
>  ProducerStateManager.unreplicatedTxns contains the aborted transaction 
> (firstOffset=0, lastOffset=8)
>  # then the log is closed and reopened.
>  # after reopen, log.lastStableOffset is initialized to 4.  This is because 
> ProducerStateManager.unreplicatedTxns is empty after reopening log.
>  
> We should rebuild ProducerStateManager.unreplicatedTxns when reloading a log, 
> so that lastStableOffset remains unchanged before and after reopen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to