[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15975434#comment-15975434
 ] 

Seth Wiesman commented on FLINK-6315:
-------------------------------------

[~StephanEwen] 

So I guess this is the crux of the real issue, eventually consistent file 
systems cannot perform renames but only PUTS. If one checkpoint times out the 
next needs to re-PUT its files into the next valid bucket. 

I had considered something similar to what you are doing with incremental 
checkpointing, if checkpoint 3 arrives before 2 finishes than build on 
checkpoint 1. The reason I backed away from is that in the degenerate case 
where no checkpoint ever completes before the next begins no buckets would be 
committed and no data would ever be deleted from the buffer. Is this a case you 
deal with for incremental checkpointing or is checkpointing that frequently 
considered an anti-pattern and not dealt with? 

Of course the easy solution would be to say that the `EventualyConsistentSink` 
only provides exactly once when `maxConcurrentCheckpoints` is set to 1 but that 
doesn't feel like a satisfactory answer to me. 


> Notify on checkpoint timeout 
> -----------------------------
>
>                 Key: FLINK-6315
>                 URL: https://issues.apache.org/jira/browse/FLINK-6315
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Seth Wiesman
>            Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>       /**
>        * This method is called as a notification if a distributed checkpoint 
> has been timed out.
>        *
>        * @param checkpointId The ID of the checkpoint that has been timed out.
>        * @throws Exception
>        */
>       void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to