Repository: incubator-beam Updated Branches: refs/heads/master 2b9906e8d -> 3001804e3
Rename SparkPipelineRunner to SparkRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96ffc429 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96ffc429 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96ffc429 Branch: refs/heads/master Commit: 96ffc42972010c9b027e826d6d610555ab0c055a Parents: 2b9906e Author: Thomas Groh <[email protected]> Authored: Fri Jun 17 10:45:09 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Jun 21 11:14:57 2016 -0700 ---------------------------------------------------------------------- README.md | 2 +- runners/spark/README.md | 8 +- runners/spark/pom.xml | 2 +- .../beam/runners/spark/SparkPipelineRunner.java | 255 ------------------- .../apache/beam/runners/spark/SparkRunner.java | 255 +++++++++++++++++++ .../runners/spark/SparkRunnerRegistrar.java | 8 +- .../runners/spark/TestSparkPipelineRunner.java | 77 ------ .../beam/runners/spark/TestSparkRunner.java | 77 ++++++ .../translation/SparkPipelineEvaluator.java | 6 +- .../StreamingWindowPipelineDetector.java | 6 +- .../apache/beam/runners/spark/DeDupTest.java | 4 +- .../beam/runners/spark/EmptyInputTest.java | 4 +- .../beam/runners/spark/SimpleWordCountTest.java | 8 +- .../runners/spark/SparkRunnerRegistrarTest.java | 2 +- .../apache/beam/runners/spark/TfIdfTest.java | 4 +- .../beam/runners/spark/io/AvroPipelineTest.java | 4 +- .../beam/runners/spark/io/NumShardsTest.java | 6 +- .../io/hadoop/HadoopFileFormatPipelineTest.java | 4 +- .../spark/translation/CombineGloballyTest.java | 6 +- .../spark/translation/CombinePerKeyTest.java | 4 +- .../spark/translation/DoFnOutputTest.java | 6 +- .../translation/MultiOutputWordCountTest.java | 4 +- .../spark/translation/SerializationTest.java | 6 +- .../spark/translation/SideEffectsTest.java | 4 +- .../translation/TransformTranslatorTest.java | 6 +- .../translation/WindowedWordCountTest.java | 8 +- .../streaming/FlattenStreamingTest.java | 6 +- .../streaming/KafkaStreamingTest.java | 6 +- .../streaming/SimpleStreamingWordCountTest.java | 6 +- 29 files changed, 397 insertions(+), 397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index c4a9155..ec89c4d 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ Beam supports executing programs on multiple distributed processing backends thr - The `DirectRunner` runs the pipeline on your local machine. - The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/). - The `FlinkRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam. -- The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam. +- The `SparkRunner` runs the pipeline on an Apache Spark cluster. The code has been donated from [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow) and is now part of Beam. Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/README.md ---------------------------------------------------------------------- diff --git a/runners/spark/README.md b/runners/spark/README.md index 457f0a9..d2bfd3e 100644 --- a/runners/spark/README.md +++ b/runners/spark/README.md @@ -89,7 +89,7 @@ If we wanted to run a Beam pipeline with the default options of a single threade instance in local mode, we would do the following: Pipeline p = <logic for pipeline creation > - EvaluationResult result = SparkPipelineRunner.create().run(p); + EvaluationResult result = SparkRunner.create().run(p); To create a pipeline runner to run against a different Spark cluster, with a custom master url we would do the following: @@ -97,7 +97,7 @@ would do the following: Pipeline p = <logic for pipeline creation > SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); - EvaluationResult result = SparkPipelineRunner.create(options).run(p); + EvaluationResult result = SparkRunner.create(options).run(p); ## Word Count Example @@ -113,7 +113,7 @@ Then run the [word count example][wc] from the SDK using a single threaded Spark in local mode: mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \ - -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \ + -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkRunner \ -DsparkMaster=local Check the output by running: @@ -139,7 +139,7 @@ Then run the word count example using Spark submit with the `yarn-client` master --class org.apache.beam.examples.WordCount \ --master yarn-client \ target/spark-runner-*-spark-app.jar \ - --inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client + --inputFile=kinglear.txt --output=out --runner=SparkRunner --sparkMaster=yarn-client Check the output by running: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 747464e..741f2db 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -77,7 +77,7 @@ <systemPropertyVariables> <beamTestPipelineOptions> [ - "--runner=org.apache.beam.runners.spark.TestSparkPipelineRunner", + "--runner=TestSparkRunner", "--streaming=false" ] </beamTestPipelineOptions> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java deleted file mode 100644 index 90404dd..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.beam.runners.spark.translation.EvaluationContext; -import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator; -import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; -import org.apache.beam.runners.spark.translation.SparkProcessContext; -import org.apache.beam.runners.spark.translation.TransformTranslator; -import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext; -import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; -import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector; -import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.TransformTreeNode; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import org.apache.spark.SparkException; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The SparkPipelineRunner translate operations defined on a pipeline to a representation - * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run - * a dataflow pipeline with the default options of a single threaded spark instance in local mode, - * we would do the following: - * - * {@code - * Pipeline p = [logic for pipeline creation] - * EvaluationResult result = SparkPipelineRunner.create().run(p); - * } - * - * To create a pipeline runner to run against a different spark cluster, with a custom master url - * we would do the following: - * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - * options.setSparkMaster("spark://host:port"); - * EvaluationResult result = SparkPipelineRunner.create(options).run(p); - * } - * - * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} - */ -public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult> { - - private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class); - /** - * Options used in this pipeline runner. - */ - private final SparkPipelineOptions mOptions; - - /** - * Creates and returns a new SparkPipelineRunner with default options. In particular, against a - * spark instance running in local mode. - * - * @return A pipeline runner with default options. - */ - public static SparkPipelineRunner create() { - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); - return new SparkPipelineRunner(options); - } - - /** - * Creates and returns a new SparkPipelineRunner with specified options. - * - * @param options The SparkPipelineOptions to use when executing the job. - * @return A pipeline runner that will execute with specified options. - */ - public static SparkPipelineRunner create(SparkPipelineOptions options) { - return new SparkPipelineRunner(options); - } - - /** - * Creates and returns a new SparkPipelineRunner with specified options. - * - * @param options The PipelineOptions to use when executing the job. - * @return A pipeline runner that will execute with specified options. - */ - public static SparkPipelineRunner fromOptions(PipelineOptions options) { - SparkPipelineOptions sparkOptions = - PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); - return new SparkPipelineRunner(sparkOptions); - } - - /** - * Overrides for this runner. - */ - @SuppressWarnings("rawtypes") - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - - if (transform instanceof GroupByKey) { - return (OutputT) ((PCollection) input).apply( - new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); - } else if (transform instanceof Create.Values) { - return (OutputT) super.apply( - new SinglePrimitiveOutputPTransform((Create.Values) transform), input); - } else { - return super.apply(transform, input); - } - } - - - /** - * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single - * thread. - */ - private SparkPipelineRunner(SparkPipelineOptions options) { - mOptions = options; - } - - - @Override - public EvaluationResult run(Pipeline pipeline) { - try { - // validate streaming configuration - if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) { - throw new RuntimeException("A streaming job must be configured with " - + SparkStreamingPipelineOptions.class.getSimpleName() + ", found " - + mOptions.getClass().getSimpleName()); - } - LOG.info("Executing pipeline using the SparkPipelineRunner."); - JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions - .getSparkMaster(), mOptions.getAppName()); - - if (mOptions.isStreaming()) { - SparkPipelineTranslator translator = - new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); - // if streaming - fixed window should be defined on all UNBOUNDED inputs - StreamingWindowPipelineDetector streamingWindowPipelineDetector = - new StreamingWindowPipelineDetector(translator); - pipeline.traverseTopologically(streamingWindowPipelineDetector); - if (!streamingWindowPipelineDetector.isWindowing()) { - throw new IllegalStateException("Spark streaming pipeline must be windowed!"); - } - - Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration(); - LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds()); - EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval); - - pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); - ctxt.computeOutputs(); - - LOG.info("Streaming pipeline construction complete. Starting execution.."); - ((StreamingEvaluationContext) ctxt).getStreamingContext().start(); - - return ctxt; - } else { - EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); - SparkPipelineTranslator translator = new TransformTranslator.Translator(); - pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); - ctxt.computeOutputs(); - - LOG.info("Pipeline execution complete."); - - return ctxt; - } - } catch (Exception e) { - // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler - // won't let you catch something that is not declared, so we can't catch - // SparkException here. Instead we do an instanceof check. - // Then we find the cause by seeing if it's a user exception (wrapped by our - // SparkProcessException), or just use the SparkException cause. - if (e instanceof SparkException && e.getCause() != null) { - if (e.getCause() instanceof SparkProcessContext.SparkProcessException - && e.getCause().getCause() != null) { - throw new RuntimeException(e.getCause().getCause()); - } else { - throw new RuntimeException(e.getCause()); - } - } - // otherwise just wrap in a RuntimeException - throw new RuntimeException(e); - } - } - - private EvaluationContext - createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, - Duration batchDuration) { - SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions; - JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); - return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout()); - } - - /** - * Evaluator on the pipeline. - */ - public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults { - protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); - - protected final SparkPipelineTranslator translator; - - protected Evaluator(SparkPipelineTranslator translator) { - this.translator = translator; - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { - if (node.getTransform() != null) { - @SuppressWarnings("unchecked") - Class<PTransform<?, ?>> transformClass = - (Class<PTransform<?, ?>>) node.getTransform().getClass(); - if (translator.hasTranslation(transformClass)) { - LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); - LOG.debug("Composite transform class: '{}'", transformClass); - doVisitTransform(node); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void visitPrimitiveTransform(TransformTreeNode node) { - doVisitTransform(node); - } - - protected abstract <TransformT extends PTransform<? super PInput, POutput>> void - doVisitTransform(TransformTreeNode node); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java new file mode 100644 index 0000000..dfda987 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkContextFactory; +import org.apache.beam.runners.spark.translation.SparkPipelineEvaluator; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.SparkProcessContext; +import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext; +import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.StreamingWindowPipelineDetector; +import org.apache.beam.runners.spark.util.SinglePrimitiveOutputPTransform; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The SparkRunner translate operations defined on a pipeline to a representation + * executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run + * a dataflow pipeline with the default options of a single threaded spark instance in local mode, + * we would do the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * EvaluationResult result = SparkRunner.create().run(p); + * } + * + * To create a pipeline runner to run against a different spark cluster, with a custom master url + * we would do the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + * options.setSparkMaster("spark://host:port"); + * EvaluationResult result = SparkRunner.create(options).run(p); + * } + * + * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} + */ +public final class SparkRunner extends PipelineRunner<EvaluationResult> { + + private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class); + /** + * Options used in this pipeline runner. + */ + private final SparkPipelineOptions mOptions; + + /** + * Creates and returns a new SparkRunner with default options. In particular, against a + * spark instance running in local mode. + * + * @return A pipeline runner with default options. + */ + public static SparkRunner create() { + SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + return new SparkRunner(options); + } + + /** + * Creates and returns a new SparkRunner with specified options. + * + * @param options The SparkPipelineOptions to use when executing the job. + * @return A pipeline runner that will execute with specified options. + */ + public static SparkRunner create(SparkPipelineOptions options) { + return new SparkRunner(options); + } + + /** + * Creates and returns a new SparkRunner with specified options. + * + * @param options The PipelineOptions to use when executing the job. + * @return A pipeline runner that will execute with specified options. + */ + public static SparkRunner fromOptions(PipelineOptions options) { + SparkPipelineOptions sparkOptions = + PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); + return new SparkRunner(sparkOptions); + } + + /** + * Overrides for this runner. + */ + @SuppressWarnings("rawtypes") + @Override + public <OutputT extends POutput, InputT extends PInput> OutputT apply( + PTransform<InputT, OutputT> transform, InputT input) { + + if (transform instanceof GroupByKey) { + return (OutputT) ((PCollection) input).apply( + new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform)); + } else if (transform instanceof Create.Values) { + return (OutputT) super.apply( + new SinglePrimitiveOutputPTransform((Create.Values) transform), input); + } else { + return super.apply(transform, input); + } + } + + + /** + * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single + * thread. + */ + private SparkRunner(SparkPipelineOptions options) { + mOptions = options; + } + + + @Override + public EvaluationResult run(Pipeline pipeline) { + try { + // validate streaming configuration + if (mOptions.isStreaming() && !(mOptions instanceof SparkStreamingPipelineOptions)) { + throw new RuntimeException("A streaming job must be configured with " + + SparkStreamingPipelineOptions.class.getSimpleName() + ", found " + + mOptions.getClass().getSimpleName()); + } + LOG.info("Executing pipeline using the SparkRunner."); + JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions + .getSparkMaster(), mOptions.getAppName()); + + if (mOptions.isStreaming()) { + SparkPipelineTranslator translator = + new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()); + // if streaming - fixed window should be defined on all UNBOUNDED inputs + StreamingWindowPipelineDetector streamingWindowPipelineDetector = + new StreamingWindowPipelineDetector(translator); + pipeline.traverseTopologically(streamingWindowPipelineDetector); + if (!streamingWindowPipelineDetector.isWindowing()) { + throw new IllegalStateException("Spark streaming pipeline must be windowed!"); + } + + Duration batchInterval = streamingWindowPipelineDetector.getBatchDuration(); + LOG.info("Setting Spark streaming batchInterval to {} msec", batchInterval.milliseconds()); + EvaluationContext ctxt = createStreamingEvaluationContext(jsc, pipeline, batchInterval); + + pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); + ctxt.computeOutputs(); + + LOG.info("Streaming pipeline construction complete. Starting execution.."); + ((StreamingEvaluationContext) ctxt).getStreamingContext().start(); + + return ctxt; + } else { + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline); + SparkPipelineTranslator translator = new TransformTranslator.Translator(); + pipeline.traverseTopologically(new SparkPipelineEvaluator(ctxt, translator)); + ctxt.computeOutputs(); + + LOG.info("Pipeline execution complete."); + + return ctxt; + } + } catch (Exception e) { + // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler + // won't let you catch something that is not declared, so we can't catch + // SparkException here. Instead we do an instanceof check. + // Then we find the cause by seeing if it's a user exception (wrapped by our + // SparkProcessException), or just use the SparkException cause. + if (e instanceof SparkException && e.getCause() != null) { + if (e.getCause() instanceof SparkProcessContext.SparkProcessException + && e.getCause().getCause() != null) { + throw new RuntimeException(e.getCause().getCause()); + } else { + throw new RuntimeException(e.getCause()); + } + } + // otherwise just wrap in a RuntimeException + throw new RuntimeException(e); + } + } + + private EvaluationContext + createStreamingEvaluationContext(JavaSparkContext jsc, Pipeline pipeline, + Duration batchDuration) { + SparkStreamingPipelineOptions streamingOptions = (SparkStreamingPipelineOptions) mOptions; + JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration); + return new StreamingEvaluationContext(jsc, pipeline, jssc, streamingOptions.getTimeout()); + } + + /** + * Evaluator on the pipeline. + */ + public abstract static class Evaluator extends Pipeline.PipelineVisitor.Defaults { + protected static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); + + protected final SparkPipelineTranslator translator; + + protected Evaluator(SparkPipelineTranslator translator) { + this.translator = translator; + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformTreeNode node) { + if (node.getTransform() != null) { + @SuppressWarnings("unchecked") + Class<PTransform<?, ?>> transformClass = + (Class<PTransform<?, ?>>) node.getTransform().getClass(); + if (translator.hasTranslation(transformClass)) { + LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName()); + LOG.debug("Composite transform class: '{}'", transformClass); + doVisitTransform(node); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + doVisitTransform(node); + } + + protected abstract <TransformT extends PTransform<? super PInput, POutput>> void + doVisitTransform(TransformTreeNode node); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java index baa2241..2bed6a5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java @@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableList; /** * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the - * {@link SparkPipelineRunner}. + * {@link SparkRunner}. * * {@link AutoService} will register Spark's implementations of the {@link PipelineRunner} * and {@link PipelineOptions} as available pipeline runner services. @@ -37,14 +37,14 @@ public final class SparkRunnerRegistrar { private SparkRunnerRegistrar() {} /** - * Registers the {@link SparkPipelineRunner}. + * Registers the {@link SparkRunner}. */ @AutoService(PipelineRunnerRegistrar.class) public static class Runner implements PipelineRunnerRegistrar { @Override public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>>of( - SparkPipelineRunner.class, TestSparkPipelineRunner.class); + return ImmutableList + .<Class<? extends PipelineRunner<?>>>of(SparkRunner.class, TestSparkRunner.class); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java deleted file mode 100644 index d11d1c1..0000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineRunner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.spark; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -/** - * The SparkPipelineRunner translate operations defined on a pipeline to a representation executable - * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow - * pipeline with the default options of a single threaded spark instance in local mode, we would do - * the following: - * - * {@code - * Pipeline p = [logic for pipeline creation] - * EvaluationResult result = SparkPipelineRunner.create().run(p); - * } - * - * To create a pipeline runner to run against a different spark cluster, with a custom master url we - * would do the following: - * - * {@code - * Pipeline p = [logic for pipeline creation] - * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - * options.setSparkMaster("spark://host:port"); - * EvaluationResult result = SparkPipelineRunner.create(options).run(p); - * } - * - * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} - */ -public final class TestSparkPipelineRunner extends PipelineRunner<EvaluationResult> { - - private SparkPipelineRunner delegate; - - private TestSparkPipelineRunner(SparkPipelineOptions options) { - this.delegate = SparkPipelineRunner.fromOptions(options); - } - - public static TestSparkPipelineRunner fromOptions(PipelineOptions options) { - // Default options suffice to set it up as a test runner - SparkPipelineOptions sparkOptions = - PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); - return new TestSparkPipelineRunner(sparkOptions); - } - - @Override - public <OutputT extends POutput, InputT extends PInput> - OutputT apply(PTransform<InputT, OutputT> transform, InputT input) { - return delegate.apply(transform, input); - }; - - @Override - public EvaluationResult run(Pipeline pipeline) { - return delegate.run(pipeline); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java new file mode 100644 index 0000000..e2b953d --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +/** + * The SparkRunner translate operations defined on a pipeline to a representation executable + * by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow + * pipeline with the default options of a single threaded spark instance in local mode, we would do + * the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * EvaluationResult result = SparkRunner.create().run(p); + * } + * + * To create a pipeline runner to run against a different spark cluster, with a custom master url we + * would do the following: + * + * {@code + * Pipeline p = [logic for pipeline creation] + * SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); + * options.setSparkMaster("spark://host:port"); + * EvaluationResult result = SparkRunner.create(options).run(p); + * } + * + * To create a Spark streaming pipeline runner use {@link SparkStreamingPipelineOptions} + */ +public final class TestSparkRunner extends PipelineRunner<EvaluationResult> { + + private SparkRunner delegate; + + private TestSparkRunner(SparkPipelineOptions options) { + this.delegate = SparkRunner.fromOptions(options); + } + + public static TestSparkRunner fromOptions(PipelineOptions options) { + // Default options suffice to set it up as a test runner + SparkPipelineOptions sparkOptions = + PipelineOptionsValidator.validate(SparkPipelineOptions.class, options); + return new TestSparkRunner(sparkOptions); + } + + @Override + public <OutputT extends POutput, InputT extends PInput> + OutputT apply(PTransform<InputT, OutputT> transform, InputT input) { + return delegate.apply(transform, input); + }; + + @Override + public EvaluationResult run(Pipeline pipeline) { + return delegate.run(pipeline); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java index 609c413..02e8b3d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkPipelineEvaluator.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark.translation; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -26,9 +26,9 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; /** - * Pipeline {@link SparkPipelineRunner.Evaluator} for Spark. + * Pipeline {@link SparkRunner.Evaluator} for Spark. */ -public final class SparkPipelineEvaluator extends SparkPipelineRunner.Evaluator { +public final class SparkPipelineEvaluator extends SparkRunner.Evaluator { private final EvaluationContext ctxt; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java index f6f3029..394b2c5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingWindowPipelineDetector.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.sdk.runners.TransformTreeNode; @@ -35,9 +35,9 @@ import org.apache.spark.streaming.Durations; /** - * Pipeline {@link SparkPipelineRunner.Evaluator} to detect windowing. + * Pipeline {@link SparkRunner.Evaluator} to detect windowing. */ -public final class StreamingWindowPipelineDetector extends SparkPipelineRunner.Evaluator { +public final class StreamingWindowPipelineDetector extends SparkRunner.Evaluator { // Currently, Spark streaming recommends batches no smaller then 500 msec private static final Duration SPARK_MIN_WINDOW = Durations.milliseconds(500); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index 285a2d6..dcf04a7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -49,14 +49,14 @@ public class DeDupTest { @Test public void testRun() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> input = p.apply(Create.of(LINES).withCoder(StringUtf8Coder.of())); PCollection<String> output = input.apply(RemoveDuplicates.<String>create()); PAssert.that(output).containsInAnyOrder(EXPECTED_SET); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java index f227e94..7befec2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/EmptyInputTest.java @@ -43,13 +43,13 @@ public class EmptyInputTest { @Test public void test() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); List<String> empty = Collections.emptyList(); PCollection<String> inputWords = p.apply(Create.of(empty).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(Combine.globally(new ConcatWords())); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); assertEquals("", Iterables.getOnlyElement(res.get(output))); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index 2b4464d..da3fa7a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -64,7 +64,7 @@ public class SimpleWordCountTest { @Test public void testInMem() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder .of())); @@ -72,7 +72,7 @@ public class SimpleWordCountTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); } @@ -82,7 +82,7 @@ public class SimpleWordCountTest { @Test public void testOutputFile() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder .of())); @@ -92,7 +92,7 @@ public class SimpleWordCountTest { output.apply( TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding()); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); assertThat(Sets.newHashSet(FileUtils.readLines(outputFile)), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java index d2e57aa..236251b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java @@ -47,7 +47,7 @@ public class SparkRunnerRegistrarTest { @Test public void testRunners() { - assertEquals(ImmutableList.of(SparkPipelineRunner.class, TestSparkPipelineRunner.class), + assertEquals(ImmutableList.of(SparkRunner.class, TestSparkRunner.class), new SparkRunnerRegistrar.Runner().getPipelineRunners()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index 00c4657..df78338 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -42,7 +42,7 @@ public class TfIdfTest { @Test public void testTfIdf() throws Exception { SparkPipelineOptions opts = PipelineOptionsFactory.as(SparkPipelineOptions.class); - opts.setRunner(SparkPipelineRunner.class); + opts.setRunner(SparkRunner.class); Pipeline pipeline = Pipeline.create(opts); pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); @@ -60,7 +60,7 @@ public class TfIdfTest { PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); - EvaluationResult res = SparkPipelineRunner.create().run(pipeline); + EvaluationResult res = SparkRunner.create().run(pipeline); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index f358878..4cce03d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.io; import static org.junit.Assert.assertEquals; import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -78,7 +78,7 @@ public class AvroPipelineTest { PCollection<GenericRecord> input = p.apply( AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema)); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); List<GenericRecord> records = readGenericFile(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java index 23d4592..b4268d6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.examples.WordCount; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; @@ -73,13 +73,13 @@ public class NumShardsTest { @Test public void testText() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(new WordCount.CountWords()) .apply(MapElements.via(new WordCount.FormatAsTextFn())); output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); int count = 0; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java index eaa508c..4d1658f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java @@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.io.hadoop; import static org.junit.Assert.assertEquals; import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.coders.WritableCoder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; @@ -89,7 +89,7 @@ public class HadoopFileFormatPipelineTest { HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(), outputFormatClass, IntWritable.class, Text.class); input.apply(write.withoutSharding()); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); IntWritable key = new IntWritable(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java index 9a3edd3..798f55a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombineGloballyTest.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -50,12 +50,12 @@ public class CombineGloballyTest { @Test public void test() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = inputWords.apply(Combine.globally(new WordMerger())); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); assertEquals("hi there,hi,hi sue bob,hi sue,,bob hi", Iterables.getOnlyElement(res.get(output))); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java index face526..65c6870 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/CombinePerKeyTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -54,7 +54,7 @@ public class CombinePerKeyTest { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<KV<String, Long>> cnts = inputWords.apply(new SumPerKey<String>()); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); Map<String, Long> actualCnts = new HashMap<>(); for (KV<String, Long> kv : res.get(cnts)) { actualCnts.put(kv.getKey(), kv.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index 0334bfe..0f60271 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -40,7 +40,7 @@ public class DoFnOutputTest implements Serializable { @Test public void test() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline pipeline = Pipeline.create(options); PCollection<String> strings = pipeline.apply(Create.of("a")); @@ -63,7 +63,7 @@ public class DoFnOutputTest implements Serializable { PAssert.that(output).containsInAnyOrder("start", "a", "finish"); - EvaluationResult res = SparkPipelineRunner.create().run(pipeline); + EvaluationResult res = SparkRunner.create().run(pipeline); res.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 3402bb4..787691d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -81,7 +81,7 @@ public class MultiOutputWordCountTest { PCollection<Long> unique = luc.get(lowerCnts).apply( ApproximateUnique.<KV<String, Long>>globally(16)); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn()))) .containsInAnyOrder(EXPECTED_LOWER_COUNTS); Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index de3c152..5b9eeff 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; @@ -132,7 +132,7 @@ public class SerializationTest { @Test public void testRun() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); PCollection<StringHolder> inputWords = p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of())); @@ -140,7 +140,7 @@ public class SerializationTest { PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); - EvaluationResult res = SparkPipelineRunner.create().run(p); + EvaluationResult res = SparkRunner.create().run(p); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 5674900..60b7f71 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -49,7 +49,7 @@ public class SideEffectsTest implements Serializable { @Test public void test() throws Exception { SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); Pipeline pipeline = Pipeline.create(options); pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java index b593316..aca36dc 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java @@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import org.apache.beam.runners.direct.DirectRunner; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; @@ -53,13 +53,13 @@ public class TransformTranslatorTest { /** * Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline - * in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark + * in DirectRunner and on SparkRunner, with the mapped dataflow-to-spark * transforms. Finally it makes sure that the results are the same for both runs. */ @Test public void testTextIOReadAndWriteTransforms() throws IOException { String directOut = runPipeline(DirectRunner.class); - String sparkOut = runPipeline(SparkPipelineRunner.class); + String sparkOut = runPipeline(SparkRunner.class); List<String> directOutput = Files.readAllLines(Paths.get(directOut + "-00000-of-00001"), Charsets.UTF_8); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 54af5e3..043d506 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -57,7 +57,7 @@ public class WindowedWordCountTest { @Test public void testFixed() throws Exception { PipelineOptions opts = PipelineOptionsFactory.create(); - opts.setRunner(SparkPipelineRunner.class); + opts.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(opts); PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of()); @@ -78,7 +78,7 @@ public class WindowedWordCountTest { @Test public void testFixed2() throws Exception { PipelineOptions opts = PipelineOptionsFactory.create(); - opts.setRunner(SparkPipelineRunner.class); + opts.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(opts); PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS) .withCoder(StringUtf8Coder.of())); @@ -100,7 +100,7 @@ public class WindowedWordCountTest { @Test public void testSliding() throws Exception { PipelineOptions opts = PipelineOptionsFactory.create(); - opts.setRunner(SparkPipelineRunner.class); + opts.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(opts); PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS) .withCoder(StringUtf8Coder.of())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 976c7c2..160f21d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming; import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; @@ -60,7 +60,7 @@ public class FlattenStreamingTest { SparkStreamingPipelineOptions options = PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); @@ -77,7 +77,7 @@ public class FlattenStreamingTest { PAssertStreaming.assertContents(union, EXPECTED_UNION); - EvaluationResult res = SparkPipelineRunner.create(options).run(p); + EvaluationResult res = SparkRunner.create(options).run(p); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 53293fb..5578e35 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.spark.translation.streaming; import org.apache.beam.runners.spark.EvaluationResult; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.KafkaIO; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; @@ -92,7 +92,7 @@ public class KafkaStreamingTest { SparkStreamingPipelineOptions options = PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); @@ -112,7 +112,7 @@ public class KafkaStreamingTest { PAssertStreaming.assertContents(formattedKV, EXPECTED); - EvaluationResult res = SparkPipelineRunner.create(options).run(p); + EvaluationResult res = SparkRunner.create(options).run(p); res.close(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96ffc429/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 6dc9a08..75a702b 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; -import org.apache.beam.runners.spark.SparkPipelineRunner; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; @@ -56,7 +56,7 @@ public class SimpleStreamingWordCountTest implements Serializable { SparkStreamingPipelineOptions options = PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class); options.setAppName(this.getClass().getSimpleName()); - options.setRunner(SparkPipelineRunner.class); + options.setRunner(SparkRunner.class); options.setTimeout(TEST_TIMEOUT_MSEC); // run for one interval Pipeline p = Pipeline.create(options); @@ -68,7 +68,7 @@ public class SimpleStreamingWordCountTest implements Serializable { PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); PAssertStreaming.assertContents(output, EXPECTED_COUNTS); - EvaluationResult res = SparkPipelineRunner.create(options).run(p); + EvaluationResult res = SparkRunner.create(options).run(p); res.close(); } }
