Martin Hørslev created KAFKA-14172: --------------------------------------
Summary: bug: State stores loose state when tasks are reassigned under EOS wit… Key: KAFKA-14172 URL: https://issues.apache.org/jira/browse/KAFKA-14172 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.1.1 Reporter: Martin Hørslev # State stores lose state when tasks are reassigned under EOS with standby replicas and default acceptable lag. ## Goal The attached stream is an extension to the known deduplication example often use in Kafka. The goal is to assign a unique identifier to each key in the input topic and pass the key and identifier to the output topic. If a key have previously been observed then reuse the ID given last time. ### GIVEN: The stream uses exactly once semantics. There are 2 partitions and partition assigment is done using : key MODULO 2 + 1 ID assignment is done using a partition specific counter and the following logic: IF Store contains the Key THEN Reuse the associated ID. ELSE Assign ID = counter * partitionCount + context.partition() Increment counter Store the key and ID assigment. ### WHEN:<br/> **Input topic:**<br/> The stream takes as input a topic containing KeyVale<Integer,Integer> where the key and value are equal.<br/> [\{0, 0} ,\{1, 1} ,\{2, 2} ,\{1, 1},...] ### THEN: The Streams will observe the following: #### Stream 0 :<br/> **INPUT :** [\{1, 1}, \{1, 1}]<br/> **OUTPUT :** [\{1, 0}, \{1, 0}] ##### Stream 1 :<br/> **INPUT :** [\{0, 0}, \{2, 2}]<br/> **OUTPUT :** [\{0, 1}, \{2, 3}] #### Output topic:<br/> The output of the stream is a topic in which each entry from the input topic is assigned a stable ID.<br/> **OUTPUT :** [\{0, 1} ,\{1, 0} ,\{2, 3} ,\{1, 0},...] #### Counter changelog topic (before compaction):<br/> **partition 0 :** [\{0 ,0}]<br/> **partition 1 :** [\{0 ,0}, \{0 ,1}] #### Store changelog topic (before compaction):<br/> **partition 0 :** [\{1, 0}]<br/> **partition 1 :** [\{0, 1}, \{2, 3}] ## The bug and test case The input and transform are both deterministic and so should the output due to the exactly-once semantics. The test demonstrates that a re-balancing event can break the exactly-once semantics and this results in IDs being reused. The test case with default acceptable lag and caching enabled produces duplicate IDs. The result of an execution of the test is presented below and the logs of the failing run is attacthed: ``` Test shouldHonorEOSWhenUsingCachingAndStandbyReplicas[DEFAULT_CONFIG] FAILED (1m 53s) java.lang.AssertionError: Each output should correspond to one distinct value Expected: is <63000L> but: was <62888L> at com.chainalysis.enumerator.StandbyTaskEOSIntegrationTest.shouldHonorEOSWhenUsingCachingAndStandbyReplicas(StandbyTaskEOSIntegrationTest.java:246) Test shouldHonorEOSWhenUsingCachingAndStandbyReplicas[CACHING_DISABLED] PASSED (2m 2s) Test shouldHonorEOSWhenUsingCachingAndStandbyReplicas[NO_LAG_ACCEPTABLE] PASSED (1m 36s) ``` The issue can be reliably reproduced on an i7-8750H CPU @ 2.20GHz × 12 with 32 GiB Memory when using caching and the default acceptable lag for standby replicas. When caching is disabled OR the acceptable lag is set to 0 then the test no longer breaks. The underlying cause is unknown and so is unknown whether the above settings fix or hides the problem. A similar issue have been reported here: https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances Flow of the test case : 1) Produce 3000 messages. 2) Start a stream. 3) Wait for the 3000 messages to be processed. 4) Start a new stream and wait for it to start syncing 5) Produce 60.000 messages 6) Wait for 5 second. 7) Start a new thread which should introduce a re-balancing event. 8) Wait until the entire log is processed by the stream. 9) Check the uniqueness of the assigned IDs. The problem have been observed in a Kafka cluster with 4 brokers and a set up application setup matching the integration test failing here: https://github.com/apache/kafka/pull/12540/files -- This message was sent by Atlassian Jira (v8.20.10#820010)