SparkRunner batch interval as a configuration instead of Beam Windows. Add the batch interval to the pipeline options, default arbitrarily to 1000 msec.
Pick-up the batch interval from pipeline options and remove StreamingWindowPipelineDetector. Use SDK API to get the window function. Update the README Update streaming tests Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08dca30a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08dca30a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08dca30a Branch: refs/heads/gearpump-runner Commit: 08dca30a38e11c13e8a4b2db1529f1306cc489b0 Parents: 95e7f01 Author: Sela <[email protected]> Authored: Wed Aug 10 13:30:30 2016 +0300 Committer: Sela <[email protected]> Committed: Wed Aug 10 20:42:17 2016 +0300 ---------------------------------------------------------------------- runners/spark/README.md | 3 +- .../runners/spark/SparkPipelineOptions.java | 5 + .../apache/beam/runners/spark/SparkRunner.java | 19 +--- .../streaming/StreamingTransformTranslator.java | 10 +- .../StreamingWindowPipelineDetector.java | 102 ------------------- .../streaming/FlattenStreamingTest.java | 1 + .../streaming/KafkaStreamingTest.java | 2 + .../streaming/SimpleStreamingWordCountTest.java | 2 + 8 files changed, 18 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/README.md ---------------------------------------------------------------------- diff --git a/runners/spark/README.md b/runners/spark/README.md index d2bfd3e..ef42fa7 100644 --- a/runners/spark/README.md +++ b/runners/spark/README.md @@ -63,8 +63,7 @@ The Spark runner provides support for batch processing of Beam bounded PCollecti ### Streaming The Spark runner currently provides partial support for stream processing of Beam unbounded PCollections as Spark [DStream](http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams)s. -Current implementation of *Windowing* takes the first window size in the pipeline and treats it as the Spark "batch interval", while following windows will be treated as *Processing Time* windows. -Further work is required to provide full support for the Beam Model *event-time* and *out-of-order* stream processing. +Currently, both *FixedWindows* and *SlidingWindows* are supported, but only with processing-time triggers and discarding pane. ### issue tracking http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 6ef3741..080ff19 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -39,4 +39,9 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, @Default.Long(-1) Long getTimeout(); void setTimeout(Long batchInterval); + + @Description("Batch interval for Spark streaming in milliseconds.") + @Default.Long(1000) + Long getBatchIntervalMillis(); + void setBatchIntervalMillis(Long batchInterval); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index d994ec4..be50f70 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -26,7 +26,6 @@ import org.apache.beam.runners.spark.translation.SparkProcessContext; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext; import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; -import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector; import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; @@ -145,24 +144,16 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { public EvaluationResult run(Pipeline pipeline) { try { LOG.info("Executing pipeline using the SparkRunner."); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions - .getSparkMaster(), mOptions.getAppName()); + JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), + mOptions.getAppName()); if (mOptions.isStreaming()) { SparkPipelineTranslator translator = - new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); - // if streaming - fixed window should be defined on all UNBOUNDED inputs - StreamingWindowPipelineDetector streamingWindowPipelineDetector = - new StreamingWindowPipelineDetector(translator); - pipeline.traverseTopologically(streamingWindowPipelineDetector); - if (!streamingWindowPipelineDetector.isWindowing()) { - throw new IllegalStateException("Spark streaming pipeline must be windowed!"); - } - - Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration(); + new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); + Duration batchInterval = new Duration(mOptions.getBatchIntervalMillis()); LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds()); - EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval); + EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval); pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); ctxt.computeOutputs(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index b0fb931..2ce2c29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -25,7 +25,6 @@ import org.apache.beam.runners.spark.translation.DoFnFunction; import org.apache.beam.runners.spark.translation.EvaluationContext; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; -import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.WindowingHelpers; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.AvroIO; @@ -35,7 +34,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -290,16 +288,12 @@ public final class StreamingTransformTranslator { } } - private static final TransformTranslator.FieldGetter WINDOW_FG = - new TransformTranslator.FieldGetter(Window.Bound.class); - - private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>> window() { + private static <T> TransformEvaluator<Window.Bound<T>> window() { return new TransformEvaluator<Window.Bound<T>>() { @Override public void evaluate(Window.Bound<T> transform, EvaluationContext context) { StreamingEvaluationContext sec = (StreamingEvaluationContext) context; - //--- first we apply windowing to the stream - WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); + WindowFn<? super T, ?> windowFn = transform.getWindowFn(); @SuppressWarnings("unchecked") JavaDStream<WindowedValue<T>> dStream = (JavaDStream<WindowedValue<T>>) sec.getStream(transform); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java deleted file mode 100644 index 394b2c5..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark.translation.streaming; - -import org.apache.beam.runners.spark.SparkRunner; -import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; -import org.apache.beam.runners.spark.translation.TransformTranslator; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.Durations; - - -/** - * Pipeline {@link SparkRunner.Evaluator} to detect windowing. - */ -public final class StreamingWindowPipelineDetector extends SparkRunner.Evaluator { - - // Currently, Spark streaming recommends batches no smaller then 500 msec - private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500); - - private boolean windowing; - private Duration batchDuration; - - public StreamingWindowPipelineDetector(SparkPipelineTranslator translator) { - super(translator); - } - - private static final TransformTranslator.FieldGetter WINDOW_FG = - new TransformTranslator.FieldGetter(Window.Bound.class); - - // Use the smallest window (fixed or sliding) as Spark streaming's batch duration - @Override - protected <TransformT extends PTransform<? super PInput, POutput>> void - doVisitTransform(TransformTreeNode node) { - @SuppressWarnings("unchecked") - TransformT transform = (TransformT) node.getTransform(); - @SuppressWarnings("unchecked") - Class<TransformT> transformClass = (Class<TransformT>) (Class<?>) transform.getClass(); - if (transformClass.isAssignableFrom(Window.Bound.class)) { - WindowFn<?, ?> windowFn = WINDOW_FG.get("windowFn", transform); - if (windowFn instanceof FixedWindows) { - setBatchDuration(((FixedWindows) windowFn).getSize()); - } else if (windowFn instanceof SlidingWindows) { - if (((SlidingWindows) windowFn).getOffset().getMillis() > 0) { - throw new UnsupportedOperationException("Spark does not support window offsets"); - } - // Sliding window size might as well set the batch duration. Applying the transformation - // will add the "slide" - setBatchDuration(((SlidingWindows) windowFn).getSize()); - } else if (!(windowFn instanceof GlobalWindows)) { - throw new IllegalStateException("Windowing function not supported: " + windowFn); - } - } - } - - private void setBatchDuration(org.joda.time.Duration duration) { - Long durationMillis = duration.getMillis(); - // validate window size - if (durationMillis < SPARK_MIN_WINDOW.milliseconds()) { - throw new IllegalArgumentException("Windowing of size " + durationMillis - + "msec is not supported!"); - } - // choose the smallest duration to be Spark's batch duration, larger ones will be handled - // as window functions over the batched-stream - if (!windowing || this.batchDuration.milliseconds() > durationMillis) { - this.batchDuration = Durations.milliseconds(durationMillis); - } - windowing = true; - } - - public boolean isWindowing() { - return windowing; - } - - public Duration getBatchDuration() { - return batchDuration; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index deb1b6a..6f4d8fb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -61,6 +61,7 @@ public class FlattenStreamingTest { PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setStreaming(true); + // using the default 1000 msec interval options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 17044aa..2527152 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.spark.streaming.Durations; import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -92,6 +93,7 @@ public class KafkaStreamingTest { PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setStreaming(true); + options.setBatchIntervalMillis(Durations.seconds(1).milliseconds()); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08dca30a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 5627056..c761fae 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.streaming.Durations; import org.joda.time.Duration; import org.junit.Test; @@ -58,6 +59,7 @@ public class SimpleStreamingWordCountTest implements Serializable { PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setStreaming(true); + options.setBatchIntervalMillis(Durations.seconds(1).milliseconds()); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options);
