This is an automated email from the ASF dual-hosted git repository. yaniv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit 3a872be239c435d27a778b7a814eb0728e3df8b7 Merge: b53d7c0 b0b6472 Author: Yaniv Rodenski <[email protected]> AuthorDate: Thu Apr 18 23:31:48 2019 +1000 virtual env is now available common/build.gradle | 12 ++- .../org/apache/amaterasu/common/utils/FileUtil.kt | 100 +++++++++++++++++++++ .../apache/amaterasu/common/utils/FileTestUtils.kt | 25 ++++++ .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes .../runners/providers/PySparkRunnerProvider.scala | 10 +-- .../providers/SparkSubmitScalaRunnerProvider.scala | 15 ++-- .../amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes .../amaterasu/leader/yarn/ApplicationMaster.kt | 12 ++- .../leader/mesos/schedulers/JobScheduler.scala | 40 +++++++-- .../sdk/frameworks/RunnerSetupProvider.kt | 31 +++++-- .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip | Bin 14898 -> 14898 bytes 12 files changed, 213 insertions(+), 32 deletions(-) diff --cc frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip index 69a9002,0000000..f4aa8c6 mode 100644,000000..100644 Binary files differ diff --cc frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip index 448adf5,0000000..f62dc5a mode 100644,000000..100644 Binary files differ diff --cc frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala index 465691c,d0a2442..fd868ac --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala @@@ -1,31 -1,48 +1,31 @@@ package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers -import java.net.URLEncoder - import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.leader.common.utilities.DataLoader -import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider -import org.apache.commons.lang.StringUtils -import org.apache.hadoop.yarn.api.ApplicationConstants - -class PySparkRunnerProvider extends RunnerSetupProvider { - - private var conf: ClusterConfig = _ - private val libPath = System.getProperty("java.library.path") - - override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match { - case "mesos" => - s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz " + - s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/* " + - s"-Dscala.usejavacp=true -Djava.library.path=$libPath org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}.stripMargin" - case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " + - s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/load-spark.sh && " + - s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " + - "-Xmx2G " + - "-Dscala.usejavacp=true " + - "-Dhdp.version=2.6.5.0-292 " + - "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + - s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + - s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + - s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " - case _ => "" - } +import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase - import org.apache.commons.lang.StringUtils - override def getRunnerResources: Array[String] = - Array[String]("miniconda.sh", "spark_intp.py", "runtime.py", "codegen.py") +class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) { - override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = - Array[String]() + override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = { - //val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) ++ val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) + log.info(s"===> Cluster manager: ${conf.mode}") - //command + - // s"$$SPARK_HOME/conf/spark-env.sh && env PYSPARK_PYTHON=$getVirtualPythonPath " + - s"$$SPARK_HOME/bin/spark-submit ${actionData.getSrc}" ++ command + ++ //s" $$SPARK_HOME/conf/spark-env.sh &&" + ++ s" && env PYSPARK_PYTHON=$getVirtualPythonPath" + ++ s" $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}" + } + + override def getRunnerResources: Array[String] = { + var resources = super.getRunnerResources + resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip" + log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}") + resources + } - override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = - Array[String]() - override def getHasExecutor: Boolean = true + override def getHasExecutor: Boolean = false + override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() } object PySparkRunnerProvider { diff --cc frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip index 9855261,0000000..c8e66c6 mode 100644,000000..100644 Binary files differ diff --cc leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt index cd4bf03,3e8a7b6..77c17d1 --- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt +++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt @@@ -249,8 -243,6 +248,8 @@@ class ApplicationMaster : KLogging(), A jobManager.actionStarted(actionData.id) containersIdsToTask[container.id.containerId] = actionData + notifier.info("created container for ${actionData.name} created") - //ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") } ++ ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") } log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}") } } diff --cc leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index a87ea5d,464e3bf..f9b1060 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@@ -17,8 -17,9 +17,9 @@@ package org.apache.amaterasu.leader.mesos.schedulers import java.io.{File, PrintWriter, StringWriter} + import java.nio.file.{Files, Path, Paths, StandardCopyOption} import java.util -import java.util.UUID +import java.util.{Collections, UUID} import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} @@@ -247,9 -248,33 +252,28 @@@ class JobScheduler extends AmaterasuSch .setExtract(false) .build())) + // setting up action executable + val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData)) + var executable: Path = null + if (actionData.getHasArtifact) { + val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath) + executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount) + } else { + val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}") + FileUtils.moveFile(sourcePath, dest) + executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString) + } + + println(s"===> executable $executable") + command.addUris(URI.newBuilder - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$executable") ++ .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable") + .setExecutable(false) + .setExtract(false) + .build()) + command .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side - .setExecutable(true) - .setExtract(false) - .build()) - .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties") + .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties") .setExecutable(false) .setExtract(false) .build()) diff --cc sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt index 57736cc,9af488e..eb75b2d --- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt +++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt @@@ -17,10 -17,10 +17,12 @@@ package org.apache.amaterasu.sdk.frameworks import org.apache.amaterasu.common.dataobjects.ActionData + import org.apache.amaterasu.common.utils.ArtifactUtil + import org.apache.amaterasu.common.utils.FileUtil +import org.apache.amaterasu.common.logging.KLogging +import org.apache.amaterasu.common.logging.Logging -abstract class RunnerSetupProvider { +abstract class RunnerSetupProvider : Logging() { private val actionFiles = arrayOf("env.yaml", "runtime.yaml", "datastores.yaml") diff --cc sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip index 7ba06f4,0000000..9ce27b8 mode 100644,000000..100644 Binary files differ
