[ 
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974811#comment-15974811
 ] 

Seth Wiesman commented on FLINK-6315:
-------------------------------------

[~aljoscha] Apologies, didn't see this comment. Might have jumped the gun a 
little bit sending in this PR 

In an eventually consistent file system, PUT operations are the only ones that 
can be considered valid. That means that invalid data can never be deleted. So 
instead valid data is marked valid when it has been fully checkpointed.

Each sink will buffer data locally. Then on checkpoint that data will be copied 
to the final FS (say S3) in a bucket that whose path is a combination of the 
checkpoint id and timestamp of when the checkpoint was initiated so that it is 
always unique. When the checkpoint is completed a single flag file is written 
to the bucket, marking it valid. 

In practice, every operator can attempt to write this file to the bucket; even 
though overwriting a file is considered an inconsistent operation, all 
operators are writing identical empty files so it’s not actually an issue. This 
means that I am only reliant on a single operator ever receiving the checkpoint 
complete message. I was not aware of the possibility that no operator would 
receive the message so I may need to rethink this bit. 

As far as notifyOnCheckpointTimeout goes, each operator keeps track of which 
files it has uploaded for each checkpoint. If checkpoint A times out then those 
files must be re-uploaded on checkpoint B, there is a diagram on FLINK-6306 
showing this process. If the number of concurrent checkpoints is set to 1 then 
this is trivial, if a new checkpoint begins before the last completed it must 
have timed out. This becomes more difficult when concurrent checkpoints are 
thrown into the mix, I need a way to signal that a previous checkpoint has 
timed out and it is up to the next to upload those files.


> Notify on checkpoint timeout 
> -----------------------------
>
>                 Key: FLINK-6315
>                 URL: https://issues.apache.org/jira/browse/FLINK-6315
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Seth Wiesman
>            Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some 
> third party location to partially output on checkpoint and then commit on 
> notifyCheckpointComplete. If that external system does not gracefully handle 
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then 
> that data needs to be handled by the next checkpoint. 
> The idea is to add a new interface similar to CheckpointListener that 
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
>  * This interface must be implemented by functions/operations that want to 
> receive
>  * a notification if a checkpoint has been {@link 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
>  */
> public interface CheckpointTimeoutListener {
>       /**
>        * This method is called as a notification if a distributed checkpoint 
> has been timed out.
>        *
>        * @param checkpointId The ID of the checkpoint that has been timed out.
>        * @throws Exception
>        */
>       void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to