[
https://issues.apache.org/jira/browse/FLINK-6315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974811#comment-15974811
]
Seth Wiesman commented on FLINK-6315:
-------------------------------------
[~aljoscha] Apologies, didn't see this comment. Might have jumped the gun a
little bit sending in this PR
In an eventually consistent file system, PUT operations are the only ones that
can be considered valid. That means that invalid data can never be deleted. So
instead valid data is marked valid when it has been fully checkpointed.
Each sink will buffer data locally. Then on checkpoint that data will be copied
to the final FS (say S3) in a bucket that whose path is a combination of the
checkpoint id and timestamp of when the checkpoint was initiated so that it is
always unique. When the checkpoint is completed a single flag file is written
to the bucket, marking it valid.
In practice, every operator can attempt to write this file to the bucket; even
though overwriting a file is considered an inconsistent operation, all
operators are writing identical empty files so it’s not actually an issue. This
means that I am only reliant on a single operator ever receiving the checkpoint
complete message. I was not aware of the possibility that no operator would
receive the message so I may need to rethink this bit.
As far as notifyOnCheckpointTimeout goes, each operator keeps track of which
files it has uploaded for each checkpoint. If checkpoint A times out then those
files must be re-uploaded on checkpoint B, there is a diagram on FLINK-6306
showing this process. If the number of concurrent checkpoints is set to 1 then
this is trivial, if a new checkpoint begins before the last completed it must
have timed out. This becomes more difficult when concurrent checkpoints are
thrown into the mix, I need a way to signal that a previous checkpoint has
timed out and it is up to the next to upload those files.
> Notify on checkpoint timeout
> -----------------------------
>
> Key: FLINK-6315
> URL: https://issues.apache.org/jira/browse/FLINK-6315
> Project: Flink
> Issue Type: New Feature
> Components: Core
> Reporter: Seth Wiesman
> Assignee: Seth Wiesman
>
> A common use case when writing a custom operator that outputs data to some
> third party location to partially output on checkpoint and then commit on
> notifyCheckpointComplete. If that external system does not gracefully handle
> rollbacks (such as Amazon S3 not allowing consistent delete operations) then
> that data needs to be handled by the next checkpoint.
> The idea is to add a new interface similar to CheckpointListener that
> provides a callback when the CheckpointCoordinator timesout a checkpoint
> {code:java}
> /**
> * This interface must be implemented by functions/operations that want to
> receive
> * a notification if a checkpoint has been {@link
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator}
> */
> public interface CheckpointTimeoutListener {
> /**
> * This method is called as a notification if a distributed checkpoint
> has been timed out.
> *
> * @param checkpointId The ID of the checkpoint that has been timed out.
> * @throws Exception
> */
> void notifyCheckpointTimeout(long checkpointId) throws Exception;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)