fix potential NPE in checkpointing of UnboundedSourceWrapper This moves all the initialization code to the open() method which ensures that no snapshot can occur before the state has been initialized correctly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3f2a977 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3f2a977 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3f2a977 Branch: refs/heads/master Commit: f3f2a9779a5c355a5902a783f3e72609ff71717f Parents: cf14e80 Author: Maximilian Michels <[email protected]> Authored: Fri Sep 16 18:42:43 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Wed Sep 28 11:14:21 2016 +0200 ---------------------------------------------------------------------- .../streaming/io/UnboundedSourceWrapper.java | 39 ++++++++++++-------- .../streaming/UnboundedSourceWrapperTest.java | 3 ++ 2 files changed, 27 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 64cf703..68a83e8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -91,6 +91,7 @@ public class UnboundedSourceWrapper< private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders; /** + * Flag to indicate whether the source is running. * Initialize here and not in run() to prevent races where we cancel a job before run() is * ever called or run() is called after cancel(). */ @@ -154,19 +155,17 @@ public class UnboundedSourceWrapper< splitSources = source.generateInitialSplits(parallelism, pipelineOptions); } - @Override - public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { - if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { - throw new RuntimeException( - "Cannot emit watermarks, this hints at a misconfiguration/bug."); - } - context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx; + /** + * Initialize and restore state before starting execution of the source. + */ + @Override + public void open(Configuration parameters) throws Exception { runtimeContext = (StreamingRuntimeContext) getRuntimeContext(); // figure out which split sources we're responsible for - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); + int subtaskIndex = runtimeContext.getIndexOfThisSubtask(); + int numSubtasks = runtimeContext.getNumberOfParallelSubtasks(); localSplitSources = new ArrayList<>(); localReaders = new ArrayList<>(); @@ -183,12 +182,12 @@ public class UnboundedSourceWrapper< new Function< KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>, UnboundedSource<OutputT, CheckpointMarkT>>() { - @Override - public UnboundedSource<OutputT, CheckpointMarkT> apply( - KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) { - return input.getKey(); - } - }); + @Override + public UnboundedSource<OutputT, CheckpointMarkT> apply( + KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> input) { + return input.getKey(); + } + }); for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored: restoredState) { @@ -215,6 +214,16 @@ public class UnboundedSourceWrapper< subtaskIndex, numSubtasks, localSplitSources); + } + + @Override + public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception { + if (!(ctx instanceof StreamSource.ManualWatermarkContext)) { + throw new RuntimeException( + "Cannot emit watermarks, this hints at a misconfiguration/bug."); + } + + context = (StreamSource.ManualWatermarkContext<WindowedValue<OutputT>>) ctx; if (localReaders.size() == 0) { // do nothing, but still look busy ... http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3f2a977/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index e728653..9e8261a 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -106,6 +106,7 @@ public class UnboundedSourceWrapperTest { setupSourceOperator(sourceOperator, numTasks); try { + sourceOperator.open(); sourceOperator.run(checkpointLock, new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { private int count = 0; @@ -173,6 +174,7 @@ public class UnboundedSourceWrapperTest { boolean readFirstBatchOfElements = false; try { + sourceOperator.open(); sourceOperator.run(checkpointLock, new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { private int count = 0; @@ -237,6 +239,7 @@ public class UnboundedSourceWrapperTest { // run again and verify that we see the other elements try { + restoredSourceOperator.open(); restoredSourceOperator.run(checkpointLock, new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() { private int count = 0;
