StephanEwen commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r614208786
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -307,6 +321,42 @@ public void abortCurrentTriggering() {
mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
}
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
+
+ private void setupAllSubtaskGateways() {
+ for (int i = 0; i < operatorParallelism; i++) {
+ setupSubtaskGateway(i);
+ }
+ }
+
+ private void setupSubtaskGateway(int subtask) {
+ // this gets an access to the latest task execution attempt.
+ final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+ final OperatorCoordinator.SubtaskGateway gateway =
+ new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+ // We need to do this synchronously here, otherwise we violate the
contract that
+ // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+ // An alternative, if we ever figure out that this cannot work
synchronously here,
+ // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
+ // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ FutureUtils.assertNoException(
Review comment:
That's a pattern we use in many places in Flink. There should never be
an exception (that's why it is an `assert`) but when the assert fails, it needs
fail loudly. When it fails, there is a bug and we must notice that immediately.
If the bug happens rarely, attempting to handle this gracefully and attempting
recovery might even hide the bug across the CI runs because the recovery would
then succeed.
I also think the graceful failure handling here would add mode code clutter,
which I try to avoid for situations that should never happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]