Handle QueuedStream (for testing) and track sources upstream. Refactor according to changes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/705695eb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/705695eb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/705695eb Branch: refs/heads/master Commit: 705695eb726acf086915e610cb2304bd968e3682 Parents: fa31f18 Author: Sela <[email protected]> Authored: Sun Feb 12 18:32:06 2017 +0200 Committer: Sela <[email protected]> Committed: Mon Feb 20 11:30:14 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/TestSparkRunner.java | 11 ++- .../aggregators/AggregatorsAccumulator.java | 2 +- .../spark/translation/SparkContextFactory.java | 2 +- .../streaming/StreamingTransformTranslator.java | 92 +++++++++++++------- .../translation/streaming/UnboundedDataset.java | 29 ++++-- 5 files changed, 88 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index e770164..a634dd4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -22,6 +22,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; +import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; @@ -107,13 +109,14 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public SparkPipelineResult run(Pipeline pipeline) { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); - - // clear metrics singleton - SparkMetricsContainer.clear(); - SparkPipelineResult result = delegate.run(pipeline); result.waitUntilFinish(); + // clear state of Aggregators, Metrics and Watermarks. + AggregatorsAccumulator.clear(); + SparkMetricsContainer.clear(); + GlobalWatermarkHolder.clear(); + // make sure the test pipeline finished successfully. State resultState = result.getState(); assertThat( http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java index 1b49e91..a4dfda6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/AggregatorsAccumulator.java @@ -89,7 +89,7 @@ public class AggregatorsAccumulator { } @VisibleForTesting - static void clear() { + public static void clear() { synchronized (AggregatorsAccumulator.class) { instance = null; } http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 326838a..cdeddad 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -38,7 +38,7 @@ public final class SparkContextFactory { * {@code true} then the Spark context will be reused for beam pipelines. * This property should only be enabled for tests. */ - static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; + public static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext"; // Spark allows only one context for JVM so this can be static. private static JavaSparkContext sparkContext; http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index a49b959..9451df7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -105,9 +105,12 @@ final class StreamingTransformTranslator { return new TransformEvaluator<Read.Unbounded<T>>() { @Override public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) { - context.putDataset(transform, - new UnboundedDataset<>(SparkUnboundedSource.read(context.getStreamingContext(), - context.getRuntimeContext(), transform.getSource()))); + context.putDataset( + transform, + SparkUnboundedSource.read( + context.getStreamingContext(), + context.getRuntimeContext(), + transform.getSource())); } }; } @@ -134,6 +137,7 @@ final class StreamingTransformTranslator { // So we could end up with an unbounded unified DStream. final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>(); final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>(); + final List<Integer> streamingSources = new ArrayList<>(); for (TaggedPValue pv : pcs) { checkArgument( pv.getValue() instanceof PCollection, @@ -143,7 +147,9 @@ final class StreamingTransformTranslator { PCollection<T> pcol = (PCollection<T>) pv.getValue(); Dataset dataset = context.borrowDataset(pcol); if (dataset instanceof UnboundedDataset) { - dStreams.add(((UnboundedDataset<T>) dataset).getDStream()); + UnboundedDataset<T> unboundedDataset = (UnboundedDataset<T>) dataset; + streamingSources.addAll(unboundedDataset.getStreamingSources()); + dStreams.add(unboundedDataset.getDStream()); } else { rdds.add(((BoundedDataset<T>) dataset).getRDD()); } @@ -162,9 +168,9 @@ final class StreamingTransformTranslator { return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds); } }); - context.putDataset(transform, new UnboundedDataset<>(joined)); + context.putDataset(transform, new UnboundedDataset<>(joined, streamingSources)); } else { - context.putDataset(transform, new UnboundedDataset<>(unifiedStreams)); + context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, streamingSources)); } } }; @@ -177,8 +183,9 @@ final class StreamingTransformTranslator { @SuppressWarnings("unchecked") WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn(); @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<T>> dStream = - ((UnboundedDataset<T>) context.borrowDataset(transform)).getDStream(); + UnboundedDataset<T> unboundedDataset = + ((UnboundedDataset<T>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<T>> dStream = unboundedDataset.getDStream(); // get the right window durations. Duration windowDuration; Duration slideDuration; @@ -197,7 +204,8 @@ final class StreamingTransformTranslator { dStream.window(windowDuration, slideDuration); //--- then we apply windowing to the elements if (TranslationUtils.skipAssignWindows(transform, context)) { - context.putDataset(transform, new UnboundedDataset<>(windowedDStream)); + context.putDataset(transform, + new UnboundedDataset<>(windowedDStream, unboundedDataset.getStreamingSources())); } else { JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform( new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { @@ -206,7 +214,8 @@ final class StreamingTransformTranslator { return rdd.map(new SparkAssignWindowFn<>(transform.getWindowFn())); } }); - context.putDataset(transform, new UnboundedDataset<>(outStream)); + context.putDataset(transform, + new UnboundedDataset<>(outStream, unboundedDataset.getStreamingSources())); } } }; @@ -217,8 +226,9 @@ final class StreamingTransformTranslator { @Override public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<KV<K, V>>> dStream = - ((UnboundedDataset<KV<K, V>>) context.borrowDataset(transform)).getDStream(); + UnboundedDataset<KV<K, V>> unboundedDataset = + ((UnboundedDataset<KV<K, V>>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<KV<K, V>>> dStream = unboundedDataset.getDStream(); @SuppressWarnings("unchecked") final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder(); @@ -239,7 +249,8 @@ final class StreamingTransformTranslator { windowingStrategy); } }); - context.putDataset(transform, new UnboundedDataset<>(outStream)); + context.putDataset(transform, + new UnboundedDataset<>(outStream, unboundedDataset.getStreamingSources())); } }; } @@ -247,7 +258,6 @@ final class StreamingTransformTranslator { private static <K, InputT, OutputT> TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>> combineGrouped() { return new TransformEvaluator<Combine.GroupedValues<K, InputT, OutputT>>() { - @SuppressWarnings("unchecked") @Override public void evaluate(final Combine.GroupedValues<K, InputT, OutputT> transform, EvaluationContext context) { @@ -255,13 +265,15 @@ final class StreamingTransformTranslator { PCollection<? extends KV<K, ? extends Iterable<InputT>>> input = context.getInput(transform); final WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + @SuppressWarnings("unchecked") final CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT> fn = (CombineWithContext.KeyedCombineFnWithContext<K, InputT, ?, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); - JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = - ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform)) - .getDStream(); + @SuppressWarnings("unchecked") + UnboundedDataset<KV<K, Iterable<InputT>>> unboundedDataset = + ((UnboundedDataset<KV<K, Iterable<InputT>>>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> dStream = unboundedDataset.getDStream(); final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final SparkPCollectionView pviews = context.getPViews(); @@ -283,7 +295,8 @@ final class StreamingTransformTranslator { } }); - context.putDataset(transform, new UnboundedDataset<>(outStream)); + context.putDataset(transform, + new UnboundedDataset<>(outStream, unboundedDataset.getStreamingSources())); } }; } @@ -292,7 +305,6 @@ final class StreamingTransformTranslator { combineGlobally() { return new TransformEvaluator<Combine.Globally<InputT, OutputT>>() { - @SuppressWarnings("unchecked") @Override public void evaluate( final Combine.Globally<InputT, OutputT> transform, @@ -301,6 +313,7 @@ final class StreamingTransformTranslator { // serializable arguments to pass. final Coder<InputT> iCoder = context.getInput(transform).getCoder(); final Coder<OutputT> oCoder = context.getOutput(transform).getCoder(); + @SuppressWarnings("unchecked") final CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn = (CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); @@ -309,8 +322,10 @@ final class StreamingTransformTranslator { final boolean hasDefault = transform.isInsertDefault(); final SparkPCollectionView pviews = context.getPViews(); - JavaDStream<WindowedValue<InputT>> dStream = - ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); + @SuppressWarnings("unchecked") + UnboundedDataset<InputT> unboundedDataset = + ((UnboundedDataset<InputT>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<InputT>> dStream = unboundedDataset.getDStream(); JavaDStream<WindowedValue<OutputT>> outStream = dStream.transform( new Function<JavaRDD<WindowedValue<InputT>>, JavaRDD<WindowedValue<OutputT>>>() { @@ -326,7 +341,8 @@ final class StreamingTransformTranslator { } }); - context.putDataset(transform, new UnboundedDataset<>(outStream)); + context.putDataset(transform, + new UnboundedDataset<>(outStream, unboundedDataset.getStreamingSources())); } }; } @@ -334,7 +350,6 @@ final class StreamingTransformTranslator { private static <K, InputT, AccumT, OutputT> TransformEvaluator<Combine.PerKey<K, InputT, OutputT>> combinePerKey() { return new TransformEvaluator<Combine.PerKey<K, InputT, OutputT>>() { - @SuppressWarnings("unchecked") @Override public void evaluate(final Combine.PerKey<K, InputT, OutputT> transform, final EvaluationContext context) { @@ -342,6 +357,7 @@ final class StreamingTransformTranslator { // serializable arguments to pass. final KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) context.getInput(transform).getCoder(); + @SuppressWarnings("unchecked") final CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn = (CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) CombineFnUtil.toFnWithContext(transform.getFn()); @@ -349,8 +365,10 @@ final class StreamingTransformTranslator { final SparkRuntimeContext runtimeContext = context.getRuntimeContext(); final SparkPCollectionView pviews = context.getPViews(); - JavaDStream<WindowedValue<KV<K, InputT>>> dStream = - ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)).getDStream(); + @SuppressWarnings("unchecked") + UnboundedDataset<KV<K, InputT>> unboundedDataset = + ((UnboundedDataset<KV<K, InputT>>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<KV<K, InputT>>> dStream = unboundedDataset.getDStream(); JavaDStream<WindowedValue<KV<K, OutputT>>> outStream = dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, InputT>>>, @@ -366,14 +384,14 @@ final class StreamingTransformTranslator { windowingStrategy, sideInputs); } }); - context.putDataset(transform, new UnboundedDataset<>(outStream)); + context.putDataset(transform, + new UnboundedDataset<>(outStream, unboundedDataset.getStreamingSources())); } }; } private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() { return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() { - @SuppressWarnings("unchecked") @Override public void evaluate(final ParDo.Bound<InputT, OutputT> transform, final EvaluationContext context) { @@ -384,8 +402,11 @@ final class StreamingTransformTranslator { final WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); final SparkPCollectionView pviews = context.getPViews(); - JavaDStream<WindowedValue<InputT>> dStream = - ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); + + @SuppressWarnings("unchecked") + UnboundedDataset<InputT> unboundedDataset = + ((UnboundedDataset<InputT>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<InputT>> dStream = unboundedDataset.getDStream(); final String stepName = context.getCurrentTransform().getFullName(); @@ -409,7 +430,8 @@ final class StreamingTransformTranslator { } }); - context.putDataset(transform, new UnboundedDataset<>(outStream)); + context.putDataset(transform, + new UnboundedDataset<>(outStream, unboundedDataset.getStreamingSources())); } }; } @@ -427,9 +449,12 @@ final class StreamingTransformTranslator { final SparkPCollectionView pviews = context.getPViews(); final WindowingStrategy<?, ?> windowingStrategy = context.getInput(transform).getWindowingStrategy(); + @SuppressWarnings("unchecked") - JavaDStream<WindowedValue<InputT>> dStream = - ((UnboundedDataset<InputT>) context.borrowDataset(transform)).getDStream(); + UnboundedDataset<InputT> unboundedDataset = + ((UnboundedDataset<InputT>) context.borrowDataset(transform)); + JavaDStream<WindowedValue<InputT>> dStream = unboundedDataset.getDStream(); + JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair( new Function<JavaRDD<WindowedValue<InputT>>, JavaPairRDD<TupleTag<?>, WindowedValue<?>>>() { @@ -460,7 +485,8 @@ final class StreamingTransformTranslator { JavaDStream<WindowedValue<Object>> values = (JavaDStream<WindowedValue<Object>>) (JavaDStream<?>) TranslationUtils.dStreamValues(filtered); - context.putDataset(e.getValue(), new UnboundedDataset<>(values)); + context.putDataset(e.getValue(), + new UnboundedDataset<>(values, unboundedDataset.getStreamingSources())); } } }; http://git-wip-us.apache.org/repos/asf/beam/blob/705695eb/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index d059c7e..80c0515 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -19,8 +19,12 @@ package org.apache.beam.runners.spark.translation.streaming; import com.google.common.collect.Iterables; + +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.translation.Dataset; @@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory; public class UnboundedDataset<T> implements Dataset { private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class); + private static final AtomicInteger queuedStreamIds = new AtomicInteger(); // only set if creating a DStream from a static collection @Nullable private transient JavaStreamingContext jssc; @@ -48,15 +53,23 @@ public class UnboundedDataset<T> implements Dataset { private Iterable<Iterable<T>> values; private Coder<T> coder; private JavaDStream<WindowedValue<T>> dStream; + // points to the input streams that created this UnboundedDataset, + // should be greater > 1 in case of Flatten for example. + // when using GlobalWatermarkHolder this information helps to take only the relevant watermarks + // and reason about them accordingly. + private final List<Integer> streamingSources = new ArrayList<>(); - UnboundedDataset(JavaDStream<WindowedValue<T>> dStream) { + public UnboundedDataset(JavaDStream<WindowedValue<T>> dStream, List<Integer> streamingSources) { this.dStream = dStream; + this.streamingSources.addAll(streamingSources); } public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext jssc, Coder<T> coder) { this.values = values; this.jssc = jssc; this.coder = coder; + // QueuedStream will have a negative (decreasing) unique id. + this.streamingSources.add(queuedStreamIds.decrementAndGet()); } @SuppressWarnings("ConstantConditions") @@ -66,7 +79,6 @@ public class UnboundedDataset<T> implements Dataset { WindowedValue.getValueOnlyCoder(coder); // create the DStream from queue Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); - JavaRDD<WindowedValue<T>> lastRDD = null; for (Iterable<T> v : values) { Iterable<WindowedValue<T>> windowedValues = Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); @@ -74,18 +86,17 @@ public class UnboundedDataset<T> implements Dataset { CoderHelpers.toByteArrays(windowedValues, windowCoder)).map( CoderHelpers.fromByteFunction(windowCoder)); rddQueue.offer(rdd); - lastRDD = rdd; } - // create DStream from queue, one at a time, - // with last as default in case batches repeat (graceful stops for example). - // if the stream is empty, avoid creating a default empty RDD. - // mainly for unit test so no reason to have this configurable. - dStream = lastRDD != null ? jssc.queueStream(rddQueue, true, lastRDD) - : jssc.queueStream(rddQueue, true); + // create DStream from queue, one at a time. + dStream = jssc.queueStream(rddQueue, true); } return dStream; } + public List<Integer> getStreamingSources() { + return streamingSources; + } + public void cache() { dStream.cache(); }
