nadav-har-tzvi closed pull request #35: Adding getActionResources and getActionDependencies URL: https://github.com/apache/incubator-amaterasu/pull/35
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala index 90c2001..fff2a81 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala @@ -94,6 +94,7 @@ class MesosActionsExecutor extends Executor with Logging { .setTaskId(taskInfo.getTaskId) .setState(TaskState.TASK_STARTING).build() driver.sendStatusUpdate(status) + val task = Future { val taskData = mapper.readValue(new ByteArrayInputStream(taskInfo.getData.toByteArray), classOf[TaskData]) diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala index 79bd080..9f267a1 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala @@ -17,14 +17,16 @@ package org.apache.amaterasu.frameworks.spark.dispatcher import java.io.File +import java.util import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.{PySparkRunnerProvider, SparkScalaRunnerProvider} +import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._ import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser} import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider} import scala.collection.mutable +import collection.JavaConversions._ class SparkSetupProvider extends FrameworkSetupProvider { @@ -50,19 +52,24 @@ class SparkSetupProvider extends FrameworkSetupProvider { this.conf = conf runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf)) + runnerProviders += ("scala-shell" -> SparkShellScalaRunnerProvider(conf)) runnerProviders += ("pyspark" -> PySparkRunnerProvider(conf)) } override def getGroupIdentifier: String = "spark" - override def getGroupResources: Array[File] = { - - conf.mode match { + override def getGroupResources: Array[File] = conf.mode match { case "mesos" => Array[File](new File(s"spark-${conf.Webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar")) case "yarn" => new File(conf.spark.home).listFiles case _ => Array[File]() } + + + override def getEnvironmentVariables: util.Map[String, String] = conf.mode match { + case "mesos" => Map[String, String](s"SPARK_HOME" -> s"spark-${conf.Webserver.sparkVersion}") + case "yarn" => Map[String, String]("SPARK_HOME" -> "spark") + case _ => Map[String, String]() } override def getDriverConfiguration: DriverConfiguration = { diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala index 9d5405e..7f91142 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala @@ -31,10 +31,14 @@ class PySparkRunnerProvider extends RunnerSetupProvider { case _ => "" } - override def getRunnerResources: Array[String] = { + override def getRunnerResources: Array[String] = Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py") - } + def getActionResources(jobId: String, actionData: ActionData): Array[String] = + Array[String]() + + override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = + Array[String]() } object PySparkRunnerProvider { diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala index c92a784..8dbc36d 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala @@ -32,9 +32,9 @@ class SparkScalaRunnerProvider extends RunnerSetupProvider { override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match { case "mesos" => s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " + - s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " + - s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + - s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin + s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " + + s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + + s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.name}".stripMargin case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " + s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/ " + "-Xmx2G " + @@ -47,10 +47,15 @@ class SparkScalaRunnerProvider extends RunnerSetupProvider { case _ => "" } - override def getRunnerResources: Array[String] = { + override def getRunnerResources: Array[String] = Array[String]() - } + + def getActionResources(jobId: String, actionData: ActionData): Array[String] = + Array[String]() + + override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = + Array[String]() } object SparkScalaRunnerProvider { diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala new file mode 100644 index 0000000..525c4f5 --- /dev/null +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala @@ -0,0 +1,30 @@ +package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers + +import org.apache.amaterasu.common.configuration.ClusterConfig +import org.apache.amaterasu.common.dataobjects.ActionData +import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider + +class SparkShellScalaRunnerProvider extends RunnerSetupProvider { + + private var conf: ClusterConfig = _ + + override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = + s"$$SPARK_HOME/bin/spark-shell ${actionData.src} --jars spark-runtime-${conf.version}.jar" + + override def getRunnerResources: Array[String] = + Array[String]() + + def getActionResources(jobId: String, actionData: ActionData): Array[String] = + Array[String](s"$jobId/${actionData.name}/${actionData.src}") + + override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = Array[String]() + +} + +object SparkShellScalaRunnerProvider { + def apply(conf: ClusterConfig): SparkShellScalaRunnerProvider = { + val result = new SparkShellScalaRunnerProvider + result.conf = conf + result + } +} \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala index aba6210..c271f50 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala @@ -83,13 +83,11 @@ object JobParser { * @param previous the previous action, this is used in order to add the current action * to the nextActionIds */ - def parseActions( - actions: Seq[JsonNode], - manager: JobManager, - actionsQueue: BlockingQueue[ActionData], - attempts: Int, - previous: Action - ): Unit = { + def parseActions(actions: Seq[JsonNode], + manager: JobManager, + actionsQueue: BlockingQueue[ActionData], + attempts: Int, + previous: Action): Unit = { if (actions.isEmpty) return @@ -106,8 +104,8 @@ object JobParser { //updating the list of frameworks setup manager.frameworks.getOrElseUpdate(action.data.groupId, - new mutable.HashSet[String]()) - .add(action.data.typeId) + new mutable.HashSet[String]()) + .add(action.data.typeId) if (manager.head == null) @@ -143,34 +141,28 @@ object JobParser { } - def parseSequentialAction( - action: JsonNode, - jobId: String, - actionsQueue: BlockingQueue[ActionData], - client: CuratorFramework, - attempts: Int - ): SequentialAction = { + def parseSequentialAction(action: JsonNode, + jobId: String, + actionsQueue: BlockingQueue[ActionData], + client: CuratorFramework, + attempts: Int): SequentialAction = { - SequentialAction( - action.path("name").asText, + SequentialAction(action.path("name").asText, action.path("file").asText, action.path("runner").path("group").asText, action.path("runner").path("type").asText, - action.path("exports").fields().asScala.toSeq.map(e=> (e.getKey, e.getValue.asText())).toMap, + action.path("exports").fields().asScala.toSeq.map(e => (e.getKey, e.getValue.asText())).toMap, jobId, actionsQueue, client, - attempts - ) + attempts) } - def parseErrorAction( - action: JsonNode, - jobId: String, - parent: String, - actionsQueue: BlockingQueue[ActionData], - client: CuratorFramework - ): SequentialAction = { + def parseErrorAction(action: JsonNode, + jobId: String, + parent: String, + actionsQueue: BlockingQueue[ActionData], + client: CuratorFramework): SequentialAction = { ErrorAction( action.path("name").asText, diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index d68ae77..a407a0d 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -17,6 +17,9 @@ package org.apache.amaterasu.leader.mesos.schedulers import java.io.{File, PrintWriter, StringWriter} +import java.nio.file.Files.copy +import java.nio.file.Paths.get +import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.util import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} @@ -40,6 +43,7 @@ import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.log4j.LogManager import org.apache.mesos.Protos.CommandInfo.URI +import org.apache.mesos.Protos.Environment.Variable import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Protos, SchedulerDriver} @@ -184,83 +188,98 @@ class JobScheduler extends AmaterasuScheduler { var executor: ExecutorInfo = null val slaveId = offer.getSlaveId.getValue slavesExecutors.synchronized { - if (slavesExecutors.contains(slaveId) && - offer.getExecutorIdsList.contains(slavesExecutors(slaveId).getExecutorId)) { - executor = slavesExecutors(slaveId) - } - else { - val execData = DataLoader.getExecutorDataBytes(env, config) - val executorId = taskId.getValue + "-" + UUID.randomUUID() - //creating the command - - println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}") - val command = CommandInfo - .newBuilder - .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")) - .addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar") - .setExecutable(false) - .setExtract(false) - .build()) - - // Getting env.yaml - command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml") + // if (slavesExecutors.contains(slaveId) && + // offer.getExecutorIdsList.contains(slavesExecutors(slaveId).getExecutorId)) { + // executor = slavesExecutors(slaveId) + // } + // else { + val execData = DataLoader.getExecutorDataBytes(env, config) + val executorId = taskId.getValue + "-" + UUID.randomUUID() + //creating the command + + // TODO: move this into the runner provider somehow + copy(get(s"repo/src/${actionData.src}"), get(s"dist/${jobManager.jobId}/${actionData.name}/${actionData.src}"), REPLACE_EXISTING) + + println(s"===> ${runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")}") + val command = CommandInfo + .newBuilder + .setValue(runnerProvider.getCommand(jobManager.jobId, actionData, env, executorId, "")) + .addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/executor-${config.version}-all.jar") .setExecutable(false) - .setExtract(true) + .setExtract(false) .build()) - // Getting datastores.yaml - command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml") - .setExecutable(false) - .setExtract(true) + // Getting env.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml") + .setExecutable(false) + .setExtract(true) + .build()) + + // Getting datastores.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml") + .setExecutable(false) + .setExtract(true) + .build()) + + // Getting runtime.yaml + command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml") + .setExecutable(false) + .setExtract(true) + .build()) + + // Getting framework resources + frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}") + .setExecutable(false) + .setExtract(true) + .build())) + + // Getting runner resources + runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r") + .setExecutable(false) + .setExtract(false) + .build())) + + // Getting action specific resources + runnerProvider.getActionResources(jobManager.jobId, actionData).foreach(r => command.addUris(URI.newBuilder + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r") + .setExecutable(false) + .setExtract(false) + .build())) + + command + .addUris(URI.newBuilder() + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side + .setExecutable(true) + .setExtract(false) .build()) - - // Getting runtime.yaml - command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml") + .addUris(URI.newBuilder() + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties") .setExecutable(false) - .setExtract(true) + .setExtract(false) .build()) - // Getting framework resources - frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${f.getName}") - .setExecutable(false) - .setExtract(true) - .build())) + // setting the processes environment variables + val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava + command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList)) - // Getting running resources - runnerProvider.getRunnerResources.foreach(r => command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$r") - .setExecutable(false) - .setExtract(false) - .build())) - - command - .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side - .setExecutable(true) - .setExtract(false) - .build()) - .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties") - .setExecutable(false) - .setExtract(false) - .build()) - - executor = ExecutorInfo - .newBuilder - .setData(ByteString.copyFrom(execData)) - .setName(taskId.getValue) - .setExecutorId(ExecutorID.newBuilder().setValue(executorId)) - .setCommand(command) - .build() - - slavesExecutors.put(offer.getSlaveId.getValue, executor) - } + executor = ExecutorInfo + .newBuilder + .setData(ByteString.copyFrom(execData)) + .setName(taskId.getValue) + .setExecutorId(ExecutorID.newBuilder().setValue(executorId)) + .setCommand(command) + + .build() + + slavesExecutors.put(offer.getSlaveId.getValue, executor) } + //} val driverConfiguration = frameworkProvider.getDriverConfiguration diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java index b676c1c..df150a0 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java @@ -20,6 +20,7 @@ import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration; import java.io.File; +import java.util.Map; public interface FrameworkSetupProvider { @@ -33,6 +34,8 @@ RunnerSetupProvider getRunnerProvider(String runnerId); + Map<String, String> getEnvironmentVariables(); + String[] getConfigurationItems(); } \ No newline at end of file diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java index fe4086d..fc2eb9a 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.java @@ -24,4 +24,8 @@ String[] getRunnerResources(); + String[] getActionResources(String jobId, ActionData actionData); + + String[] getActionDependencies(String jobId, ActionData actionData); + } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services