[ 
https://issues.apache.org/jira/browse/KAFKA-14374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635805#comment-17635805
 ] 

Youcef Sebiat commented on KAFKA-14374:
---------------------------------------

Thanks for your answer. I will try to elaborate.
{quote}What do you mean by this? Do you expect that an input record from 
partition 0 (table2 topic) will go to repartition-topic partition 0? But 
default, this won't be guaranteed, because the default hash-partitioning may 
compute a different partition for an integer vs a string.
{quote}
Not exactly. What we are expecting is that the state store will be a mirror of 
the input table. When we read the changelog topic and the input table, we 
notice that are some keys are missing. However when we read the repartition 
topic, we find that it mirrors the input table topic and the set of keys is 
equal between the two.
{quote}Not sure what this means. Can you elaborate?
{quote}
Maybe an example can be more helpful: we have three key-value pair [(1, A);(2, 
B);(3,C)]. In the input topic `table2` we have all three [(1, A);(2, B);(3,C)]; 
in the `repartition-topic` we can see all three have processed [(1, A);(2, 
B);(3,C)], but in the changelog we observe that at least one of the three is 
missing if not all. It is generally the first in the order that are missing. 

So what we actually observe is that some messages get lost between the 
repartition topic and the changelog topic.
{quote}How do you do the verification?
{quote}
We stop the debezium connector that feeds the input topic. So the input topic 
doesn't change and the number of keys do not change(cardinality in my original 
post). We then download the input topic content and the changelog topic 
content. Using a script, we compare the sets of keys in the input topic, 
repartition topic and changelog topics. We find that the set of keys is exactly 
the same between input topic and repartition topic, but keys in changelog topic 
is missing some keys that are present both in input topic and repartition topic.
{quote}What do you mean by "triggers the delete"?
{quote}
"triggers the delete" == moves the low watermark of the repartition topic to a 
higher values.
{quote}Also, why do think that changelog writes are not flushed?{quote}
Because when we check the topics, we observe that some messages are missing. 
And the loss happens in the transition between the repartition topic and the 
changelog topic.
 
 

> Kafka streams losing messages in State Store during first launch of app
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-14374
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14374
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 3.2.0
>            Reporter: Youcef Sebiat
>            Priority: Major
>         Attachments: Screenshot 2022-11-09 at 14.56.00.png
>
>
> We have been using Kafka Streams to implement a CDC based app. Attached is 
> the sub topology of interest.
> `table2` topic is created by Debezium who is connected to a SQL DB. It 
> contains 26K lines. We take `table2` and create a key which is only a 
> conversion of the key from `string` to `int`. This means that we should 
> expect that #table2=#repartition-topic=#state-store; which actually is not 
> verified. What we end up with is the following #table2=#repartition-topic, 
> but  #repartition-topic>#state-store. We actually lose messages and thus 
> corrupt the state store, which makes the app live in incorrect state. (Please 
> note that there is no insertion in `table2` as we paused the connector to 
> verify the cardinality.)
> The above happens only during the first launch, i.e. the app has never been 
> launched before, so internal topics do not exist yet. Restarts of 
> pre-existing apps do not yield any problems.
> We have:
> 1. Broker on Kafka 3.2.
> 2. Client app on 2.8|3.2 (we tried both and we faced the same issue).
> 2. All parameters are by default except `CACHE_MAX_BYTES_BUFFERING_CONFIG` 
> set to `0` and `NUM_STREAM_THREADS_CONFIG` set to `>1`.
>  
> *What actually worked*
> 1. Use a monothread at first launch: using one thread solves the problem. The 
> #table2=#repartition-topic=#state-store is verified.
> 2. Pre-creating kafka internal topics: we noticed that whenever there is 
> rebalance during the first launch of Kafka Streams app, the state stores 
> ended up missing values. This also happens when you launch multiple pods in 
> K8s for example. When we read through the logs, we noticed that there is a 
> rebalance that is triggered when we first launch the app. This comes from the 
> fact that the internal topics get created and assigned, thus the rebalance. 
> So by creating the internal topics before, we avoid the rebalance and we end 
> up by #table2=#repartition-topic=#state-store.
> *What we noticed from the logs*
> On multi-thread mode, we noticed that it is the partitions that are assigned 
> to the thread chosen by the Coordinator to be the Leader of consumers that 
> suffer the data loss. What we think is happening is the following:
> 1. Consumers threads are launched and inform the coordinator.
> 2. Coordinator assign topics and choses the Leader among the threads.
> 3. App create internal topics.
> 4. Consumers/producers process data. Specifically the Consumer leader 
> consumes from the repartition topic, which triggers the delete of those 
> messages without flushing them to changelog topic.
> 5. Leader notified of new assignment with internal topics. Triggers rebalance.
> 6. Leader pauses partitions. 
> 7. Rebalance finished. The leader resumes partitions.
> 8. Leader fetches the oldest offset of repartition partitions he got 
> assigned. He will not start from zero, but instead from where he got 
> interrupted in 4. The chunk of early messages are thus lost.
> Please note, that on mono-thread mode, there is no data loss which is weird 
> since the leader is actually the unique thread. 
> So my questions are:
> 1. Are we understanding wrongly what's happening?
> 2. What can be the origin of this problem?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to