[
https://issues.apache.org/jira/browse/KAFKA-14497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728723#comment-17728723
]
Daniel Urban commented on KAFKA-14497:
--------------------------------------
AFAIU, the information about the replicated state of a transaction is not
stored in the snapshot at all. I think the data stored in the snapshot file
needs to be extended with the extra information whether the completed
transaction is replicated.
By the time ProducerStateManager#completeTxn is called (which puts the
transaction into ProducerStateManager.unreplicatedTxns), the producer entry is
already cleared (ProducerAppendInfo#appendEndTxnMarker - currentTxnFirstOffset
is empty, indicating that there is no pending transaction). If a snapshot is
created at this point, and then the snapshot is loaded, there is no way to
differentiate between replicated and unreplicated transactions.
Instead, ProducerAppendInfo#appendEndTxnMarker should also set a flag showing
that while the transaction is complete, it might still be unreplicated. Then,
when ProducerStateManager#removeUnreplicatedTransactions is called, the flag in
the producer entry can be cleared.
This way the snapshot would contain the full data, and we could also recover
the state of unreplicatedTxns.
[~hachikuji] wdyt about this approach? If it seems okay, I can take a look into
this and submit a PR.
> LastStableOffset is advanced prematurely when a log is reopened.
> ----------------------------------------------------------------
>
> Key: KAFKA-14497
> URL: https://issues.apache.org/jira/browse/KAFKA-14497
> Project: Kafka
> Issue Type: Bug
> Reporter: Vincent Jiang
> Priority: Major
>
> In below test case, last stable offset of log is advanced prematurely after
> reopen:
> # producer #1 appends transaction records to leader. offsets = [0, 1, 2, 3]
> # producer #2 appends transactional records to leader. offsets = [4, 5, 6,
> 7]
> # all records are replicated to followers and high watermark advanced to 8.
> # at this point, lastStableOffset = 0. (first offset of an open transaction)
> # producer #1 aborts the transaction by writing an abort marker at offset 8.
> ProducerStateManager.unreplicatedTxns contains the aborted transaction
> (firstOffset=0, lastOffset=8)
> # then the log is closed and reopened.
> # after reopen, log.lastStableOffset is initialized to 4. This is because
> ProducerStateManager.unreplicatedTxns is empty after reopening log.
>
> We should rebuild ProducerStateManager.unreplicatedTxns when reloading a log,
> so that lastStableOffset remains unchanged before and after reopen.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)