[ https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698180#comment-17698180 ]
Guozhang Wang commented on KAFKA-14172: --------------------------------------- [~gray.john][~Horslev] I took a deep look into this issue and I think I found the culprit. Here's a short summary: 1. When standbys are enabled, Kafka Streams could recycle a standby task (and its state stores) into an active, and vice versa. 2. When caching is enabled, we would bypass the caching layer when updating a standby task (i.e. via putInternal). And these two together combined would cause an issue. Take a concrete example following https://github.com/apache/kafka/pull/12540's demo: let's say we have a task A with a cached state store S. * For a given host, originally the task was hosted as an active. * A rebalance happens, and that task was recycled into a standby. At that time the cache is flushed, so that the underlying store and the cache layer are consistent, let's assume they are S1 (version 1). * The standby task was updated for a period of time, where updates are directly written into the underlying store. Now the underlying store is S2 while the caching layer is still S1. * A second rebalance happens, and that task was recycled again into an active. Then when that task is normally processing, a read into the store would hit the cache layer first, and very likely read out an older versioned S1 instead of S2. As a result, we have a duplicate: more specifically in the above PR's example, the {{count}} store would return an old counter and hence cause the resulted ID inferred from counter being used twice. That also explains why the test would not fail if caching is disabled, or standby replicas are disabled (tested locally); I think this test could still fail even when acceptable lag is set to 0, but it is less likely to have a standby -> active and then -> standby again so maybe people may not easily observe it. I have a hack fix (note, this is not for merging as it is just a hack) that is inherited from [~Horslev]'s integration test, which would clear the cache upon flushing it (which is called when the task manager is flushed). With this fix the test no longer fails. > bug: State stores lose state when tasks are reassigned under EOS wit… > --------------------------------------------------------------------- > > Key: KAFKA-14172 > URL: https://issues.apache.org/jira/browse/KAFKA-14172 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.1.1 > Reporter: Martin Hørslev > Priority: Critical > > h1. State stores lose state when tasks are reassigned under EOS with standby > replicas and default acceptable lag. > I have observed that state stores used in a transform step under a Exactly > Once semantics ends up losing state after a rebalancing event that includes > reassignment of tasks to previous standby task within the acceptable standby > lag. > > The problem is reproduceable and an integration test have been created to > showcase the [issue|https://github.com/apache/kafka/pull/12540]. > A detailed description of the observed issue is provided > [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45] > Similar issues have been observed and reported to StackOverflow for example > [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)