This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 2bdc194  [FLINK-24846][streaming] Ignoring completing async operator 
record if mailbox is closed already
2bdc194 is described below

commit 2bdc19489fada127999305ab87342b2968bd5b62
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Nov 30 15:21:26 2021 +0100

    [FLINK-24846][streaming] Ignoring completing async operator record if 
mailbox is closed already
---
 .../api/operators/async/AsyncWaitOperator.java     | 15 ++++++-
 .../api/operators/async/AsyncWaitOperatorTest.java | 50 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a65b71b..2f8fc7b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -52,6 +52,7 @@ import javax.annotation.Nonnull;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -301,8 +302,18 @@ public class AsyncWaitOperator<IN, OUT>
             queue.emitCompletedElement(timestampedCollector);
             // if there are more completed elements, emit them with subsequent 
mails
             if (queue.hasCompletedElements()) {
-                mailboxExecutor.execute(
-                        this::outputCompletedElement, 
"AsyncWaitOperator#outputCompletedElement");
+                try {
+                    mailboxExecutor.execute(
+                            this::outputCompletedElement,
+                            "AsyncWaitOperator#outputCompletedElement");
+                } catch (RejectedExecutionException mailboxClosedException) {
+                    // This exception can only happen if the operator is 
cancelled which means all
+                    // pending records can be safely ignored since they will 
be processed one more
+                    // time after recovery.
+                    LOG.debug(
+                            "Attempt to complete element is ignored since the 
mailbox rejected the execution.",
+                            mailboxClosedException);
+                }
             }
         }
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index ee04bc01..f616c5d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -64,6 +64,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -1024,6 +1026,54 @@ public class AsyncWaitOperatorTest extends TestLogger {
         assertThat(outputElements, Matchers.equalTo(expectedOutput));
     }
 
+    @Test
+    public void testIgnoreAsyncOperatorRecordsOnDrain() throws Exception {
+        // given: Async wait operator which are able to collect result futures.
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        List<ResultFuture<?>> resultFutures = new ArrayList<>();
+        try (StreamTaskMailboxTestHarness<Integer> harness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<>(
+                                        new 
CollectableFuturesAsyncFunction<>(resultFutures),
+                                        TIMEOUT,
+                                        5,
+                                        AsyncDataStream.OutputMode.ORDERED))
+                        .build()) {
+            // when: Processing at least two elements in reverse order to keep 
completed queue not
+            // empty.
+            harness.processElement(new StreamRecord<>(1));
+            harness.processElement(new StreamRecord<>(2));
+
+            for (ResultFuture<?> resultFuture : Lists.reverse(resultFutures)) {
+                resultFuture.complete(Collections.emptyList());
+            }
+
+            // then: All records from async operator should be ignored during 
drain since they will
+            // be processed on recovery.
+            harness.finishProcessing();
+            assertTrue(harness.getOutput().isEmpty());
+        }
+    }
+
+    private static class CollectableFuturesAsyncFunction<IN> implements 
AsyncFunction<IN, IN> {
+
+        private static final long serialVersionUID = -4214078239227288637L;
+
+        private static List<ResultFuture<?>> resultFutures;
+
+        private CollectableFuturesAsyncFunction(List<ResultFuture<?>> 
resultFutures) {
+            this.resultFutures = resultFutures;
+        }
+
+        @Override
+        public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) 
throws Exception {
+            resultFutures.add(resultFuture);
+        }
+    }
+
     private static class ControllableAsyncFunction<IN> implements 
AsyncFunction<IN, IN> {
 
         private static final long serialVersionUID = -4214078239267288636L;

Reply via email to