wonook closed pull request #18: [NEMO-45] Distributed Nemo-Spark
URL: https://github.com/apache/incubator-nemo/pull/18
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/.travis.yml b/.travis.yml
index d568369d..7fbdee72 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -40,3 +40,6 @@ addons:
organization: "apache-nemo"
token:
secure:
"ZCt/oEtyomXzNtJIZUeTVuCN3tjJRqZ29OkLg/U3DJTHIqWcaLfaY4MRFJR4DLRwthB24le0UTcGhFZnzZcZU3ji+ADpF/21sIqMDZgPSaqnb45NBCcLRhDUxM6VmU+DevTU7ob6aGRatEGO+C49logQOQbWM6g3KTKeaCR/pds/6isEUJg8Yqj/Poorqy+DbcpaavHBRrg3Zyxi8xwR1teYo8b7lVVMyXvtEVg+YAPuRPMy7c01zGm0MDzngSL1Sv8Q3YmHsbO3SrIueo+Ik0umuTSKMU4pkRj9jIunpGV1UQ3h5LQHzU/9VnhlgTnK2Ut6fThDx9no7rJwUCfy3LTP0z0dN2hAgK43ZSxuM47lP/Bm4hDRCY7KFNNVxEVhA/5DboWhTQq+iPW0Cc0SztOTLR+j76Yh6qmHmN39OWF22UG34D2JFGGgqfWfXwOBxW4cXVxtFQVzsuBcbJ/5zw0gtuvcQhB9MOFewi2bH1DDu2H3QOjq/vp+V4FigQyMSXRbPYInpF/nUmtSZdroOmOJRt7Ov0Vku4eBVnKbol+npcMbX/pgLZzzaZhB5QVqKrkpY2AKE0jM1deCGb6ABdt/Slizp/ovtzWlF5xCnLvVgVDhsYR4Qo8ZeWbldsbFQBu4y+1SlBdVRHV6QNBsBDyqM9WL4mVhKZaar19HJTw="
+
+env:
+ - HADOOP_HOME="/"
diff --git a/README.md b/README.md
index b50b9177..fcbbe4ea 100644
--- a/README.md
+++ b/README.md
@@ -87,7 +87,7 @@ Please refer to the [Contribution
guideline](.github/CONTRIBUTING.md) to contrib
### Examples
```bash
## MapReduce example
-./bin/run.sh \
+./bin/run_beam.sh \
-job_id mr_default \
-executor_json `pwd`/examples/resources/sample_executor_resources.json \
-optimization_policy
edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy \
@@ -95,7 +95,7 @@ Please refer to the [Contribution
guideline](.github/CONTRIBUTING.md) to contrib
-user_args "`pwd`/examples/resources/sample_input_mr
`pwd`/examples/resources/sample_output_mr"
## YARN cluster example
-./bin/run.sh \
+./bin/run_beam.sh \
-deploy_mode yarn \
-job_id mr_pado \
-executor_json `pwd`/examples/resources/sample_executor_resources.json \
@@ -143,7 +143,7 @@ Nemo Compiler and Engine can store JSON representation of
intermediate DAGs.
### Examples
```bash
-./bin/run.sh \
+./bin/run_beam.sh \
-job_id als \
-executor_json `pwd`/examples/resources/sample_executor_resources.json \
-user_main edu.snu.nemo.examples.beam.AlternatingLeastSquare \
diff --git a/bin/run.sh b/bin/run_beam.sh
similarity index 100%
rename from bin/run.sh
rename to bin/run_beam.sh
diff --git a/bin/run_external_app.sh b/bin/run_external_app.sh
index ec032f6e..f17c8121 100755
--- a/bin/run_external_app.sh
+++ b/bin/run_external_app.sh
@@ -15,5 +15,6 @@
# limitations under the License.
parent_path=$( cd "$(dirname "${BASH_SOURCE[0]}")" ; pwd -P )
-cd $parent_path
+pushd $parent_path
java -cp examples/target/nemo-examples-0.1-SNAPSHOT-shaded.jar:$1:`yarn
classpath` edu.snu.nemo.client.JobLauncher "${@:2}"
+popd
diff --git a/bin/run_spark.sh b/bin/run_spark.sh
new file mode 100644
index 00000000..bc7f5c23
--- /dev/null
+++ b/bin/run_spark.sh
@@ -0,0 +1,17 @@
+#!/usr/bin/env bash
+#
+# Copyright (C) 2017 Seoul National University
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp
examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn
classpath` edu.snu.nemo.client.JobLauncher "$@"
diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 4440a869..37154cd9 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.client;
+import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.driver.NemoDriver;
@@ -56,6 +57,7 @@
private static final int LOCAL_NUMBER_OF_EVALUATORS = 100; // hopefully
large enough for our use....
private static Configuration jobAndDriverConf = null;
private static Configuration deployModeConf = null;
+ private static Configuration builtJobConf = null;
/**
* private constructor.
@@ -65,37 +67,39 @@ private JobLauncher() {
/**
* Main JobLauncher method.
+ *
* @param args arguments.
* @throws Exception exception on the way.
*/
public static void main(final String[] args) throws Exception {
// Get Job and Driver Confs
- final Configuration jobConf = getJobConf(args);
- final Configuration driverConf = getDriverConf(jobConf);
+ builtJobConf = getJobConf(args);
+ final Configuration driverConf = getDriverConf(builtJobConf);
final Configuration driverNcsConf = getDriverNcsConf();
final Configuration driverMessageConfg = getDriverMessageConf();
- final Configuration executorResourceConfig =
getExecutorResourceConf(jobConf);
+ final Configuration executorResourceConfig =
getExecutorResourceConf(builtJobConf);
final Configuration clientConf = getClientConf();
// Merge Job and Driver Confs
- jobAndDriverConf = Configurations.merge(jobConf, driverConf,
driverNcsConf, driverMessageConfg,
+ jobAndDriverConf = Configurations.merge(builtJobConf, driverConf,
driverNcsConf, driverMessageConfg,
executorResourceConfig);
// Get DeployMode Conf
- deployModeConf = Configurations.merge(getDeployModeConf(jobConf),
clientConf);
+ deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf),
clientConf);
// Launch client main
- runUserProgramMain(jobConf);
+ runUserProgramMain(builtJobConf);
}
/**
* Launch application using the application DAG.
+ *
* @param dag the application DAG.
*/
// When modifying the signature of this method, see
CompilerTestUtil#compileDAG and make corresponding changes
public static void launchDAG(final DAG dag) {
try {
- if (jobAndDriverConf == null || deployModeConf == null) {
+ if (jobAndDriverConf == null || deployModeConf == null || builtJobConf
== null) {
throw new RuntimeException("Configuration for launching driver is not
ready");
}
final String serializedDAG =
Base64.getEncoder().encodeToString(SerializationUtils.serialize(dag));
@@ -103,7 +107,7 @@ public static void launchDAG(final DAG dag) {
.bindNamedParameter(JobConf.SerializedDAG.class, serializedDAG)
.build();
// Launch and wait indefinitely for the job to finish
- final LauncherStatus launcherStatus =
DriverLauncher.getLauncher(deployModeConf)
+ final LauncherStatus launcherStatus =
DriverLauncher.getLauncher(deployModeConf)
.run(Configurations.merge(jobAndDriverConf, dagConf));
final Optional<Throwable> possibleError = launcherStatus.getError();
if (possibleError.isPresent()) {
@@ -118,6 +122,7 @@ public static void launchDAG(final DAG dag) {
/**
* Run user-provided main method.
+ *
* @param jobConf the job configuration
* @throws Exception on any exceptions on the way
*/
@@ -148,6 +153,7 @@ private static Configuration getClientConf() {
/**
* Get driver ncs configuration.
+ *
* @return driver ncs configuration.
* @throws InjectionException exception while injection.
*/
@@ -161,6 +167,7 @@ private static Configuration getDriverNcsConf() throws
InjectionException {
/**
* Get driver message configuration.
+ *
* @return driver message configuration.
* @throws InjectionException exception while injection.
*/
@@ -172,6 +179,7 @@ private static Configuration getDriverMessageConf() throws
InjectionException {
/**
* Get driver configuration.
+ *
* @param jobConf job Configuration to get job id and driver memory.
* @return driver configuration.
* @throws InjectionException exception while injection.
@@ -195,11 +203,13 @@ private static Configuration getDriverConf(final
Configuration jobConf) throws I
/**
* Get job configuration.
+ *
* @param args arguments to be processed as command line.
* @return job configuration.
- * @throws IOException exception while processing command line.
+ * @throws IOException exception while processing command line.
* @throws InjectionException exception while injection.
*/
+ @VisibleForTesting
public static Configuration getJobConf(final String[] args) throws
IOException, InjectionException {
final JavaConfigurationBuilder confBuilder =
TANG.newConfigurationBuilder();
final CommandLine cl = new CommandLine(confBuilder);
@@ -227,11 +237,12 @@ public static Configuration getJobConf(final String[]
args) throws IOException,
/**
* Get deploy mode configuration.
+ *
* @param jobConf job configuration to get deploy mode.
* @return deploy mode configuration.
* @throws InjectionException exception while injection.
*/
- public static Configuration getDeployModeConf(final Configuration jobConf)
throws InjectionException {
+ private static Configuration getDeployModeConf(final Configuration jobConf)
throws InjectionException {
final Injector injector = TANG.newInjector(jobConf);
final String deployMode =
injector.getNamedInstance(JobConf.DeployMode.class);
switch (deployMode) {
@@ -250,11 +261,12 @@ public static Configuration getDeployModeConf(final
Configuration jobConf) throw
/**
* Get executor resource configuration.
+ *
* @param jobConf job configuration to get executor json path.
* @return executor resource configuration.
* @throws InjectionException exception while injection.
*/
- public static Configuration getExecutorResourceConf(final Configuration
jobConf) throws InjectionException {
+ private static Configuration getExecutorResourceConf(final Configuration
jobConf) throws InjectionException {
final Injector injector = TANG.newInjector(jobConf);
try {
final String path =
injector.getNamedInstance(JobConf.ExecutorJsonPath.class);
@@ -266,4 +278,14 @@ public static Configuration getExecutorResourceConf(final
Configuration jobConf)
throw new RuntimeException(e);
}
}
+
+ /**
+ * Get the built job configuration.
+ * It can be {@code null} if this method is not called by the process which
called the main function of this class.
+ *
+ * @return the built job configuration.
+ */
+ public static Configuration getBuiltJobConf() {
+ return builtJobConf;
+ }
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
b/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
index 44b6b053..9bca623a 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
@@ -25,6 +25,7 @@
public interface Readable<O> extends Serializable {
/**
* Method to read data from the source.
+ *
* @return an {@link Iterable} of the data read by the readable.
* @throws Exception exception while reading data.
*/
@@ -33,10 +34,10 @@
/**
* Returns the list of locations where this readable resides.
* Each location has a complete copy of the readable.
+ *
* @return List of locations where this readable resides
* @throws UnsupportedOperationException when this operation is not supported
- * @throws Exception any other exceptions on the way
+ * @throws Exception any other exceptions on the way
*/
List<String> getLocations() throws Exception;
}
-
diff --git a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
index 7fc6e335..22281529 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
@@ -31,7 +31,6 @@
*/
public ArgBuilder() {
this.args = new ArrayList<>();
- this.args.add(Arrays.asList("-executor_json",
"../resources/sample_executor_resources.json"));
}
/**
@@ -81,6 +80,15 @@ public ArgBuilder addDAGDirectory(final String directory) {
return this;
}
+ /**
+ * @param executorJsonFileName the name of the executor resource file to use.
+ * @return builder with the executor resource file.
+ */
+ public ArgBuilder addResourceJson(final String executorJsonFileName) {
+ args.add(Arrays.asList("-executor_json", executorJsonFileName));
+ return this;
+ }
+
/**
* @return the built arguments.
*/
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java
index 04721657..f9791204 100644
---
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java
@@ -25,8 +25,10 @@
import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor;
import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder;
import edu.snu.nemo.compiler.frontend.spark.core.RDD;
+import edu.snu.nemo.compiler.frontend.spark.transform.MapTransform;
import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform;
import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.serializer.Serializer;
import scala.Tuple2;
@@ -91,6 +93,21 @@ public SparkContext getSparkContext() {
return new JavaPairRDD<>(this.sparkContext,
builder.buildWithoutSourceSinkCheck(), reduceByKeyVertex);
}
+ @Override
+ public <R> JavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
+ final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
+
+ final IRVertex mapVertex = new OperatorVertex(new MapTransform<>(f));
+ builder.addVertex(mapVertex, loopVertexStack);
+
+ final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex,
mapVertex),
+ lastVertex, mapVertex, new SparkCoder(serializer));
+ newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
+ builder.connectVertices(newEdge);
+
+ return new JavaRDD<>(this.sparkContext,
builder.buildWithoutSourceSinkCheck(), mapVertex);
+ }
+
/////////////// ACTIONS ///////////////
@Override
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
index bc6d891e..a8d024c2 100644
---
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
@@ -15,23 +15,23 @@
*/
package edu.snu.nemo.compiler.frontend.spark.core.java;
+import edu.snu.nemo.client.JobLauncher;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
import edu.snu.nemo.common.ir.vertex.*;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor;
import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder;
import edu.snu.nemo.compiler.frontend.spark.core.RDD;
-import edu.snu.nemo.compiler.frontend.spark.source.SparkBoundedSourceVertex;
+import
edu.snu.nemo.compiler.frontend.spark.source.SparkDatasetBoundedSourceVertex;
+import
edu.snu.nemo.compiler.frontend.spark.source.SparkTextFileBoundedSourceVertex;
import edu.snu.nemo.compiler.frontend.spark.sql.Dataset;
import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
import edu.snu.nemo.compiler.frontend.spark.transform.*;
-import org.apache.spark.Partition;
-import org.apache.spark.Partitioner;
-import org.apache.spark.SparkContext;
-import org.apache.spark.TaskContext;
+import org.apache.spark.*;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
@@ -59,14 +59,16 @@
/**
* Static method to create a JavaRDD object from an iterable object.
+ *
* @param sparkContext spark context containing configurations.
- * @param initialData initial data.
- * @param parallelism parallelism information.
- * @param <T> type of the resulting object.
+ * @param initialData initial data.
+ * @param parallelism parallelism information.
+ * @param <T> type of the resulting object.
* @return the new JavaRDD object.
*/
public static <T> JavaRDD<T> of(final SparkContext sparkContext,
- final Iterable<T> initialData, final Integer
parallelism) {
+ final Iterable<T> initialData,
+ final int parallelism) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
final IRVertex initializedSourceVertex = new
InitializedSourceVertex<>(initialData);
@@ -76,18 +78,41 @@
return new JavaRDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(),
initializedSourceVertex);
}
+ /**
+ * Static method to create a JavaRDD object from an text file.
+ *
+ * @param sparkContext the spark context containing configurations.
+ * @param minPartitions the minimum nubmer of partitions.
+ * @param inputPath the path of the input text file.
+ * @param <T> the type of resulting object.
+ * @return the new JavaRDD object
+ */
+ public static <T> JavaRDD<T> of(final SparkContext sparkContext,
+ final int minPartitions,
+ final String inputPath) {
+ final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
+
+ final int numPartitions = sparkContext.textFile(inputPath,
minPartitions).getNumPartitions();
+ final IRVertex textSourceVertex = new
SparkTextFileBoundedSourceVertex(sparkContext, inputPath, numPartitions);
+ textSourceVertex.setProperty(ParallelismProperty.of(numPartitions));
+ builder.addVertex(textSourceVertex);
+
+ return new JavaRDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(),
textSourceVertex);
+ }
+
/**
* Static method to create a JavaRDD object from a Dataset.
+ *
* @param sparkSession spark session containing configurations.
- * @param dataset dataset to read initial data from.
- * @param <T> type of the resulting object.
+ * @param dataset dataset to read initial data from.
+ * @param <T> type of the resulting object.
* @return the new JavaRDD object.
*/
public static <T> JavaRDD<T> of(final SparkSession sparkSession,
final Dataset<T> dataset) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
- final IRVertex sparkBoundedSourceVertex = new
SparkBoundedSourceVertex<>(sparkSession, dataset);
+ final IRVertex sparkBoundedSourceVertex = new
SparkDatasetBoundedSourceVertex<>(sparkSession, dataset);
sparkBoundedSourceVertex.setProperty(ParallelismProperty.of(dataset.rdd().getNumPartitions()));
builder.addVertex(sparkBoundedSourceVertex);
@@ -96,9 +121,10 @@
/**
* Constructor.
+ *
* @param sparkContext spark context containing configurations.
- * @param dag the current DAG.
- * @param lastVertex last vertex added to the builder.
+ * @param dag the current DAG.
+ * @param lastVertex last vertex added to the builder.
*/
JavaRDD(final SparkContext sparkContext, final DAG<IRVertex, IREdge> dag,
final IRVertex lastVertex) {
// TODO #366: resolve while implementing scala RDD.
@@ -122,8 +148,9 @@ public SparkContext getSparkContext() {
/**
* Map transform.
+ *
* @param func function to apply.
- * @param <O> output type.
+ * @param <O> output type.
* @return the JavaRDD with the DAG.
*/
@Override
@@ -159,7 +186,7 @@ public SparkContext getSparkContext() {
/////////////// TRANSFORMATION TO PAIR RDD ///////////////
@Override
- public <K2, V2> JavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2>
f) {
+ public <K2, V2> JavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2>
f) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
final IRVertex mapToPairVertex = new OperatorVertex(new
MapToPairTransform<>(f));
@@ -179,6 +206,7 @@ public SparkContext getSparkContext() {
/**
* This method is to be removed after a result handler is implemented.
+ *
* @return a unique integer.
*/
public static Integer getResultId() {
@@ -187,6 +215,7 @@ public static Integer getResultId() {
/**
* Reduce action.
+ *
* @param func function (binary operator) to apply.
* @return the result of the reduce action.
*/
@@ -210,6 +239,27 @@ public T reduce(final Function2<T, T, T> func) {
return SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex,
serializer);
}
+ @Override
+ public void saveAsTextFile(final String path) {
+
+ // Check if given path is HDFS path.
+ final boolean isHDFSPath = path.startsWith("hdfs://") ||
path.startsWith("s3a://") || path.startsWith("file://");
+ final Transform textFileTransform = isHDFSPath
+ ? new HDFSTextFileTransform(path) : new LocalTextFileTransform(path);
+
+ final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
+
+ final IRVertex flatMapVertex = new OperatorVertex(textFileTransform);
+ builder.addVertex(flatMapVertex, loopVertexStack);
+
+ final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex,
flatMapVertex),
+ lastVertex, flatMapVertex, new SparkCoder(serializer));
+ newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
+ builder.connectVertices(newEdge);
+
+ JobLauncher.launchDAG(builder.build());
+ }
+
/////////////// UNSUPPORTED TRANSFORMATIONS ///////////////
//TODO#776: support unimplemented RDD transformation/actions.
@@ -260,7 +310,7 @@ public T reduce(final Function2<T, T, T> func) {
@Override
public <R> JavaRDD<R> mapPartitionsWithIndex(final Function2<Integer,
Iterator<T>, Iterator<R>> f,
- final boolean
preservesPartitioning) {
+ final boolean
preservesPartitioning) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -328,7 +378,7 @@ public T reduce(final Function2<T, T, T> func) {
}
@Override
- public <U> JavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f, final
int numPartitions) {
+ public <U> JavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f, final
int numPartitions) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -344,12 +394,12 @@ public T reduce(final Function2<T, T, T> func) {
@Override
public <K2, V2> JavaPairRDD<K2, V2> mapPartitionsToPair(final
PairFlatMapFunction<java.util.Iterator<T>, K2, V2> f,
- final boolean
preservesPartitioning) {
+ final boolean
preservesPartitioning) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@Override
- public JavaPairRDD<T, Long> zipWithIndex() {
+ public JavaPairRDD<T, Long> zipWithIndex() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -372,9 +422,8 @@ public void checkpoint() {
}
-
@Override
- public JavaFutureAction<List<T>> collectAsync() {
+ public JavaFutureAction<List<T>> collectAsync() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -389,7 +438,7 @@ public long count() {
}
@Override
- public PartialResult<BoundedDouble> countApprox(final long timeout) {
+ public PartialResult<BoundedDouble> countApprox(final long timeout) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -399,12 +448,12 @@ public long count() {
}
@Override
- public long countApproxDistinct(final double relativeSD) {
+ public long countApproxDistinct(final double relativeSD) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@Override
- public JavaFutureAction<Long> countAsync() {
+ public JavaFutureAction<Long> countAsync() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -459,7 +508,7 @@ public void foreachPartition(final
VoidFunction<Iterator<T>> f) {
}
@Override
- public int getNumPartitions() {
+ public int getNumPartitions() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -479,7 +528,7 @@ public boolean isCheckpointed() {
}
@Override
- public boolean isEmpty() {
+ public boolean isEmpty() {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -518,14 +567,9 @@ public void saveAsObjectFile(final String path) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
- @Override
- public void saveAsTextFile(final String path) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
@Override
public void saveAsTextFile(final String path,
- final Class<? extends
org.apache.hadoop.io.compress.CompressionCodec> codec) {
+ final Class<? extends
org.apache.hadoop.io.compress.CompressionCodec> codec) {
throw new UnsupportedOperationException("Operation not yet implemented.");
}
@@ -586,7 +630,7 @@ public String toDebugString() {
@Override
public <U> U treeAggregate(final U zeroValue, final Function2<U, T, U> seqOp,
- final Function2<U, U, U> combOp, final int
depth) {
+ final Function2<U, U, U> combOp, final int depth)
{
throw new UnsupportedOperationException("Operation not yet implemented.");
}
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java
index 58036069..cca48cb9 100644
---
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java
@@ -33,8 +33,41 @@ public JavaSparkContext(final SparkContext sparkContext) {
this.sparkContext = sparkContext;
}
+ /**
+ * Create a String {@link JavaRDD} from a text file path.
+ *
+ * @param path the path to read.
+ * @return the RDD.
+ */
+ public JavaRDD<String> textFile(final String path) {
+ return this.textFile(path, 1);
+ }
+
+ /**
+ * Create a String {@link JavaRDD} from a text file path with specific
minimum parallelism.
+ *
+ * @param path the path to read.
+ * @param minPartitions the minimum parallelism.
+ * @return the RDD.
+ */
+ public JavaRDD<String> textFile(final String path, final int minPartitions) {
+ return JavaRDD.of(sparkContext, minPartitions, path);
+ }
+
+ /**
+ * Initiate a JavaRDD with the number of parallelism.
+ *
+ * @param list input data as list.
+ * @param <T> type of the initial element.
+ * @return the newly initiated JavaRDD.
+ */
+ public <T> JavaRDD<T> parallelize(final List<T> list) {
+ return this.parallelize(list, 1);
+ }
+
/**
* Initiate a JavaRDD with the number of parallelism.
+ *
* @param l input data as list.
* @param slices number of slices (parallelism).
* @param <T> type of the initial element.
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
similarity index 58%
rename from
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java
rename to
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index e1270d96..775faf41 100644
---
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -19,51 +19,50 @@
import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.compiler.frontend.spark.sql.Dataset;
import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
-import org.apache.spark.TaskContext$;
+import org.apache.spark.*;
import org.apache.spark.rdd.RDD;
import scala.collection.JavaConverters;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.IntStream;
+import java.util.*;
/**
- * Bounded source vertex for Spark.
+ * Bounded source vertex for Spark Dataset.
* @param <T> type of data to read.
*/
-public final class SparkBoundedSourceVertex<T> extends SourceVertex<T> {
+public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
private final List<Readable<T>> readables;
/**
* Constructor.
- * Note that we have to first create our iterators here and supply them to
our readables.
*
* @param sparkSession sparkSession to recreate on each executor.
- * @param dataset Dataset to read data from.
+ * @param dataset Dataset to read data from.
*/
- public SparkBoundedSourceVertex(final SparkSession sparkSession, final
Dataset<T> dataset) {
+ public SparkDatasetBoundedSourceVertex(final SparkSession sparkSession,
final Dataset<T> dataset) {
this.readables = new ArrayList<>();
- IntStream.range(0,
dataset.rdd().getNumPartitions()).forEach(partitionIndex ->
- readables.add(new SparkBoundedSourceReadable(
- sparkSession.getDatasetCommandsList(),
- sparkSession.getInitialConf(),
- partitionIndex)));
+ final RDD rdd = dataset.rdd();
+ final Partition[] partitions = rdd.getPartitions();
+ for (int i = 0; i < partitions.length; i++) {
+ readables.add(new SparkDatasetBoundedSourceReadable(
+ partitions[i],
+ sparkSession.getDatasetCommandsList(),
+ sparkSession.getInitialConf(),
+ i));
+ }
}
/**
- * Constructor.
+ * Constructor for cloning.
*
* @param readables the list of Readables to set.
*/
- public SparkBoundedSourceVertex(final List<Readable<T>> readables) {
+ private SparkDatasetBoundedSourceVertex(final List<Readable<T>> readables) {
this.readables = readables;
}
@Override
- public SparkBoundedSourceVertex getClone() {
- final SparkBoundedSourceVertex<T> that = new
SparkBoundedSourceVertex<>((this.readables));
+ public SparkDatasetBoundedSourceVertex getClone() {
+ final SparkDatasetBoundedSourceVertex<T> that = new
SparkDatasetBoundedSourceVertex<>((this.readables));
this.copyExecutionPropertiesTo(that);
return that;
}
@@ -74,25 +73,30 @@ public SparkBoundedSourceVertex getClone() {
}
/**
- * A Readable for SparkBoundedSourceReadablesWrapper.
+ * A Readable wrapper for Spark Dataset.
*/
- private final class SparkBoundedSourceReadable implements Readable<T> {
+ private final class SparkDatasetBoundedSourceReadable implements Readable<T>
{
private final LinkedHashMap<String, Object[]> commands;
private final Map<String, String> sessionInitialConf;
private final int partitionIndex;
+ private final List<String> locations;
/**
* Constructor.
- * @param commands list of commands needed to build the dataset.
+ *
+ * @param partition the partition to wrap.
+ * @param commands list of commands needed to build the dataset.
* @param sessionInitialConf spark session's initial configuration.
- * @param partitionIndex partition for this readable.
+ * @param partitionIndex partition for this readable.
*/
- private SparkBoundedSourceReadable(final LinkedHashMap<String, Object[]>
commands,
- final Map<String, String>
sessionInitialConf,
- final int partitionIndex) {
+ private SparkDatasetBoundedSourceReadable(final Partition partition,
+ final LinkedHashMap<String,
Object[]> commands,
+ final Map<String, String>
sessionInitialConf,
+ final int partitionIndex) {
this.commands = commands;
this.sessionInitialConf = sessionInitialConf;
this.partitionIndex = partitionIndex;
+ this.locations = SparkSourceUtil.getPartitionLocation(partition);
}
@Override
@@ -111,7 +115,11 @@ private SparkBoundedSourceReadable(final
LinkedHashMap<String, Object[]> command
@Override
public List<String> getLocations() {
- throw new UnsupportedOperationException();
+ if (locations.isEmpty()) {
+ throw new UnsupportedOperationException();
+ } else {
+ return locations;
+ }
}
}
}
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkSourceUtil.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkSourceUtil.java
new file mode 100644
index 00000000..aaa1713e
--- /dev/null
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkSourceUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.source;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.spark.Partition;
+import org.apache.spark.SerializableWritable;
+import org.apache.spark.rdd.HadoopPartition;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility methods for spark sources.
+ */
+final class SparkSourceUtil {
+ /**
+ * Empty constructor.
+ */
+ private SparkSourceUtil() {
+ // Private constructor.
+ }
+
+ /**
+ * Gets the source location of a Spark partition.
+ *
+ * @param partition the partition to get location.
+ * @return a list of locations.
+ * @throws RuntimeException if failed to get source location.
+ */
+ static List<String> getPartitionLocation(final Partition partition) {
+ try {
+ if (partition instanceof HadoopPartition) {
+ final Field inputSplitField =
partition.getClass().getDeclaredField("inputSplit");
+ inputSplitField.setAccessible(true);
+ final InputSplit inputSplit = (InputSplit) ((SerializableWritable)
inputSplitField.get(partition)).value();
+
+ final String[] splitLocations = inputSplit.getLocations();
+ final List<String> parsedLocations = new ArrayList<>();
+
+ for (final String loc : splitLocations) {
+ final String canonicalHostName =
InetAddress.getByName(loc).getCanonicalHostName();
+ parsedLocations.add(canonicalHostName);
+ }
+
+ if (parsedLocations.size() == 1 &&
parsedLocations.get(0).equals("localhost")) {
+ return Collections.emptyList();
+ } else {
+ return parsedLocations;
+ }
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
new file mode 100644
index 00000000..a5e78333
--- /dev/null
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.source;
+
+import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.SourceVertex;
+import org.apache.spark.*;
+import org.apache.spark.rdd.RDD;
+import scala.collection.JavaConverters;
+
+import java.util.*;
+
+/**
+ * Bounded source vertex for Spark text file.
+ */
+public final class SparkTextFileBoundedSourceVertex extends
SourceVertex<String> {
+ private final List<Readable<String>> readables;
+
+ /**
+ * Constructor.
+ *
+ * @param sparkContext the spark context.
+ * @param inputPath the path of the target text file.
+ * @param numPartitions the number of partitions.
+ */
+ public SparkTextFileBoundedSourceVertex(final SparkContext sparkContext,
+ final String inputPath,
+ final int numPartitions) {
+ this.readables = new ArrayList<>();
+ final Partition[] partitions = sparkContext.textFile(inputPath,
numPartitions).getPartitions();
+ for (int i = 0; i < partitions.length; i++) {
+ readables.add(new SparkTextFileBoundedSourceReadable(
+ partitions[i],
+ sparkContext.getConf(),
+ i,
+ inputPath,
+ numPartitions));
+ }
+ }
+
+ /**
+ * Constructor for cloning.
+ *
+ * @param readables the list of Readables to set.
+ */
+ private SparkTextFileBoundedSourceVertex(final List<Readable<String>>
readables) {
+ this.readables = readables;
+ }
+
+ @Override
+ public SparkTextFileBoundedSourceVertex getClone() {
+ final SparkTextFileBoundedSourceVertex that = new
SparkTextFileBoundedSourceVertex(this.readables);
+ this.copyExecutionPropertiesTo(that);
+ return that;
+ }
+
+ @Override
+ public List<Readable<String>> getReadables(final int desiredNumOfSplits) {
+ return readables;
+ }
+
+ /**
+ * A Readable wrapper for Spark text file.
+ */
+ private final class SparkTextFileBoundedSourceReadable implements
Readable<String> {
+ private final SparkConf sparkConf;
+ private final int partitionIndex;
+ private final List<String> locations;
+ private final String inputPath;
+ private final int numPartitions;
+
+ /**
+ * Constructor.
+ *
+ * @param partition the partition to wrap.
+ * @param sparkConf configuration needed to build the SparkContext.
+ * @param partitionIndex partition for this readable.
+ * @param numPartitions the total number of partitions.
+ */
+ private SparkTextFileBoundedSourceReadable(final Partition partition,
+ final SparkConf sparkConf,
+ final int partitionIndex,
+ final String inputPath,
+ final int numPartitions) {
+ this.sparkConf = sparkConf;
+ this.partitionIndex = partitionIndex;
+ this.inputPath = inputPath;
+ this.numPartitions = numPartitions;
+ this.locations = SparkSourceUtil.getPartitionLocation(partition);
+ }
+
+ @Override
+ public Iterable<String> read() throws Exception {
+ // for setting up the same environment in the executors.
+ final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
+
+ // Spark does lazy evaluation: it doesn't load the full data in rdd, but
only the partition it is asked for.
+ final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
+ return () -> JavaConverters.asJavaIteratorConverter(
+ rdd.iterator(rdd.getPartitions()[partitionIndex],
TaskContext$.MODULE$.empty())).asJava();
+ }
+
+ @Override
+ public List<String> getLocations() {
+ if (locations.isEmpty()) {
+ throw new UnsupportedOperationException();
+ } else {
+ return locations;
+ }
+ }
+ }
+}
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
index 325df6d8..5b501095 100644
---
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
@@ -262,7 +262,7 @@ public static SparkSession from(final
org.apache.spark.sql.SparkSession sparkSes
* @return the session builder.
*/
public static Builder builder() {
- return new Builder().master("local");
+ return new Builder();
}
/**
@@ -311,7 +311,12 @@ public Builder master(final String master) {
@Override
public SparkSession getOrCreate() {
-
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("nemo_user"));
+ if (!options.containsKey("spark.master")) { // default spark_master
option.
+ return this.master("local[*]").getOrCreate();
+ }
+
+
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("ubuntu"));
+
return SparkSession.from(super.getOrCreate(), this.options);
}
}
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
new file mode 100644
index 00000000..03f3215c
--- /dev/null
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.transform;
+
+import edu.snu.nemo.common.ir.OutputCollector;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Transform which saves elements to a HDFS text file for Spark.
+ * @param <I> input type.
+ * @param <O> output type.
+ */
+public final class HDFSTextFileTransform<I, O> implements Transform<I, O> {
+ private final String path;
+ private Path fileName;
+ private List<I> elements;
+
+ /**
+ * Constructor.
+ *
+ * @param path the path to write elements.
+ */
+ public HDFSTextFileTransform(final String path) {
+ this.path = path;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<O>
outputCollector) {
+ fileName = new Path(path + UUID.randomUUID().toString());
+ this.elements = new ArrayList<>();
+ }
+
+ @Override
+ public void onData(final I element) {
+ elements.add(element);
+ }
+
+ @Override
+ public void close() {
+ try (
+ final FileSystem fileSystem = fileName.getFileSystem(new JobConf());
+ final FSDataOutputStream outputStream = fileSystem.create(fileName,
false);
+ ) {
+ for (final I element : elements) {
+ outputStream.writeBytes(element + "\n");
+ }
+ outputStream.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
new file mode 100644
index 00000000..8fe7bcf4
--- /dev/null
+++
b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.transform;
+
+import edu.snu.nemo.common.ir.OutputCollector;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Transform which saves elements to a local text file for Spark.
+ * @param <I> input type.
+ * @param <O> output type.
+ */
+public final class LocalTextFileTransform<I, O> implements Transform<I, O> {
+ private final String path;
+ private String fileName;
+ private List<I> elements;
+
+ /**
+ * Constructor.
+ *
+ * @param path the path to write elements.
+ */
+ public LocalTextFileTransform(final String path) {
+ this.path = path;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<O>
outputCollector) {
+ fileName = path + UUID.randomUUID().toString();
+ this.elements = new ArrayList<>();
+ }
+
+ @Override
+ public void onData(final I element) {
+ elements.add(element);
+ }
+
+ @Override
+ public void close() {
+ try (
+ final Writer writer =
+ new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(fileName, false), "utf-8"))
+ ) {
+ for (final I element : elements) {
+ writer.write(element + "\n");
+ }
+ writer.close();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 7f052e34..19a654d7 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -103,7 +103,7 @@
* Path to the JSON file that specifies resource layout.
*/
@NamedParameter(doc = "Path to the JSON file that specifies resources for
executors", short_name = "executor_json",
- default_value = "examples/resources/sample_executor_resources.json")
+ default_value = "examples/resources/sample_executor_resources.json")
public final class ExecutorJsonPath implements Name<String> {
}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index 4825e784..f5426275 100644
---
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@ -34,20 +34,24 @@
@PrepareForTest(JobLauncher.class)
public final class AlternatingLeastSquareITCase {
private static final int TIMEOUT = 240000;
- private static ArgBuilder builder = new ArgBuilder();
+ private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
private static final String input = fileBasePath + "sample_input_als";
private static final String outputFileName = "sample_output_als";
private static final String output = fileBasePath + outputFileName;
private static final String testResourceFileName = "test_output_als";
+ private static final String executorResourceFileName = fileBasePath +
"beam_sample_executor_resources.json";
private static final String numFeatures = "10";
private static final String numIteration = "3";
private static final String lambda = "0.05";
@Before
public void setUp() throws Exception {
- builder = new ArgBuilder();
+ builder = new ArgBuilder()
+ .addResourceJson(executorResourceFileName)
+ .addUserMain(AlternatingLeastSquare.class.getCanonicalName())
+ .addUserArgs(input, numFeatures, numIteration, lambda, output);
}
@After
@@ -63,8 +67,6 @@ public void tearDown() throws Exception {
public void test() throws Exception {
JobLauncher.main(builder
.addJobId(AlternatingLeastSquareITCase.class.getSimpleName())
- .addUserMain(AlternatingLeastSquare.class.getCanonicalName())
- .addUserArgs(input, numFeatures, numIteration, lambda, output)
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
}
@@ -73,8 +75,6 @@ public void test() throws Exception {
public void testPado() throws Exception {
JobLauncher.main(builder
.addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado")
- .addUserMain(AlternatingLeastSquare.class.getCanonicalName())
- .addUserArgs(input, numFeatures, numIteration, lambda, output)
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
.build());
}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
index 6531310a..25ea7811 100644
---
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
@@ -34,18 +34,22 @@
@PrepareForTest(JobLauncher.class)
public final class BroadcastITCase {
private static final int TIMEOUT = 180000;
- private static ArgBuilder builder = new ArgBuilder();
+ private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
private static final String inputFileName = "sample_input_mr";
private static final String outputFileName = "sample_output_broadcast";
private static final String testResourceFileName = "test_output_broadcast";
+ private static final String executorResourceFileName = fileBasePath +
"beam_sample_executor_resources.json";
private static final String inputFilePath = fileBasePath + inputFileName;
private static final String outputFilePath = fileBasePath + outputFileName;
@Before
public void setUp() throws Exception {
- builder = new ArgBuilder();
+ builder = new ArgBuilder()
+ .addUserMain(Broadcast.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath)
+ .addResourceJson(executorResourceFileName);
}
@After
@@ -61,8 +65,6 @@ public void tearDown() throws Exception {
public void test() throws Exception {
JobLauncher.main(builder
.addJobId(BroadcastITCase.class.getSimpleName())
- .addUserMain(Broadcast.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
}
@@ -71,8 +73,6 @@ public void test() throws Exception {
public void testPado() throws Exception {
JobLauncher.main(builder
.addJobId(BroadcastITCase.class.getSimpleName() + "_pado")
- .addUserMain(Broadcast.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
.build());
}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index ed15fa51..44b0504e 100644
---
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@ -33,19 +33,22 @@
@PrepareForTest(JobLauncher.class)
public final class MapReduceITCase {
private static final int TIMEOUT = 120000;
- private static ArgBuilder builder = new ArgBuilder();
+ private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
private static final String inputFileName = "sample_input_mr";
private static final String outputFileName = "sample_output_mr";
private static final String testResourceFileName = "test_output_mr";
+ private static final String executorResourceFileName = fileBasePath +
"beam_sample_executor_resources.json";
private static final String inputFilePath = fileBasePath + inputFileName;
private static final String outputFilePath = fileBasePath + outputFileName;
-
@Before
public void setUp() throws Exception {
- builder = new ArgBuilder();
+ builder = new ArgBuilder()
+ .addResourceJson(executorResourceFileName)
+ .addUserMain(MapReduce.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath);
}
@After
@@ -61,8 +64,6 @@ public void tearDown() throws Exception {
public void test() throws Exception {
JobLauncher.main(builder
.addJobId(MapReduceITCase.class.getSimpleName())
- .addUserMain(MapReduce.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
}
@@ -71,8 +72,6 @@ public void test() throws Exception {
public void testSailfish() throws Exception {
JobLauncher.main(builder
.addJobId(MapReduceITCase.class.getSimpleName() + "_sailfish")
- .addUserMain(MapReduce.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
.build());
}
@@ -81,8 +80,6 @@ public void testSailfish() throws Exception {
public void testDisagg() throws Exception {
JobLauncher.main(builder
.addJobId(MapReduceITCase.class.getSimpleName() + "_disagg")
- .addUserMain(MapReduce.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(DisaggregationPolicyParallelismFive.class.getCanonicalName())
.build());
}
@@ -91,8 +88,6 @@ public void testDisagg() throws Exception {
public void testPado() throws Exception {
JobLauncher.main(builder
.addJobId(MapReduceITCase.class.getSimpleName() + "_pado")
- .addUserMain(MapReduce.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
.build());
}
@@ -105,8 +100,6 @@ public void testPado() throws Exception {
public void testDataSkew() throws Exception {
JobLauncher.main(builder
.addJobId(MapReduceITCase.class.getSimpleName() + "_dataskew")
- .addUserMain(MapReduce.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath)
.addOptimizationPolicy(DataSkewPolicyParallelismFive.class.getCanonicalName())
.build());
}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
index c37cbbcf..13d0d926 100644
---
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
@@ -33,6 +33,7 @@
private static final int TIMEOUT = 240000;
private static ArgBuilder builder = new ArgBuilder();
private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
+ private static final String executorResourceFileName = fileBasePath +
"beam_sample_executor_resources.json";
@Before
public void setUp() throws Exception {
@@ -51,6 +52,7 @@ public void test() throws Exception {
.addUserMain(MultinomialLogisticRegression.class.getCanonicalName())
.addUserArgs(input, numFeatures, numClasses, numIteration)
.addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+ .addResourceJson(executorResourceFileName)
.build());
}
}
diff --git a/examples/resources/beam_sample_executor_resources.json
b/examples/resources/beam_sample_executor_resources.json
new file mode 100644
index 00000000..1f7f9737
--- /dev/null
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -0,0 +1,17 @@
+[
+ {
+ "type": "Transient",
+ "memory_mb": 300,
+ "capacity": 3
+ },
+ {
+ "type": "Reserved",
+ "memory_mb": 300,
+ "capacity": 5
+ },
+ {
+ "type": "Compute",
+ "memory_mb": 300,
+ "capacity": 3
+ }
+]
diff --git a/examples/resources/sample_executor_resources.json
b/examples/resources/spark_sample_executor_resources.json
similarity index 75%
rename from examples/resources/sample_executor_resources.json
rename to examples/resources/spark_sample_executor_resources.json
index 5765bf39..187bd452 100644
--- a/examples/resources/sample_executor_resources.json
+++ b/examples/resources/spark_sample_executor_resources.json
@@ -2,16 +2,16 @@
{
"type": "Transient",
"memory_mb": 512,
- "capacity": 5
+ "capacity": 2
},
{
"type": "Reserved",
"memory_mb": 512,
- "capacity": 5
+ "capacity": 2
},
{
"type": "Compute",
"memory_mb": 512,
- "capacity": 5
+ "capacity": 2
}
]
diff --git a/examples/spark/pom.xml b/examples/spark/pom.xml
index 26384046..e78df98f 100644
--- a/examples/spark/pom.xml
+++ b/examples/spark/pom.xml
@@ -57,6 +57,96 @@ limitations under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <outputFile>
+
${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+ </outputFile>
+ <transformers>
+ <!-- Required for using beam-hadoop: See
https://stackoverflow.com/questions/44365545
+ -->
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
+ </transformers>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.3.1</version>
+ <configuration>
+ <recompileMode>incremental</recompileMode>
+ <javacArgs>
+ <javacArg>-Xlint:unchecked</javacArg>
+ <javacArg>-Xlint:deprecation</javacArg>
+ </javacArgs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <executions>
+ <execution>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git
a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java
b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java
new file mode 100644
index 00000000..e07b80b8
--- /dev/null
+++
b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.spark;
+
+import edu.snu.nemo.compiler.frontend.spark.core.java.JavaPairRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.java.JavaSparkContext;
+import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
+import scala.Tuple2;
+
+/**
+ * Java MapReduce example.
+ */
+public final class JavaMapReduce {
+
+ /**
+ * Private constructor.
+ */
+ private JavaMapReduce() {
+ }
+
+ /**
+ * Main method.
+ * @param args arguments.
+ * @throws Exception exceptions.
+ */
+ public static void main(final String[] args) throws Exception {
+
+ // Parse Arguments
+ final String input = args[0];
+ final String output = args[1];
+ final int parallelism = args.length > 2 ? Integer.parseInt(args[2]) : 1;
+ final boolean yarn = args.length > 3 && Boolean.parseBoolean(args[3]);
+
+ final SparkSession.Builder sparkBuilder = SparkSession
+ .builder()
+ .appName("JavaWordCount");
+ if (yarn) {
+ sparkBuilder
+ .config("mapreduce.input.fileinputformat.input.dir.recursive",
"true")
+ .master("yarn")
+ .config("spark.submit.deployMode", "cluster");
+ }
+ final SparkSession spark = sparkBuilder.getOrCreate();
+
+ final long start = System.currentTimeMillis();
+
+ // Run MR
+ final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+ final JavaRDD<String> data = jsc.textFile(input, parallelism);
+ final JavaPairRDD<String, Long> documentToCount = data
+ .mapToPair(line -> {
+ final String[] words = line.split(" +");
+ final String documentId = words[0] + "#" + words[1];
+ final long count = Long.parseLong(words[2]);
+ return new Tuple2<>(documentId, count);
+ });
+ final JavaRDD<String> documentToSum = documentToCount
+ .reduceByKey((i1, i2) -> i1 + i2)
+ .map(t -> t._1() + ": " + t._2());
+ documentToSum.saveAsTextFile(output);
+
+ // DONE
+ System.out.println("*******END*******");
+ System.out.println("JCT(ms): " + (System.currentTimeMillis() - start));
+
+ spark.stop();
+ }
+}
diff --git
a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
index 96667dd1..dfa29b3c 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
@@ -46,7 +46,7 @@ public static void main(final String[] args) throws Exception
{
.appName("JavaSparkPi")
.getOrCreate();
- JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
+ final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
diff --git
a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
index 172ec995..ace78831 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
@@ -35,13 +35,15 @@
@PrepareForTest(JobLauncher.class)
@PowerMockIgnore("javax.management.*")
public final class SparkITCase {
- private static final int TIMEOUT = 120000;
- private static ArgBuilder builder = new ArgBuilder();
+ private static final int TIMEOUT = 180000;
+ private static ArgBuilder builder;
private static final String fileBasePath = System.getProperty("user.dir") +
"/../resources/";
+ private static final String executorResourceFileName = fileBasePath +
"spark_sample_executor_resources.json";
@Before
public void setUp() {
- builder = new ArgBuilder();
+ builder = new ArgBuilder()
+ .addResourceJson(executorResourceFileName);
}
@Test(timeout = TIMEOUT)
@@ -52,6 +54,7 @@ public void testSparkWordCount() throws Exception {
final String inputFilePath = fileBasePath + inputFileName;
final String outputFilePath = fileBasePath + outputFileName;
+
JobLauncher.main(builder
.addJobId(JavaWordCount.class.getSimpleName() + "_test")
.addUserMain(JavaWordCount.class.getCanonicalName())
@@ -66,6 +69,31 @@ public void testSparkWordCount() throws Exception {
}
}
+ /* Temporary disabled because of Travis issue
+ @Test(timeout = TIMEOUT)
+ public void testSparkMapReduce() throws Exception {
+ final String inputFileName = "sample_input_mr";
+ final String outputFileName = "sample_output_mr";
+ final String testResourceFileName = "test_output_mr";
+ final String inputFilePath = fileBasePath + inputFileName;
+ final String outputFilePath = fileBasePath + outputFileName;
+ final String parallelism = "2";
+ final String runOnYarn = "false";
+
+ JobLauncher.main(builder
+ .addJobId(JavaMapReduce.class.getSimpleName() + "_test")
+ .addUserMain(JavaMapReduce.class.getCanonicalName())
+ .addUserArgs(inputFilePath, outputFilePath, parallelism, runOnYarn)
+ .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName,
testResourceFileName);
+ } finally {
+ ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+ }
+ }*/
+
@Test(timeout = TIMEOUT)
public void testSparkPi() throws Exception {
final String numParallelism = "3";
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services