This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 6e2a4db [NEMO-294] Beam-Runner (#163)
6e2a4db is described below
commit 6e2a4db7b49f0789a90d2d4aa9fcc8246d781245
Author: Won Wook SONG <[email protected]>
AuthorDate: Mon Dec 3 15:07:25 2018 +0900
[NEMO-294] Beam-Runner (#163)
JIRA: [NEMO-294: Support Nemo Runner execution by providing PipelineOptions
to the Beam
program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-294)
**Major changes:**
- Update launch scripts to launch shaded jars with the current project
version, instead of 0.1-SNAPSHOT
- Refactor `JobLauncher` so that jobs can be launched from the
JobLauncher's main class as well as from a particular Beam application. This
also changes so that the job ID can be received from the application itself.
- Use Google's AutoService from Beam to register NemoRunner as one of its
runners with the `NemoRunnerRegistrar`.
- Rename `NemoPipelineRunner` into `NemoRunner` to follow conventions.
- Instead of having a default `executor_json` file, put in a default JSON
string instead, so that it doesn't require a JSON file by default.
- Add a MinimalWordCount examples from the beam homepage (from the
quickstart article)
- Now, we can create a QuickStart with the MinimalWordCount example on the
Beam site by adding a simple profile on the maven `pom.xml` on the
MinimalWordCount example.
**Minor changes to note:**
- Fix minor typos (e.g., confg --> config)
- Fix minor indentation inconsistencies
**Tests for the changes:**
- Existing tests confirm that these changes do not break the code
**Other comments:**
- N/A
---
bin/run_beam.sh | 4 +-
bin/run_spark.sh | 4 +-
.../java/org/apache/nemo/client/JobLauncher.java | 163 +++++++++++++--------
compiler/frontend/beam/pom.xml | 13 +-
.../compiler/frontend/beam/NemoPipelineResult.java | 5 +-
.../{NemoPipelineRunner.java => NemoRunner.java} | 32 +++-
.../frontend/beam/NemoRunnerRegistrar.java | 62 ++++++++
.../frontend/spark/core/SparkFrontendUtils.java | 2 +-
.../compiler/frontend/spark/core/rdd/RDD.scala | 2 +-
.../org/apache/nemo/compiler/CompilerTestUtil.java | 3 +-
.../main/java/org/apache/nemo/conf/JobConf.java | 2 +-
.../nemo/examples/beam/AlternatingLeastSquare.java | 4 +-
.../beam/AlternatingLeastSquareInefficient.java | 4 +-
.../org/apache/nemo/examples/beam/Broadcast.java | 4 +-
.../nemo/examples/beam/MinimalWordCount.java | 87 +++++++++++
.../beam/MultinomialLogisticRegression.java | 4 +-
.../nemo/examples/beam/NetworkTraceAnalysis.java | 4 +-
.../nemo/examples/beam/PartitionWordsByLength.java | 4 +-
.../apache/nemo/examples/beam/PerKeyMedian.java | 4 +-
.../nemo/examples/beam/PerPercentileAverage.java | 4 +-
.../apache/nemo/examples/beam/SimpleSumSQL.java | 4 +-
.../nemo/examples/beam/WindowedBroadcast.java | 4 +-
.../nemo/examples/beam/WindowedWordCount.java | 4 +-
.../org/apache/nemo/examples/beam/WordCount.java | 4 +-
pom.xml | 1 +
25 files changed, 327 insertions(+), 101 deletions(-)
diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index 41c1ef5..cbd082c 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,4 +17,6 @@
# specific language governing permissions and limitations
# under the License.
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp
examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp
examples/beam/target/nemo-examples-beam-$(mvn -q \
+ -Dexec.executable=echo -Dexec.args='${project.version}' \
+ --non-recursive exec:exec)-shaded.jar:`yarn classpath`
org.apache.nemo.client.JobLauncher "$@"
diff --git a/bin/run_spark.sh b/bin/run_spark.sh
index 057b017..314fd0d 100755
--- a/bin/run_spark.sh
+++ b/bin/run_spark.sh
@@ -17,4 +17,6 @@
# specific language governing permissions and limitations
# under the License.
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp
examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp
examples/spark/target/nemo-examples-spark-$(mvn -q \
+ -Dexec.executable=echo -Dexec.args='${project.version}' \
+ --non-recursive exec:exec)-shaded.jar:`yarn classpath`
org.apache.nemo.client.JobLauncher "$@"
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 035d719..f95bc73 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -101,6 +101,23 @@ public final class JobLauncher {
* @throws Exception exception on the way.
*/
public static void main(final String[] args) throws Exception {
+ try {
+ setup(args);
+ // Launch client main. The shutdown() method is called inside the
launchDAG() method.
+ runUserProgramMain(builtJobConf);
+ } catch (final InjectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Set up the driver, etc. before the actual execution.
+ * @param args arguments.
+ * @throws InjectionException injection exception from REEF.
+ * @throws ClassNotFoundException class not found exception.
+ * @throws IOException IO exception.
+ */
+ public static void setup(final String[] args) throws InjectionException,
ClassNotFoundException, IOException {
// Get Job and Driver Confs
builtJobConf = getJobConf(args);
@@ -108,77 +125,76 @@ public final class JobLauncher {
LOG.info("Launching RPC Server");
driverRPCServer = new DriverRPCServer();
driverRPCServer
-
.registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event
-> {
- })
- .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady,
event -> driverReadyLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event
-> jobDoneLatch.countDown())
-
.registerHandler(ControlMessage.DriverToClientMessageType.DataCollected,
message -> COLLECTED_DATA.addAll(
-
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
- .run();
+ .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted,
event -> {
+ })
+ .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady,
event -> driverReadyLatch.countDown())
+ .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone,
event -> jobDoneLatch.countDown())
+ .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected,
message -> COLLECTED_DATA.addAll(
+
SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
+ .run();
final Configuration driverConf = getDriverConf(builtJobConf);
final Configuration driverNcsConf = getDriverNcsConf();
- final Configuration driverMessageConfg = getDriverMessageConf();
+ final Configuration driverMessageConfig = getDriverMessageConf();
+ final String defaultExecutorResourceConfig =
"[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+ + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
final Configuration executorResourceConfig = getJSONConf(builtJobConf,
JobConf.ExecutorJSONPath.class,
- JobConf.ExecutorJSONContents.class);
+ JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
final Configuration bandwidthConfig = getJSONConf(builtJobConf,
JobConf.BandwidthJSONPath.class,
- JobConf.BandwidthJSONContents.class);
+ JobConf.BandwidthJSONContents.class, "");
final Configuration clientConf = getClientConf();
final Configuration schedulerConf = getSchedulerConf(builtJobConf);
// Merge Job and Driver Confs
- jobAndDriverConf = Configurations.merge(builtJobConf, driverConf,
driverNcsConf, driverMessageConfg,
- executorResourceConfig, bandwidthConfig,
driverRPCServer.getListeningConfiguration(), schedulerConf);
+ jobAndDriverConf = Configurations.merge(builtJobConf, driverConf,
driverNcsConf, driverMessageConfig,
+ executorResourceConfig, bandwidthConfig,
driverRPCServer.getListeningConfiguration(), schedulerConf);
// Get DeployMode Conf
deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf),
clientConf);
// Start Driver and launch user program.
- try {
- if (jobAndDriverConf == null || deployModeConf == null || builtJobConf
== null) {
- throw new RuntimeException("Configuration for launching driver is not
ready");
- }
-
-
- // Launch driver
- LOG.info("Launching driver");
- driverReadyLatch = new CountDownLatch(1);
- driverLauncher = DriverLauncher.getLauncher(deployModeConf);
- driverLauncher.submit(jobAndDriverConf, 500);
- // When the driver is up and the resource is ready, the DriverReady
message is delivered.
+ if (jobAndDriverConf == null || deployModeConf == null || builtJobConf ==
null) {
+ throw new RuntimeException("Configuration for launching driver is not
ready");
+ }
- // Launch client main
- runUserProgramMain(builtJobConf);
+ // Launch driver
+ LOG.info("Launching driver");
+ driverReadyLatch = new CountDownLatch(1);
+ driverLauncher = DriverLauncher.getLauncher(deployModeConf);
+ driverLauncher.submit(jobAndDriverConf, 500);
+ // When the driver is up and the resource is ready, the DriverReady
message is delivered.
+ }
- // Trigger driver shutdown afterwards
- driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
-
.setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
- // Wait for driver to naturally finish
- synchronized (driverLauncher) {
- while (!driverLauncher.getStatus().isDone()) {
- try {
- LOG.info("Wait for the driver to finish");
- driverLauncher.wait();
- } catch (final InterruptedException e) {
- LOG.warn("Interrupted: " + e);
- // clean up state...
- Thread.currentThread().interrupt();
- }
+ /**
+ * Clean up everything.
+ */
+ public static void shutdown() {
+ // Trigger driver shutdown afterwards
+ driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
+
.setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
+ // Wait for driver to naturally finish
+ synchronized (driverLauncher) {
+ while (!driverLauncher.getStatus().isDone()) {
+ try {
+ LOG.info("Wait for the driver to finish");
+ driverLauncher.wait();
+ } catch (final InterruptedException e) {
+ LOG.warn("Interrupted: " + e);
+ // clean up state...
+ Thread.currentThread().interrupt();
}
- LOG.info("Driver terminated");
- }
- } catch (final InjectionException e) {
- throw new RuntimeException(e);
- } finally {
- // Close everything that's left
- driverRPCServer.shutdown();
- driverLauncher.close();
- final Optional<Throwable> possibleError =
driverLauncher.getStatus().getError();
- if (possibleError.isPresent()) {
- throw new RuntimeException(possibleError.get());
- } else {
- LOG.info("Job successfully completed");
}
+ LOG.info("Driver terminated");
+ }
+
+ // Close everything that's left
+ driverRPCServer.shutdown();
+ driverLauncher.close();
+ final Optional<Throwable> possibleError =
driverLauncher.getStatus().getError();
+ if (possibleError.isPresent()) {
+ throw new RuntimeException(possibleError.get());
+ } else {
+ LOG.info("Job successfully completed");
}
}
@@ -191,14 +207,32 @@ public final class JobLauncher {
*/
// When modifying the signature of this method, see
CompilerTestUtil#compileDAG and make corresponding changes
public static void launchDAG(final DAG dag) {
- launchDAG(dag, Collections.emptyMap());
+ launchDAG(dag, Collections.emptyMap(), "");
+ }
+
+ /**
+ * @param dag the application DAG.
+ * @param jobId job ID.
+ */
+ public static void launchDAG(final DAG dag, final String jobId) {
+ launchDAG(dag, Collections.emptyMap(), jobId);
}
/**
* @param dag the application DAG.
* @param broadcastVariables broadcast variables (can be empty).
+ * @param jobId job ID.
*/
- public static void launchDAG(final DAG dag, final Map<Serializable, Object>
broadcastVariables) {
+ public static void launchDAG(final DAG dag, final Map<Serializable, Object>
broadcastVariables, final String jobId) {
+ // launch driver if it hasn't been already
+ if (driverReadyLatch == null) {
+ try {
+ setup(new String[]{"-job_id", jobId});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
// Wait until the driver is ready.
try {
LOG.info("Waiting for the driver to be ready");
@@ -229,8 +263,11 @@ public final class JobLauncher {
// clean up state...
Thread.currentThread().interrupt();
throw new RuntimeException(e);
+ } finally {
+ LOG.info("DAG execution done");
+ // trigger shutdown.
+ shutdown();
}
- LOG.info("DAG execution done");
}
/**
@@ -267,6 +304,13 @@ public final class JobLauncher {
return jcb.build();
}
+ /**
+ * Fetch scheduler configuration.
+ * @param jobConf job configuration.
+ * @return the scheduler configuration.
+ * @throws ClassNotFoundException exception while finding the class.
+ * @throws InjectionException exception while injection (REEF Tang).
+ */
private static Configuration getSchedulerConf(final Configuration jobConf)
throws ClassNotFoundException, InjectionException {
final Injector injector = TANG.newInjector(jobConf);
@@ -399,13 +443,14 @@ public final class JobLauncher {
*/
private static Configuration getJSONConf(final Configuration jobConf,
final Class<? extends Name<String>>
pathParameter,
- final Class<? extends Name<String>>
contentsParameter)
+ final Class<? extends Name<String>>
contentsParameter,
+ final String defaultContent)
throws InjectionException {
final Injector injector = TANG.newInjector(jobConf);
try {
final String path = injector.getNamedInstance(pathParameter);
- final String contents = path.isEmpty() ? ""
- : new String(Files.readAllBytes(Paths.get(path)),
StandardCharsets.UTF_8);
+ final String contents = path.isEmpty() ? defaultContent
+ : new String(Files.readAllBytes(Paths.get(path)),
StandardCharsets.UTF_8);
return TANG.newConfigurationBuilder()
.bindNamedParameter(contentsParameter, contents)
.build();
diff --git a/compiler/frontend/beam/pom.xml b/compiler/frontend/beam/pom.xml
index 01000f7..02fab3a 100644
--- a/compiler/frontend/beam/pom.xml
+++ b/compiler/frontend/beam/pom.xml
@@ -29,9 +29,10 @@ under the License.
<artifactId>nemo-compiler-frontend-beam</artifactId>
<name>Nemo Compiler Frontend: Beam</name>
+ <packaging>jar</packaging>
<dependencies>
- <dependency>
+ <dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-common</artifactId>
<version>${project.version}</version>
@@ -46,11 +47,11 @@ under the License.
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>${beam.version}</version>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
@@ -62,5 +63,11 @@ under the License.
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.auto.service</groupId>
+ <artifactId>auto-service</artifactId>
+ <version>${auto-service.version}</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</project>
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
index 57f1634..1cfbf7a 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -57,10 +57,7 @@ public final class NemoPipelineResult extends ClientEndpoint
implements Pipeline
@Override
public State waitUntilFinish() {
- throw new UnsupportedOperationException();
- // TODO #208: NemoPipelineResult#waitUntilFinish hangs
- // Previous code that hangs the job:
- // return (State) super.waitUntilJobFinish();
+ return waitUntilFinish(Duration.ZERO);
}
@Override
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
similarity index 67%
rename from
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
rename to
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
index d011d11..9128213 100644
---
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
@@ -18,6 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.beam;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.nemo.client.JobLauncher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
@@ -27,25 +28,46 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
/**
* Runner class for BEAM programs.
*/
-public final class NemoPipelineRunner extends
PipelineRunner<NemoPipelineResult> {
+public final class NemoRunner extends PipelineRunner<NemoPipelineResult> {
private final NemoPipelineOptions nemoPipelineOptions;
/**
* BEAM Pipeline Runner.
* @param nemoPipelineOptions PipelineOptions.
*/
- private NemoPipelineRunner(final NemoPipelineOptions nemoPipelineOptions) {
+ private NemoRunner(final NemoPipelineOptions nemoPipelineOptions) {
this.nemoPipelineOptions = nemoPipelineOptions;
}
/**
+ * Creates and returns a new NemoRunner with default options.
+ *
+ * @return A pipeline runner with default options.
+ */
+ public static NemoRunner create() {
+ NemoPipelineOptions options =
PipelineOptionsFactory.as(NemoPipelineOptions.class);
+ options.setRunner(NemoRunner.class);
+ return new NemoRunner(options);
+ }
+
+ /**
+ * Creates and returns a new NemoRunner with specified options.
+ *
+ * @param options The NemoPipelineOptions to use when executing the job.
+ * @return A pipeline runner that will execute with specified options.
+ */
+ public static NemoRunner create(final NemoPipelineOptions options) {
+ return new NemoRunner(options);
+ }
+
+ /**
* Static initializer for creating PipelineRunner with the given options.
* @param options given PipelineOptions.
* @return The created PipelineRunner.
*/
- public static PipelineRunner<NemoPipelineResult> fromOptions(final
PipelineOptions options) {
+ public static NemoRunner fromOptions(final PipelineOptions options) {
final NemoPipelineOptions nemoOptions =
PipelineOptionsValidator.validate(NemoPipelineOptions.class, options);
- return new NemoPipelineRunner(nemoOptions);
+ return new NemoRunner(nemoOptions);
}
/**
@@ -57,7 +79,7 @@ public final class NemoPipelineRunner extends
PipelineRunner<NemoPipelineResult>
final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline,
nemoPipelineOptions);
pipeline.traverseTopologically(pipelineVisitor);
final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
- JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline());
+ JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(),
nemoPipelineOptions.getJobName());
return nemoPipelineResult;
}
}
diff --git
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
new file mode 100644
index 0000000..aa05519
--- /dev/null
+++
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link
PipelineOptionsRegistrar} for the {@link NemoRunner}.
+ *
+ * {@link AutoService} will register Nemo's implementations of the {@link
PipelineRunner} and {@link PipelineOptions}
+ * as available pipeline runner services.
+ */
+public final class NemoRunnerRegistrar {
+ /**
+ * Private constructor.
+ */
+ private NemoRunnerRegistrar() {
+ }
+
+ /**
+ * Registers the {@link NemoRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static final class Runner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.of(NemoRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link NemoPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static final class Options implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.of(NemoPipelineOptions.class);
+ }
+ }
+}
diff --git
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 3cb7555..0a5c0e5 100644
---
a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -105,7 +105,7 @@ public final class SparkFrontendUtils {
builder.connectVertices(newEdge);
// launch DAG
- JobLauncher.launchDAG(builder.build(), SparkBroadcastVariables.getAll());
+ JobLauncher.launchDAG(builder.build(), SparkBroadcastVariables.getAll(),
"");
return (List<T>) JobLauncher.getCollectedData();
}
diff --git
a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 3cf516a..234e867 100644
---
a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++
b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -230,7 +230,7 @@ final class RDD[T: ClassTag] protected[rdd] (
newEdge.setProperty(keyExtractorProperty)
builder.connectVertices(newEdge)
- JobLauncher.launchDAG(builder.build, SparkBroadcastVariables.getAll)
+ JobLauncher.launchDAG(builder.build, SparkBroadcastVariables.getAll, "")
}
/////////////// CACHING ///////////////
diff --git
a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
index 2a4d359..d948138 100644
--- a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
+++ b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
@@ -86,8 +86,9 @@ public final class CompilerTestUtil {
final Method userMainMethod = userMainClass.getMethod("main",
String[].class);
final ArgumentCaptor<DAG> captor = ArgumentCaptor.forClass(DAG.class);
+ final ArgumentCaptor<String> stringArg =
ArgumentCaptor.forClass(String.class);
PowerMockito.mockStatic(JobLauncher.class);
- PowerMockito.doNothing().when(JobLauncher.class, "launchDAG",
captor.capture());
+ PowerMockito.doNothing().when(JobLauncher.class, "launchDAG",
captor.capture(), stringArg.capture());
userMainMethod.invoke(null, (Object) userMainMethodArgs);
return captor.getValue();
}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 7de0e18..1bdb7c9 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -165,7 +165,7 @@ public final class JobConf extends
ConfigurationModuleBuilder {
* Path to the JSON file that specifies resource layout.
*/
@NamedParameter(doc = "Path to the JSON file that specifies resources for
executors", short_name = "executor_json",
- default_value = "examples/resources/test_executor_resources.json")
+ default_value = "")
public final class ExecutorJSONPath implements Name<String> {
}
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 76bdc03..f3ce915 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -20,7 +20,7 @@ package org.apache.nemo.examples.beam;
import com.github.fommil.netlib.BLAS;
import com.github.fommil.netlib.LAPACK;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderProviders;
@@ -371,7 +371,7 @@ public final class AlternatingLeastSquare {
}
final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("ALS");
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index ab1760f..5af8dd1 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -116,7 +116,7 @@ public final class AlternatingLeastSquareInefficient {
}
final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("ALS");
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index da609e4..5ded58b 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -18,7 +18,7 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,7 +49,7 @@ public final class Broadcast {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
final Pipeline p = Pipeline.create(options);
final PCollection<String> elemCollection = GenericSourceSink.read(p,
inputFilePath);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
new file mode 100644
index 0000000..5549d10
--- /dev/null
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
+
+import java.util.Arrays;
+/**
+ * MinimalWordCount program from BEAM.
+ */
+public final class MinimalWordCount {
+ /**
+ * Private Constructor.
+ */
+ private MinimalWordCount() {
+ }
+ /**
+ * Main function for the MinimalWordCount Beam program.
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String inputFilePath = args[0];
+ final String outputFilePath = args[1];
+ final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoRunner.class);
+ options.setJobName("MinimalWordCount");
+ // Create the Pipeline object with the options we defined above
+ final Pipeline p = Pipeline.create(options);
+ // Concept #1: Apply a root transform to the pipeline; in this case,
TextIO.Read to read a set
+ // of input text files. TextIO.Read returns a PCollection where each
element is one line from
+ // the input text (a set of Shakespeare's texts).
+ // This example reads a public data set consisting of the complete works
of Shakespeare.
+ p.apply(TextIO.read().from(inputFilePath))
+ // Concept #2: Apply a FlatMapElements transform the PCollection of text
lines.
+ // This transform splits the lines in PCollection<String>, where each
element is an
+ // individual word in Shakespeare's collected texts.
+ .apply(
+ FlatMapElements.into(TypeDescriptors.strings())
+ .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
+ // We use a Filter transform to avoid empty word
+ .apply(Filter.by((String word) -> !word.isEmpty()))
+ // Concept #3: Apply the Count transform to our PCollection of
individual words. The Count
+ // transform returns a new PCollection of key/value pairs, where each
key represents a
+ // unique word in the text. The associated value is the occurrence count
for that word.
+ .apply(Count.perElement())
+ // Apply a MapElements transform that formats our PCollection of word
counts into a
+ // printable string, suitable for writing to an output file.
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(
+ (KV<String, Long> wordCount) ->
+ wordCount.getKey() + ": " + wordCount.getValue()))
+ // Concept #4: Apply a write transform, TextIO.Write, at the end of the
pipeline.
+ // TextIO.Write writes the contents of a PCollection (in this case, our
PCollection of
+ // formatted strings) to a series of text files.
+ //
+ // By default, it will write to a set of files with names like
wordcounts-00001-of-00005
+ .apply(TextIO.write().to(outputFilePath));
+ p.run();
+ }
+}
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 921b862..2b37eba 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.nemo.common.Pair;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -420,7 +420,7 @@ public final class MultinomialLogisticRegression {
}
final PipelineOptions options = PipelineOptionsFactory.create();
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("MLR");
options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index e0567c6..a9bbc43 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -59,7 +59,7 @@ public final class NetworkTraceAnalysis {
final String input1FilePath = args[1];
final String outputFilePath = args[2];
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("NetworkTraceAnalysis");
// Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds
"192.168.3.1", "192.168.0.2" and "29"
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index b446858..816be62 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -47,7 +47,7 @@ public final class PartitionWordsByLength {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("PartitionWordsByLength");
// {} here is required for preserving type information.
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index ba2a94e..6255a93 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -53,7 +53,7 @@ public final class PerKeyMedian {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("PerKeyMedian");
final Pipeline p = Pipeline.create(options);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index 39b941a..486442b 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -20,7 +20,7 @@ package org.apache.nemo.examples.beam;
import com.google.common.collect.Lists;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -52,7 +52,7 @@ public final class PerPercentileAverage {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("PerPercentileAverage");
final Pipeline p = Pipeline.create(options);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index b476460..6fa8f5e 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.*;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import java.util.List;
import java.util.stream.Collectors;
@@ -50,7 +50,7 @@ public final class SimpleSumSQL {
final String outputFilePath = args[0];
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("SimpleSumSQL");
final Pipeline p = Pipeline.create(options);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index 30ee405..5bff5d8 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -62,7 +62,7 @@ public final class WindowedBroadcast {
.every(Duration.standardSeconds(1)));
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("WindowedBroadcast");
final Pipeline p = Pipeline.create(options);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 0f13dc4..a814165 100644
---
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -106,7 +106,7 @@ public final class WindowedWordCount {
}
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("WindowedWordCount");
final Pipeline p = Pipeline.create(options);
diff --git
a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index 9af9d7c..ba3cb80 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,7 +45,7 @@ public final class WordCount {
final String inputFilePath = args[0];
final String outputFilePath = args[1];
final PipelineOptions options =
PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
- options.setRunner(NemoPipelineRunner.class);
+ options.setRunner(NemoRunner.class);
options.setJobName("WordCount");
final Pipeline p = Pipeline.create(options);
diff --git a/pom.xml b/pom.xml
index 7ed51fe..132c5b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@ under the License.
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <auto-service.version>1.0-rc2</auto-service.version>
<beam.version>2.6.0</beam.version>
<spark.version>2.2.0</spark.version>
<scala.version>2.11.8</scala.version>