Repository: incubator-beam Updated Branches: refs/heads/master a96ea98a4 -> b6205ffa3
[BEAM-619] keep track of local split sources in UnboundedSourceWrapper Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/145ad47d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/145ad47d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/145ad47d Branch: refs/heads/master Commit: 145ad47d9f945f816be7a91001cdf7cb3b6a7fac Parents: be689df Author: Maximilian Michels <m...@apache.org> Authored: Wed Sep 7 13:07:15 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Sep 7 13:15:54 2016 +0200 ---------------------------------------------------------------------- .../streaming/io/UnboundedSourceWrapper.java | 79 +++++++++++--------- 1 file changed, 43 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/145ad47d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 8647322..2cd06ed 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -58,7 +58,7 @@ public class UnboundedSourceWrapper< private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); /** - * Keep the options so that we can initialize the readers. + * Keep the options so that we can initialize the localReaders. */ private final SerializedPipelineOptions serializedOptions; @@ -72,13 +72,19 @@ public class UnboundedSourceWrapper< * The split sources. We split them in the constructor to ensure that all parallel * sources are consistent about the split sources. */ - private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources; + private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources; /** + * The local split sources. Assigned at runtime when the wrapper is executed in parallel. + */ + private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources; + + /** + * The local split readers. Assigned at runtime when the wrapper is executed in parallel. * Make it a field so that we can access it in {@link #trigger(long)} for * emitting watermarks. */ - private transient List<UnboundedSource.UnboundedReader<OutputT>> readers; + private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders; /** * Initialize here and not in run() to prevent races where we cancel a job before run() is @@ -149,26 +155,15 @@ public class UnboundedSourceWrapper< int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); - List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>(); - - for (int i = 0; i < splitSources.size(); i++) { - if (i % numSubtasks == subtaskIndex) { - localSources.add(splitSources.get(i)); - } - } + localSplitSources = new ArrayList<>(); + localReaders = new ArrayList<>(); - LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", - subtaskIndex, - numSubtasks, - localSources); - - readers = new ArrayList<>(); if (restoredState != null) { // restore the splitSources from the checkpoint to ensure consistent ordering // do it using a transform because otherwise we would have to do // unchecked casts - splitSources = Lists.transform( + localSplitSources = Lists.transform( restoredState, new Function< KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>, @@ -182,19 +177,31 @@ public class UnboundedSourceWrapper< for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored: restoredState) { - readers.add( + localReaders.add( restored.getKey().createReader( serializedOptions.getPipelineOptions(), restored.getValue())); } restoredState = null; } else { - // initialize readers from scratch - for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) { - readers.add(source.createReader(serializedOptions.getPipelineOptions(), null)); + // initialize localReaders and localSources from scratch + for (int i = 0; i < splitSources.size(); i++) { + if (i % numSubtasks == subtaskIndex) { + UnboundedSource<OutputT, CheckpointMarkT> source = + splitSources.get(i); + UnboundedSource.UnboundedReader<OutputT> reader = + source.createReader(serializedOptions.getPipelineOptions(), null); + localSplitSources.add(source); + localReaders.add(reader); + } } } - if (readers.size() == 0) { + LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}", + subtaskIndex, + numSubtasks, + localSplitSources); + + if (localReaders.size() == 0) { // do nothing, but still look busy ... // also, output a Long.MAX_VALUE watermark since we know that we're not // going to emit anything @@ -218,9 +225,9 @@ public class UnboundedSourceWrapper< } } } - } else if (readers.size() == 1) { + } else if (localReaders.size() == 1) { // the easy case, we just read from one reader - UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0); + UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0); boolean dataAvailable = reader.start(); if (dataAvailable) { @@ -239,25 +246,25 @@ public class UnboundedSourceWrapper< } } } else { - // a bit more complicated, we are responsible for several readers + // a bit more complicated, we are responsible for several localReaders // loop through them and sleep if none of them had any data - int numReaders = readers.size(); + int numReaders = localReaders.size(); int currentReader = 0; // start each reader and emit data if immediately available - for (UnboundedSource.UnboundedReader<OutputT> reader : readers) { + for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) { boolean dataAvailable = reader.start(); if (dataAvailable) { emitElement(ctx, reader); } } - // a flag telling us whether any of the readers had data + // a flag telling us whether any of the localReaders had data // if no reader had data, sleep for bit boolean hadData = false; while (isRunning) { - UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader); + UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader); boolean dataAvailable = reader.advance(); if (dataAvailable) { @@ -298,8 +305,8 @@ public class UnboundedSourceWrapper< @Override public void close() throws Exception { super.close(); - if (readers != null) { - for (UnboundedSource.UnboundedReader<OutputT> reader: readers) { + if (localReaders != null) { + for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) { reader.close(); } } @@ -324,9 +331,9 @@ public class UnboundedSourceWrapper< List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> checkpoints = new ArrayList<>(); - for (int i = 0; i < splitSources.size(); i++) { - UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i); - UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i); + for (int i = 0; i < localSplitSources.size(); i++) { + UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i); + UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i); @SuppressWarnings("unchecked") CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark(); @@ -357,9 +364,9 @@ public class UnboundedSourceWrapper< public void trigger(long timestamp) throws Exception { if (this.isRunning) { synchronized (context.getCheckpointLock()) { - // find minimum watermark over all readers + // find minimum watermark over all localReaders long watermarkMillis = Long.MAX_VALUE; - for (UnboundedSource.UnboundedReader<OutputT> reader: readers) { + for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) { Instant watermark = reader.getWatermark(); if (watermark != null) { watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);