pnowojski commented on a change in pull request #17965:
URL: https://github.com/apache/flink/pull/17965#discussion_r762826983
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
##########
@@ -301,8 +302,17 @@ private void outputCompletedElement() {
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent
mails
if (queue.hasCompletedElements()) {
- mailboxExecutor.execute(
- this::outputCompletedElement,
"AsyncWaitOperator#outputCompletedElement");
+ try {
+ mailboxExecutor.execute(
+ this::outputCompletedElement,
+ "AsyncWaitOperator#outputCompletedElement");
+ } catch (RejectedExecutionException mailboxClosedException) {
+ // This rejection is not a problem since this element will
be processed one more
Review comment:
Add a comment that we are assuming `RejectedExecutionException` can only
happen if the task/operator is being cancelled/aborted and we thus it's safe to
drop/ignore all of the pending records?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]