Repository: reef Updated Branches: refs/heads/master 49c1414d6 -> 39adc451b
[REEF-1795] Implement REEF-on-Spark example JIRA: [REEF-1795](https://issues.apache.org/jira/browse/REEF-1795) Pull Request: This closes #1302 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/39adc451 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/39adc451 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/39adc451 Branch: refs/heads/master Commit: 39adc451b9799f3441000a7cb37764b1f66d5375 Parents: 49c1414 Author: Sergiy Matusevych <[email protected]> Authored: Wed May 3 16:47:06 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu May 11 17:52:31 2017 -0700 ---------------------------------------------------------------------- lang/scala/reef-examples-scala/pom.xml | 112 +++++++++++++++++++ .../reef/examples/hellospark/ReefOnSpark.scala | 83 ++++++++++++++ .../examples/hellospark/ReefOnSparkDriver.scala | 71 ++++++++++++ .../examples/hellospark/ReefOnSparkTask.scala | 39 +++++++ pom.xml | 32 +++++- 5 files changed, 336 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/pom.xml ---------------------------------------------------------------------- diff --git a/lang/scala/reef-examples-scala/pom.xml b/lang/scala/reef-examples-scala/pom.xml new file mode 100644 index 0000000..2e2d07d --- /dev/null +++ b/lang/scala/reef-examples-scala/pom.xml @@ -0,0 +1,112 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>reef-examples-spark</artifactId> + <name>REEF Spark Examples</name> + + <parent> + <groupId>org.apache.reef</groupId> + <artifactId>reef-project</artifactId> + <version>0.16.0-SNAPSHOT</version> + <relativePath>../../..</relativePath> + </parent> + + <properties> + <rootPath>${basedir}/../../..</rootPath> + <!-- Findbugs does not support Scala code --> + <findbugs.skip>true</findbugs.skip> + </properties> + + <dependencies> + <!-- REEF --> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-local</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-runtime-yarn</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>reef-io</artifactId> + <version>${project.version}</version> + </dependency> + <!-- End of REEF --> + <!-- Spark --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <scope>provided</scope> + </dependency> + <!-- End of Spark --> + <dependency> + <groupId>com.jsuereth</groupId> + <artifactId>scala-arm_2.11</artifactId> + <version>2.0</version> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/scala</sourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + <configuration> + <outputFile> + ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar + </outputFile> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>yarn-default.xml</exclude> + <exclude>yarn-version-info.properties</exclude> + <exclude>core-default.xml</exclude> + <exclude>LICENSE</exclude> + <exclude>META-INF/*</exclude> + </excludes> + </filter> + </filters> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala ---------------------------------------------------------------------- diff --git a/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala new file mode 100644 index 0000000..033607e --- /dev/null +++ b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSpark.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.hellospark + +import java.util.logging.{Level, Logger} + +import org.apache.reef.client.{DriverConfiguration, DriverLauncher} +import org.apache.reef.runtime.common.REEFEnvironment +import org.apache.reef.runtime.yarn.client.unmanaged.UnmanagedAmYarnClientConfiguration +import org.apache.reef.runtime.yarn.client.unmanaged.UnmanagedAmYarnDriverConfiguration +import org.apache.reef.util.EnvironmentUtils +import org.apache.spark.{SparkConf, SparkContext} +import resource._ + +// Run: +// ..\spark\bin\spark-submit.cmd +// --master yarn --deploy-mode cluster +// --class org.apache.reef.examples.hellospark.ReefOnSpark +// .\target\reef-examples-spark-0.16.0-SNAPSHOT-shaded.jar + +object ReefOnSpark { + + private val LOG: Logger = Logger.getLogger(this.getClass.getName) + + private val rootFolder = "." + + private val runtimeConfig = UnmanagedAmYarnClientConfiguration.CONF + .set(UnmanagedAmYarnClientConfiguration.ROOT_FOLDER, rootFolder) + .build + + def main(args: Array[String]) { + + LOG.setLevel(Level.FINEST) + + val conf = new SparkConf().setAppName("ReefOnSpark:host") + val sc = new SparkContext(conf) + + for (client <- managed(DriverLauncher.getLauncher(runtimeConfig))) { + + val jarPath = EnvironmentUtils.getClassLocation(classOf[ReefOnSparkDriver]) + + val driverConfig = DriverConfiguration.CONF + .set(DriverConfiguration.DRIVER_IDENTIFIER, "ReefOnSpark:hello") + .set(DriverConfiguration.GLOBAL_LIBRARIES, jarPath) + .set(DriverConfiguration.ON_DRIVER_STARTED, classOf[ReefOnSparkDriver#StartHandler]) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, classOf[ReefOnSparkDriver#EvaluatorAllocatedHandler]) + .build + + val appId = client.submit(driverConfig, 120000) + + LOG.log(Level.INFO, "Job submitted: {0} to {1}", Array[AnyRef](appId, jarPath)) + + val yarnAmConfig = UnmanagedAmYarnDriverConfiguration.CONF + .set(UnmanagedAmYarnDriverConfiguration.JOB_IDENTIFIER, appId) + .set(UnmanagedAmYarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, rootFolder) + .build + + for (reef <- managed(REEFEnvironment.fromConfiguration(client.getUser, yarnAmConfig, driverConfig))) { + reef.run() + val status = reef.getLastStatus + LOG.log(Level.INFO, "REEF job {0} completed: state {1}", Array[AnyRef](appId, status.getState)) + } + } + + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala ---------------------------------------------------------------------- diff --git a/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala new file mode 100644 index 0000000..a77ff07 --- /dev/null +++ b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkDriver.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.hellospark + +import org.apache.reef.driver.evaluator.AllocatedEvaluator +import org.apache.reef.driver.evaluator.EvaluatorRequestor +import org.apache.reef.driver.task.TaskConfiguration +import org.apache.reef.tang.Configuration +import org.apache.reef.tang.annotations.Unit +import org.apache.reef.wake.EventHandler +import org.apache.reef.wake.time.event.StartTime + +import javax.inject.Inject +import java.util.logging.Level +import java.util.logging.Logger + +object ReefOnSparkDriver { + private val LOG: Logger = Logger.getLogger(classOf[ReefOnSparkDriver].getName) +} + +@Unit +final class ReefOnSparkDriver @Inject private(val requestor: EvaluatorRequestor) { + + ReefOnSparkDriver.LOG.log(Level.FINE, "Instantiated ReefOnSparkDriver") + + final class StartHandler extends EventHandler[StartTime] { + def onNext(startTime: StartTime) { + + ReefOnSparkDriver.LOG.log(Level.INFO, "Start ReefOnSparkDriver: {0}", startTime) + + requestor.newRequest + .setNumber(1) + .setMemory(64) + .setNumberOfCores(1) + .submit() + + ReefOnSparkDriver.LOG.log(Level.INFO, "Requested Evaluator.") + } + } + + final class EvaluatorAllocatedHandler extends EventHandler[AllocatedEvaluator] { + def onNext(allocatedEvaluator: AllocatedEvaluator) { + + ReefOnSparkDriver.LOG.log(Level.INFO, + "Submitting ReefOnSparkTask task to AllocatedEvaluator: {0}", allocatedEvaluator) + + val taskConfiguration: Configuration = TaskConfiguration.CONF + .set(TaskConfiguration.IDENTIFIER, "ReefOnSparkTask") + .set(TaskConfiguration.TASK, classOf[ReefOnSparkTask]) + .build + + allocatedEvaluator.submitTask(taskConfiguration) + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala ---------------------------------------------------------------------- diff --git a/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala new file mode 100644 index 0000000..934790d --- /dev/null +++ b/lang/scala/reef-examples-scala/src/main/scala/org/apache/reef/examples/hellospark/ReefOnSparkTask.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.examples.hellospark + +import javax.inject.Inject +import java.util.logging.Level +import java.util.logging.Logger + +import org.apache.reef.task.Task + +object ReefOnSparkTask { + private val LOG: Logger = Logger.getLogger(classOf[ReefOnSparkTask].getName) +} + +final class ReefOnSparkTask @Inject() extends Task { + + ReefOnSparkTask.LOG.log(Level.FINE, "Instantiated ReefOnSparkTask") + + override def call(bytes: Array[Byte]): Array[Byte] = { + ReefOnSparkTask.LOG.log(Level.INFO, "Hello Spark!") + null + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/39adc451/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6e41a3e..5940dad 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ under the License. <bundle.snappy>false</bundle.snappy> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.6.0</hadoop.version> + <spark.version>2.1.0</spark.version> <avro.version>1.8.1</avro.version> <parquet.version>1.9.0</parquet.version> <jetty.version>6.1.26</jetty.version> @@ -275,7 +276,7 @@ under the License. <exclude>.gitignore</exclude> <exclude>.git/**</exclude> <!-- Intellij idea project files --> - <exclude>lang/java/.idea/**</exclude> + <exclude>**/.idea/**</exclude> <exclude>**/*.iml</exclude> <exclude>**/target/**</exclude> <!-- ReadMe files --> @@ -402,6 +403,25 @@ under the License. </excludes> </configuration> </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.1</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <args> + <!-- work-around for https://issues.scala-lang.org/browse/SI-8358 --> + <arg>-nobootcp</arg> + </args> + </configuration> + </plugin> </plugins> </pluginManagement> <plugins> @@ -611,6 +631,15 @@ under the License. </dependency> <!-- END OF HADOOP --> + <!-- Spark --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <!-- End of Spark --> + <!-- Apache Commons --> <dependency> <groupId>commons-cli</groupId> @@ -751,6 +780,7 @@ under the License. <module>lang/java/reef-webserver</module> <module>lang/java/reef-utils-hadoop</module> <module>lang/java/reef-utils</module> + <module>lang/scala/reef-examples-scala</module> <module>website</module> </modules>
