Eswarar Siva created KAFKA-20731:
------------------------------------
Summary: Streams: transitToUpdateStandby is called without
checking ACTIVE_RESTORING, crashing the state updater thread
Key: KAFKA-20731
URL: https://issues.apache.org/jira/browse/KAFKA-20731
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 4.1.2, 4.3.0
Reporter: Eswarar Siva
The state updater thread can die with (captured on 4.3.0):
java.lang.IllegalStateException: The changelog reader is not restoring active
tasks (is STANDBY_UPDATING) while trying to
transit to update standby tasks: {...}
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.transitToUpdateStandby(StoreChangelogReader.java:326)
at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.resumeTask(DefaultStateUpdater.java:661)
at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.resumeTasks(DefaultStateUpdater.java:241)
at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.runOnce(DefaultStateUpdater.java:197)
at
org.apache.kafka.streams.processor.internals.DefaultStateUpdater$StateUpdaterThread.run(DefaultStateUpdater.java:163)
transitToUpdateStandby() is documented as not idempotent and throws when the
reader state is not ACTIVE_RESTORING. Two call sites on the state updater
thread call it without checking the state first:
1. resumeTask, in the standby branch: when a standby task is resumed and
updatingTasks.size() == 1,
it calls changelogReader.transitToUpdateStandby() with no guard on the
reader state.
2. transitToUpdateStandbysIfOnlyStandbysLeft: when onlyStandbyTasksUpdating()
is true it calls
transitToUpdateStandby() with no guard on the reader state.
This is a single thread problem and needs no cross thread race. Ordering on the
state updater thread:
-> the reader is already STANDBY_UPDATING, which is the normal state once only
standbys are left,
-> the updating tasks are paused (pauseTask does not move the reader back to
ACTIVE_RESTORING),
-> one standby is resumed, updatingTasks.size() becomes 1, resumeTask calls
transitToUpdateStandby() on a reader that is already STANDBY_UPDATING, and it
throws, killing the state updater thread.
Reproduced on 4.1.2 and 4.3.0 with a standalone Streams application under
topology pause and resume churn (the same reproducer used for KAFKA-20724). The
code path is unchanged on trunk.
Relationship to KAFKA-17946: that ticket was the flaky test
shouldResumeStandbyTask. Its fix (PR #18253) stabilized the test but did not
change resumeTask or transitToUpdateStandby, so the production transition is
still not idempotent and still reachable.
Suggested fix: preserve the invariant that transitToUpdateStandby is only
called when the reader is ACTIVE_RESTORING. Guard both call sites with
isRestoringActive(), or make transitToUpdateStandby idempotent so it is a no op
when the reader is already STANDBY_UPDATING.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)