Repository: incubator-beam
Updated Branches:
  refs/heads/master cd3f61cf8 -> e43a38355


[BEAM-944] Spark runner causes an exception when creating pipeline options.
Create a SparkContextOptions for context-ware options.

Move UsesProvidedSparkContext property to SparkPipelineOptions so it's 
available from command-line
as well.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/121bff46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/121bff46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/121bff46

Branch: refs/heads/master
Commit: 121bff46d950e319eebf10e3a42bdd890edfb0c5
Parents: cd3f61c
Author: Sela <ans...@paypal.com>
Authored: Tue Nov 8 23:05:13 2016 +0200
Committer: Sela <ans...@paypal.com>
Committed: Thu Nov 10 23:27:17 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/SparkContextOptions.java | 64 ++++++++++++++++++++
 .../runners/spark/SparkPipelineOptions.java     | 36 +++--------
 .../spark/translation/SparkContextFactory.java  | 19 +++---
 .../SparkRunnerStreamingContextFactory.java     |  3 +-
 .../runners/spark/ProvidedSparkContextTest.java |  6 +-
 .../streaming/KafkaStreamingTest.java           |  4 +-
 6 files changed, 91 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
new file mode 100644
index 0000000..98f7492
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
+
+
+/**
+ * A custom {@link PipelineOptions} to work with properties related to {@link 
JavaSparkContext}.
+ *
+ * <p>This can only be used programmatically (as opposed to passing command 
line arguments),
+ * since the properties here are context-aware and should not be propagated to 
workers.
+ *
+ * <p>Separating this from {@link SparkPipelineOptions} is needed so the 
context-aware properties,
+ * which link to Spark dependencies, won't be scanned by {@link 
PipelineOptions}
+ * reflective instantiation.
+ * Note that {@link SparkContextOptions} is not registered with {@link 
SparkRunnerRegistrar}.
+ */
+public interface SparkContextOptions extends SparkPipelineOptions {
+
+  @Description("Provided Java Spark Context")
+  @JsonIgnore
+  JavaSparkContext getProvidedSparkContext();
+  void setProvidedSparkContext(JavaSparkContext jsc);
+
+  @Description("Spark streaming listeners")
+  @Default.InstanceFactory(EmptyListenersList.class)
+  @JsonIgnore
+  List<JavaStreamingListener> getListeners();
+  void setListeners(List<JavaStreamingListener> listeners);
+
+  /** Returns an empty list, top avoid handling null. */
+  class EmptyListenersList implements 
DefaultValueFactory<List<JavaStreamingListener>> {
+    @Override
+    public List<JavaStreamingListener> create(PipelineOptions options) {
+      return new ArrayList<>();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4eada35..5168c6c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,24 +18,22 @@
 
 package org.apache.beam.runners.spark;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
 
 
 /**
- * Spark runner pipeline options.
+ * Spark runner {@link PipelineOptions} handles Spark execution-related 
configurations,
+ * such as the master address, batch-interval, and other user-related knobs.
  */
-public interface SparkPipelineOptions extends PipelineOptions, 
StreamingOptions,
-                                              ApplicationNameOptions {
+public interface SparkPipelineOptions
+    extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
+
   @Description("The url of the spark master to connect to, (e.g. 
spark://host:port, local[4]).")
   @Default.String("local[4]")
   String getSparkMaster();
@@ -93,27 +91,9 @@ public interface SparkPipelineOptions extends 
PipelineOptions, StreamingOptions,
   Boolean getEnableSparkSinks();
   void setEnableSparkSinks(Boolean enableSparkSinks);
 
-  @Description("If the spark runner will be initialized with a provided Spark 
Context")
+  @Description("If the spark runner will be initialized with a provided Spark 
Context. "
+      + "The Spark Context should be provided with SparkContextOptions.")
   @Default.Boolean(false)
   boolean getUsesProvidedSparkContext();
   void setUsesProvidedSparkContext(boolean value);
-
-  @Description("Provided Java Spark Context")
-  @JsonIgnore
-  JavaSparkContext getProvidedSparkContext();
-  void setProvidedSparkContext(JavaSparkContext jsc);
-
-  @Description("Spark streaming listeners")
-  @Default.InstanceFactory(EmptyListenersList.class)
-  @JsonIgnore
-  List<JavaStreamingListener> getListeners();
-  void setListeners(List<JavaStreamingListener> listeners);
-
-  /** Returns an empty list, top avoid handling null. */
-  class EmptyListenersList implements 
DefaultValueFactory<List<JavaStreamingListener>> {
-    @Override
-    public List<JavaStreamingListener> create(PipelineOptions options) {
-      return new ArrayList<>();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index ee2104a..c7f90b4 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.spark.translation;
 
+import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator;
 import org.apache.spark.SparkConf;
@@ -46,11 +47,13 @@ public final class SparkContextFactory {
   }
 
   public static synchronized JavaSparkContext 
getSparkContext(SparkPipelineOptions options) {
+    SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
     // reuse should be ignored if the context is provided.
-    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT) && 
!options.getUsesProvidedSparkContext()) {
+    if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)
+        && !contextOptions.getUsesProvidedSparkContext()) {
       // if the context is null or stopped for some reason, re-create it.
       if (sparkContext == null || sparkContext.sc().isStopped()) {
-        sparkContext = createSparkContext(options);
+        sparkContext = createSparkContext(contextOptions);
         sparkMaster = options.getSparkMaster();
       } else if (!options.getSparkMaster().equals(sparkMaster)) {
         throw new IllegalArgumentException(String.format("Cannot reuse spark 
context "
@@ -59,7 +62,7 @@ public final class SparkContextFactory {
       }
       return sparkContext;
     } else {
-      return createSparkContext(options);
+      return createSparkContext(contextOptions);
     }
   }
 
@@ -69,10 +72,10 @@ public final class SparkContextFactory {
     }
   }
 
-  private static JavaSparkContext createSparkContext(SparkPipelineOptions 
options) {
-    if (options.getUsesProvidedSparkContext()) {
+  private static JavaSparkContext createSparkContext(SparkContextOptions 
contextOptions) {
+    if (contextOptions.getUsesProvidedSparkContext()) {
       LOG.info("Using a provided Spark Context");
-      JavaSparkContext jsc = options.getProvidedSparkContext();
+      JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
       if (jsc == null || jsc.sc().isStopped()){
         LOG.error("The provided Spark context " + jsc + " was not created or 
was stopped");
         throw new RuntimeException("The provided Spark context was not created 
or was stopped");
@@ -83,9 +86,9 @@ public final class SparkContextFactory {
       SparkConf conf = new SparkConf();
       if (!conf.contains("spark.master")) {
         // set master if not set.
-        conf.setMaster(options.getSparkMaster());
+        conf.setMaster(contextOptions.getSparkMaster());
       }
-      conf.setAppName(options.getAppName());
+      conf.setAppName(contextOptions.getAppName());
       // register immutable collections serializers because the SDK uses them.
       conf.set("spark.kryo.registrator", 
BeamSparkRunnerRegistrator.class.getName());
       conf.set("spark.serializer", KryoSerializer.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index a670f61..f8ee8ad 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
@@ -86,7 +87,7 @@ public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextF
     jssc.checkpoint(checkpointDir);
 
     // register listeners.
-    for (JavaStreamingListener listener: options.getListeners()) {
+    for (JavaStreamingListener listener: 
options.as(SparkContextOptions.class).getListeners()) {
       LOG.info("Registered listener {}." + 
listener.getClass().getSimpleName());
       jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index cbc5976..c225073 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -56,7 +56,7 @@ public class ProvidedSparkContextTest {
     public void testWithProvidedContext() throws Exception {
         JavaSparkContext jsc = new JavaSparkContext("local[*]", 
"Existing_Context");
 
-        SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
         options.setRunner(SparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);
@@ -83,7 +83,7 @@ public class ProvidedSparkContextTest {
     public void testWithNullContext() throws Exception {
         JavaSparkContext jsc = null;
 
-        SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
         options.setRunner(SparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);
@@ -114,7 +114,7 @@ public class ProvidedSparkContextTest {
         // Stop the provided Spark context directly
         jsc.stop();
 
-        SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+        SparkContextOptions options = 
PipelineOptionsFactory.as(SparkContextOptions.class);
         options.setRunner(SparkRunner.class);
         options.setUsesProvidedSparkContext(true);
         options.setProvidedSparkContext(jsc);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/121bff46/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index f01059f..29e4609 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
@@ -121,7 +122,8 @@ public class KafkaStreamingTest {
 
   @Test
   public void testLatest() throws Exception {
-    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    SparkContextOptions options =
+        
commonOptions.withTmpCheckpointDir(checkpointParentDir).as(SparkContextOptions.class);
     //--- setup
     final String topic = "topic";
     // messages.

Reply via email to