Repository: beam Updated Branches: refs/heads/master f1ea8f951 -> eaf4450f2
[BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource fix javadoc for BoundedSourceWrapper Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2344e94 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2344e94 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2344e94 Branch: refs/heads/master Commit: c2344e944b25d884f25375ea7fce3a9c203cdb9a Parents: a93e218 Author: Alexey Diomin <[email protected]> Authored: Thu Jan 12 10:44:43 2017 +0400 Committer: Alexey Diomin <[email protected]> Committed: Thu Jan 12 15:40:37 2017 +0400 ---------------------------------------------------------------------- .../streaming/io/BoundedSourceWrapper.java | 2 +- .../streaming/io/UnboundedSourceWrapper.java | 2 +- .../streaming/UnboundedSourceWrapperTest.java | 464 ++++++++++--------- 3 files changed, 250 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index df49a49..909cb0e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -35,7 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source. + * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source. */ public class BoundedSourceWrapper<OutputT> extends RichParallelSourceFunction<WindowedValue<OutputT>> http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index af955ba..68746b2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -143,7 +143,7 @@ public class UnboundedSourceWrapper< } else { Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder = - SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>>() { + (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() { }); checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder)); http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 9e8261a..b0be98b 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -46,259 +46,291 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.InstantiationUtil; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; /** * Tests for {@link UnboundedSourceWrapper}. */ -@RunWith(Parameterized.class) +@RunWith(Enclosed.class) public class UnboundedSourceWrapperTest { - private final int numTasks; - private final int numSplits; + /** + * Parameterized tests. + */ + @RunWith(Parameterized.class) + public static class UnboundedSourceWrapperTestWithParams { + private final int numTasks; + private final int numSplits; + + public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) { + this.numTasks = numTasks; + this.numSplits = numSplits; + } - public UnboundedSourceWrapperTest(int numTasks, int numSplits) { - this.numTasks = numTasks; - this.numSplits = numSplits; - } + @Parameterized.Parameters + public static Collection<Object[]> data() { + /* + * Parameters for initializing the tests: + * {numTasks, numSplits} + * The test currently assumes powers of two for some assertions. + */ + return Arrays.asList(new Object[][]{ + {1, 1}, {1, 2}, {1, 4}, + {2, 1}, {2, 2}, {2, 4}, + {4, 1}, {4, 2}, {4, 4} + }); + } - @Parameterized.Parameters - public static Collection<Object[]> data() { - /* - * Parameters for initializing the tests: - * {numTasks, numSplits} - * The test currently assumes powers of two for some assertions. + /** + * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. + * If numSplits > numTasks the source has one source will manage multiple readers. */ - return Arrays.asList(new Object[][] { - {1, 1}, {1, 2}, {1, 4}, - {2, 1}, {2, 2}, {2, 4}, - {4, 1}, {4, 2}, {4, 4} - }); - } + @Test + public void testReaders() throws Exception { + final int numElements = 20; + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // this source will emit exactly NUM_ELEMENTS across all parallel readers, + // afterwards it will stall. We check whether we also receive NUM_ELEMENTS + // elements later. + TestCountingSource source = new TestCountingSource(numElements); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>(options, source, numSplits); + + assertEquals(numSplits, flinkWrapper.getSplitSources().size()); + + StreamSource<WindowedValue< + KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); + + setupSourceOperator(sourceOperator, numTasks); + + try { + sourceOperator.open(); + sourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { + } - /** - * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source. - * If numSplits > numTasks the source has one source will manage multiple readers. - */ - @Test - public void testReaders() throws Exception { - final int numElements = 20; - final Object checkpointLock = new Object(); - PipelineOptions options = PipelineOptionsFactory.create(); - - // this source will emit exactly NUM_ELEMENTS across all parallel readers, - // afterwards it will stall. We check whether we also receive NUM_ELEMENTS - // elements later. - TestCountingSource source = new TestCountingSource(numElements); - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); - - assertEquals(numSplits, flinkWrapper.getSplitSources().size()); - - StreamSource<WindowedValue< - KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - - setupSourceOperator(sourceOperator, numTasks); - - try { - sourceOperator.open(); - sourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; - - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - - count++; - if (count >= numElements) { - throw new SuccessException(); + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { + + count++; + if (count >= numElements) { + throw new SuccessException(); + } } - } - @Override - public void close() { + @Override + public void close() { - } - }); - } catch (SuccessException e) { + } + }); + } catch (SuccessException e) { - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); + assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); - // success - return; + // success + return; + } + fail("Read terminated without producing expected number of outputs"); } - fail("Read terminated without producing expected number of outputs"); - } - /** - * Verify that snapshot/restore work as expected. We bring up a source and cancel - * after seeing a certain number of elements. Then we snapshot that source, - * bring up a completely new source that we restore from the snapshot and verify - * that we see all expected elements in the end. - */ - @Test - public void testRestore() throws Exception { - final int numElements = 20; - final Object checkpointLock = new Object(); - PipelineOptions options = PipelineOptionsFactory.create(); - - // this source will emit exactly NUM_ELEMENTS across all parallel readers, - // afterwards it will stall. We check whether we also receive NUM_ELEMENTS - // elements later. - TestCountingSource source = new TestCountingSource(numElements); - UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); - - assertEquals(numSplits, flinkWrapper.getSplitSources().size()); - - StreamSource< - WindowedValue<KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - - setupSourceOperator(sourceOperator, numTasks); - - final Set<KV<Integer, Integer>> emittedElements = new HashSet<>(); - - boolean readFirstBatchOfElements = false; - - try { - sourceOperator.open(); - sourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; - - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - - emittedElements.add(windowedValueStreamRecord.getValue().getValue()); - count++; - if (count >= numElements / 2) { - throw new SuccessException(); + /** + * Verify that snapshot/restore work as expected. We bring up a source and cancel + * after seeing a certain number of elements. Then we snapshot that source, + * bring up a completely new source that we restore from the snapshot and verify + * that we see all expected elements in the end. + */ + @Test + public void testRestore() throws Exception { + final int numElements = 20; + final Object checkpointLock = new Object(); + PipelineOptions options = PipelineOptionsFactory.create(); + + // this source will emit exactly NUM_ELEMENTS across all parallel readers, + // afterwards it will stall. We check whether we also receive NUM_ELEMENTS + // elements later. + TestCountingSource source = new TestCountingSource(numElements); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>(options, source, numSplits); + + assertEquals(numSplits, flinkWrapper.getSplitSources().size()); + + StreamSource< + WindowedValue<KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); + + setupSourceOperator(sourceOperator, numTasks); + + final Set<KV<Integer, Integer>> emittedElements = new HashSet<>(); + + boolean readFirstBatchOfElements = false; + + try { + sourceOperator.open(); + sourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { } - } - @Override - public void close() { + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - } - }); - } catch (SuccessException e) { - // success - readFirstBatchOfElements = true; - } + emittedElements.add(windowedValueStreamRecord.getValue().getValue()); + count++; + if (count >= numElements / 2) { + throw new SuccessException(); + } + } + + @Override + public void close() { + + } + }); + } catch (SuccessException e) { + // success + readFirstBatchOfElements = true; + } + + assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements); + + // draw a snapshot + byte[] snapshot = flinkWrapper.snapshotState(0, 0); + + // test that finalizeCheckpoint on CheckpointMark is called + final ArrayList<Integer> finalizeList = new ArrayList<>(); + TestCountingSource.setFinalizeTracker(finalizeList); + flinkWrapper.notifyCheckpointComplete(0); + assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size()); + + // create a completely new source but restore from the snapshot + TestCountingSource restoredSource = new TestCountingSource(numElements); + UnboundedSourceWrapper< + KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = + new UnboundedSourceWrapper<>(options, restoredSource, numSplits); + + assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); + + StreamSource< + WindowedValue<KV<Integer, Integer>>, + UnboundedSourceWrapper< + KV<Integer, Integer>, + TestCountingSource.CounterMark>> restoredSourceOperator = + new StreamSource<>(restoredFlinkWrapper); + + setupSourceOperator(restoredSourceOperator, numTasks); + + // restore snapshot + restoredFlinkWrapper.restoreState(snapshot); + + boolean readSecondBatchOfElements = false; + + // run again and verify that we see the other elements + try { + restoredSourceOperator.open(); + restoredSourceOperator.run(checkpointLock, + new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { + private int count = 0; + + @Override + public void emitWatermark(Watermark watermark) { + } - assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements); - - // draw a snapshot - byte[] snapshot = flinkWrapper.snapshotState(0, 0); - - // test that finalizeCheckpoint on CheckpointMark is called - final ArrayList<Integer> finalizeList = new ArrayList<>(); - TestCountingSource.setFinalizeTracker(finalizeList); - flinkWrapper.notifyCheckpointComplete(0); - assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size()); - - // create a completely new source but restore from the snapshot - TestCountingSource restoredSource = new TestCountingSource(numElements); - UnboundedSourceWrapper< - KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, restoredSource, numSplits); - - assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); - - StreamSource< - WindowedValue<KV<Integer, Integer>>, - UnboundedSourceWrapper< - KV<Integer, Integer>, - TestCountingSource.CounterMark>> restoredSourceOperator = - new StreamSource<>(restoredFlinkWrapper); - - setupSourceOperator(restoredSourceOperator, numTasks); - - // restore snapshot - restoredFlinkWrapper.restoreState(snapshot); - - boolean readSecondBatchOfElements = false; - - // run again and verify that we see the other elements - try { - restoredSourceOperator.open(); - restoredSourceOperator.run(checkpointLock, - new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { - private int count = 0; - - @Override - public void emitWatermark(Watermark watermark) { - } - - @Override - public void collect( - StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { - emittedElements.add(windowedValueStreamRecord.getValue().getValue()); - count++; - if (count >= numElements / 2) { - throw new SuccessException(); + @Override + public void collect( + StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) { + emittedElements.add(windowedValueStreamRecord.getValue().getValue()); + count++; + if (count >= numElements / 2) { + throw new SuccessException(); + } } - } - @Override - public void close() { + @Override + public void close() { - } - }); - } catch (SuccessException e) { - // success - readSecondBatchOfElements = true; - } + } + }); + } catch (SuccessException e) { + // success + readSecondBatchOfElements = true; + } - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); + assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); - assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); + assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); - // verify that we saw all NUM_ELEMENTS elements - assertTrue(emittedElements.size() == numElements); - } + // verify that we saw all NUM_ELEMENTS elements + assertTrue(emittedElements.size() == numElements); + } - @SuppressWarnings("unchecked") - private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) { - ExecutionConfig executionConfig = new ExecutionConfig(); - StreamConfig cfg = new StreamConfig(new Configuration()); + @SuppressWarnings("unchecked") + private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) { + ExecutionConfig executionConfig = new ExecutionConfig(); + StreamConfig cfg = new StreamConfig(new Configuration()); - cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); + cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); - Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0); + Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0); - StreamTask<?, ?> mockTask = mock(StreamTask.class); - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(new Object()); - when(mockTask.getConfiguration()).thenReturn(cfg); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getAccumulatorMap()) - .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); + StreamTask<?, ?> mockTask = mock(StreamTask.class); + when(mockTask.getName()).thenReturn("Mock Task"); + when(mockTask.getCheckpointLock()).thenReturn(new Object()); + when(mockTask.getConfiguration()).thenReturn(cfg); + when(mockTask.getEnvironment()).thenReturn(env); + when(mockTask.getExecutionConfig()).thenReturn(executionConfig); + when(mockTask.getAccumulatorMap()) + .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); - operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class)); + operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); + } + + /** + * A special {@link RuntimeException} that we throw to signal that the test was successful. + */ + private static class SuccessException extends RuntimeException { + } } /** - * A special {@link RuntimeException} that we throw to signal that the test was successful. + * Not parameterized tests. */ - private static class SuccessException extends RuntimeException {} + public static class BasicTest { + + /** + * Check serialization a {@link UnboundedSourceWrapper}. + */ + @Test + public void testSerialization() throws Exception { + final int parallelism = 1; + final int numElements = 20; + PipelineOptions options = PipelineOptionsFactory.create(); + + TestCountingSource source = new TestCountingSource(numElements); + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = + new UnboundedSourceWrapper<>(options, source, parallelism); + + InstantiationUtil.serializeObject(flinkWrapper); + } + + } }
