yunfengzhou-hub commented on code in PR #20454:
URL: https://github.com/apache/flink/pull/20454#discussion_r939858060
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##########
@@ -1488,12 +1488,15 @@ public void deliverOperatorEvent(OperatorID operator,
SerializedValue<OperatorEv
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
- if (getExecutionState() == ExecutionState.RUNNING
- || getExecutionState() == ExecutionState.INITIALIZING)
{
+ if ((getExecutionState() == ExecutionState.RUNNING
+ || getExecutionState() ==
ExecutionState.INITIALIZING)
+ && !(t instanceof RejectedExecutionException)) {
Review Comment:
In the previous implementation, the task would swallow the
`RejectedExecutionException` if it receives any operator event after its
mailbox has been closed, which happens during the shutdown process. This does
not work for the close gateway event introduced in this PR, as we require that
subtasks always receive ACK or throw exceptions on this event for the
checkpoint process to proceed.
According to our offline discussion, I'll wrap the
`RejectedExecutionException` with `TaskNotRunningException` to clarify the
handling behavior of exceptions on the coordinator side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]