This is an automated email from the ASF dual-hosted git repository.

yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git

commit b53d7c0897f66d90212121cce5b670e9fd6402a1
Author: Yaniv Rodenski <[email protected]>
AuthorDate: Tue Apr 16 14:52:47 2019 +1000

    fixed the spark-env location
---
 .../runners/providers/PythonRunnerProviderBase.kt   |   2 +-
 .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip  | Bin 8304 -> 8304 bytes
 .../dist/amaterasu_python-0.2.0-incubating-rc4.zip  | Bin 6167 -> 6167 bytes
 .../runners/providers/PySparkRunnerProvider.scala   |  10 +++++-----
 .../dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes
 .../amaterasu/leader/yarn/ApplicationMaster.kt      |  13 ++++++-------
 .../amaterasu/leader/yarn/YarnNMCallbackHandler.kt  |   5 +++--
 .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip     | Bin 14898 -> 14898 bytes
 8 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
index 72242d0..d6e9f42 100644
--- 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
+++ 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
@@ -26,7 +26,7 @@ abstract class PythonRunnerProviderBase(val env: String, val 
conf: ClusterConfig
 
     private val requirementsFileName: String = "ama-requirements.txt"
     private val mandatoryPYPIPackages: Array<String> = arrayOf("requests")
-    protected val virtualPythonPath = "./amaterasu_env/bin/python"
+    protected val virtualPythonPath = "amaterasu_env/bin/python"
 
     override val runnerResources: Array<String>
         get() = arrayOf("amaterasu-sdk-${conf.version()}.zip")
diff --git 
a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
 
b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index b6ca524..69a9002 100644
Binary files 
a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
 and 
b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
 differ
diff --git 
a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
 
b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index ce67e5a..448adf5 100644
Binary files 
a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
 and 
b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
 differ
diff --git 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 407eb30..465691c 100644
--- 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -3,16 +3,16 @@ package 
org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import 
org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-import sys.process._
+import org.apache.commons.lang.StringUtils
 
 class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends 
PythonRunnerProviderBase(env, conf) {
 
   override def getCommand(jobId: String, actionData: ActionData, env: String, 
executorId: String, callbackAddress: String): String = {
-    var command = super.getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String)
+    //val command = super.getCommand(jobId: String, actionData: ActionData, 
env: String, executorId: String, callbackAddress: String)
     log.info(s"===> Cluster manager: ${conf.mode}")
-    val pythonBinPath = Seq(getVirtualPythonPath, "-c", "import sys; 
print(sys.executable)").!!.trim()
-    command + s" && /bin/bash $$SPARK_HOME/bin/load-spark-env.sh && env 
PYSPARK_PYTHON=$pythonBinPath " +
-      s" && $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
+    //command +
+     // s"$$SPARK_HOME/conf/spark-env.sh && env 
PYSPARK_PYTHON=$getVirtualPythonPath " +
+      s"$$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
   }
 
   override def getRunnerResources: Array[String] = {
diff --git 
a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
 
b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 1e47c2e..9855261 100644
Binary files 
a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
 and 
b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
 differ
diff --git 
a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
 
b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index 8c45b41..cd4bf03 100644
--- 
a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ 
b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -38,6 +38,7 @@ import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.commons.lang.exception.ExceptionUtils
 
 import org.apache.curator.framework.CuratorFramework
 import org.apache.curator.framework.CuratorFrameworkFactory
@@ -60,10 +61,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.zookeeper.CreateMode
+import java.io.*
 
-import java.io.File
-import java.io.FileInputStream
-import java.io.IOException
 import java.nio.ByteBuffer
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.ConcurrentLinkedQueue
@@ -240,7 +239,7 @@ class ApplicationMaster : KLogging(), 
AMRMClientAsync.CallbackHandler {
                         val ctx = 
Records.newRecord(ContainerLaunchContext::class.java)
                         val commands: List<String> = 
listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, 
"${actionData.id}-${container.id.containerId}", address))
 
-                        log.info("container command 
${commands.joinToString(prefix = " ", postfix = " ")}")
+                        notifier.info("container command 
${commands.joinToString(prefix = " ", postfix = " ")}")
                         ctx.commands = commands
                         ctx.tokens = allTokens()
                         ctx.localResources = 
setupContainerResources(framework, runnerProvider, actionData)
@@ -293,7 +292,7 @@ class ApplicationMaster : KLogging(), 
AMRMClientAsync.CallbackHandler {
         // getting the action specific resources
         result.putAll(runnerProvider.getActionResources(jobManager.jobId, 
actionData).map { it.removePrefix("${jobManager.jobId}/${actionData.name}/") to 
createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) })
 
-            // getting the action specific dependencies
+        // getting the action specific dependencies
         runnerProvider.getActionDependencies(jobManager.jobId, 
actionData).forEach { distributeFile(it, 
"${jobManager.jobId}/${actionData.name}/") }
         result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, 
actionData).map { File(it).name to 
createLocalResourceFromPath(Path.mergePaths(yarnJarPath, 
createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) })
 
@@ -400,7 +399,7 @@ class ApplicationMaster : KLogging(), 
AMRMClientAsync.CallbackHandler {
     }
 
     override fun onError(e: Throwable?) {
-        notifier.error("Error on AM", e!!.message!!)
+        notifier.error("Error running a container ${e!!.message!!}", 
ExceptionUtils.getStackTrace(e))
         stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
     }
 
@@ -421,7 +420,7 @@ class ApplicationMaster : KLogging(), 
AMRMClientAsync.CallbackHandler {
                 } else {
                     // TODO: Check the getDiagnostics value and see if 
appropriate
                     jobManager.actionFailed(taskId, status.diagnostics)
-                    notifier.error("", "Container $containerId Complete with 
task $taskId with Failed status code (${status.exitStatus})")
+                    notifier.error( "Container $containerId Complete with task 
$taskId with Failed status code (${status.exitStatus})", status.diagnostics)
                 }
             }
         }
diff --git 
a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
 
b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
index 7fc89e8..697bb40 100644
--- 
a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
+++ 
b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
@@ -18,6 +18,7 @@ package org.apache.amaterasu.leader.yarn
 
 import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.common.utils.ActiveNotifier
+import org.apache.commons.lang.exception.ExceptionUtils
 
 import java.nio.ByteBuffer
 
@@ -30,7 +31,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync
 class YarnNMCallbackHandler(val notifier: ActiveNotifier) : KLogging() , 
NMClientAsync.CallbackHandler {
 
     override fun onStartContainerError(containerId: ContainerId, t: Throwable) 
{
-        notifier.error("","Container ${containerId.containerId} couldn't 
start. message ${t.message}")
+        notifier.error("Error starting a container ${t.message!!}", 
ExceptionUtils.getStackTrace(t))
     }
 
     override fun onGetContainerStatusError(containerId: ContainerId, t: 
Throwable) {
@@ -46,7 +47,7 @@ class YarnNMCallbackHandler(val notifier: ActiveNotifier) : 
KLogging() , NMClien
     }
 
     override fun onStopContainerError(containerId: ContainerId, t: Throwable) {
-        notifier.error("","Container ${containerId.containerId} has thrown an 
error.  message ${t.message}")
+        notifier.error("Error running a container ${t.message!!}", 
ExceptionUtils.getStackTrace(t))
     }
 
     override fun onContainerStopped(containerId: ContainerId) {
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip 
b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 1d735fd..7ba06f4 100644
Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and 
b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ

Reply via email to