StephanEwen commented on a change in pull request #10483:
[FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events
URL: https://github.com/apache/flink/pull/10483#discussion_r355181966
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -845,4 +854,58 @@ private String
retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID
.orElse("Unknown location");
}
+ @Override
+ public void deliverOperatorEventToCoordinator(
+ final ExecutionAttemptID taskExecutionId,
+ final OperatorID operatorId,
+ final OperatorEvent evt) throws FlinkException {
+
+ // Failure semantics (as per the javadocs of the method):
+ // If the task manager sends an event for a non-running task or
an non-existing operator
+ // coordinator, then respond with an exception to the call. If
task and coordinator exist,
+ // then we assume that the call from the TaskManager was valid,
and any bubbling exception
+ // needs to cause a job failure.
+
+ final Execution exec =
executionGraph.getRegisteredExecutions().get(taskExecutionId);
+ if (exec == null || exec.getState() != ExecutionState.RUNNING) {
+ throw new FlinkException("Task is not running in the
scheduler");
+ }
+
+ final ExecutionJobVertex ejv = exec.getVertex().getJobVertex();
+ final OperatorCoordinator coordinator =
ejv.getOperatorCoordinator(operatorId);
+ if (coordinator == null) {
+ throw new FlinkException("No coordinator registered for
operator " + operatorId);
+ }
Review comment:
Could do that, but I personally find the "dummyX" pattern less readable than
such a simple check.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services