StephanEwen commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r432840780



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -77,21 +77,66 @@
 
        // 
------------------------------------------------------------------------
 
-       CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) 
throws Exception;
+       /**
+        * Takes a checkpoint or the coordinator. The checkpoint is identified 
by the given ID.
+        *
+        * <p>To confirm the checkpoint and store state in it, the given {@code 
CompletableFuture}
+        * must be completed with the state. To abort or dis-confirm the 
checkpoint, the given
+        * {@code CompletableFuture} must be completed exceptionally.
+        * In any case, the given {@code CompletableFuture} must be completed 
in some way, otherwise the
+        * checkpoint will not progress.
+        *
+        * <h3>Exactly-once Semantics</h3>
+        *
+        * <p>The semantics are defined as follows:
+        * <ul>
+        *   <li>The point in time when the checkpoint future is completed is 
considered the point in time
+        *       when the coordinator's checkpoint takes place.
+        *   <li>The OperatorCoordinator implementation must have a way of 
strictly ordering the sending
+        *       of events and the completion of the checkpoint future (for 
example the same thread does
+        *       both actions, or both actions are guarded by a mutex).
+        *   <li>Every event sent before the checkpoint future is completed is 
considered before the checkpoint.
+        *   <li>Every event sent after the checkpoint future is completed is 
considered to be after the checkpoint.
+        * </ul>
+        */
+       void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
resultFuture) throws Exception;
 
        /**
         * Notifies the coordinator that the checkpoint with the given 
checkpointId completes and
         * was committed.
         *
-        * <p><b>Important:</b> This method is not supposed to throw an 
exception, because by the
-        * time we notify that the checkpoint is complete, the checkpoint is 
committed and cannot be
-        * aborted any more. If the coordinator gets into an inconsistent state 
internally, it should
-        * fail the job ({@link Context#failJob(Throwable)}) instead. Any 
exception propagating from
-        * this method may be treated as a fatal error for the JobManager, 
crashing the JobManager,
-        * and leading to an expensive "master failover" procedure.
+        * <h3>Checkpoint Subsuming</h3>
+        *
+        * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher 
ID always subsumes
+        * a checkpoint with lower ID. For example, when checkpoint T is 
confirmed complete, the
+        * code should treat all checkpoints with lower ID (T-1, T-2, etc.) 
also as confirmed.
+        *
+        * <h3>Exceptions</h3>
+        *
+        * <p>This method is not supposed to throw an exception indicating the 
the checkpoint cannot
+        * be completed. By the time we notify that the checkpoint is complete, 
the checkpoint is
+        * committed and cannot be aborted any more.
+        *
+        * <p>If the coordinator gets into an inconsistent state internally, as 
a result of logic that
+        * runs after this notification, it should fail the job ({@link 
Context#failJob(Throwable)})
+        * instead. Any exception propagating from this method may be treated 
as a fatal error for the
+        * JobManager, crashing the JobManager, and leading to an expensive 
"master failover" procedure.
         */
        void checkpointComplete(long checkpointId);
 
+       /**
+        * Resets the coordinator to the given checkpoint.
+        * When this method is called, the coordinator can discard all other 
in-flight working state.
+        * All subtasks will also have been reset to the same checkpoint.
+        *
+        * <p>Restoring to a checkpoint is a way of confirming that the 
checkpoint is complete.
+        * It is safe to commit side-effects that are predicated on checkpoint 
completion after this
+        * call.
+        *
+        * <p>Even if no call to {@link #checkpointComplete(long)} happened, 
the checkpoint can still be
+        * complete (for example when a system failure happened directly after 
committing the checkpoint,
+        * before calling the {@link #checkpointComplete(long)} method).
+        */
        void resetToCheckpoint(byte[] checkpointData) throws Exception;

Review comment:
       Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to