Repository: flink
Updated Branches:
  refs/heads/release-1.4 38278ebef -> 09edf6a62


[FLINK-7949] Add unit test for AsyncWaitOperator recovery with full queue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb088bc1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb088bc1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb088bc1

Branch: refs/heads/release-1.4
Commit: fb088bc1343099a1ea71d0589ab825897e8dcdee
Parents: a2198b0
Author: Till Rohrmann <[email protected]>
Authored: Wed Jan 10 18:53:38 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jan 25 15:26:09 2018 +0100

----------------------------------------------------------------------
 .../operators/async/AsyncWaitOperatorTest.java  | 123 ++++++++++++++++++-
 1 file changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb088bc1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 993bffb..34c9a0f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -56,6 +57,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.tasks.AcknowledgeStreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -65,6 +67,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -74,10 +77,12 @@ import org.mockito.stubbing.Answer;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -88,6 +93,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -970,6 +976,122 @@ public class AsyncWaitOperatorTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that the AysncWaitOperator can restart if checkpointed queue 
was full.
+        *
+        * <p>See FLINK-7949
+        */
+       @Test(timeout = 10000)
+       public void testRestartWithFullQueue() throws Exception {
+               int capacity = 10;
+
+               // 1. create the snapshot which contains capacity + 1 elements
+               final CompletableFuture<Void> trigger = new 
CompletableFuture<>();
+               final ControllableAsyncFunction<Integer> 
controllableAsyncFunction = new ControllableAsyncFunction<>(trigger);
+
+               final OneInputStreamOperatorTestHarness<Integer, Integer> 
snapshotHarness = new OneInputStreamOperatorTestHarness<>(
+                       new AsyncWaitOperator<>(
+                               controllableAsyncFunction, // the 
NoOpAsyncFunction is like a blocking function
+                               1000L,
+                               capacity,
+                               AsyncDataStream.OutputMode.ORDERED),
+                       IntSerializer.INSTANCE);
+
+               snapshotHarness.open();
+
+               final OperatorStateHandles snapshot;
+
+               final ArrayList<Integer> expectedOutput = new 
ArrayList<>(capacity + 1);
+
+               try {
+                       synchronized (snapshotHarness.getCheckpointLock()) {
+                               for (int i = 0; i < capacity; i++) {
+                                       snapshotHarness.processElement(i, 0L);
+                                       expectedOutput.add(i);
+                               }
+                       }
+
+                       expectedOutput.add(capacity);
+
+                       final OneShotLatch lastElement = new OneShotLatch();
+
+                       final CheckedThread lastElementWriter = new 
CheckedThread() {
+                               @Override
+                               public void go() throws Exception {
+                                       synchronized 
(snapshotHarness.getCheckpointLock()) {
+                                               lastElement.trigger();
+                                               
snapshotHarness.processElement(capacity, 0L);
+                                       }
+                               }
+                       };
+
+                       lastElementWriter.start();
+
+                       lastElement.await();
+
+                       synchronized (snapshotHarness.getCheckpointLock()) {
+                               // execute the snapshot within the checkpoint 
lock, because then it is guaranteed
+                               // that the lastElementWriter has written the 
exceeding element
+                               snapshot = snapshotHarness.snapshot(0L, 0L);
+                       }
+
+                       // trigger the computation to make the close call finish
+                       trigger.complete(null);
+               } finally {
+                       synchronized (snapshotHarness.getCheckpointLock()) {
+                               snapshotHarness.close();
+                       }
+               }
+
+               // 2. restore the snapshot and check that we complete
+               final OneInputStreamOperatorTestHarness<Integer, Integer> 
recoverHarness = new OneInputStreamOperatorTestHarness<>(
+                       new AsyncWaitOperator<>(
+                               new 
ControllableAsyncFunction<>(CompletableFuture.completedFuture(null)),
+                               1000L,
+                               capacity,
+                               AsyncDataStream.OutputMode.ORDERED),
+                       IntSerializer.INSTANCE);
+
+               recoverHarness.initializeState(snapshot);
+
+               synchronized (recoverHarness.getCheckpointLock()) {
+                       recoverHarness.open();
+               }
+
+               synchronized (recoverHarness.getCheckpointLock()) {
+                       recoverHarness.close();
+               }
+
+               final ConcurrentLinkedQueue<Object> output = 
recoverHarness.getOutput();
+
+               assertThat(output.size(), Matchers.equalTo(capacity + 1));
+
+               final ArrayList<Integer> outputElements = new 
ArrayList<>(capacity + 1);
+
+               for (int i = 0; i < capacity + 1; i++) {
+                       StreamRecord<Integer> streamRecord = 
((StreamRecord<Integer>) output.poll());
+                       outputElements.add(streamRecord.getValue());
+               }
+
+               assertThat(outputElements, Matchers.equalTo(expectedOutput));
+       }
+
+       private static class ControllableAsyncFunction<IN> implements 
AsyncFunction<IN, IN> {
+
+               private static final long serialVersionUID = 
-4214078239267288636L;
+
+               private transient CompletableFuture<Void> trigger;
+
+               private ControllableAsyncFunction(CompletableFuture<Void> 
trigger) {
+                       this.trigger = Preconditions.checkNotNull(trigger);
+               }
+
+               @Override
+               public void asyncInvoke(IN input, ResultFuture<IN> 
resultFuture) throws Exception {
+                       trigger.thenAccept(v -> 
resultFuture.complete(Collections.singleton(input)));
+               }
+       }
+
        private static class NoOpAsyncFunction<IN, OUT> implements 
AsyncFunction<IN, OUT> {
                private static final long serialVersionUID = 
-3060481953330480694L;
 
@@ -978,5 +1100,4 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        // no op
                }
        }
-
 }

Reply via email to