StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613006620



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -282,34 +307,55 @@ private void checkpointCoordinatorInternal(
 
     @Override
     public void afterSourceBarrierInjection(long checkpointId) {
-        // this method is commonly called by the CheckpointCoordinator's 
executor thread (timer
-        // thread).
-
-        // we ideally want the scheduler main-thread to be the one that sends 
the blocked events
-        // however, we need to react synchronously here, to maintain 
consistency and not allow
-        // another checkpoint injection in-between (unlikely, but possible).
-        // fortunately, the event-sending goes pretty much directly to the RPC 
gateways, which are
-        // thread safe.
-
-        // this will automatically be fixed once the checkpoint coordinator 
runs in the
+        // unfortunately, this method does not run in the scheduler executor, 
but in the
+        // checkpoint coordinator time thread.
+        // we can remove the delegation once the checkpoint coordinator runs 
fully in the
         // scheduler's main thread executor
-        eventValve.openValveAndUnmarkCheckpoint();
+        mainThreadExecutor.execute(() -> 
eventValve.openValveAndUnmarkCheckpoint(checkpointId));
     }
 
     @Override
     public void abortCurrentTriggering() {
-        // this method is commonly called by the CheckpointCoordinator's 
executor thread (timer
-        // thread).
+        // unfortunately, this method does not run in the scheduler executor, 
but in the
+        // checkpoint coordinator time thread.
+        // we can remove the delegation once the checkpoint coordinator runs 
fully in the
+        // scheduler's main thread executor
+        mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
+    }
 
-        // we ideally want the scheduler main-thread to be the one that sends 
the blocked events
-        // however, we need to react synchronously here, to maintain 
consistency and not allow
-        // another checkpoint injection in-between (unlikely, but possible).
-        // fortunately, the event-sending goes pretty much directly to the RPC 
gateways, which are
-        // thread safe.
+    // ------------------------------------------------------------------------
+    //  miscellaneous helpers
+    // ------------------------------------------------------------------------
 
-        // this will automatically be fixed once the checkpoint coordinator 
runs in the
-        // scheduler's main thread executor
-        eventValve.openValveAndUnmarkCheckpoint();
+    private void setupAllSubtaskGateways() {
+        for (int i = 0; i < operatorParallelism; i++) {
+            setupSubtaskGateway(i);
+        }
+    }
+
+    private void setupSubtaskGateway(int subtask) {
+        // this gets an access to the latest task execution attempt.
+        final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+        final OperatorCoordinator.SubtaskGateway gateway =
+                new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+        // We need to do this synchronously here, otherwise we violate the 
contract that
+        // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+        // An alternative, if we ever figure out that this cannot work 
synchronously here,
+        // is that we re-enqueue all actions (like 'subtaskFailed()' and 
'subtaskRestored()')
+        // back into the main thread executor, rather than directly calling 
the OperatorCoordinator
+        sta.hasSwitchedToRunning()
+                .thenAccept(
+                        (ignored) -> {
+                            mainThreadExecutor.assertRunningInMainThread();
+
+                            // this is a guard in case someone accidentally 
makes the
+                            // notification asynchronous
+                            assert sta.isStillRunning();
+
+                            coordinator.subtaskReady(subtask, gateway);

Review comment:
       Yes, that is true. See my comment below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to