[
https://issues.apache.org/jira/browse/FLINK-29589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616307#comment-17616307
]
Krzysztof Chmielewski edited comment on FLINK-29589 at 10/20/22 9:10 AM:
-------------------------------------------------------------------------
Hi [~chesnay]
V2 on 1.15, 1.16 and 1.17 has its own issues that we have found and actually we
are working to fix them with Fabian Paul.
https://issues.apache.org/jira/browse/FLINK-29509
https://issues.apache.org/jira/browse/FLINK-29583
https://issues.apache.org/jira/browse/FLINK-29512
https://issues.apache.org/jira/browse/FLINK-29627
With those stil on the plate we cant really tell if there is a data loss on V2
since Task manager is failing to start during recovery when running Sink with
global committer.
was (Author: kristoffsc):
Hi [~chesnay]
V2 on 1.15, 1.16 and 1.17 has its own issues that we have found and actually we
are working to fix them with Fabian Paul.
https://issues.apache.org/jira/browse/FLINK-29509
https://issues.apache.org/jira/browse/FLINK-29583
https://issues.apache.org/jira/browse/FLINK-29512
With those stil on the plate we cant really tell if there is a data loss on V2
since Task manager is failing to start during recovery when running Sink with
global committer.
> Data Loss in Sink GlobalCommitter during Task Manager recovery
> --------------------------------------------------------------
>
> Key: FLINK-29589
> URL: https://issues.apache.org/jira/browse/FLINK-29589
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.14.0
> Reporter: Krzysztof Chmielewski
> Priority: Blocker
>
> Flink's Sink architecture with global committer seems to be vulnerable for
> data loss during Task Manager recovery. The entire checkpoint can be lost by
> _GlobalCommitter_ resulting with data loss.
> Issue was observed in Delta Sink connector on a real 1.14.x cluster and was
> replicated using Flink's 1.14.6 Test Utils classes.
> Scenario:
> # Streaming source emitting constant number of events per checkpoint (20
> events per commit for 5 commits in total, that gives 100 records).
> # Sink with parallelism > 1 with committer and _GlobalCommitter_ elements.
> # _Commiters_ processed committables for *checkpointId 2*.
> # _GlobalCommitter_ throws exception (desired exception) during
> *checkpointId 2* (third commit) while processing data from *checkpoint 1* (it
> is expected to global committer architecture lag one commit behind in
> reference to rest of the pipeline).
> # Task Manager recovery, source resumes sending data.
> # Streaming source ends.
> # We are missing 20 records (one checkpoint).
> What is happening is that during recovery, committers are performing "retry"
> on committables for *checkpointId 2*, however those committables, reprocessed
> from "retry" task are not emit downstream to the global committer.
> The issue can be reproduced using Junit Test build with Flink's TestSink.
> The test was [implemented
> here|https://github.com/kristoffSC/flink/blob/Flink_1.14_DataLoss_SinkGlobalCommitter/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java#:~:text=testGlobalCommitterMissingRecordsDuringRecovery]
> and it is based on other tests from `SinkITCase.java` class.
> The test reproduces the issue in more than 90% of runs.
> I believe that problem is somewhere around
> *SinkOperator::notifyCheckpointComplete* method. In there we see that Retry
> async task is scheduled however its result is never emitted downstream like
> it is done for regular flow one line above.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)