[
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689638#comment-17689638
]
Fabian Paul commented on FLINK-30238:
-------------------------------------
Sorry for joining late I was on PTO.
[~gaoyunhaii]
# I think your analysis is not fully correct. endOfInput() is also called on
stop-with-savepoint if the source, drain only advances the max watermark. I
took one of our KafkaSink tests on 1.15 [1][2] and set a breakpoint at [3], and
it is triggered on savepoint. So the special marker \{checkpoint id =
MAX_VALUE} is emitted from the SinkWriters and stored in the Committer state
(only committables with lower checkpoint ids are committed forwarded). The
marker is initialized again if a new pipeline tries to recover from that state
but is never committed. The problem starts if you try to take another savepoint
because the committer will receive the marker again, and currently, we do not
allow two summaries with the same checkpoint id.
# The problem here is more about a possible migration story. So far, when
users use the sink and want to close all pending transactions, we advise
stopping the job with a savepoint that should finalize all open transactions (-
some windows if drain is not used). In the case of sinks with a post-commit
topology, that doesn't entirely work because it essentially needs two
notifyCheckpointCompletes first to flush the committer and then the, for
example, global committer.
We should concentrate first on the first issue because it atm blocks all sinks
running on Flink 1.15+, and figure out how to improve the situation for the
second one later.
[1]
[https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java#L177]
[2]
[https://github.com/apache/flink/blob/c6b649bf937976038dbfcd00e59c51b8d886ad96/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java#L174]
[2]
[https://github.com/apache/flink/blob/c6b649bf937976038dbfcd00e59c51b8d886ad96/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java#L183]
> Unified Sink committer does not clean up state on final savepoint
> -----------------------------------------------------------------
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.17.0, 1.15.3, 1.16.1
> Reporter: Fabian Paul
> Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending
> committables on notifyCheckpointComplete.
> This has several downsides.
> * Last committableSummary has checkpoint id LONG.MAX and is never cleared
> from the state leading to that stop-with-savepoint does not work when the
> pipeline recovers from a savepoint
> * While the committables are committed during stop-with-savepoint they are
> not forwarded to post-commit topology, potentially losing data and preventing
> to close open transactions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)