zhijiangW commented on a change in pull request #9926: 
[FLINK-14004][runtime,test] Add test coverage for stateful SourceReaderOperator 
implementation
URL: https://github.com/apache/flink/pull/9926#discussion_r336398580
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceReaderStreamTaskTest.java
 ##########
 @@ -18,71 +18,191 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.SourceReaderOperator;
 import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Test;
 
+import java.util.Iterator;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static 
org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals;
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for verifying that the {@link SourceReaderOperator} as a task input 
can be integrated
  * well with {@link 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor}.
  */
 public class SourceReaderStreamTaskTest {
-
+       /**
+        * Tests that the stream operator can snapshot and restore the operator 
state of chained
+        * operators.
+        */
        @Test
-       public void testSourceOutputCorrectness() throws Exception {
+       public void testSnapshotAndRestore() throws Exception {
                final int numRecords = 10;
+
+               TaskStateSnapshot taskStateSnapshot = 
executeAndWaitForCheckpoint(
+                       numRecords,
+                       1,
+                       IntStream.range(0, numRecords),
+                       Optional.empty());
+
+               executeAndWaitForCheckpoint(
+                       numRecords,
+                       2,
+                       IntStream.range(numRecords, 2 * numRecords),
+                       Optional.of(taskStateSnapshot));
+       }
+
+       private TaskStateSnapshot executeAndWaitForCheckpoint(
+                       int numRecords,
+                       long checkpointId,
+                       IntStream expectedOutputStream,
+                       Optional<TaskStateSnapshot> initialSnapshot) throws 
Exception {
+
+               final LinkedBlockingQueue<Object> expectedOutput = new 
LinkedBlockingQueue<>();
+               expectedOutputStream.forEach(record -> expectedOutput.add(new 
StreamRecord<>(record)));
+               CheckpointOptions checkpointOptions = 
CheckpointOptions.forCheckpointWithDefaultLocation();
+               expectedOutput.add(new CheckpointBarrier(checkpointId, 
checkpointId, checkpointOptions));
+
+               final Deadline deadline = new FiniteDuration(30, 
TimeUnit.SECONDS).fromNow();
+               final StreamTaskTestHarness<Integer> testHarness = 
createTestHarness(numRecords);
+               if (initialSnapshot.isPresent()) {
+                       testHarness.setTaskStateSnapshot(checkpointId, 
initialSnapshot.get());
+               }
+
+               TestTaskStateManager taskStateManager = 
testHarness.taskStateManager;
+               OneShotLatch waitForAcknowledgeLatch = new OneShotLatch();
+
+               taskStateManager.setWaitForReportLatch(waitForAcknowledgeLatch);
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning(deadline.timeLeft().toMillis());
 
 Review comment:
   Thanks for the fix. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to