Boyang Chen created KAFKA-10010: ----------------------------------- Summary: Should close standby task for safety during HandleLostAll Key: KAFKA-10010 URL: https://issues.apache.org/jira/browse/KAFKA-10010 Project: Kafka Issue Type: Bug Reporter: Boyang Chen Assignee: Boyang Chen
The current lost all logic doesn't close standby task, which could potentially lead to a tricky condition like below: 1. The standby task was initializing as `CREATED` state, and task corrupted exception was thrown from registerStateStores 2. The task corrupted exception was caught, and do a non-affected task commit 3. The task commit failed due to task migrated exception 4. The handleLostAll didn't close the standby task, leaving it as CREATED state 5. Next rebalance complete, the same task was assigned back as standby task. 6. Illegal Argument exception caught : {code:java} [2020-05-16T11:56:18-07:00] (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 18:56:18,050] ERROR [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] stream-thread [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) [2020-05-16T11:56:18-07:00] (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) java.lang.IllegalArgumentException: stream-thread [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-0000000007 has already been registered. at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) at org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) at org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)