AHeise commented on a change in pull request #10435: [FLINK-13955][runtime]
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r370035731
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
##########
@@ -31,214 +32,125 @@
import
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.repackageState;
+import static
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.repartitionOperatorState;
/**
* Test processing files during rescaling.
*/
public class ContinuousFileProcessingRescalingTest {
private final int maxParallelism = 10;
+ private final int sizeOfSplit = 20;
+ /**
+ * Simulates the scenario of scaling down from 2 to 1 instances.
+ */
@Test
public void testReaderScalingDown() throws Exception {
- // simulates the scenario of scaling down from 2 to 1 instances
-
- final OneShotLatch waitingLatch = new OneShotLatch();
-
- // create the first instance and let it process the first split
till element 5
- final OneShotLatch triggerLatch1 = new OneShotLatch();
- BlockingFileInputFormat format1 = new BlockingFileInputFormat(
- triggerLatch1, waitingLatch, new Path("test"), 20, 5);
- FileInputSplit[] splits = format1.createInputSplits(2);
-
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit,
String> testHarness1 = getTestHarness(format1, 2, 0);
- testHarness1.open();
- testHarness1.processElement(new
StreamRecord<>(getTimestampedSplit(0, splits[0])));
-
- // wait until its arrives to element 5
- if (!triggerLatch1.isTriggered()) {
- triggerLatch1.await();
- }
+ HarnessWithFormat[] beforeRescale = buildAndStart(new int[]{5,
15});
- // create the second instance and let it process the second
split till element 15
- final OneShotLatch triggerLatch2 = new OneShotLatch();
- BlockingFileInputFormat format2 = new BlockingFileInputFormat(
- triggerLatch2, waitingLatch, new Path("test"), 20, 15);
+ HarnessWithFormat afterRescale = buildAndStart(1, 0, 5,
snapshotAndMergeState(beforeRescale));
+ afterRescale.awaitEverythingProcessed();
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit,
String> testHarness2 = getTestHarness(format2, 2, 1);
- testHarness2.open();
- testHarness2.processElement(new
StreamRecord<>(getTimestampedSplit(0, splits[1])));
-
- // wait until its arrives to element 15
- if (!triggerLatch2.isTriggered()) {
- triggerLatch2.await();
- }
-
- // 1) clear the outputs of the two previous instances so that
- // we can compare their newly produced outputs with the merged
one
- testHarness1.getOutput().clear();
- testHarness2.getOutput().clear();
-
- // 2) take the snapshots from the previous instances and merge
them
- // into a new one which will be then used to initialize a third
instance
- OperatorSubtaskState mergedState =
AbstractStreamOperatorTestHarness.
- repackageState(
- testHarness2.snapshot(0, 0),
- testHarness1.snapshot(0, 0)
- );
-
- // 3) and repartition to get the initialized state when scaling
down.
- OperatorSubtaskState initState =
-
AbstractStreamOperatorTestHarness.repartitionOperatorState(mergedState,
maxParallelism, 2, 1, 0);
-
- // create the third instance
- final OneShotLatch wLatch = new OneShotLatch();
- final OneShotLatch tLatch = new OneShotLatch();
-
- BlockingFileInputFormat format = new
BlockingFileInputFormat(wLatch, tLatch, new Path("test"), 20, 5);
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit,
String> testHarness = getTestHarness(format, 1, 0);
-
- // initialize the state of the new operator with the
constructed by
- // combining the partial states of the instances above.
- testHarness.initializeState(initState);
- testHarness.open();
-
- // now restart the waiting operators
- wLatch.trigger();
- tLatch.trigger();
- waitingLatch.trigger();
-
- // and wait for the processing to finish
- synchronized (testHarness1.getCheckpointLock()) {
- testHarness1.close();
- }
- synchronized (testHarness2.getCheckpointLock()) {
- testHarness2.close();
+ for (HarnessWithFormat i : beforeRescale) {
+ i.getHarness().getOutput().clear(); // we only want
output from the 2nd chunk (after the "checkpoint")
+ i.awaitEverythingProcessed();
}
- synchronized (testHarness.getCheckpointLock()) {
- testHarness.close();
- }
-
- Queue<Object> expectedResult = new ArrayDeque<>();
- putElementsInQ(expectedResult, testHarness1.getOutput());
- putElementsInQ(expectedResult, testHarness2.getOutput());
-
- Queue<Object> actualResult = new ArrayDeque<>();
- putElementsInQ(actualResult, testHarness.getOutput());
- Assert.assertEquals(20, actualResult.size());
- Assert.assertArrayEquals(expectedResult.toArray(),
actualResult.toArray());
+ Assert.assertEquals(collectOutput(beforeRescale),
collectOutput(afterRescale));
}
+ /**
+ * Simulates the scenario of scaling up from 1 to 2 instances.
+ */
@Test
public void testReaderScalingUp() throws Exception {
- // simulates the scenario of scaling up from 1 to 2 instances
+ HarnessWithFormat beforeRescale = buildAndStart(1, 0, 5, null,
buildSplits(2));
- final OneShotLatch waitingLatch1 = new OneShotLatch();
- final OneShotLatch triggerLatch1 = new OneShotLatch();
+ OperatorSubtaskState snapshot =
beforeRescale.getHarness().snapshot(0, 0);
- BlockingFileInputFormat format1 = new BlockingFileInputFormat(
- triggerLatch1, waitingLatch1, new Path("test"), 20, 5);
- FileInputSplit[] splits = format1.createInputSplits(2);
+ HarnessWithFormat afterRescale0 = buildAndStart(2, 0, 15,
repartitionOperatorState(snapshot, maxParallelism, 1, 2, 0));
+ HarnessWithFormat afterRescale1 = buildAndStart(2, 1, 15,
repartitionOperatorState(snapshot, maxParallelism, 1, 2, 1));
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit,
String> testHarness1 = getTestHarness(format1, 1, 0);
- testHarness1.open();
+ beforeRescale.getHarness().getOutput().clear();
- testHarness1.processElement(new
StreamRecord<>(getTimestampedSplit(0, splits[0])));
- testHarness1.processElement(new
StreamRecord<>(getTimestampedSplit(1, splits[1])));
-
- // wait until its arrives to element 5
- if (!triggerLatch1.isTriggered()) {
- triggerLatch1.await();
+ for (HarnessWithFormat i : Arrays.asList(beforeRescale,
afterRescale0, afterRescale1)) {
Review comment:
nit: i is not a very nice variable name; would be okay for integer.
suggestions: harness
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services