pnowojski commented on a change in pull request #17965:
URL: https://github.com/apache/flink/pull/17965#discussion_r762826983



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
##########
@@ -301,8 +302,17 @@ private void outputCompletedElement() {
             queue.emitCompletedElement(timestampedCollector);
             // if there are more completed elements, emit them with subsequent 
mails
             if (queue.hasCompletedElements()) {
-                mailboxExecutor.execute(
-                        this::outputCompletedElement, 
"AsyncWaitOperator#outputCompletedElement");
+                try {
+                    mailboxExecutor.execute(
+                            this::outputCompletedElement,
+                            "AsyncWaitOperator#outputCompletedElement");
+                } catch (RejectedExecutionException mailboxClosedException) {
+                    // This rejection is not a problem since this element will 
be processed one more

Review comment:
       Add a comment that we are assuming `RejectedExecutionException` can only 
happen if the task/operator is being cancelled/aborted and we thus it's safe to 
drop/ignore all of the pending records?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to