StephanEwen commented on a change in pull request #12234:
URL: https://github.com/apache/flink/pull/12234#discussion_r432840780
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -77,21 +77,66 @@
//
------------------------------------------------------------------------
- CompletableFuture<byte[]> checkpointCoordinator(long checkpointId)
throws Exception;
+ /**
+ * Takes a checkpoint or the coordinator. The checkpoint is identified
by the given ID.
+ *
+ * <p>To confirm the checkpoint and store state in it, the given {@code
CompletableFuture}
+ * must be completed with the state. To abort or dis-confirm the
checkpoint, the given
+ * {@code CompletableFuture} must be completed exceptionally.
+ * In any case, the given {@code CompletableFuture} must be completed
in some way, otherwise the
+ * checkpoint will not progress.
+ *
+ * <h3>Exactly-once Semantics</h3>
+ *
+ * <p>The semantics are defined as follows:
+ * <ul>
+ * <li>The point in time when the checkpoint future is completed is
considered the point in time
+ * when the coordinator's checkpoint takes place.
+ * <li>The OperatorCoordinator implementation must have a way of
strictly ordering the sending
+ * of events and the completion of the checkpoint future (for
example the same thread does
+ * both actions, or both actions are guarded by a mutex).
+ * <li>Every event sent before the checkpoint future is completed is
considered before the checkpoint.
+ * <li>Every event sent after the checkpoint future is completed is
considered to be after the checkpoint.
+ * </ul>
+ */
+ void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]>
resultFuture) throws Exception;
/**
* Notifies the coordinator that the checkpoint with the given
checkpointId completes and
* was committed.
*
- * <p><b>Important:</b> This method is not supposed to throw an
exception, because by the
- * time we notify that the checkpoint is complete, the checkpoint is
committed and cannot be
- * aborted any more. If the coordinator gets into an inconsistent state
internally, it should
- * fail the job ({@link Context#failJob(Throwable)}) instead. Any
exception propagating from
- * this method may be treated as a fatal error for the JobManager,
crashing the JobManager,
- * and leading to an expensive "master failover" procedure.
+ * <h3>Checkpoint Subsuming</h3>
+ *
+ * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher
ID always subsumes
+ * a checkpoint with lower ID. For example, when checkpoint T is
confirmed complete, the
+ * code should treat all checkpoints with lower ID (T-1, T-2, etc.)
also as confirmed.
+ *
+ * <h3>Exceptions</h3>
+ *
+ * <p>This method is not supposed to throw an exception indicating the
the checkpoint cannot
+ * be completed. By the time we notify that the checkpoint is complete,
the checkpoint is
+ * committed and cannot be aborted any more.
+ *
+ * <p>If the coordinator gets into an inconsistent state internally, as
a result of logic that
+ * runs after this notification, it should fail the job ({@link
Context#failJob(Throwable)})
+ * instead. Any exception propagating from this method may be treated
as a fatal error for the
+ * JobManager, crashing the JobManager, and leading to an expensive
"master failover" procedure.
*/
void checkpointComplete(long checkpointId);
+ /**
+ * Resets the coordinator to the given checkpoint.
+ * When this method is called, the coordinator can discard all other
in-flight working state.
+ * All subtasks will also have been reset to the same checkpoint.
+ *
+ * <p>Restoring to a checkpoint is a way of confirming that the
checkpoint is complete.
+ * It is safe to commit side-effects that are predicated on checkpoint
completion after this
+ * call.
+ *
+ * <p>Even if no call to {@link #checkpointComplete(long)} happened,
the checkpoint can still be
+ * complete (for example when a system failure happened directly after
committing the checkpoint,
+ * before calling the {@link #checkpointComplete(long)} method).
+ */
void resetToCheckpoint(byte[] checkpointData) throws Exception;
Review comment:
Will do!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]