StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r614671342
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -307,6 +321,42 @@ public void abortCurrentTriggering() {
mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
}
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
+
+ private void setupAllSubtaskGateways() {
+ for (int i = 0; i < operatorParallelism; i++) {
+ setupSubtaskGateway(i);
+ }
+ }
+
+ private void setupSubtaskGateway(int subtask) {
+ // this gets an access to the latest task execution attempt.
+ final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+ final OperatorCoordinator.SubtaskGateway gateway =
+ new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+ // We need to do this synchronously here, otherwise we violate the
contract that
+ // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+ // An alternative, if we ever figure out that this cannot work
synchronously here,
+ // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
+ // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ FutureUtils.assertNoException(
Review comment:
That is a fair point. The exceptions coming from `subtaskReady()` should
not crash the system.
I'll add a dedicated Exception block around that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]