StephanEwen commented on a change in pull request #10483: [FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events URL: https://github.com/apache/flink/pull/10483#discussion_r377668232
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ########## @@ -845,4 +854,58 @@ private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID .orElse("Unknown location"); } + @Override + public void deliverOperatorEventToCoordinator( + final ExecutionAttemptID taskExecutionId, + final OperatorID operatorId, + final OperatorEvent evt) throws FlinkException { + + // Failure semantics (as per the javadocs of the method): + // If the task manager sends an event for a non-running task or an non-existing operator + // coordinator, then respond with an exception to the call. If task and coordinator exist, + // then we assume that the call from the TaskManager was valid, and any bubbling exception + // needs to cause a job failure. + + final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId); + if (exec == null || exec.getState() != ExecutionState.RUNNING) { + throw new FlinkException("Task is not running in the scheduler"); Review comment: The task is (or was) running on the TaskManager but was unknown on the JobManager. I think this can happen only if a failure / recovery was concurrently in progress and the event was still being in flight (delivered). I would say that this indicates an inconsistent situation and should lead to task failure on the TaskManager. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services