This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2742a610b93aca65265ec281cf0f0905d39b009b Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri May 29 21:37:27 2020 +0200 [hotfix][coordination] Improve JavaDocs for OperatorCoordinator and OperatorCoordinatorHolder --- .../coordination/OperatorCoordinator.java | 91 ++++++++++++++++++++-- .../coordination/OperatorCoordinatorHolder.java | 68 +++++++++++++--- 2 files changed, 142 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index dc06ae0..cac03d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -77,21 +77,72 @@ public interface OperatorCoordinator extends AutoCloseable { // ------------------------------------------------------------------------ - void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception; + /** + * Takes a checkpoint or the coordinator. The checkpoint is identified by the given ID. + * + * <p>To confirm the checkpoint and store state in it, the given {@code CompletableFuture} + * must be completed with the state. To abort or dis-confirm the checkpoint, the given + * {@code CompletableFuture} must be completed exceptionally. + * In any case, the given {@code CompletableFuture} must be completed in some way, otherwise the + * checkpoint will not progress. + * + * <h3>Exactly-once Semantics</h3> + * + * <p>The semantics are defined as follows: + * <ul> + * <li>The point in time when the checkpoint future is completed is considered the point in time + * when the coordinator's checkpoint takes place. + * <li>The OperatorCoordinator implementation must have a way of strictly ordering the sending + * of events and the completion of the checkpoint future (for example the same thread does + * both actions, or both actions are guarded by a mutex). + * <li>Every event sent before the checkpoint future is completed is considered before the checkpoint. + * <li>Every event sent after the checkpoint future is completed is considered to be after the checkpoint. + * </ul> + */ + void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) throws Exception; /** * Notifies the coordinator that the checkpoint with the given checkpointId completes and * was committed. * - * <p><b>Important:</b> This method is not supposed to throw an exception, because by the - * time we notify that the checkpoint is complete, the checkpoint is committed and cannot be - * aborted any more. If the coordinator gets into an inconsistent state internally, it should - * fail the job ({@link Context#failJob(Throwable)}) instead. Any exception propagating from - * this method may be treated as a fatal error for the JobManager, crashing the JobManager, - * and leading to an expensive "master failover" procedure. + * <h3>Checkpoint Subsuming</h3> + * + * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher ID always subsumes + * a checkpoint with lower ID. For example, when checkpoint T is confirmed complete, the + * code should treat all checkpoints with lower ID (T-1, T-2, etc.) also as confirmed. + * + * <h3>Exceptions</h3> + * + * <p>This method is not supposed to throw an exception indicating the the checkpoint cannot + * be completed. By the time we notify that the checkpoint is complete, the checkpoint is + * committed and cannot be aborted any more. + * + * <p>If the coordinator gets into an inconsistent state internally, as a result of logic that + * runs after this notification, it should fail the job ({@link Context#failJob(Throwable)}) + * instead. Any exception propagating from this method may be treated as a fatal error for the + * JobManager, crashing the JobManager, and leading to an expensive "master failover" procedure. */ void checkpointComplete(long checkpointId); + /** + * Resets the coordinator to the given checkpoint. + * When this method is called, the coordinator can discard all other in-flight working state. + * All subtasks will also have been reset to the same checkpoint. + * + * <p>This method is expected to behave synchronously with respect to other method calls and calls + * to {@code Context} methods. For example, Events being sent by the Coordinator after this method + * returns are assumed to take place after the checkpoint that was restored. + * + * <h2>Restoring implicitly notifies of Checkpoint Completion</h2> + * + * <p>Restoring to a checkpoint is a way of confirming that the checkpoint is complete. + * It is safe to commit side-effects that are predicated on checkpoint completion after this + * call. + * + * <p>Even if no call to {@link #checkpointComplete(long)} happened, the checkpoint can still be + * complete (for example when a system failure happened directly after committing the checkpoint, + * before calling the {@link #checkpointComplete(long)} method). + */ void resetToCheckpoint(byte[] checkpointData) throws Exception; // ------------------------------------------------------------------------ @@ -103,14 +154,34 @@ public interface OperatorCoordinator extends AutoCloseable { */ interface Context { + /** + * Gets the ID of the operator to which the coordinator belongs. + */ OperatorID getOperatorId(); + /** + * Sends an event to the parallel subtask with the given subtask index. + * + * <p>The returned future is completed successfully once the event has been received + * by the target TaskManager. The future is completed exceptionally if the event cannot be sent. + * That includes situations where the target task is not running. + */ CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int targetSubtask) throws TaskNotRunningException; void failTask(int subtask, Throwable cause); + /** + * Fails the job and trigger a global failover operation. + * + * <p>This operation restores the entire job to the latest complete checkpoint. This + * is useful to recover from inconsistent situations (the view from the coordinator and its + * subtasks as diverged), but is expensive and should be used with care. + */ void failJob(Throwable cause); + /** + * Gets the current parallelism with which this operator is executed. + */ int currentParallelism(); } @@ -126,8 +197,14 @@ public interface OperatorCoordinator extends AutoCloseable { */ interface Provider extends Serializable { + /** + * Gets the ID of the operator to which the coordinator belongs. + */ OperatorID getOperatorId(); + /** + * Creates the {@code OperatorCoordinator}, using the given context. + */ OperatorCoordinator create(Context context); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 321b401..e11bbb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -44,18 +44,66 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** - * A holder for an {@link OperatorCoordinator.Context} and all the necessary facility around it that - * is needed to interaction between the Coordinator, the Scheduler, the Checkpoint Coordinator, etc. + * The {@code OperatorCoordinatorHolder} holds the {@link OperatorCoordinator} and manages all its + * interactions with the remaining components. + * It provides the context and is responsible for checkpointing and exactly once semantics. * - * <p>The holder is itself a {@link OperatorCoordinator} and forwards all calls to the actual coordinator. - * That way, we can make adjustments to assumptions about the threading model and message/call forwarding - * without needing to adjust all the call sites that interact with the coordinator. + * <h3>Exactly-one Semantics</h3> * - * <p>This is also needed, unfortunately, because we need a lazy two-step initialization: - * When the execution graph is created, we need to create the coordinators (or the holders, to be specific) - * because the CheckpointCoordinator is also created in the ExecutionGraph and needs access to them. - * However, the real Coordinators can only be created after SchedulerNG was created, because they need - * a reference to it for the failure calls. + * <p>The semantics are described under {@link OperatorCoordinator#checkpointCoordinator(long, CompletableFuture)}. + * + * <h3>Exactly-one Mechanism</h3> + * + * <p>This implementation can handle one checkpoint being triggered at a time. If another checkpoint + * is triggered while the triggering of the first one was not completed or aborted, this class will + * throw an exception. That is in line with the capabilities of the Checkpoint Coordinator, which can + * handle multiple concurrent checkpoints on the TaskManagers, but only one concurrent triggering phase. + * + * <p>The mechanism for exactly once semantics is as follows: + * + * <ul> + * <li>Events pass through a special channel, the {@link OperatorEventValve}. If we are not currently + * triggering a checkpoint, then events simply pass through. + * <li>Atomically, with the completion of the checkpoint future for the coordinator, this operator + * operator event valve is closed. Events coming after that are held back (buffered), because + * they belong to the epoch after the checkpoint. + * <li>Once all coordinators in the job have completed the checkpoint, the barriers to the sources + * are injected. After that (see {@link #afterSourceBarrierInjection(long)}) the valves are + * opened again and the events are sent. + * <li>If a task fails in the meantime, the events are dropped from the valve. From the coordinator's + * perspective, these events are lost, because they were sent to a failed subtask after it's latest + * complete checkpoint. + * </ul> + * + * <p><b>IMPORTANT:</b> A critical assumption is that all events from the scheduler to the Tasks are + * transported strictly in order. Events being sent from the coordinator after the checkpoint barrier + * was injected must not overtake the checkpoint barrier. This is currently guaranteed by Flink's + * RPC mechanism. + * + * <p>Consider this example: + * <pre> + * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . |barrier| . e . f + * Coordinator two events: => . . x . . |trigger| . . . . . . . . . .|complete||barrier| . . y . . z + * </pre> + * + * <p>Two coordinators trigger checkpoints at the same time. 'Coordinator Two' takes longer to complete, + * and in the meantime 'Coordinator One' sends more events. + * + * <p>'Coordinator One' emits events 'c' and 'd' after it finished its checkpoint, meaning the events must + * take place after the checkpoint. But they are before the barrier injection, meaning the runtime + * task would see them before the checkpoint, if they were immediately transported. + * + * <p>'Coordinator One' closes its valve as soon as the checkpoint future completes. Events 'c' and 'd' + * get held back in the valve. Once 'Coordinator Two' completes its checkpoint, the barriers are sent + * to the sources. Then the valves are opened, and events 'c' and 'd' can flow to the tasks where they + * are received after the barrier. + * + * <h3>Concurrency and Threading Model</h3> + * + * <p>This component runs mainly in a main-thread-executor, like RPC endpoints. However, + * some actions need to be triggered synchronously by other threads. Most notably, when the + * checkpoint future is completed by the {@code OperatorCoordinator} implementation, we need to + * synchronously suspend event-sending. */ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {