[
https://issues.apache.org/jira/browse/KAFKA-13249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-13249.
-----------------------------------
Fix Version/s: 3.1.0
Resolution: Fixed
> Checkpoints do not contain latest offsets on shutdown when using EOS
> --------------------------------------------------------------------
>
> Key: KAFKA-13249
> URL: https://issues.apache.org/jira/browse/KAFKA-13249
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.0.0, 2.7.0, 2.8.0
> Reporter: Oliver Hutchison
> Assignee: Oliver Hutchison
> Priority: Major
> Fix For: 3.1.0
>
>
> When using EOS the {{.checkpoint}} file created when a stateful streams app
> is shutdown does not always contain changelog offsets which represent the
> latest state of the state store. The offsets can often be behind the end of
> the changelog - sometimes quite significantly.
> This leads to a state restore being required when the streams app restarts
> after shutting down cleanly as streams thinks (based on the incorrect offsets
> in the checkpoint) that the state store is not up to date with the changelog.
> This is increasing the time we see it takes to do a clean restart of a single
> instance streams app from around 10 second to sometime over 2 minutes in our
> case.
> I suspect the bug appears because an assumption about the {{commitNeeded}}
> field in the following method in {{StreamTask}}:
> {code:java}
> protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
> // commitNeeded indicates we may have processed some records since last
> commit
> // and hence we need to refresh checkpointable offsets regardless whether
> we should checkpoint or not
> if (commitNeeded) {
> stateMgr.updateChangelogOffsets(checkpointableOffsets());
> }
> super.maybeWriteCheckpoint(enforceCheckpoint);
> }
> {code}
> In a steady state case for a simple single instance single thread stream app
> where an app simply starts, runs and then shuts down the {{if
> (commitNeeded)}} test always fails when running with EOS which results in the
> latest checkpoint offsets never getting updated into the {{stateMgr}}.
> Tracing back to the callers of {{maybeWriteCheckpoint}} it's easy to see this
> is the case as there's only 1 place in the code which calls
> {{maybeWriteCheckpoint}} during this steady state. The {{postCommit(final
> boolean enforceCheckpoint)}} method, specifically the call in the {{RUNNING}}
> state.
> {code:java}
> case RUNNING:
> if (enforceCheckpoint || !eosEnabled) {
> maybeWriteCheckpoint(enforceCheckpoint);
> }
> log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}",
> state(), eosEnabled, enforceCheckpoint);
> break;
> {code}
> We can see from this code that {{maybeWriteCheckpoint}} will only ever to
> called if {{enforceCheckpoint=true}} because we know {{eosEnabled=true}} as
> we're running with EOS.
> So then where does {{postCommit}} get called with {{enforceCheckpoint=true}}?
> Again looking only at the steady state case we find that it's only called
> from {{TaskManager.tryCloseCleanAllActiveTasks}} which is only called from
> {{TaskManager.shutdown}}.
> The thing about the call in {{tryCloseCleanAllActiveTasks}} is that it
> happens *after* all active tasks have commited. Which means that
> {{StreamTask.commitNeeded=false}} for all tasks so it follows that the test
> back in {{maybeWriteCheckpoint}} always fails and we don't end up getting the
> latest offsets stored into the state manager.
> I think the fix is to simply change the test in {{maybeWriteCheckpoint}} to
> be {{if (commitNeeded || enforceCheckpoint) { ...}} as we know we must always
> update the changelog offserts before we write the checkpoint.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)