yunfengzhou-hub commented on code in PR #20454: URL: https://github.com/apache/flink/pull/20454#discussion_r939858060
########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java: ########## @@ -1488,12 +1488,15 @@ public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEv } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - if (getExecutionState() == ExecutionState.RUNNING - || getExecutionState() == ExecutionState.INITIALIZING) { + if ((getExecutionState() == ExecutionState.RUNNING + || getExecutionState() == ExecutionState.INITIALIZING) + && !(t instanceof RejectedExecutionException)) { Review Comment: In the previous implementation, the task would swallow the `RejectedExecutionException` if it receives any operator event after its mailbox has been closed, which happens during the shutdown process. This does not work for the close gateway event introduced in this PR, as we require that subtasks always receive ACK or throw exceptions on this event for the checkpoint process to proceed. According to our offline discussion, I'll wrap the `RejectedExecutionException` with `TaskNotRunningException` to clarify the handling behavior of exceptions on the coordinator side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org