This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4b67847 [BEAM-7110] Add Spark master option to SparkJobServerDriver
new 81faf35 Merge pull request #8379 from ibzib/spark-master2
4b67847 is described below
commit 4b67847193da92cc8e59d37b0539bfb2bc6ab37f
Author: Kyle Weaver <[email protected]>
AuthorDate: Thu Apr 18 12:54:03 2019 -0700
[BEAM-7110] Add Spark master option to SparkJobServerDriver
---
runners/spark/job-server/build.gradle | 2 ++
.../apache/beam/runners/spark/SparkJobInvoker.java | 14 ++++++++++---
.../beam/runners/spark/SparkJobServerDriver.java | 23 +++++++++++++++++-----
.../beam/runners/spark/SparkPipelineOptions.java | 4 +++-
4 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/runners/spark/job-server/build.gradle
b/runners/spark/job-server/build.gradle
index 2ce34fe..7ebde87 100644
--- a/runners/spark/job-server/build.gradle
+++ b/runners/spark/job-server/build.gradle
@@ -70,6 +70,8 @@ runShadow {
args += ["--artifacts-dir=${project.property('artifactsDir')}"]
if (project.hasProperty('cleanArtifactsPerJob'))
args +=
["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"]
+ if (project.hasProperty('sparkMasterUrl'))
+ args += ["--spark-master-url=${project.property('sparkMasterUrl')}"]
// Enable remote debugging.
jvmArgs = ["-Xdebug",
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
index e47c851..da35ae2 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
@@ -34,12 +34,16 @@ public class SparkJobInvoker extends JobInvoker {
private static final Logger LOG =
LoggerFactory.getLogger(SparkJobInvoker.class);
- public static SparkJobInvoker create() {
- return new SparkJobInvoker();
+ private SparkJobServerDriver.SparkServerConfiguration configuration;
+
+ public static SparkJobInvoker create(
+ SparkJobServerDriver.SparkServerConfiguration configuration) {
+ return new SparkJobInvoker(configuration);
}
- private SparkJobInvoker() {
+ private SparkJobInvoker(SparkJobServerDriver.SparkServerConfiguration
configuration) {
super("spark-runner-job-invoker");
+ this.configuration = configuration;
}
@Override
@@ -56,6 +60,10 @@ public class SparkJobInvoker extends JobInvoker {
String.format("%s_%s", sparkOptions.getJobName(),
UUID.randomUUID().toString());
LOG.info("Invoking job {}", invocationId);
+ if
(sparkOptions.getSparkMaster().equals(SparkPipelineOptions.DEFAULT_MASTER_URL))
{
+ sparkOptions.setSparkMaster(configuration.getSparkMasterUrl());
+ }
+
// Options can't be translated to proto if runner class is unresolvable,
so set it to null.
sparkOptions.setRunner(null);
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
index 387907f..0589045 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,11 +33,23 @@ public class SparkJobServerDriver extends JobServerDriver {
@Override
protected JobInvoker createJobInvoker() {
- return SparkJobInvoker.create();
+ return SparkJobInvoker.create((SparkServerConfiguration) configuration);
}
private static final Logger LOG =
LoggerFactory.getLogger(SparkJobServerDriver.class);
+ /** Spark runner-specific Configuration for the jobServer. */
+ public static class SparkServerConfiguration extends ServerConfiguration {
+ @Option(
+ name = "--spark-master-url",
+ usage = "Spark master url to submit job (e.g. spark://host:port,
local[4])")
+ private String sparkMasterUrl = SparkPipelineOptions.DEFAULT_MASTER_URL;
+
+ String getSparkMasterUrl() {
+ return this.sparkMasterUrl;
+ }
+ }
+
public static void main(String[] args) {
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
fromParams(args).run();
@@ -50,7 +63,7 @@ public class SparkJobServerDriver extends JobServerDriver {
}
private static SparkJobServerDriver fromParams(String[] args) {
- ServerConfiguration configuration = new ServerConfiguration();
+ SparkServerConfiguration configuration = new SparkServerConfiguration();
CmdLineParser parser = new CmdLineParser(configuration);
try {
parser.parseArgument(args);
@@ -63,7 +76,7 @@ public class SparkJobServerDriver extends JobServerDriver {
return fromConfig(configuration);
}
- private static SparkJobServerDriver fromConfig(ServerConfiguration
configuration) {
+ private static SparkJobServerDriver fromConfig(SparkServerConfiguration
configuration) {
return create(
configuration,
createJobServerFactory(configuration),
@@ -71,14 +84,14 @@ public class SparkJobServerDriver extends JobServerDriver {
}
private static SparkJobServerDriver create(
- ServerConfiguration configuration,
+ SparkServerConfiguration configuration,
ServerFactory jobServerFactory,
ServerFactory artifactServerFactory) {
return new SparkJobServerDriver(configuration, jobServerFactory,
artifactServerFactory);
}
private SparkJobServerDriver(
- ServerConfiguration configuration,
+ SparkServerConfiguration configuration,
ServerFactory jobServerFactory,
ServerFactory artifactServerFactory) {
super(configuration, jobServerFactory, artifactServerFactory);
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 6935b54..4bf51e8 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -33,8 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions;
public interface SparkPipelineOptions
extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
+ String DEFAULT_MASTER_URL = "local[4]";
+
@Description("The url of the spark master to connect to, (e.g.
spark://host:port, local[4]).")
- @Default.String("local[4]")
+ @Default.String(DEFAULT_MASTER_URL)
String getSparkMaster();
void setSparkMaster(String master);