AHeise commented on a change in pull request #10435: [FLINK-13955][runtime] 
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r370032317
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 ##########
 @@ -31,214 +32,125 @@
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.repackageState;
+import static 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.repartitionOperatorState;
 
 /**
  * Test processing files during rescaling.
  */
 public class ContinuousFileProcessingRescalingTest {
 
        private final int maxParallelism = 10;
+       private final int sizeOfSplit = 20;
 
+       /**
+       * Simulates the scenario of scaling down from 2 to 1 instances.
+       */
        @Test
        public void testReaderScalingDown() throws Exception {
-               // simulates the scenario of scaling down from 2 to 1 instances
-
-               final OneShotLatch waitingLatch = new OneShotLatch();
-
-               // create the first instance and let it process the first split 
till element 5
-               final OneShotLatch triggerLatch1 = new OneShotLatch();
-               BlockingFileInputFormat format1 = new BlockingFileInputFormat(
-                       triggerLatch1, waitingLatch, new Path("test"), 20, 5);
-               FileInputSplit[] splits = format1.createInputSplits(2);
-
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness1 = getTestHarness(format1, 2, 0);
-               testHarness1.open();
-               testHarness1.processElement(new 
StreamRecord<>(getTimestampedSplit(0, splits[0])));
-
-               // wait until its arrives to element 5
-               if (!triggerLatch1.isTriggered()) {
-                       triggerLatch1.await();
-               }
+               HarnessWithFormat[] beforeRescale = buildAndStart(new int[]{5, 
15});
 
-               // create the second instance and let it process the second 
split till element 15
-               final OneShotLatch triggerLatch2 = new OneShotLatch();
-               BlockingFileInputFormat format2 = new BlockingFileInputFormat(
-                       triggerLatch2, waitingLatch, new Path("test"), 20, 15);
+               HarnessWithFormat afterRescale = buildAndStart(1, 0, 5, 
snapshotAndMergeState(beforeRescale));
+               afterRescale.awaitEverythingProcessed();
 
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness2 = getTestHarness(format2, 2, 1);
-               testHarness2.open();
-               testHarness2.processElement(new 
StreamRecord<>(getTimestampedSplit(0, splits[1])));
-
-               // wait until its arrives to element 15
-               if (!triggerLatch2.isTriggered()) {
-                       triggerLatch2.await();
-               }
-
-               // 1) clear the outputs of the two previous instances so that
-               // we can compare their newly produced outputs with the merged 
one
-               testHarness1.getOutput().clear();
-               testHarness2.getOutput().clear();
-
-               // 2) take the snapshots from the previous instances and merge 
them
-               // into a new one which will be then used to initialize a third 
instance
-               OperatorSubtaskState mergedState = 
AbstractStreamOperatorTestHarness.
-                       repackageState(
-                               testHarness2.snapshot(0, 0),
-                               testHarness1.snapshot(0, 0)
-                       );
-
-               // 3) and repartition to get the initialized state when scaling 
down.
-               OperatorSubtaskState initState =
-                       
AbstractStreamOperatorTestHarness.repartitionOperatorState(mergedState, 
maxParallelism, 2, 1, 0);
-
-               // create the third instance
-               final OneShotLatch wLatch = new OneShotLatch();
-               final OneShotLatch tLatch = new OneShotLatch();
-
-               BlockingFileInputFormat format = new 
BlockingFileInputFormat(wLatch, tLatch, new Path("test"), 20, 5);
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness = getTestHarness(format, 1, 0);
-
-               // initialize the state of the new operator with the 
constructed by
-               // combining the partial states of the instances above.
-               testHarness.initializeState(initState);
-               testHarness.open();
-
-               // now restart the waiting operators
-               wLatch.trigger();
-               tLatch.trigger();
-               waitingLatch.trigger();
-
-               // and wait for the processing to finish
-               synchronized (testHarness1.getCheckpointLock()) {
-                       testHarness1.close();
-               }
-               synchronized (testHarness2.getCheckpointLock()) {
-                       testHarness2.close();
+               for (HarnessWithFormat i : beforeRescale) {
+                       i.getHarness().getOutput().clear(); // we only want 
output from the 2nd chunk (after the "checkpoint")
+                       i.awaitEverythingProcessed();
                }
-               synchronized (testHarness.getCheckpointLock()) {
-                       testHarness.close();
-               }
-
-               Queue<Object> expectedResult = new ArrayDeque<>();
-               putElementsInQ(expectedResult, testHarness1.getOutput());
-               putElementsInQ(expectedResult, testHarness2.getOutput());
-
-               Queue<Object> actualResult = new ArrayDeque<>();
-               putElementsInQ(actualResult, testHarness.getOutput());
 
-               Assert.assertEquals(20, actualResult.size());
-               Assert.assertArrayEquals(expectedResult.toArray(), 
actualResult.toArray());
+               Assert.assertEquals(collectOutput(beforeRescale), 
collectOutput(afterRescale));
        }
 
+       /**
+        * Simulates the scenario of scaling up from 1 to 2 instances.
+        */
        @Test
        public void testReaderScalingUp() throws Exception {
-               // simulates the scenario of scaling up from 1 to 2 instances
+               HarnessWithFormat beforeRescale = buildAndStart(1, 0, 5, null, 
buildSplits(2));
 
-               final OneShotLatch waitingLatch1 = new OneShotLatch();
-               final OneShotLatch triggerLatch1 = new OneShotLatch();
+               OperatorSubtaskState snapshot = 
beforeRescale.getHarness().snapshot(0, 0);
 
-               BlockingFileInputFormat format1 = new BlockingFileInputFormat(
-                       triggerLatch1, waitingLatch1, new Path("test"), 20, 5);
-               FileInputSplit[] splits = format1.createInputSplits(2);
+               HarnessWithFormat afterRescale0 = buildAndStart(2, 0, 15, 
repartitionOperatorState(snapshot, maxParallelism, 1, 2, 0));
+               HarnessWithFormat afterRescale1 = buildAndStart(2, 1, 15, 
repartitionOperatorState(snapshot, maxParallelism, 1, 2, 1));
 
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness1 = getTestHarness(format1, 1, 0);
-               testHarness1.open();
+               beforeRescale.getHarness().getOutput().clear();
 
-               testHarness1.processElement(new 
StreamRecord<>(getTimestampedSplit(0, splits[0])));
-               testHarness1.processElement(new 
StreamRecord<>(getTimestampedSplit(1, splits[1])));
-
-               // wait until its arrives to element 5
-               if (!triggerLatch1.isTriggered()) {
-                       triggerLatch1.await();
+               for (HarnessWithFormat i : Arrays.asList(beforeRescale, 
afterRescale0, afterRescale1)) {
+                       i.awaitEverythingProcessed();
                }
 
-               OperatorSubtaskState snapshot = testHarness1.snapshot(0, 0);
-
-               // this will be the init state for new instance-0
-               OperatorSubtaskState initState1 =
-                       
AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 
maxParallelism, 1, 2, 0);
-
-               // this will be the init state for new instance-1
-               OperatorSubtaskState initState2 =
-                       
AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 
maxParallelism, 1, 2, 1);
-
-               // 1) clear the output of instance so that we can compare it 
with one created by the new instances, and
-               // 2) let the operator process the rest of its state
-               testHarness1.getOutput().clear();
-               waitingLatch1.trigger();
-
-               // create the second instance and let it process the second 
split till element 15
-               final OneShotLatch triggerLatch2 = new OneShotLatch();
-               final OneShotLatch waitingLatch2 = new OneShotLatch();
-
-               BlockingFileInputFormat format2 = new BlockingFileInputFormat(
-                       triggerLatch2, waitingLatch2, new Path("test"), 20, 15);
-
-               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> testHarness2 = getTestHarness(format2, 2, 0);
-               testHarness2.setup();
-               testHarness2.initializeState(initState1);
-               testHarness2.open();
+               Assert.assertEquals(collectOutput(beforeRescale), 
collectOutput(afterRescale0, afterRescale1));
+       }
 
-               BlockingFileInputFormat format3 = new BlockingFileInputFormat(
-                       triggerLatch2, waitingLatch2, new Path("test"), 20, 15);
+       private HarnessWithFormat[] buildAndStart(int[] 
elementsBeforeCheckpoint) throws Exception {
 
 Review comment:
   nit: use vararg to increase readability in test?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to