This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 2bdc194 [FLINK-24846][streaming] Ignoring completing async operator
record if mailbox is closed already
2bdc194 is described below
commit 2bdc19489fada127999305ab87342b2968bd5b62
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Nov 30 15:21:26 2021 +0100
[FLINK-24846][streaming] Ignoring completing async operator record if
mailbox is closed already
---
.../api/operators/async/AsyncWaitOperator.java | 15 ++++++-
.../api/operators/async/AsyncWaitOperatorTest.java | 50 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index a65b71b..2f8fc7b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -52,6 +52,7 @@ import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -301,8 +302,18 @@ public class AsyncWaitOperator<IN, OUT>
queue.emitCompletedElement(timestampedCollector);
// if there are more completed elements, emit them with subsequent
mails
if (queue.hasCompletedElements()) {
- mailboxExecutor.execute(
- this::outputCompletedElement,
"AsyncWaitOperator#outputCompletedElement");
+ try {
+ mailboxExecutor.execute(
+ this::outputCompletedElement,
+ "AsyncWaitOperator#outputCompletedElement");
+ } catch (RejectedExecutionException mailboxClosedException) {
+ // This exception can only happen if the operator is
cancelled which means all
+ // pending records can be safely ignored since they will
be processed one more
+ // time after recovery.
+ LOG.debug(
+ "Attempt to complete element is ignored since the
mailbox rejected the execution.",
+ mailboxClosedException);
+ }
}
}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index ee04bc01..f616c5d 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -64,6 +64,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
@@ -1024,6 +1026,54 @@ public class AsyncWaitOperatorTest extends TestLogger {
assertThat(outputElements, Matchers.equalTo(expectedOutput));
}
+ @Test
+ public void testIgnoreAsyncOperatorRecordsOnDrain() throws Exception {
+ // given: Async wait operator which are able to collect result futures.
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ List<ResultFuture<?>> resultFutures = new ArrayList<>();
+ try (StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(
+ new AsyncWaitOperatorFactory<>(
+ new
CollectableFuturesAsyncFunction<>(resultFutures),
+ TIMEOUT,
+ 5,
+ AsyncDataStream.OutputMode.ORDERED))
+ .build()) {
+ // when: Processing at least two elements in reverse order to keep
completed queue not
+ // empty.
+ harness.processElement(new StreamRecord<>(1));
+ harness.processElement(new StreamRecord<>(2));
+
+ for (ResultFuture<?> resultFuture : Lists.reverse(resultFutures)) {
+ resultFuture.complete(Collections.emptyList());
+ }
+
+ // then: All records from async operator should be ignored during
drain since they will
+ // be processed on recovery.
+ harness.finishProcessing();
+ assertTrue(harness.getOutput().isEmpty());
+ }
+ }
+
+ private static class CollectableFuturesAsyncFunction<IN> implements
AsyncFunction<IN, IN> {
+
+ private static final long serialVersionUID = -4214078239227288637L;
+
+ private static List<ResultFuture<?>> resultFutures;
+
+ private CollectableFuturesAsyncFunction(List<ResultFuture<?>>
resultFutures) {
+ this.resultFutures = resultFutures;
+ }
+
+ @Override
+ public void asyncInvoke(IN input, ResultFuture<IN> resultFuture)
throws Exception {
+ resultFutures.add(resultFuture);
+ }
+ }
+
private static class ControllableAsyncFunction<IN> implements
AsyncFunction<IN, IN> {
private static final long serialVersionUID = -4214078239267288636L;