This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4aff693c25d1e489f83a53e2c5b9e1bfb364c3b3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 21:37:27 2020 +0200

    [hotfix][coordination] Improve JavaDocs for OperatorCoordinator and 
OperatorCoordinatorHolder
---
 .../coordination/OperatorCoordinator.java          | 91 ++++++++++++++++++++--
 .../coordination/OperatorCoordinatorHolder.java    | 68 +++++++++++++---
 2 files changed, 142 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index dc06ae0..cac03d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -77,21 +77,72 @@ public interface OperatorCoordinator extends AutoCloseable {
 
        // 
------------------------------------------------------------------------
 
-       void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
result) throws Exception;
+       /**
+        * Takes a checkpoint or the coordinator. The checkpoint is identified 
by the given ID.
+        *
+        * <p>To confirm the checkpoint and store state in it, the given {@code 
CompletableFuture}
+        * must be completed with the state. To abort or dis-confirm the 
checkpoint, the given
+        * {@code CompletableFuture} must be completed exceptionally.
+        * In any case, the given {@code CompletableFuture} must be completed 
in some way, otherwise the
+        * checkpoint will not progress.
+        *
+        * <h3>Exactly-once Semantics</h3>
+        *
+        * <p>The semantics are defined as follows:
+        * <ul>
+        *   <li>The point in time when the checkpoint future is completed is 
considered the point in time
+        *       when the coordinator's checkpoint takes place.
+        *   <li>The OperatorCoordinator implementation must have a way of 
strictly ordering the sending
+        *       of events and the completion of the checkpoint future (for 
example the same thread does
+        *       both actions, or both actions are guarded by a mutex).
+        *   <li>Every event sent before the checkpoint future is completed is 
considered before the checkpoint.
+        *   <li>Every event sent after the checkpoint future is completed is 
considered to be after the checkpoint.
+        * </ul>
+        */
+       void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
resultFuture) throws Exception;
 
        /**
         * Notifies the coordinator that the checkpoint with the given 
checkpointId completes and
         * was committed.
         *
-        * <p><b>Important:</b> This method is not supposed to throw an 
exception, because by the
-        * time we notify that the checkpoint is complete, the checkpoint is 
committed and cannot be
-        * aborted any more. If the coordinator gets into an inconsistent state 
internally, it should
-        * fail the job ({@link Context#failJob(Throwable)}) instead. Any 
exception propagating from
-        * this method may be treated as a fatal error for the JobManager, 
crashing the JobManager,
-        * and leading to an expensive "master failover" procedure.
+        * <h3>Checkpoint Subsuming</h3>
+        *
+        * <p>Checkpoint IDs are strictly increasing. A checkpoint with higher 
ID always subsumes
+        * a checkpoint with lower ID. For example, when checkpoint T is 
confirmed complete, the
+        * code should treat all checkpoints with lower ID (T-1, T-2, etc.) 
also as confirmed.
+        *
+        * <h3>Exceptions</h3>
+        *
+        * <p>This method is not supposed to throw an exception indicating the 
the checkpoint cannot
+        * be completed. By the time we notify that the checkpoint is complete, 
the checkpoint is
+        * committed and cannot be aborted any more.
+        *
+        * <p>If the coordinator gets into an inconsistent state internally, as 
a result of logic that
+        * runs after this notification, it should fail the job ({@link 
Context#failJob(Throwable)})
+        * instead. Any exception propagating from this method may be treated 
as a fatal error for the
+        * JobManager, crashing the JobManager, and leading to an expensive 
"master failover" procedure.
         */
        void checkpointComplete(long checkpointId);
 
+       /**
+        * Resets the coordinator to the given checkpoint.
+        * When this method is called, the coordinator can discard all other 
in-flight working state.
+        * All subtasks will also have been reset to the same checkpoint.
+        *
+        * <p>This method is expected to behave synchronously with respect to 
other method calls and calls
+        * to {@code Context} methods. For example, Events being sent by the 
Coordinator after this method
+        * returns are assumed to take place after the checkpoint that was 
restored.
+        *
+        * <h2>Restoring implicitly notifies of Checkpoint Completion</h2>
+        *
+        * <p>Restoring to a checkpoint is a way of confirming that the 
checkpoint is complete.
+        * It is safe to commit side-effects that are predicated on checkpoint 
completion after this
+        * call.
+        *
+        * <p>Even if no call to {@link #checkpointComplete(long)} happened, 
the checkpoint can still be
+        * complete (for example when a system failure happened directly after 
committing the checkpoint,
+        * before calling the {@link #checkpointComplete(long)} method).
+        */
        void resetToCheckpoint(byte[] checkpointData) throws Exception;
 
        // 
------------------------------------------------------------------------
@@ -103,14 +154,34 @@ public interface OperatorCoordinator extends 
AutoCloseable {
         */
        interface Context {
 
+               /**
+                * Gets the ID of the operator to which the coordinator belongs.
+                */
                OperatorID getOperatorId();
 
+               /**
+                * Sends an event to the parallel subtask with the given 
subtask index.
+                *
+                * <p>The returned future is completed successfully once the 
event has been received
+                * by the target TaskManager. The future is completed 
exceptionally if the event cannot be sent.
+                * That includes situations where the target task is not 
running.
+                */
                CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int 
targetSubtask) throws TaskNotRunningException;
 
                void failTask(int subtask, Throwable cause);
 
+               /**
+                * Fails the job and trigger a global failover operation.
+                *
+                * <p>This operation restores the entire job to the latest 
complete checkpoint. This
+                * is useful to recover from inconsistent situations (the view 
from the coordinator and its
+                * subtasks as diverged), but is expensive and should be used 
with care.
+                */
                void failJob(Throwable cause);
 
+               /**
+                * Gets the current parallelism with which this operator is 
executed.
+                */
                int currentParallelism();
        }
 
