[ 
https://issues.apache.org/jira/browse/FLINK-19698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219664#comment-17219664
 ] 

Stephan Ewen commented on FLINK-19698:
--------------------------------------

Fair question. 

So far, there was no notion of checkpoints, just split assignment, which 
generalized work assignment and checkpoint restoring.
The explicit checkpoint hook breaks this. In batch processing, it would never 
be called, as I understand it.

One might argue that it should be called at the end of batch processing. 
Similar in how the new sink API unifies "committing semantics" between "commit 
on complete checkpoint" (streaming) and "commit on job end" (batch).

I think in practice, this "commit on checkpoint" is going to be used in a 
critical way (needed for correct behavior) by sources that cannot be unified 
(Rabbit MQ). And for sources that can be unified (Kafka) it will be used mainly 
for "visibility purposes", like committing offsets to a consumer group such 
that external tools can monitor the lag.

Given that, I'd be fin with ignoring this in batch execution for now.

> Add close() method and onCheckpointComplete() to the Source.
> ------------------------------------------------------------
>
>                 Key: FLINK-19698
>                 URL: https://issues.apache.org/jira/browse/FLINK-19698
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Common
>    Affects Versions: 1.11.2
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>            Priority: Major
>
> Right now there are some caveats to the new Source API. From the 
> implementation of some connectors. We would like to make the following 
> improvements to the current Source API.
>  # Add the following method to the {{SplitReader}} API.
> {{public void close() throws Exception;}}
> This method allows the SplitReader implementations to be closed properly when 
> the split fetcher exits.
>  # Add the following method to the {{SourceReader}} API.
> {{public void checkpointComplete(long checkpointId);}}
> This method allows the {{SourceReader}} to take some cleanup / reporting 
> actions when a checkpoint has been successfully taken.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to