This is an automated email from the ASF dual-hosted git repository. yaniv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit 9d14164317343afc3b4c84693413adc85da36b46 Author: Yaniv Rodenski <[email protected]> AuthorDate: Sun May 5 10:14:03 2019 +1000 mesos executing the new python support --- .../common/configuration/ClusterConfig.scala | 6 +- .../runners/providers/BasicPythonRunnerProvider.kt | 3 +- .../runners/providers/PandasRunnerProvider.kt | 3 +- .../runners/providers/PythonRunnerProviderBase.kt | 5 +- .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes frameworks/spark/dispatcher/build.gradle | 5 - .../spark/dispatcher/SparkSetupProvider.kt | 87 ++++++++ .../runners/providers/PySparkRunnerProvider.kt} | 31 ++- .../providers/SparkSubmitScalaRunnerProvider.kt | 28 +++ .../spark/dispatcher/SparkSetupProvider.scala | 234 ++++++++++----------- .../runners/providers/PySparkRunnerProvider.scala | 86 ++++---- .../providers/SparkScalaRunnerProvider.scala | 138 ++++++------ .../providers/SparkSubmitScalaRunnerProvider.scala | 100 ++++----- .../amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes .../frameworls/FrameworkProvidersFactory.kt | 1 - .../leader/common/utilities/DataLoader.kt | 14 +- .../amaterasu/leader/yarn/ApplicationMaster.kt | 4 +- .../leader/mesos/schedulers/JobScheduler.scala | 4 +- sdk/build.gradle | 6 +- .../sdk/frameworks/RunnerSetupProvider.kt | 6 +- .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip | Bin 15020 -> 15020 bytes 22 files changed, 455 insertions(+), 306 deletions(-) diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala index ed429de..e5c05a7 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala @@ -66,7 +66,7 @@ class ClusterConfig extends Logging { if (props.containsKey("yarn.hadoop.home.dir")) hadoopHomeDir = props.getProperty("yarn.hadoop.home.dir") this.master.load(props) - this.Worker.load(props) + this.worker.load(props) } class Master { @@ -81,7 +81,7 @@ class ClusterConfig extends Logging { val Master = new Master() - object Worker { + class Worker { var cores: Int = 1 var memoryMB: Int = 1024 @@ -91,6 +91,8 @@ class ClusterConfig extends Logging { } } + val worker = new Worker() + } diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt index a78e623..2cac127 100644 --- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt +++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt @@ -16,6 +16,7 @@ */ package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers +import com.uchuhimo.konf.Config import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData @@ -29,7 +30,7 @@ import org.apache.amaterasu.common.dataobjects.ActionData override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf() - override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String { + override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String { return super.getCommand(jobId, actionData, env, executorId, callbackAddress) + " && $virtualPythonPath ${actionData.src}" } diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt index be32d9b..e616498 100644 --- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt +++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt @@ -1,5 +1,6 @@ package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers +import com.uchuhimo.konf.Config import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData @@ -13,7 +14,7 @@ import org.apache.amaterasu.common.dataobjects.ActionData return resources } - override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String { + override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String { return super.getCommand(jobId, actionData, env, executorId, callbackAddress) + " && $virtualPythonPath ${actionData.src}" } } \ No newline at end of file diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt index 8636f22..daea452 100644 --- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt +++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt @@ -16,6 +16,7 @@ */ package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers +import com.uchuhimo.konf.Config import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.leader.common.utilities.DataLoader @@ -32,12 +33,12 @@ abstract class PythonRunnerProviderBase(val env: String, val conf: ClusterConfig override val runnerResources: Array<String> get() = arrayOf("amaterasu-sdk-${conf.version()}.zip") - override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String { + override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String { val pythonPath = conf.pythonPath() val virtualEnvCmd = "$pythonPath -m venv amaterasu_env" val installBaseRequirementsCmd = "$virtualPythonPath -m pip install --upgrade --force-reinstall -r $requirementsFileName" var cmd = "$virtualEnvCmd && $installBaseRequirementsCmd" - val execData = DataLoader.getExecutorData(env, conf) + val execData = DataLoader.getExecutorData(this.env, conf) execData.pyDeps?.filePaths?.forEach { path -> cmd += " && $pythonPath -m pip install -r ${path.split('/').last()}" } diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip index 6a7200c..c625dd1 100644 Binary files a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip and b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip differ diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip index 7f94f5b..8433083 100644 Binary files a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip and b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip differ diff --git a/frameworks/spark/dispatcher/build.gradle b/frameworks/spark/dispatcher/build.gradle index e28b202..2f4d469 100644 --- a/frameworks/spark/dispatcher/build.gradle +++ b/frameworks/spark/dispatcher/build.gradle @@ -34,7 +34,6 @@ buildscript { plugins { id 'com.github.johnrengelman.shadow' version '1.2.4' id 'com.github.maiflai.scalatest' version '0.22' - id 'scala' } shadowJar { @@ -52,10 +51,6 @@ apply plugin: 'kotlin' dependencies { - compile 'org.scala-lang:scala-library:2.11.8' -// compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' -// compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' - compile project(':common') compile project(':leader-common') compile project(':amaterasu-sdk') diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt new file mode 100644 index 0000000..63a4234 --- /dev/null +++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt @@ -0,0 +1,87 @@ +package org.apache.amaterasu.frameworks.spark.dispatcher + +import org.apache.amaterasu.common.configuration.ClusterConfig +import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.PySparkRunnerProvider +import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.SparkSubmitScalaRunnerProvider +import org.apache.amaterasu.leader.common.utilities.DataLoader +import org.apache.amaterasu.leader.common.utilities.MemoryFormatParser +import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider +import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider +import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration +import org.apache.commons.lang.StringUtils +import java.io.File + +class SparkSetupProvider : FrameworkSetupProvider { + + private lateinit var env: String + private lateinit var conf: ClusterConfig + + private val sparkExecConfigurations: Map<String, Any> by lazy { + val execData = DataLoader.getExecutorData(env, conf) + execData.configurations["spark"].orEmpty() + } + + private lateinit var runnerProviders: Map<String, RunnerSetupProvider> + + override fun init(env: String, conf: ClusterConfig) { + + this.env = env + this.conf = conf + + runnerProviders = mapOf( + "jar" to SparkSubmitScalaRunnerProvider(conf), + "pyspark" to PySparkRunnerProvider(env, conf) + ) + + } + + override val environmentVariables: Map<String, String> by lazy { + when (conf.mode()) { + "mesos" -> mapOf("SPARK_HOME" to "spark-${conf.webserver().sparkVersion()}", "SPARK_HOME_DOCKER" to "/opt/spark/") + "yarn" -> mapOf("SPARK_HOME" to StringUtils.stripStart(conf.spark().home(), "/")) + else -> mapOf() + } + } + + override val groupResources: Array<File> by lazy { + when (conf.mode()) { + "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runner-${conf.version()}-all.jar"), File("spark-runtime-${conf.version()}.jar")) + "yarn" -> arrayOf(File("spark-runner-${conf.version()}-all.jar"), File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home())) + else -> arrayOf() + } + } + + override fun getRunnerProvider(runnerId: String): RunnerSetupProvider { + return runnerProviders.getValue(runnerId) + } + + override val groupIdentifier: String = "spark" + override val configurationItems = arrayOf("sparkConfiguration", "sparkExecutor") + + override val driverConfiguration: DriverConfiguration + get() { + //TODO: Spark configuration sould come for the ENV only + val sparkOpts = conf.spark().opts() + val cpu: Int = when { + sparkExecConfigurations.containsKey("spark.yarn.am.cores") -> sparkExecConfigurations["spark.yarn.am.cores"].toString().toInt() + sparkExecConfigurations.containsKey("spark.driver.cores") -> sparkExecConfigurations["spark.driver.cores"].toString().toInt() + sparkOpts.contains("yarn.am.cores") -> sparkOpts["yarn.am.cores"].toString().toInt() + sparkOpts.contains("driver.cores") -> sparkOpts["driver.cores"].toString().toInt() + conf.yarn().worker().cores() > 0 -> conf.yarn().worker().cores() + else -> 1 + } + + val mem: Int = when { + sparkExecConfigurations.containsKey("spark.yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.yarn.am.memory"].toString()) + sparkExecConfigurations.containsKey("spark.driver.memeory") -> MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.driver.memeory"].toString()) + sparkOpts.contains("yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkOpts["yarn.am.memory"].get()) + sparkOpts.contains("driver.memory") -> MemoryFormatParser.extractMegabytes(sparkOpts["driver.memory"].get()) + conf.yarn().worker().memoryMB() > 0 -> conf.yarn().worker().memoryMB() + conf.taskMem() > 0 -> conf.taskMem() + else -> 1024 + } + return DriverConfiguration(mem, cpu) + } + + +} \ No newline at end of file diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt similarity index 54% copy from frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt copy to frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt index a78e623..1d8e3ce 100644 --- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt +++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt @@ -14,23 +14,36 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers +package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers +import com.uchuhimo.konf.Config import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData +import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase +import org.apache.amaterasu.leader.common.configuration.Job + +class PySparkRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) { + + override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String { + val command = super.getCommand(jobId, actionData , env, executorId, callbackAddress) + command + " && \$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " + + "--conf spark.pyspark.python=${conf.pythonPath()} " + + "--conf spark.pyspark.driver.python=$virtualPythonPath " + + "--files \$SPARK_HOME/conf/hive-site.xml ${actionData.src}" + + return command + } + + + override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf() - class BasicPythonRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) { override val runnerResources: Array<String> get() { var resources = super.runnerResources - resources += "amaterasu_python-${conf.version()}.zip" + resources += "amaterasu_pyspark-${conf.version()}.zip" + //log.info("PYSPARK RESOURCES ==> ${resources.toSet}") return resources } - override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf() - - override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String { - return super.getCommand(jobId, actionData, env, executorId, callbackAddress) + " && $virtualPythonPath ${actionData.src}" - } - + override val hasExecutor: Boolean = false } \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt new file mode 100644 index 0000000..a07747a --- /dev/null +++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt @@ -0,0 +1,28 @@ +package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers + +import com.uchuhimo.konf.Config +import org.apache.amaterasu.common.configuration.ClusterConfig +import org.apache.amaterasu.common.dataobjects.ActionData +import org.apache.amaterasu.common.utils.ArtifactUtil +import org.apache.amaterasu.leader.common.configuration.Job +import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider + +class SparkSubmitScalaRunnerProvider(val conf: ClusterConfig) : RunnerSetupProvider() { + + override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String { + val util = ArtifactUtil(listOf(actionData.repo), jobId) + val classParam = if (actionData.hasArtifact) " --class ${actionData.entryClass}" else "" + + return "\$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.artifact).first().name} " + + " --master ${env[Job.master]}" + + " --jars spark-runtime-${conf.version()}.jar >&1" + } + + override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf() + + override fun getActionDependencies(jobId: String, actionData: ActionData): Array<String> = arrayOf() + + override val hasExecutor: Boolean = false + override val runnerResources: Array<String> = arrayOf() + +} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala index cd16b33..76d0aa8 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala @@ -1,117 +1,117 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.amaterasu.frameworks.spark.dispatcher - -import java.io.File -import java.util - -import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._ -import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser} -import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration -import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider} -import org.apache.commons.lang.StringUtils - -import scala.collection.mutable -import collection.JavaConversions._ - -class SparkSetupProvider extends FrameworkSetupProvider { - - private var env: String = _ - private var conf: ClusterConfig = _ - private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig - - private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]() - - private def loadSparkConfig: mutable.Map[String, Any] = { - - val execData = DataLoader.getExecutorData(env, conf) - val sparkExecConfiguration = execData.getConfigurations.get("spark") - if (sparkExecConfiguration.isEmpty) { - throw new Exception(s"Spark configuration files could not be loaded for the environment $env") - } - collection.mutable.Map(sparkExecConfiguration.toSeq: _*) - - } - - override def init(env: String, conf: ClusterConfig): Unit = { - this.env = env - this.conf = conf - -// this.sparkExecConfigurations = loadSparkConfig - runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf)) - runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf)) - runnerProviders += ("pyspark" -> PySparkRunnerProvider(env, conf)) - - } - - override def getGroupIdentifier: String = "spark" - - override def getGroupResources: Array[File] = conf.mode match { - case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar")) - case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home)) - case _ => Array[File]() - } - - - override def getEnvironmentVariables: util.Map[String, String] = conf.mode match { - case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/") - case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/")) - case _ => Map[String, String]() - } - - override def getDriverConfiguration: DriverConfiguration = { - var cpu: Int = 0 - if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) { - cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt - } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) { - cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt - } else if (conf.spark.opts.contains("yarn.am.cores")) { - cpu = conf.spark.opts("yarn.am.cores").toInt - } else if (conf.spark.opts.contains("driver.cores")) { - cpu = conf.spark.opts("driver.cores").toInt - } else if (conf.yarn.Worker.cores > 0) { - cpu = conf.yarn.Worker.cores - } else { - cpu = 1 - } - var mem: Int = 0 - if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) { - mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString) - } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) { - mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString) - } else if (conf.spark.opts.contains("yarn.am.memory")) { - mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory")) - } else if (conf.spark.opts.contains("driver.memory")) { - mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory")) - } else if (conf.yarn.Worker.memoryMB > 0) { - mem = conf.yarn.Worker.memoryMB - } else if (conf.taskMem > 0) { - mem = conf.taskMem - } else { - mem = 1024 - } - - new DriverConfiguration(mem, cpu) - } - - override def getRunnerProvider(runnerId: String): RunnerSetupProvider = { - runnerProviders(runnerId) - } - - override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor") -} \ No newline at end of file +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package org.apache.amaterasu.frameworks.spark.dispatcher +// +//import java.io.File +//import java.util +// +//import org.apache.amaterasu.common.configuration.ClusterConfig +//import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._ +//import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser} +//import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration +//import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider} +//import org.apache.commons.lang.StringUtils +// +//import scala.collection.mutable +//import collection.JavaConversions._ +// +//class SparkSetupProvider extends FrameworkSetupProvider { +// +// private var env: String = _ +// private var conf: ClusterConfig = _ +// private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig +// +// private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]() +// +// private def loadSparkConfig: mutable.Map[String, Any] = { +// +// val execData = DataLoader.getExecutorData(env, conf) +// val sparkExecConfiguration = execData.getConfigurations.get("spark") +// if (sparkExecConfiguration.isEmpty) { +// throw new Exception(s"Spark configuration files could not be loaded for the environment $env") +// } +// collection.mutable.Map(sparkExecConfiguration.toSeq: _*) +// +// } +// +// override def init(env: String, conf: ClusterConfig): Unit = { +// this.env = env +// this.conf = conf +// +//// this.sparkExecConfigurations = loadSparkConfig +// runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf)) +// runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf)) +// runnerProviders += ("pyspark" -> new PySparkRunnerProvider(env, conf)) +// +// } +// +// override def getGroupIdentifier: String = "spark" +// +// override def getGroupResources: Array[File] = conf.mode match { +// case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar")) +// case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home)) +// case _ => Array[File]() +// } +// +// +// override def getEnvironmentVariables: util.Map[String, String] = conf.mode match { +// case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/") +// case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/")) +// case _ => Map[String, String]() +// } +// +// override def getDriverConfiguration: DriverConfiguration = { +// var cpu: Int = 0 +// if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) { +// cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt +// } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) { +// cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt +// } else if (conf.spark.opts.contains("yarn.am.cores")) { +// cpu = conf.spark.opts("yarn.am.cores").toInt +// } else if (conf.spark.opts.contains("driver.cores")) { +// cpu = conf.spark.opts("driver.cores").toInt +// } else if (conf.yarn.Worker.cores > 0) { +// cpu = conf.yarn.Worker.cores +// } else { +// cpu = 1 +// } +// var mem: Int = 0 +// if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) { +// mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString) +// } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) { +// mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString) +// } else if (conf.spark.opts.contains("yarn.am.memory")) { +// mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory")) +// } else if (conf.spark.opts.contains("driver.memory")) { +// mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory")) +// } else if (conf.yarn.Worker.memoryMB > 0) { +// mem = conf.yarn.Worker.memoryMB +// } else if (conf.taskMem > 0) { +// mem = conf.taskMem +// } else { +// mem = 1024 +// } +// +// new DriverConfiguration(mem, cpu) +// } +// +// override def getRunnerProvider(runnerId: String): RunnerSetupProvider = { +// runnerProviders(runnerId) +// } +// +// override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor") +//} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala index ee20009..db1e3fc 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala @@ -1,41 +1,45 @@ -package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers - -import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase - -class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) { - - override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = { - - val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) - log.info(s"===> Cluster manager: ${conf.mode}") - command + - //s" $$SPARK_HOME/conf/spark-env.sh" + - // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" + - //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d - s" && $$SPARK_HOME/bin/spark-submit --master yarn-client " + - s"--conf spark.pyspark.python=${conf.pythonPath} " + - s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " + - s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}" - } - - override def getRunnerResources: Array[String] = { - var resources = super.getRunnerResources - resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip" - log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}") - resources - } - - - override def getHasExecutor: Boolean = false - - override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() -} - -object PySparkRunnerProvider { - def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = { - val result = new PySparkRunnerProvider(env, conf) - result - } -} \ No newline at end of file +//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers +// +//import com.uchuhimo.konf.Config +//import org.apache.amaterasu.common.configuration.ClusterConfig +//import org.apache.amaterasu.common.dataobjects.ActionData +//import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase +//import org.apache.amaterasu.leader.common.configuration.Job +// +//class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) { +// +// +// +// override def getRunnerResources: Array[String] = { +// var resources = super.getRunnerResources +// resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip" +// log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}") +// resources +// } +// +// +// override def getHasExecutor: Boolean = false +// +// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() +// +// override def getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String = { +// +// val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) +// log.info(s"===> Cluster manager: ${conf.mode}") +// command + +// //s" $$SPARK_HOME/conf/spark-env.sh" + +// // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" + +// //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d +// s" && $$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " + +// s"--conf spark.pyspark.python=${conf.pythonPath} " + +// s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " + +// s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}" +// } +//} +// +//object PySparkRunnerProvider { +// def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = { +// val result = new PySparkRunnerProvider(env, conf) +// result +// } +//} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala index 6cd63f5..40f9194 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala @@ -1,69 +1,69 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers - -import java.net.URLEncoder - -import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.leader.common.utilities.DataLoader -import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider -import org.apache.commons.lang.StringUtils -import org.apache.hadoop.yarn.api.ApplicationConstants - -class SparkScalaRunnerProvider extends RunnerSetupProvider { - - private var conf: ClusterConfig = _ - private val libPath = System.getProperty("java.library.path") - - override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match { - case "mesos" => - s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz " + - s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/* " + - s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + - s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin - case "yarn" => - s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " + - s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " + - "-Xmx2G " + - "-Dscala.usejavacp=true " + - "-Dhdp.version=2.6.5.0-292 " + - "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + - s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + - s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + - s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " - case _ => "" - } - - override def getRunnerResources: Array[String] = - Array[String]() - - override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = - Array[String]() - - override def getHasExecutor: Boolean = true - - override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() -} - -object SparkScalaRunnerProvider { - def apply(conf: ClusterConfig): SparkScalaRunnerProvider = { - val result = new SparkScalaRunnerProvider - result.conf = conf - result - } -} \ No newline at end of file +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers +// +//import java.net.URLEncoder +// +//import org.apache.amaterasu.common.configuration.ClusterConfig +//import org.apache.amaterasu.common.dataobjects.ActionData +//import org.apache.amaterasu.leader.common.utilities.DataLoader +//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider +//import org.apache.commons.lang.StringUtils +//import org.apache.hadoop.yarn.api.ApplicationConstants +// +//class SparkScalaRunnerProvider extends RunnerSetupProvider { +// +// private var conf: ClusterConfig = _ +// private val libPath = System.getProperty("java.library.path") +// +// override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match { +// case "mesos" => +// s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz " + +// s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/* " + +// s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + +// s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin +// case "yarn" => +// s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " + +// s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " + +// "-Xmx2G " + +// "-Dscala.usejavacp=true " + +// "-Dhdp.version=2.6.5.0-292 " + +// "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + +// s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + +// s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + +// s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " +// case _ => "" +// } +// +// override def getRunnerResources: Array[String] = +// Array[String]() +// +// override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = +// Array[String]() +// +// override def getHasExecutor: Boolean = true +// +// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() +//} +// +//object SparkScalaRunnerProvider { +// def apply(conf: ClusterConfig): SparkScalaRunnerProvider = { +// val result = new SparkScalaRunnerProvider +// result.conf = conf +// result +// } +//} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala index 8622800..b8e9677 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala @@ -1,50 +1,50 @@ -package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers - -import java.io.File - -import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.common.dataobjects.ActionData -import org.apache.amaterasu.common.utils.ArtifactUtil -import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider - -import scala.collection.JavaConverters._ - -class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider { - - private var conf: ClusterConfig = _ - val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath) - val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist") - val amalocation = new File(s"${new File(jarFile.getParent).getParent}") - - override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = { - - val util = new ArtifactUtil(List(actionData.repo).asJava, jobId) - val classParam = if (actionData.getHasArtifact) s" --class ${actionData.entryClass}" else "" - s"$$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1" - - } - - override def getRunnerResources: Array[String] = - Array[String]() - - override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = - Array[String]() - - - override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = - Array[String]() - - - override def getHasExecutor: Boolean = false - - -} - -object SparkSubmitScalaRunnerProvider { - def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = { - val result = new SparkSubmitScalaRunnerProvider - - result.conf = conf - result - } -} \ No newline at end of file +//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers +// +//import java.io.File +// +//import org.apache.amaterasu.common.configuration.ClusterConfig +//import org.apache.amaterasu.common.dataobjects.ActionData +//import org.apache.amaterasu.common.utils.ArtifactUtil +//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider +// +//import scala.collection.JavaConverters._ +// +//class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider { +// +// private var conf: ClusterConfig = _ +// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath) +// val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist") +// val amalocation = new File(s"${new File(jarFile.getParent).getParent}") +// +// override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = { +// +// val util = new ArtifactUtil(List(actionData.repo).asJava, jobId) +// val classParam = if (actionData.getHasArtifact) s" --class ${actionData.entryClass}" else "" +// s"$$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1" +// +// } +// +// override def getRunnerResources: Array[String] = +// Array[String]() +// +// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = +// Array[String]() +// +// +// override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = +// Array[String]() +// +// +// override def getHasExecutor: Boolean = false +// +// +//} +// +//object SparkSubmitScalaRunnerProvider { +// def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = { +// val result = new SparkSubmitScalaRunnerProvider +// +// result.conf = conf +// result +// } +//} \ No newline at end of file diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip index fa4e0eb..8d56b70 100644 Binary files a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip and b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip differ diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt index cf3e10d..78e06fa 100644 --- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt @@ -32,7 +32,6 @@ class FrameworkProvidersFactory(val env: String, val config: ClusterConfig) : KL providers = runnerTypes.map { - val provider = it.newInstance() provider.init(env, config) diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt index 73c66fd..fa114b0 100644 --- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt +++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt @@ -101,9 +101,19 @@ object DataLoader : KLogging() { fun getExecutorData(env: String, clusterConf: ClusterConfig): ExecData { // loading the job configuration - val envValue = File("repo/env/$env/job.yml").readText() //TODO: change this to YAML - val envData = ymlMapper.readValue<Environment>(envValue) + var envFile = File("repo/env/$env/job.yml") + val envValue = if (envFile.exists()) { + envFile.readText() + } else { + envFile = File("repo/env/$env/job.yaml") + if (envFile.exists()) { + envFile.readText() + } else { + "" + } + } + val envData = ymlMapper.readValue<Environment>(envValue) // loading all additional configurations val files = File("repo/env/$env/").listFiles().filter { it.isFile }.filter { it.name != "job.yml" } val config = files.map { yamlToMap(it) }.toMap() diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt index 861ca3f..075cfec 100644 --- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt +++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt @@ -237,7 +237,9 @@ class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler { val framework = frameworkFactory.getFramework(actionData.groupId) val runnerProvider = framework.getRunnerProvider(actionData.typeId) val ctx = Records.newRecord(ContainerLaunchContext::class.java) - val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, "${actionData.id}-${container.id.containerId}", address)) + + val envConf = configManager.getActionConfiguration(actionData.name, actionData.config) + val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, "${actionData.id}-${container.id.containerId}", address)) notifier.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}") ctx.commands = commands diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index 19cb831..293b878 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -168,6 +168,7 @@ class JobScheduler extends AmaterasuScheduler { val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig) writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml") + val envConf = configManager.getActionConfiguration(actionData.getName, actionData.getConfig) val dataStores = DataLoader.getTaskData(actionData, env).getExports val writer = new StringWriter() yamlMapper.writeValue(writer, dataStores) @@ -190,6 +191,7 @@ class JobScheduler extends AmaterasuScheduler { log.info(s">>>> Framework: ${actionData.getGroupId}") val frameworkProvider = frameworkFactory.providers(actionData.getGroupId) log.info(s">>>> Runner: ${actionData.getTypeId}") + val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId) // searching for an executor that already exist on the slave, if non exist @@ -207,7 +209,7 @@ class JobScheduler extends AmaterasuScheduler { // if(!actionData.getSrc.isEmpty){ // copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING) // } - val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, env, executorId, "") + val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "") log.info(s"===> Command: $commandStr") val command = CommandInfo .newBuilder diff --git a/sdk/build.gradle b/sdk/build.gradle index f299963..9335188 100644 --- a/sdk/build.gradle +++ b/sdk/build.gradle @@ -67,7 +67,10 @@ dependencies { testCompile group: 'junit', name: 'junit', version: '4.11' compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" compile "org.jetbrains.kotlin:kotlin-reflect" - + compile('com.uchuhimo:konf:0.11') { + exclude group: 'org.eclipse.jgit' + } + testCompile 'org.jetbrains.spek:spek-api:1.1.5' testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version" testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5' @@ -75,6 +78,7 @@ dependencies { // spek requires kotlin-reflect, can be omitted if already in the classpath testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" + testImplementation 'org.junit.platform:junit-platform-runner:1.0.0' testImplementation 'org.junit.platform:junit-platform-launcher:1.0.0' testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.0' diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt index 499ce10..0c2baf1 100644 --- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt +++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt @@ -19,16 +19,16 @@ package org.apache.amaterasu.sdk.frameworks import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.utils.ArtifactUtil import org.apache.amaterasu.common.utils.FileUtil -import org.apache.amaterasu.common.logging.KLogging import org.apache.amaterasu.common.logging.Logging +import com.uchuhimo.konf.Config abstract class RunnerSetupProvider : Logging() { - private val actionFiles = arrayOf("env.yaml", "runtime.yaml", "datastores.yaml", "datasets.yaml") + private val actionFiles = arrayOf("env.yaml", "runtime.yaml", "datasets.yaml") abstract val runnerResources: Array<String> - abstract fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String + abstract fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String protected fun getDownloadableActionSrcPath(jobId: String, actionData: ActionData): String { return "$jobId/${actionData.name}/${actionData.src}" diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip index b2d76c0..2b0e997 100644 Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ
