[ 
https://issues.apache.org/jira/browse/FLINK-18706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168343#comment-17168343
 ] 

Yumeng Zhang edited comment on FLINK-18706 at 7/31/20, 2:41 AM:
----------------------------------------------------------------

[~kkl0u] Yes, I see. But this exactly once semantic is only for inside Flink. 
These data which has been read after savepoint barrier may eventually be sent 
to the outside world even if they are ignored from the final savepoint. This 
means when we restore from the last savepoint, we will reprocess these data and 
send them to the outside world again. I'm not sure if it meets your 
expectations. What I want to fix is to make sure stop-with-savepoint be exactly 
once not only for inside Flink but for the outside world.


was (Author: yumeng):
[~kkl0u] Yes, I see. But this exactly once semantic is only for inside Flink. 
These data which has been read after savepoint barrier may eventually be sent 
to the outside world even if they are ignored from the final savepoint. This 
means when we restore from the last savepoint, we will reprocess these data and 
send them to the outside world again. I'm not sure if it meets your 
expectations. What I want to fix is to make sure stop-with-savepoint be exactly 
once for not only for inside Flink but the outside world.

> Stop with savepoint cannot guarantee exactly-once for kafka source
> ------------------------------------------------------------------
>
>                 Key: FLINK-18706
>                 URL: https://issues.apache.org/jira/browse/FLINK-18706
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.1, 1.11.1
>            Reporter: Yumeng Zhang
>            Priority: Major
>              Labels: pull-request-available
>
> When I run stop-with-savepoint command with my old job and submit a new job 
> with the previous sync-savepoint, I find sometimes my new job will consume a 
> few duplicate data. Here is my case. I have a data generation job with 
> parallelism 1, which will generate long number incrementally and send the 
> data to Kafka topicA which only has one partition. Then I have another 
> consumer job with parallelism 1, which reads data from topicA and does 
> nothing processing, just print these numbers to system out. For example, 
> after doing stop-with-savepoint, my consumer job has printed sequence 
> 0,1,2,3...40,41,42,43. Then I start the consumer job again from that 
> sync-savepoint. It prints 41,42,43,44..., which means it has consumed some 
> duplicate data.
> I think the reason is that we fail to guarantee the mutual exclusion between 
> canceling source task and sending data to downstream by checkpoint lock. It 
> may send some data to downstream first before sync-savepoint completed and 
> then cancel the task. Therefore, We need to keep the source operator running 
> in the synchronous savepoint mailbox loop for triggerCheckpoint method before 
> synchronous savepoint completed and keep checking running state before 
> sending data to downstream for Kafka connector. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to