kezhuw commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r614223827
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -307,6 +321,42 @@ public void abortCurrentTriggering() {
mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
}
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
+
+ private void setupAllSubtaskGateways() {
+ for (int i = 0; i < operatorParallelism; i++) {
+ setupSubtaskGateway(i);
+ }
+ }
+
+ private void setupSubtaskGateway(int subtask) {
+ // this gets an access to the latest task execution attempt.
+ final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+ final OperatorCoordinator.SubtaskGateway gateway =
+ new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+ // We need to do this synchronously here, otherwise we violate the
contract that
+ // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+ // An alternative, if we ever figure out that this cannot work
synchronously here,
+ // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
+ // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ FutureUtils.assertNoException(
Review comment:
I used to think that `FutureUtils.assertNoException` should be used for
only flink code but not user side code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]