[ https://issues.apache.org/jira/browse/FLINK-29589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krzysztof Chmielewski updated FLINK-29589: ------------------------------------------ Description: Flink's Sink architecture with global committer seems to be vulnerable for data loss during Task Manager recovery. The entire checkpoint can be lost by _GlobalCommitter_ resulting with data loss. Issue was observed in Delta Sink connector on a real 1.14.x cluster and was replicated using Flink's 1.14.6 Test Utils classes. Scenario: # Streaming source emitting constant number of events per checkpoint (20 events per commit for 5 commits in total, that gives 100 records). # Sink with parallelism > 1 with committer and _GlobalCommitter_ elements. # _Commiters_ processed committables for *checkpointId 2*. # _GlobalCommitter_ throws exception (desired exception) during *checkpointId 2* (third commit) while processing data from *checkpoint 1* (it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline). # Task Manager recovery, source resumes sending data. # Streaming source ends. # We are missing 20 records (one checkpoint). What is happening is that during recovery, committers are performing "retry" on committables for *checkpointId 2*, however those committables, reprocessed from "retry" task are not emit downstream to the global committer. The issue can be reproduced using Junit Test build with Flink's TestSink. The test was [implemented here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery] and it is based on other tests from `SinkITCase.java` class. The test reproduces the issue in more than 90% of runs. I believe that problem is somewhere around *SinkOperator::notifyCheckpointComplete* method. In there we see that Retry async task is scheduled however its result is never emitted downstream like it is done for regular flow one line above. was: Flink's Sink architecture with global committer seems to be vulnerable for data loss during Task Manager recovery. The entire checkpoint can be lost by `GlobalCommitter` resulting with data loss. Issue was observed in Delta Sink connector on a real 1.14.x cluster and was replicated using Flink's 1.14.6 Test Utils classes. Scenario: # Streaming source emitting constant number of events per checkpoint (20 events per commit for 5 commits in total, that gives 100 records). # Sink with parallelism > 1 with committer and `GlobalCommitter` elements. # `Commiters` processed committables for checkpointId 2. # `GlobalCommitter` throws exception (desired exception) during `checkpointId 2` (third commit) while processing data from checkpoint 1 (it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline). # Task Manager recovery, source resumes sending data. # Streaming source ends. # We are missing 20 records (one checkpoint). What is happening is that during recovery, committers are performing "retry" on committables for `checkpointId 2`, however those committables, reprocessed from "retry" task are not emit downstream to the global committer. The issue can be reproduced using Junit Test build with Flink's TestSink. The test was [implemented here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery] and it is based on other tests from `SinkITCase.java` class. The test reproduces the issue in more than 90% of runs. I believe that problem is somewhere around `SinkOperator::notifyCheckpointComplete` method. In there we see that Retry async task is scheduled however its result is never emitted downstream like it is done for regular flow one line above. > Data Loss in Sink GlobalCommitter during Task Manager recovery > -------------------------------------------------------------- > > Key: FLINK-29589 > URL: https://issues.apache.org/jira/browse/FLINK-29589 > Project: Flink > Issue Type: Bug > Affects Versions: 1.14.0, 1.14.2, 1.14.3, 1.14.4, 1.14.5, 1.14.6 > Reporter: Krzysztof Chmielewski > Priority: Blocker > > Flink's Sink architecture with global committer seems to be vulnerable for > data loss during Task Manager recovery. The entire checkpoint can be lost by > _GlobalCommitter_ resulting with data loss. > Issue was observed in Delta Sink connector on a real 1.14.x cluster and was > replicated using Flink's 1.14.6 Test Utils classes. > Scenario: > # Streaming source emitting constant number of events per checkpoint (20 > events per commit for 5 commits in total, that gives 100 records). > # Sink with parallelism > 1 with committer and _GlobalCommitter_ elements. > # _Commiters_ processed committables for *checkpointId 2*. > # _GlobalCommitter_ throws exception (desired exception) during > *checkpointId 2* (third commit) while processing data from *checkpoint 1* (it > is expected to global committer architecture lag one commit behind in > reference to rest of the pipeline). > # Task Manager recovery, source resumes sending data. > # Streaming source ends. > # We are missing 20 records (one checkpoint). > What is happening is that during recovery, committers are performing "retry" > on committables for *checkpointId 2*, however those committables, reprocessed > from "retry" task are not emit downstream to the global committer. > The issue can be reproduced using Junit Test build with Flink's TestSink. > The test was [implemented > here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery] > and it is based on other tests from `SinkITCase.java` class. > The test reproduces the issue in more than 90% of runs. > I believe that problem is somewhere around > *SinkOperator::notifyCheckpointComplete* method. In there we see that Retry > async task is scheduled however its result is never emitted downstream like > it is done for regular flow one line above. -- This message was sent by Atlassian Jira (v8.20.10#820010)