Fix streaming translation of Flatten and Window, make CreateStream eager.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f143504 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f143504 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f143504 Branch: refs/heads/master Commit: 9f143504f7ade5bb0a966dcef645a834277632ef Parents: 96abe4f Author: Sela <[email protected]> Authored: Sat Feb 18 22:01:58 2017 +0200 Committer: Sela <[email protected]> Committed: Wed Mar 1 00:17:59 2017 +0200 ---------------------------------------------------------------------- .../spark/translation/EvaluationContext.java | 6 -- .../streaming/StreamingTransformTranslator.java | 101 +++++++++---------- .../translation/streaming/UnboundedDataset.java | 50 +-------- 3 files changed, 52 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9f143504/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index a35aff2..329e047 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -136,11 +135,6 @@ public class EvaluationContext { datasets.put(getOutput(transform), new BoundedDataset<>(values, jsc, coder)); } - public <T> void putUnboundedDatasetFromQueue( - PTransform<?, ? extends PValue> transform, Iterable<Iterable<T>> values, Coder<T> coder) { - datasets.put(getOutput(transform), new UnboundedDataset<>(values, jssc, coder)); - } - public Dataset borrowDataset(PTransform<? extends PValue, ?> transform) { return borrowDataset(getInput(transform)); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f143504/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index e90b490..7abf5be 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -22,12 +22,18 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectSplittable; import static org.apache.beam.runners.spark.translation.TranslationUtils.rejectStateAndTimers; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; +import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.io.ConsoleIO; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.io.SparkUnboundedSource; @@ -48,6 +54,7 @@ import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TranslationUtils; import org.apache.beam.runners.spark.translation.WindowingHelpers; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -60,8 +67,6 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.CombineFnUtil; @@ -76,10 +81,10 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; /** @@ -116,13 +121,35 @@ final class StreamingTransformTranslator { }; } - private static <T> TransformEvaluator<CreateStream.QueuedValues<T>> createFromQueue() { - return new TransformEvaluator<CreateStream.QueuedValues<T>>() { + private static <T> TransformEvaluator<CreateStream<T>> createFromQueue() { + return new TransformEvaluator<CreateStream<T>>() { @Override - public void evaluate(CreateStream.QueuedValues<T> transform, EvaluationContext context) { - Iterable<Iterable<T>> values = transform.getQueuedValues(); + public void evaluate(CreateStream<T> transform, EvaluationContext context) { Coder<T> coder = context.getOutput(transform).getCoder(); - context.putUnboundedDatasetFromQueue(transform, values, coder); + JavaStreamingContext jssc = context.getStreamingContext(); + Queue<Iterable<T>> values = transform.getBatches(); + WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = + WindowedValue.getValueOnlyCoder(coder); + // create the DStream from queue. + Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); + for (Iterable<T> v : values) { + Iterable<WindowedValue<T>> windowedValues = + Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); + JavaRDD<WindowedValue<T>> rdd = + jssc.sparkContext() + .parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder)) + .map(CoderHelpers.fromByteFunction(windowCoder)); + rddQueue.offer(rdd); + } + + JavaInputDStream<WindowedValue<T>> inputDStream = jssc.queueStream(rddQueue, true); + UnboundedDataset<T> unboundedDataset = new UnboundedDataset<T>( + inputDStream, Collections.singletonList(inputDStream.inputDStream().id())); + // add pre-baked Watermarks for the pre-baked batches. + Queue<GlobalWatermarkHolder.SparkWatermarks> times = transform.getTimes(); + GlobalWatermarkHolder.addAll( + ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times)); + context.putDataset(transform, unboundedDataset); } }; } @@ -136,7 +163,6 @@ final class StreamingTransformTranslator { // since this is a streaming pipeline, at least one of the PCollections to "flatten" are // unbounded, meaning it represents a DStream. // So we could end up with an unbounded unified DStream. - final List<JavaRDD<WindowedValue<T>>> rdds = new ArrayList<>(); final List<JavaDStream<WindowedValue<T>>> dStreams = new ArrayList<>(); final List<Integer> streamingSources = new ArrayList<>(); for (TaggedPValue pv : pcs) { @@ -152,27 +178,18 @@ final class StreamingTransformTranslator { streamingSources.addAll(unboundedDataset.getStreamSources()); dStreams.add(unboundedDataset.getDStream()); } else { - rdds.add(((BoundedDataset<T>) dataset).getRDD()); + // create a single RDD stream. + Queue<JavaRDD<WindowedValue<T>>> q = new LinkedBlockingQueue<>(); + q.offer(((BoundedDataset) dataset).getRDD()); + //TODO: this is not recoverable from checkpoint! + JavaDStream<WindowedValue<T>> dStream = context.getStreamingContext().queueStream(q); + dStreams.add(dStream); } } // start by unifying streams into a single stream. JavaDStream<WindowedValue<T>> unifiedStreams = context.getStreamingContext().union(dStreams.remove(0), dStreams); - // now unify in RDDs. - if (rdds.size() > 0) { - JavaDStream<WindowedValue<T>> joined = - unifiedStreams.transform( - new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { - @Override - public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> streamRdd) - throws Exception { - return new JavaSparkContext(streamRdd.context()).union(streamRdd, rdds); - } - }); - context.putDataset(transform, new UnboundedDataset<>(joined, streamingSources)); - } else { - context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, streamingSources)); - } + context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, streamingSources)); } }; } @@ -182,42 +199,24 @@ final class StreamingTransformTranslator { @Override public void evaluate(final Window.Bound<T> transform, EvaluationContext context) { @SuppressWarnings("unchecked") - WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn(); - @SuppressWarnings("unchecked") UnboundedDataset<T> unboundedDataset = ((UnboundedDataset<T>) context.borrowDataset(transform)); JavaDStream<WindowedValue<T>> dStream = unboundedDataset.getDStream(); - // get the right window durations. - Duration windowDuration; - Duration slideDuration; - if (windowFn instanceof FixedWindows) { - windowDuration = Durations.milliseconds(((FixedWindows) windowFn).getSize().getMillis()); - slideDuration = windowDuration; - } else if (windowFn instanceof SlidingWindows) { - SlidingWindows slidingWindows = (SlidingWindows) windowFn; - windowDuration = Durations.milliseconds(slidingWindows.getSize().getMillis()); - slideDuration = Durations.milliseconds(slidingWindows.getPeriod().getMillis()); - } else { - throw new UnsupportedOperationException(String.format("WindowFn %s is not supported.", - windowFn.getClass().getCanonicalName())); - } - JavaDStream<WindowedValue<T>> windowedDStream = - dStream.window(windowDuration, slideDuration); - //--- then we apply windowing to the elements + JavaDStream<WindowedValue<T>> outputStream; if (TranslationUtils.skipAssignWindows(transform, context)) { - context.putDataset(transform, - new UnboundedDataset<>(windowedDStream, unboundedDataset.getStreamSources())); + // do nothing. + outputStream = dStream; } else { - JavaDStream<WindowedValue<T>> outStream = windowedDStream.transform( + outputStream = dStream.transform( new Function<JavaRDD<WindowedValue<T>>, JavaRDD<WindowedValue<T>>>() { @Override public JavaRDD<WindowedValue<T>> call(JavaRDD<WindowedValue<T>> rdd) throws Exception { return rdd.map(new SparkAssignWindowFn<>(transform.getWindowFn())); } }); - context.putDataset(transform, - new UnboundedDataset<>(outStream, unboundedDataset.getStreamSources())); } + context.putDataset(transform, + new UnboundedDataset<>(outputStream, unboundedDataset.getStreamSources())); } }; } @@ -427,7 +426,7 @@ final class StreamingTransformTranslator { EVALUATORS.put(ParDo.Bound.class, parDo()); EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); - EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue()); + EVALUATORS.put(CreateStream.class, createFromQueue()); EVALUATORS.put(Window.Bound.class, window()); EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl()); } http://git-wip-us.apache.org/repos/asf/beam/blob/9f143504/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index 8624f41..d7f3f34 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -18,22 +18,12 @@ package org.apache.beam.runners.spark.translation.streaming; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; -import java.util.Queue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nullable; -import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.translation.Dataset; -import org.apache.beam.runners.spark.translation.WindowingHelpers; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +34,8 @@ import org.slf4j.LoggerFactory; public class UnboundedDataset<T> implements Dataset { private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class); - private static final AtomicInteger queuedStreamIds = new AtomicInteger(); - // only set if creating a DStream from a static collection - @Nullable private transient JavaStreamingContext jssc; - - private Iterable<Iterable<T>> values; - private Coder<T> coder; - private JavaDStream<WindowedValue<T>> dStream; + private final JavaDStream<WindowedValue<T>> dStream; // points to the input streams that created this UnboundedDataset, // should be greater > 1 in case of Flatten for example. // when using GlobalWatermarkHolder this information helps to take only the relevant watermarks @@ -63,41 +47,11 @@ public class UnboundedDataset<T> implements Dataset { this.streamSources.addAll(streamSources); } - public UnboundedDataset(Iterable<Iterable<T>> values, JavaStreamingContext jssc, Coder<T> coder) { - this.values = values; - this.jssc = jssc; - this.coder = coder; - // QueuedStream will have a negative (decreasing) unique id. - this.streamSources.add(queuedStreamIds.decrementAndGet()); - } - - @VisibleForTesting - public static void resetQueuedStreamIds() { - queuedStreamIds.set(0); - } - - @SuppressWarnings("ConstantConditions") JavaDStream<WindowedValue<T>> getDStream() { - if (dStream == null) { - WindowedValue.ValueOnlyWindowedValueCoder<T> windowCoder = - WindowedValue.getValueOnlyCoder(coder); - // create the DStream from queue - Queue<JavaRDD<WindowedValue<T>>> rddQueue = new LinkedBlockingQueue<>(); - for (Iterable<T> v : values) { - Iterable<WindowedValue<T>> windowedValues = - Iterables.transform(v, WindowingHelpers.<T>windowValueFunction()); - JavaRDD<WindowedValue<T>> rdd = jssc.sc().parallelize( - CoderHelpers.toByteArrays(windowedValues, windowCoder)).map( - CoderHelpers.fromByteFunction(windowCoder)); - rddQueue.offer(rdd); - } - // create DStream from queue, one at a time. - dStream = jssc.queueStream(rddQueue, true); - } return dStream; } - public List<Integer> getStreamSources() { + List<Integer> getStreamSources() { return streamSources; }
