yunfengzhou-hub commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r939858060


##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##########
@@ -1488,12 +1488,15 @@ public void deliverOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEv
             } catch (Throwable t) {
                 ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 
-                if (getExecutionState() == ExecutionState.RUNNING
-                        || getExecutionState() == ExecutionState.INITIALIZING) 
{
+                if ((getExecutionState() == ExecutionState.RUNNING
+                                || getExecutionState() == 
ExecutionState.INITIALIZING)
+                        && !(t instanceof RejectedExecutionException)) {

Review Comment:
   In the previous implementation, the task would swallow the 
`RejectedExecutionException` if it receives any operator event after its 
mailbox has been closed, which happens during the shutdown process. This does 
not work for the close gateway event introduced in this PR, as we require that 
subtasks always receive ACK or throw exceptions on this event for the 
checkpoint process to proceed.
   
   According to our offline discussion, I'll wrap the 
`RejectedExecutionException` with `TaskNotRunningException` to clarify the 
handling behavior of exceptions on the coordinator side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to