[ 
https://issues.apache.org/jira/browse/FLINK-29589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krzysztof Chmielewski updated FLINK-29589:
------------------------------------------
    Description: 
Flink's Sink architecture with global committer seems to be vulnerable for data 
loss during Task Manager recovery. The entire checkpoint can be lost by 
_GlobalCommitter_ resulting with data loss.

Issue was observed in Delta Sink connector on a real 1.14.x cluster and was 
replicated using Flink's 1.14.6 Test Utils classes.

Scenario:
 #  Streaming source emitting constant number of events per checkpoint (20 
events per commit for 5 commits in total, that gives 100 records).
 #  Sink with parallelism > 1 with committer and _GlobalCommitter_ elements.
 #  _Commiters_ processed committables for *checkpointId 2*.
 #  _GlobalCommitter_ throws exception (desired exception) during *checkpointId 
2* (third commit) while processing data from *checkpoint 1* (it is expected to 
global committer architecture lag one commit behind in reference to rest of the 
pipeline).
 # Task Manager recovery, source resumes sending data.
 # Streaming source ends.
 # We are missing 20 records (one checkpoint).

What is happening is that during recovery, committers are performing "retry" on 
committables for *checkpointId 2*, however those committables, reprocessed from 
"retry" task are not emit downstream to the global committer. 

The issue can be reproduced using Junit Test build with Flink's TestSink.
The test was [implemented 
here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery]
 and it is based on other tests from `SinkITCase.java` class.
The test reproduces the issue in more than 90% of runs.

I believe that problem is somewhere around 
*SinkOperator::notifyCheckpointComplete* method. In there we see that Retry 
async task is scheduled however its result is never emitted downstream like it 
is done for regular flow one line above.

  was:
Flink's Sink architecture with global committer seems to be vulnerable for data 
loss during Task Manager recovery. The entire checkpoint can be lost by 
`GlobalCommitter` resulting with data loss.

Issue was observed in Delta Sink connector on a real 1.14.x cluster and was 
replicated using Flink's 1.14.6 Test Utils classes.

Scenario:
 #  Streaming source emitting constant number of events per checkpoint (20 
events per commit for 5 commits in total, that gives 100 records).
 #  Sink with parallelism > 1 with committer and `GlobalCommitter` elements.
 #  `Commiters` processed committables for checkpointId 2.
 #  `GlobalCommitter` throws exception (desired exception) during `checkpointId 
2` (third commit) while processing data from checkpoint 1 (it is expected to 
global committer architecture lag one commit behind in reference to rest of the 
pipeline).
 # Task Manager recovery, source resumes sending data.
 # Streaming source ends.
 # We are missing 20 records (one checkpoint).

What is happening is that during recovery, committers are performing "retry" on 
committables for `checkpointId 2`, however those committables, reprocessed from 
"retry" task are not emit downstream to the global committer. 

The issue can be reproduced using Junit Test build with Flink's TestSink.
The test was [implemented 
here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery]
 and it is based on other tests from `SinkITCase.java` class.
The test reproduces the issue in more than 90% of runs.

I believe that problem is somewhere around 
`SinkOperator::notifyCheckpointComplete` method. In there we see that Retry 
async task is scheduled however its result is never emitted downstream like it 
is done for regular flow one line above.


> Data Loss in Sink GlobalCommitter during Task Manager recovery
> --------------------------------------------------------------
>
>                 Key: FLINK-29589
>                 URL: https://issues.apache.org/jira/browse/FLINK-29589
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.14.6
>            Reporter: Krzysztof Chmielewski
>            Priority: Blocker
>
> Flink's Sink architecture with global committer seems to be vulnerable for 
> data loss during Task Manager recovery. The entire checkpoint can be lost by 
> _GlobalCommitter_ resulting with data loss.
> Issue was observed in Delta Sink connector on a real 1.14.x cluster and was 
> replicated using Flink's 1.14.6 Test Utils classes.
> Scenario:
>  #  Streaming source emitting constant number of events per checkpoint (20 
> events per commit for 5 commits in total, that gives 100 records).
>  #  Sink with parallelism > 1 with committer and _GlobalCommitter_ elements.
>  #  _Commiters_ processed committables for *checkpointId 2*.
>  #  _GlobalCommitter_ throws exception (desired exception) during 
> *checkpointId 2* (third commit) while processing data from *checkpoint 1* (it 
> is expected to global committer architecture lag one commit behind in 
> reference to rest of the pipeline).
>  # Task Manager recovery, source resumes sending data.
>  # Streaming source ends.
>  # We are missing 20 records (one checkpoint).
> What is happening is that during recovery, committers are performing "retry" 
> on committables for *checkpointId 2*, however those committables, reprocessed 
> from "retry" task are not emit downstream to the global committer. 
> The issue can be reproduced using Junit Test build with Flink's TestSink.
> The test was [implemented 
> here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery]
>  and it is based on other tests from `SinkITCase.java` class.
> The test reproduces the issue in more than 90% of runs.
> I believe that problem is somewhere around 
> *SinkOperator::notifyCheckpointComplete* method. In there we see that Retry 
> async task is scheduled however its result is never emitted downstream like 
> it is done for regular flow one line above.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to