[BEAM-619] extend test case to be parameterized - extend test case with number of tasks and splits parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4afd25a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4afd25a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4afd25a7 Branch: refs/heads/master Commit: 4afd25a7a85a24ff0212a4791661d3c5e105662b Parents: 145ad47 Author: Maximilian Michels <m...@apache.org> Authored: Wed Sep 7 14:23:12 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Fri Sep 9 16:09:44 2016 +0200 ---------------------------------------------------------------------- .../streaming/io/UnboundedSourceWrapper.java | 8 ++ .../streaming/UnboundedSourceWrapperTest.java | 113 +++++++------------ 2 files changed, 50 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 2cd06ed..a62a754 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -397,4 +397,12 @@ public class UnboundedSourceWrapper< public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() { return splitSources; } + + /** + * Visible so that we can check this in tests. Must not be used for anything else. + */ + @VisibleForTesting + public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() { + return localSplitSources; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 73124a9..0cc584e 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -23,6 +23,8 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -44,78 +46,43 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Tests for {@link UnboundedSourceWrapper}. */ +@RunWith(Parameterized.class) public class UnboundedSourceWrapperTest { - /** - * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since we - * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel subtask. - */ - @Test - public void testWithOneReader() throws Exception { - final int numElements = 20; - final Object checkpointLock = new Object(); - PipelineOptions options = PipelineOptionsFactory.create(); - - // this source will emit exactly NUM_ELEMENTS across all parallel readers, - // afterwards it will stall. We check whether we also receive NUM_ELEMENTS - // elements later. - TestCountingSource source = new TestCountingSource(numElements); - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, 1); - - assertEquals(1, flinkWrapper.getSplitSources().size()); - - StreamSource< - WindowedValue<KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - - setupSourceOperator(sourceOperator); - - - try { - sourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; + private final int numTasks; + private final int numSplits; - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - - count++; - if (count >= numElements) { - throw new SuccessException(); - } - } - - @Override - public void close() { + public UnboundedSourceWrapperTest(int numTasks, int numSplits) { + this.numTasks = numTasks; + this.numSplits = numSplits; + } - } - }); - } catch (SuccessException e) { - // success - } catch (Exception e) { - fail("We caught " + e); - } + @Parameterized.Parameters + public static Collection<Object[]> data() { + /* + * Parameters for initializing the tests: + * {numTasks, numSplits} + * The test currently assumes powers of two for some assertions. + */ + return Arrays.asList(new Object[][] { + {1, 1}, {1, 2}, {1, 4}, + {2, 1}, {2, 2}, {2, 4}, + {4, 1}, {4, 2}, {4, 4} + }); } /** - * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since we - * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel - * this means that one source will manage multiple readers. + * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. + * If numSplits > numTasks the source has one source will manage multiple readers. */ @Test - public void testWithMultipleReaders() throws Exception { + public void testReaders() throws Exception { final int numElements = 20; final Object checkpointLock = new Object(); PipelineOptions options = PipelineOptionsFactory.create(); @@ -125,9 +92,9 @@ public class UnboundedSourceWrapperTest { // elements later. TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, 4); + new UnboundedSourceWrapper<>(options, source, numSplits); - assertEquals(4, flinkWrapper.getSplitSources().size()); + assertEquals(numSplits, flinkWrapper.getSplitSources().size()); StreamSource<WindowedValue< KV<Integer, Integer>>, @@ -135,8 +102,7 @@ public class UnboundedSourceWrapperTest { KV<Integer, Integer>, TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(sourceOperator); - + setupSourceOperator(sourceOperator, numTasks); try { sourceOperator.run(checkpointLock, @@ -163,6 +129,9 @@ public class UnboundedSourceWrapperTest { } }); } catch (SuccessException e) { + + assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); + // success return; } @@ -186,9 +155,9 @@ public class UnboundedSourceWrapperTest { // elements later. TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, 1); + new UnboundedSourceWrapper<>(options, source, numSplits); - assertEquals(1, flinkWrapper.getSplitSources().size()); + assertEquals(numSplits, flinkWrapper.getSplitSources().size()); StreamSource< WindowedValue<KV<Integer, Integer>>, @@ -196,7 +165,7 @@ public class UnboundedSourceWrapperTest { KV<Integer, Integer>, TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(sourceOperator); + setupSourceOperator(sourceOperator, numTasks); final Set<KV<Integer, Integer>> emittedElements = new HashSet<>(); @@ -241,9 +210,9 @@ public class UnboundedSourceWrapperTest { TestCountingSource restoredSource = new TestCountingSource(numElements); UnboundedSourceWrapper< KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, restoredSource, 1); + new UnboundedSourceWrapper<>(options, restoredSource, numSplits); - assertEquals(1, restoredFlinkWrapper.getSplitSources().size()); + assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); StreamSource< WindowedValue<KV<Integer, Integer>>, @@ -252,7 +221,7 @@ public class UnboundedSourceWrapperTest { TestCountingSource.CounterMark>> restoredSourceOperator = new StreamSource<>(restoredFlinkWrapper); - setupSourceOperator(restoredSourceOperator); + setupSourceOperator(restoredSourceOperator, numTasks); // restore snapshot restoredFlinkWrapper.restoreState(snapshot); @@ -289,6 +258,8 @@ public class UnboundedSourceWrapperTest { readSecondBatchOfElements = true; } + assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); + assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); // verify that we saw all NUM_ELEMENTS elements @@ -296,13 +267,13 @@ public class UnboundedSourceWrapperTest { } @SuppressWarnings("unchecked") - private static <T> void setupSourceOperator(StreamSource<T, ?> operator) { + private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) { ExecutionConfig executionConfig = new ExecutionConfig(); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); - Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0); + Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0); StreamTask<?, ?> mockTask = mock(StreamTask.class); when(mockTask.getName()).thenReturn("Mock Task");