This is an automated email from the ASF dual-hosted git repository. yaniv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit a28eea9b69dd3085b5a87aaafdc597806ec4dcb0 Author: Yaniv Rodenski <[email protected]> AuthorDate: Sun May 5 10:49:23 2019 +1000 mesos notifications added --- .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes .../spark/dispatcher/SparkSetupProvider.kt | 16 +++ .../providers/SparkSubmitScalaRunnerProvider.kt | 16 +++ .../spark/dispatcher/SparkSetupProvider.scala | 117 --------------------- .../runners/providers/PySparkRunnerProvider.scala | 45 -------- .../providers/SparkScalaRunnerProvider.scala | 69 ------------ .../providers/SparkSubmitScalaRunnerProvider.scala | 50 --------- .../amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes .../leader/mesos/schedulers/JobScheduler.scala | 24 ++++- .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip | Bin 15020 -> 15020 bytes 11 files changed, 52 insertions(+), 285 deletions(-) diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip index c625dd1..3534e3d 100644 Binary files a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip and b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip differ diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip index 8433083..379f066 100644 Binary files a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip and b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip differ diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt index 63a4234..007dcf0 100644 --- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt +++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.amaterasu.frameworks.spark.dispatcher import org.apache.amaterasu.common.configuration.ClusterConfig diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt index a07747a..f13e7c9 100644 --- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt +++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers import com.uchuhimo.konf.Config diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala deleted file mode 100644 index 76d0aa8..0000000 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala +++ /dev/null @@ -1,117 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package org.apache.amaterasu.frameworks.spark.dispatcher -// -//import java.io.File -//import java.util -// -//import org.apache.amaterasu.common.configuration.ClusterConfig -//import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._ -//import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser} -//import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration -//import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider} -//import org.apache.commons.lang.StringUtils -// -//import scala.collection.mutable -//import collection.JavaConversions._ -// -//class SparkSetupProvider extends FrameworkSetupProvider { -// -// private var env: String = _ -// private var conf: ClusterConfig = _ -// private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig -// -// private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]() -// -// private def loadSparkConfig: mutable.Map[String, Any] = { -// -// val execData = DataLoader.getExecutorData(env, conf) -// val sparkExecConfiguration = execData.getConfigurations.get("spark") -// if (sparkExecConfiguration.isEmpty) { -// throw new Exception(s"Spark configuration files could not be loaded for the environment $env") -// } -// collection.mutable.Map(sparkExecConfiguration.toSeq: _*) -// -// } -// -// override def init(env: String, conf: ClusterConfig): Unit = { -// this.env = env -// this.conf = conf -// -//// this.sparkExecConfigurations = loadSparkConfig -// runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf)) -// runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf)) -// runnerProviders += ("pyspark" -> new PySparkRunnerProvider(env, conf)) -// -// } -// -// override def getGroupIdentifier: String = "spark" -// -// override def getGroupResources: Array[File] = conf.mode match { -// case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar")) -// case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home)) -// case _ => Array[File]() -// } -// -// -// override def getEnvironmentVariables: util.Map[String, String] = conf.mode match { -// case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/") -// case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/")) -// case _ => Map[String, String]() -// } -// -// override def getDriverConfiguration: DriverConfiguration = { -// var cpu: Int = 0 -// if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) { -// cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt -// } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) { -// cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt -// } else if (conf.spark.opts.contains("yarn.am.cores")) { -// cpu = conf.spark.opts("yarn.am.cores").toInt -// } else if (conf.spark.opts.contains("driver.cores")) { -// cpu = conf.spark.opts("driver.cores").toInt -// } else if (conf.yarn.Worker.cores > 0) { -// cpu = conf.yarn.Worker.cores -// } else { -// cpu = 1 -// } -// var mem: Int = 0 -// if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) { -// mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString) -// } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) { -// mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString) -// } else if (conf.spark.opts.contains("yarn.am.memory")) { -// mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory")) -// } else if (conf.spark.opts.contains("driver.memory")) { -// mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory")) -// } else if (conf.yarn.Worker.memoryMB > 0) { -// mem = conf.yarn.Worker.memoryMB -// } else if (conf.taskMem > 0) { -// mem = conf.taskMem -// } else { -// mem = 1024 -// } -// -// new DriverConfiguration(mem, cpu) -// } -// -// override def getRunnerProvider(runnerId: String): RunnerSetupProvider = { -// runnerProviders(runnerId) -// } -// -// override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor") -//} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala deleted file mode 100644 index db1e3fc..0000000 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ /dev/null @@ -1,45 +0,0 @@ -//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers -// -//import com.uchuhimo.konf.Config -//import org.apache.amaterasu.common.configuration.ClusterConfig -//import org.apache.amaterasu.common.dataobjects.ActionData -//import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase -//import org.apache.amaterasu.leader.common.configuration.Job -// -//class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) { -// -// -// -// override def getRunnerResources: Array[String] = { -// var resources = super.getRunnerResources -// resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip" -// log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}") -// resources -// } -// -// -// override def getHasExecutor: Boolean = false -// -// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() -// -// override def getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String = { -// -// val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) -// log.info(s"===> Cluster manager: ${conf.mode}") -// command + -// //s" $$SPARK_HOME/conf/spark-env.sh" + -// // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" + -// //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d -// s" && $$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " + -// s"--conf spark.pyspark.python=${conf.pythonPath} " + -// s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " + -// s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}" -// } -//} -// -//object PySparkRunnerProvider { -// def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = { -// val result = new PySparkRunnerProvider(env, conf) -// result -// } -//} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala deleted file mode 100644 index 40f9194..0000000 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala +++ /dev/null @@ -1,69 +0,0 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers -// -//import java.net.URLEncoder -// -//import org.apache.amaterasu.common.configuration.ClusterConfig -//import org.apache.amaterasu.common.dataobjects.ActionData -//import org.apache.amaterasu.leader.common.utilities.DataLoader -//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider -//import org.apache.commons.lang.StringUtils -//import org.apache.hadoop.yarn.api.ApplicationConstants -// -//class SparkScalaRunnerProvider extends RunnerSetupProvider { -// -// private var conf: ClusterConfig = _ -// private val libPath = System.getProperty("java.library.path") -// -// override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = conf.mode match { -// case "mesos" => -// s"env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz " + -// s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/* " + -// s"-Dscala.usejavacp=true -Djava.library.path=$libPath " + -// s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin -// case "yarn" => -// s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " + -// s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " + -// "-Xmx2G " + -// "-Dscala.usejavacp=true " + -// "-Dhdp.version=2.6.5.0-292 " + -// "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " + -// s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " + -// s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " + -// s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr " -// case _ => "" -// } -// -// override def getRunnerResources: Array[String] = -// Array[String]() -// -// override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = -// Array[String]() -// -// override def getHasExecutor: Boolean = true -// -// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]() -//} -// -//object SparkScalaRunnerProvider { -// def apply(conf: ClusterConfig): SparkScalaRunnerProvider = { -// val result = new SparkScalaRunnerProvider -// result.conf = conf -// result -// } -//} \ No newline at end of file diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala deleted file mode 100644 index b8e9677..0000000 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala +++ /dev/null @@ -1,50 +0,0 @@ -//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers -// -//import java.io.File -// -//import org.apache.amaterasu.common.configuration.ClusterConfig -//import org.apache.amaterasu.common.dataobjects.ActionData -//import org.apache.amaterasu.common.utils.ArtifactUtil -//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider -// -//import scala.collection.JavaConverters._ -// -//class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider { -// -// private var conf: ClusterConfig = _ -// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath) -// val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist") -// val amalocation = new File(s"${new File(jarFile.getParent).getParent}") -// -// override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = { -// -// val util = new ArtifactUtil(List(actionData.repo).asJava, jobId) -// val classParam = if (actionData.getHasArtifact) s" --class ${actionData.entryClass}" else "" -// s"$$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1" -// -// } -// -// override def getRunnerResources: Array[String] = -// Array[String]() -// -// override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = -// Array[String]() -// -// -// override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = -// Array[String]() -// -// -// override def getHasExecutor: Boolean = false -// -// -//} -// -//object SparkSubmitScalaRunnerProvider { -// def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = { -// val result = new SparkSubmitScalaRunnerProvider -// -// result.conf = conf -// result -// } -//} \ No newline at end of file diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip index 8d56b70..7c53be8 100644 Binary files a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip and b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip differ diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index 293b878..f6d09f2 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -85,6 +85,7 @@ class JobScheduler extends AmaterasuScheduler { private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala private val lock = new ReentrantLock() private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala + private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala private val mapper = new ObjectMapper() mapper.registerModule(DefaultScalaModule) @@ -118,14 +119,26 @@ class JobScheduler extends AmaterasuScheduler { def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = { + val actionName = taskIdsToActions(status.getTaskId) status.getState match { case TaskState.TASK_STARTING => log.info("Task starting ...") - case TaskState.TASK_RUNNING => jobManager.actionStarted(status.getTaskId.getValue) - case TaskState.TASK_FINISHED => jobManager.actionComplete(status.getTaskId.getValue) + case TaskState.TASK_RUNNING => { + jobManager.actionStarted(status.getTaskId.getValue) + printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution)) + + } + case TaskState.TASK_FINISHED => { + jobManager.actionComplete(status.getTaskId.getValue) + printNotification(new Notification("",s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution)) + } case TaskState.TASK_FAILED | TaskState.TASK_KILLED | TaskState.TASK_ERROR | - TaskState.TASK_LOST => jobManager.actionFailed(status.getTaskId.getValue, status.getMessage) //TODO: revisit this + TaskState.TASK_LOST => { + jobManager.actionFailed(status.getTaskId.getValue, status.getMessage) + printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData}", NotificationType.Error, NotificationLevel.Execution)) + + } case _ => log.warn("WTF? just got unexpected task state: " + status.getState) } @@ -164,6 +177,7 @@ class JobScheduler extends AmaterasuScheduler { val actionData = jobManager.getNextActionData if (actionData != null) { val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build() + taskIdsToActions.put(taskId, actionData.getName) // setting up the configuration files for the container val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig) writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml") @@ -210,7 +224,8 @@ class JobScheduler extends AmaterasuScheduler { // copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING) // } val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "") - log.info(s"===> Command: $commandStr") + printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution)) + val command = CommandInfo .newBuilder .setValue(commandStr) @@ -335,6 +350,7 @@ class JobScheduler extends AmaterasuScheduler { //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava) } + printNotification(new Notification("", s"requesting container fo ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution)) driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava) } diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip index 2b0e997..8abd578 100644 Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ
