StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r613006620
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -282,34 +307,55 @@ private void checkpointCoordinatorInternal(
@Override
public void afterSourceBarrierInjection(long checkpointId) {
- // this method is commonly called by the CheckpointCoordinator's
executor thread (timer
- // thread).
-
- // we ideally want the scheduler main-thread to be the one that sends
the blocked events
- // however, we need to react synchronously here, to maintain
consistency and not allow
- // another checkpoint injection in-between (unlikely, but possible).
- // fortunately, the event-sending goes pretty much directly to the RPC
gateways, which are
- // thread safe.
-
- // this will automatically be fixed once the checkpoint coordinator
runs in the
+ // unfortunately, this method does not run in the scheduler executor,
but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs
fully in the
// scheduler's main thread executor
- eventValve.openValveAndUnmarkCheckpoint();
+ mainThreadExecutor.execute(() ->
eventValve.openValveAndUnmarkCheckpoint(checkpointId));
}
@Override
public void abortCurrentTriggering() {
- // this method is commonly called by the CheckpointCoordinator's
executor thread (timer
- // thread).
+ // unfortunately, this method does not run in the scheduler executor,
but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs
fully in the
+ // scheduler's main thread executor
+ mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
+ }
- // we ideally want the scheduler main-thread to be the one that sends
the blocked events
- // however, we need to react synchronously here, to maintain
consistency and not allow
- // another checkpoint injection in-between (unlikely, but possible).
- // fortunately, the event-sending goes pretty much directly to the RPC
gateways, which are
- // thread safe.
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
- // this will automatically be fixed once the checkpoint coordinator
runs in the
- // scheduler's main thread executor
- eventValve.openValveAndUnmarkCheckpoint();
+ private void setupAllSubtaskGateways() {
+ for (int i = 0; i < operatorParallelism; i++) {
+ setupSubtaskGateway(i);
+ }
+ }
+
+ private void setupSubtaskGateway(int subtask) {
+ // this gets an access to the latest task execution attempt.
+ final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+ final OperatorCoordinator.SubtaskGateway gateway =
+ new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+ // We need to do this synchronously here, otherwise we violate the
contract that
+ // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+ // An alternative, if we ever figure out that this cannot work
synchronously here,
+ // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
+ // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ sta.hasSwitchedToRunning()
+ .thenAccept(
+ (ignored) -> {
+ mainThreadExecutor.assertRunningInMainThread();
+
+ // this is a guard in case someone accidentally
makes the
+ // notification asynchronous
+ assert sta.isStillRunning();
+
+ coordinator.subtaskReady(subtask, gateway);
Review comment:
Yes, that is true. See my comment below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]