[ 
https://issues.apache.org/jira/browse/KAFKA-14172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698180#comment-17698180
 ] 

Guozhang Wang commented on KAFKA-14172:
---------------------------------------

[~gray.john][~Horslev] I took a deep look into this issue and I think I found 
the culprit. Here's a short summary:

1. When standbys are enabled, Kafka Streams could recycle a standby task (and 
its state stores) into an active, and vice versa.
2. When caching is enabled, we would bypass the caching layer when updating a 
standby task (i.e. via putInternal).

And these two together combined would cause an issue. Take a concrete example 
following https://github.com/apache/kafka/pull/12540's demo: let's say we have 
a task A with a cached state store S. 

* For a given host, originally the task was hosted as an active.
* A rebalance happens, and that task was recycled into a standby. At that time 
the cache is flushed, so that the underlying store and the cache layer are 
consistent, let's assume they are S1 (version 1).
* The standby task was updated for a period of time, where updates are directly 
written into the underlying store. Now the underlying store is S2 while the 
caching layer is still S1.
* A second rebalance happens, and that task was recycled again into an active. 
Then when that task is normally processing, a read into the store would hit the 
cache layer first, and very likely read out an older versioned S1 instead of 
S2. As a result, we have a duplicate: more specifically in the above PR's 
example, the {{count}} store would return an old counter and hence cause the 
resulted ID inferred from counter being used twice.

That also explains why the test would not fail if caching is disabled, or 
standby replicas are disabled (tested locally); I think this test could still 
fail even when acceptable lag is set to 0, but it is less likely to have a 
standby -> active and then -> standby again so maybe people may not easily 
observe it.

I have a hack fix (note, this is not for merging as it is just a hack) that is 
inherited from [~Horslev]'s integration test, which would clear the cache upon 
flushing it (which is called when the task manager is flushed). With this fix 
the test no longer fails.

> bug: State stores lose state when tasks are reassigned under EOS wit…
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-14172
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14172
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.1
>            Reporter: Martin Hørslev
>            Priority: Critical
>
> h1. State stores lose state when tasks are reassigned under EOS with standby 
> replicas and default acceptable lag.
> I have observed that state stores used in a transform step under a Exactly 
> Once semantics ends up losing state after a rebalancing event that includes 
> reassignment of tasks to previous standby task within the acceptable standby 
> lag.
>  
> The problem is reproduceable and an integration test have been created to 
> showcase the [issue|https://github.com/apache/kafka/pull/12540]. 
> A detailed description of the observed issue is provided 
> [here|https://github.com/apache/kafka/pull/12540/files?short_path=3ca480e#diff-3ca480ef093a1faa18912e1ebc679be492b341147b96d7a85bda59911228ef45]
> Similar issues have been observed and reported to StackOverflow for example 
> [here|https://stackoverflow.com/questions/69038181/kafka-streams-aggregation-data-loss-between-instance-restarts-and-rebalances].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to