[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265345#comment-16265345
 ] 

Aljoscha Krettek commented on FLINK-7883:
-----------------------------------------

The problem is that this is harder than it looks. In a Flink pipeline data can 
not only originate at sources but there can be other things in the pipeline 
that produce events "out of thin air". From the top of my head I can think of: 
 - Sources: this is obvious, sources emit events
 - Reaction to watermark timers, these can emit events. Not as easy to control 
because watermarks can not only originate at sources but also at any operator 
in the pipeline
 - Reaction to processing-time timers, these can also emit events at any time
 - "Special" operators that emit data from a separate thread. Currently this 
would be the Async I/O operator and the {{ContinuousFileReaderOperator}} which 
forms the "file source" together with {{ContinuousFileMonitoringFunction}}.

If we want to make sure that we don't emit any unwanted data we have to 
_quiesce_ the whole pipeline first. This can be done via special messages (like 
watermarks) that are injected at the sources and traverse the topology based on 
a message from the {{JobManager}}. All operators would have to report that they 
are quiesced before we do the savepoint. In case the savepoint fails for some 
reason we need to _un-quiesce_ the pipeline again.

The above is the hard part. Making sure that we don't emit data from a "source" 
is as simple as ensuring that {{SourceContext.collect()}} doesn't forward the 
data that the source wants to emit.

We definitely have to solve this problem but I don't think that we can do this 
for the 1.5 release because the community decided to do a very short release 
cycle after 1.4.0 because a bunch of important features are almost ready to be 
released?

> Stop fetching source before a cancel with savepoint
> ---------------------------------------------------
>
>                 Key: FLINK-7883
>                 URL: https://issues.apache.org/jira/browse/FLINK-7883
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API, Kafka Connector, State Backends, 
> Checkpointing
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Antoine Philippot
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to