@@ -126,8 +197,14 @@ public interface OperatorCoordinator extends AutoCloseable 
{
         */
        interface Provider extends Serializable {
 
+               /**
+                * Gets the ID of the operator to which the coordinator belongs.
+                */
                OperatorID getOperatorId();
 
+               /**
+                * Creates the {@code OperatorCoordinator}, using the given 
context.
+                */
                OperatorCoordinator create(Context context);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 321b401..e11bbb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -44,18 +44,66 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A holder for an {@link OperatorCoordinator.Context} and all the necessary 
facility around it that
- * is needed to interaction between the Coordinator, the Scheduler, the 
Checkpoint Coordinator, etc.
+ * The {@code OperatorCoordinatorHolder} holds the {@link OperatorCoordinator} 
and manages all its
+ * interactions with the remaining components.
+ * It provides the context and is responsible for checkpointing and exactly 
once semantics.
  *
- * <p>The holder is itself a {@link OperatorCoordinator} and forwards all 
calls to the actual coordinator.
- * That way, we can make adjustments to assumptions about the threading model 
and message/call forwarding
- * without needing to adjust all the call sites that interact with the 
coordinator.
+ * <h3>Exactly-one Semantics</h3>
  *
- * <p>This is also needed, unfortunately, because we need a lazy two-step 
initialization:
- * When the execution graph is created, we need to create the coordinators (or 
the holders, to be specific)
- * because the CheckpointCoordinator is also created in the ExecutionGraph and 
needs access to them.
- * However, the real Coordinators can only be created after SchedulerNG was 
created, because they need
- * a reference to it for the failure calls.
+ * <p>The semantics are described under {@link 
OperatorCoordinator#checkpointCoordinator(long, CompletableFuture)}.
+ *
+ * <h3>Exactly-one Mechanism</h3>
+ *
+ * <p>This implementation can handle one checkpoint being triggered at a time. 
If another checkpoint
+ * is triggered while the triggering of the first one was not completed or 
aborted, this class will
+ * throw an exception. That is in line with the capabilities of the Checkpoint 
Coordinator, which can
+ * handle multiple concurrent checkpoints on the TaskManagers, but only one 
concurrent triggering phase.
+ *
+ * <p>The mechanism for exactly once semantics is as follows:
+ *
+ * <ul>
+ *   <li>Events pass through a special channel, the {@link 
OperatorEventValve}. If we are not currently
+ *       triggering a checkpoint, then events simply pass through.
+ *   <li>Atomically, with the completion of the checkpoint future for the 
coordinator, this operator
+ *       operator event valve is closed. Events coming after that are held 
back (buffered), because
+ *       they belong to the epoch after the checkpoint.
+ *   <li>Once all coordinators in the job have completed the checkpoint, the 
barriers to the sources
+ *       are injected. After that (see {@link 
#afterSourceBarrierInjection(long)}) the valves are
+ *       opened again and the events are sent.
+ *   <li>If a task fails in the meantime, the events are dropped from the 
valve. From the coordinator's
+ *       perspective, these events are lost, because they were sent to a 
failed subtask after it's latest
+ *       complete checkpoint.
+ * </ul>
+ *
+ * <p><b>IMPORTANT:</b> A critical assumption is that all events from the 
scheduler to the Tasks are
+ * transported strictly in order. Events being sent from the coordinator after 
the checkpoint barrier
+ * was injected must not overtake the checkpoint barrier. This is currently 
guaranteed by Flink's
+ * RPC mechanism.
+ *
+ * <p>Consider this example:
+ * <pre>
+ * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d . 
|barrier| . e . f
+ * Coordinator two events: => . . x . . |trigger| . . . . . . . . . 
.|complete||barrier| . . y . . z
+ * </pre>
+ *
+ * <p>Two coordinators trigger checkpoints at the same time. 'Coordinator Two' 
takes longer to complete,
+ * and in the meantime 'Coordinator One' sends more events.
+ *
+ * <p>'Coordinator One' emits events 'c' and 'd' after it finished its 
checkpoint, meaning the events must
+ * take place after the checkpoint. But they are before the barrier injection, 
meaning the runtime
+ * task would see them before the checkpoint, if they were immediately 
transported.
+ *
+ * <p>'Coordinator One' closes its valve as soon as the checkpoint future 
completes. Events 'c' and 'd'
+ * get held back in the valve. Once 'Coordinator Two' completes its 
checkpoint, the barriers are sent
+ * to the sources. Then the valves are opened, and events 'c' and 'd' can flow 
to the tasks where they
+ * are received after the barrier.
+ *
+ * <h3>Concurrency and Threading Model</h3>
+ *
+ * <p>This component runs mainly in a main-thread-executor, like RPC 
endpoints. However,
+ * some actions need to be triggered synchronously by other threads. Most 
notably, when the
+ * checkpoint future is completed by the {@code OperatorCoordinator} 
implementation, we need to
+ * synchronously suspend event-sending.
  */
 public class OperatorCoordinatorHolder implements OperatorCoordinator, 
OperatorCoordinatorCheckpointContext {
 

Reply via email to