Repository: incubator-beam Updated Branches: refs/heads/master cd3f61cf8 -> e43a38355
[BEAM-944] Spark runner causes an exception when creating pipeline options. Create a SparkContextOptions for context-ware options. Move UsesProvidedSparkContext property to SparkPipelineOptions so it's available from command-line as well. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/121bff46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/121bff46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/121bff46 Branch: refs/heads/master Commit: 121bff46d950e319eebf10e3a42bdd890edfb0c5 Parents: cd3f61c Author: Sela <ans...@paypal.com> Authored: Tue Nov 8 23:05:13 2016 +0200 Committer: Sela <ans...@paypal.com> Committed: Thu Nov 10 23:27:17 2016 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/SparkContextOptions.java | 64 ++++++++++++++++++++ .../runners/spark/SparkPipelineOptions.java | 36 +++-------- .../spark/translation/SparkContextFactory.java | 19 +++--- .../SparkRunnerStreamingContextFactory.java | 3 +- .../runners/spark/ProvidedSparkContextTest.java | 6 +- .../streaming/KafkaStreamingTest.java | 4 +- 6 files changed, 91 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java new file mode 100644 index 0000000..98f7492 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingListener; + + + +/** + * A custom {@link PipelineOptions} to work with properties related to {@link JavaSparkContext}. + * + * <p>This can only be used programmatically (as opposed to passing command line arguments), + * since the properties here are context-aware and should not be propagated to workers. + * + * <p>Separating this from {@link SparkPipelineOptions} is needed so the context-aware properties, + * which link to Spark dependencies, won't be scanned by {@link PipelineOptions} + * reflective instantiation. + * Note that {@link SparkContextOptions} is not registered with {@link SparkRunnerRegistrar}. + */ +public interface SparkContextOptions extends SparkPipelineOptions { + + @Description("Provided Java Spark Context") + @JsonIgnore + JavaSparkContext getProvidedSparkContext(); + void setProvidedSparkContext(JavaSparkContext jsc); + + @Description("Spark streaming listeners") + @Default.InstanceFactory(EmptyListenersList.class) + @JsonIgnore + List<JavaStreamingListener> getListeners(); + void setListeners(List<JavaStreamingListener> listeners); + + /** Returns an empty list, top avoid handling null. */ + class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> { + @Override + public List<JavaStreamingListener> create(PipelineOptions options) { + return new ArrayList<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 4eada35..5168c6c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -18,24 +18,22 @@ package org.apache.beam.runners.spark; -import com.fasterxml.jackson.annotation.JsonIgnore; -import java.util.ArrayList; -import java.util.List; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.streaming.api.java.JavaStreamingListener; + /** - * Spark runner pipeline options. + * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, + * such as the master address, batch-interval, and other user-related knobs. */ -public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, - ApplicationNameOptions { +public interface SparkPipelineOptions + extends PipelineOptions, StreamingOptions, ApplicationNameOptions { + @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).") @Default.String("local[4]") String getSparkMaster(); @@ -93,27 +91,9 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, Boolean getEnableSparkSinks(); void setEnableSparkSinks(Boolean enableSparkSinks); - @Description("If the spark runner will be initialized with a provided Spark Context") + @Description("If the spark runner will be initialized with a provided Spark Context. " + + "The Spark Context should be provided with SparkContextOptions.") @Default.Boolean(false) boolean getUsesProvidedSparkContext(); void setUsesProvidedSparkContext(boolean value); - - @Description("Provided Java Spark Context") - @JsonIgnore - JavaSparkContext getProvidedSparkContext(); - void setProvidedSparkContext(JavaSparkContext jsc); - - @Description("Spark streaming listeners") - @Default.InstanceFactory(EmptyListenersList.class) - @JsonIgnore - List<JavaStreamingListener> getListeners(); - void setListeners(List<JavaStreamingListener> listeners); - - /** Returns an empty list, top avoid handling null. */ - class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> { - @Override - public List<JavaStreamingListener> create(PipelineOptions options) { - return new ArrayList<>(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index ee2104a..c7f90b4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.spark.translation; +import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator; import org.apache.spark.SparkConf; @@ -46,11 +47,13 @@ public final class SparkContextFactory { } public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options) { + SparkContextOptions contextOptions = options.as(SparkContextOptions.class); // reuse should be ignored if the context is provided. - if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && !options.getUsesProvidedSparkContext()) { + if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) + && !contextOptions.getUsesProvidedSparkContext()) { // if the context is null or stopped for some reason, re-create it. if (sparkContext == null || sparkContext.sc().isStopped()) { - sparkContext = createSparkContext(options); + sparkContext = createSparkContext(contextOptions); sparkMaster = options.getSparkMaster(); } else if (!options.getSparkMaster().equals(sparkMaster)) { throw new IllegalArgumentException(String.format("Cannot reuse spark context " @@ -59,7 +62,7 @@ public final class SparkContextFactory { } return sparkContext; } else { - return createSparkContext(options); + return createSparkContext(contextOptions); } } @@ -69,10 +72,10 @@ public final class SparkContextFactory { } } - private static JavaSparkContext createSparkContext(SparkPipelineOptions options) { - if (options.getUsesProvidedSparkContext()) { + private static JavaSparkContext createSparkContext(SparkContextOptions contextOptions) { + if (contextOptions.getUsesProvidedSparkContext()) { LOG.info("Using a provided Spark Context"); - JavaSparkContext jsc = options.getProvidedSparkContext(); + JavaSparkContext jsc = contextOptions.getProvidedSparkContext(); if (jsc == null || jsc.sc().isStopped()){ LOG.error("The provided Spark context " + jsc + " was not created or was stopped"); throw new RuntimeException("The provided Spark context was not created or was stopped"); @@ -83,9 +86,9 @@ public final class SparkContextFactory { SparkConf conf = new SparkConf(); if (!conf.contains("spark.master")) { // set master if not set. - conf.setMaster(options.getSparkMaster()); + conf.setMaster(contextOptions.getSparkMaster()); } - conf.setAppName(options.getAppName()); + conf.setAppName(contextOptions.getAppName()); // register immutable collections serializers because the SDK uses them. conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName()); conf.set("spark.serializer", KryoSerializer.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index a670f61..f8ee8ad 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.translation.SparkContextFactory; @@ -86,7 +87,7 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF jssc.checkpoint(checkpointDir); // register listeners. - for (JavaStreamingListener listener: options.getListeners()) { + for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners()) { LOG.info("Registered listener {}." + listener.getClass().getSimpleName()); jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java index cbc5976..c225073 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java @@ -56,7 +56,7 @@ public class ProvidedSparkContextTest { public void testWithProvidedContext() throws Exception { JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context"); - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); options.setRunner(SparkRunner.class); options.setUsesProvidedSparkContext(true); options.setProvidedSparkContext(jsc); @@ -83,7 +83,7 @@ public class ProvidedSparkContextTest { public void testWithNullContext() throws Exception { JavaSparkContext jsc = null; - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); options.setRunner(SparkRunner.class); options.setUsesProvidedSparkContext(true); options.setProvidedSparkContext(jsc); @@ -114,7 +114,7 @@ public class ProvidedSparkContextTest { // Stop the provided Spark context directly jsc.stop(); - SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); + SparkContextOptions options = PipelineOptionsFactory.as(SparkContextOptions.class); options.setRunner(SparkRunner.class); options.setUsesProvidedSparkContext(true); options.setProvidedSparkContext(jsc); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index f01059f..29e4609 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Properties; +import org.apache.beam.runners.spark.SparkContextOptions; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted; @@ -121,7 +122,8 @@ public class KafkaStreamingTest { @Test public void testLatest() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + SparkContextOptions options = + commonOptions.withTmpCheckpointDir(checkpointParentDir).as(SparkContextOptions.class); //--- setup final String topic = "topic"; // messages.