This is an automated email from the ASF dual-hosted git repository. yaniv pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git
commit b53d7c0897f66d90212121cce5b670e9fd6402a1 Author: Yaniv Rodenski <[email protected]> AuthorDate: Tue Apr 16 14:52:47 2019 +1000 fixed the spark-env location --- .../runners/providers/PythonRunnerProviderBase.kt | 2 +- .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes .../runners/providers/PySparkRunnerProvider.scala | 10 +++++----- .../dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip | Bin 14488 -> 14488 bytes .../amaterasu/leader/yarn/ApplicationMaster.kt | 13 ++++++------- .../amaterasu/leader/yarn/YarnNMCallbackHandler.kt | 5 +++-- .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip | Bin 14898 -> 14898 bytes 8 files changed, 15 insertions(+), 15 deletions(-) diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt index 72242d0..d6e9f42 100644 --- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt +++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt @@ -26,7 +26,7 @@ abstract class PythonRunnerProviderBase(val env: String, val conf: ClusterConfig private val requirementsFileName: String = "ama-requirements.txt" private val mandatoryPYPIPackages: Array<String> = arrayOf("requests") - protected val virtualPythonPath = "./amaterasu_env/bin/python" + protected val virtualPythonPath = "amaterasu_env/bin/python" override val runnerResources: Array<String> get() = arrayOf("amaterasu-sdk-${conf.version()}.zip") diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip index b6ca524..69a9002 100644 Binary files a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip and b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip differ diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip index ce67e5a..448adf5 100644 Binary files a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip and b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip differ diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala index 407eb30..465691c 100644 --- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala +++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala @@ -3,16 +3,16 @@ package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase -import sys.process._ +import org.apache.commons.lang.StringUtils class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) { override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = { - var command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) + //val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String) log.info(s"===> Cluster manager: ${conf.mode}") - val pythonBinPath = Seq(getVirtualPythonPath, "-c", "import sys; print(sys.executable)").!!.trim() - command + s" && /bin/bash $$SPARK_HOME/bin/load-spark-env.sh && env PYSPARK_PYTHON=$pythonBinPath " + - s" && $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}" + //command + + // s"$$SPARK_HOME/conf/spark-env.sh && env PYSPARK_PYTHON=$getVirtualPythonPath " + + s"$$SPARK_HOME/bin/spark-submit ${actionData.getSrc}" } override def getRunnerResources: Array[String] = { diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip index 1e47c2e..9855261 100644 Binary files a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip and b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip differ diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt index 8c45b41..cd4bf03 100644 --- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt +++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt @@ -38,6 +38,7 @@ import org.apache.amaterasu.leader.common.utilities.DataLoader import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider +import org.apache.commons.lang.exception.ExceptionUtils import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFrameworkFactory @@ -60,10 +61,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.util.Records import org.apache.zookeeper.CreateMode +import java.io.* -import java.io.File -import java.io.FileInputStream -import java.io.IOException import java.nio.ByteBuffer import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue @@ -240,7 +239,7 @@ class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler { val ctx = Records.newRecord(ContainerLaunchContext::class.java) val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, "${actionData.id}-${container.id.containerId}", address)) - log.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}") + notifier.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}") ctx.commands = commands ctx.tokens = allTokens() ctx.localResources = setupContainerResources(framework, runnerProvider, actionData) @@ -293,7 +292,7 @@ class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler { // getting the action specific resources result.putAll(runnerProvider.getActionResources(jobManager.jobId, actionData).map { it.removePrefix("${jobManager.jobId}/${actionData.name}/") to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }) - // getting the action specific dependencies + // getting the action specific dependencies runnerProvider.getActionDependencies(jobManager.jobId, actionData).forEach { distributeFile(it, "${jobManager.jobId}/${actionData.name}/") } result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, actionData).map { File(it).name to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) }) @@ -400,7 +399,7 @@ class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler { } override fun onError(e: Throwable?) { - notifier.error("Error on AM", e!!.message!!) + notifier.error("Error running a container ${e!!.message!!}", ExceptionUtils.getStackTrace(e)) stopApplication(FinalApplicationStatus.FAILED, "Error on AM") } @@ -421,7 +420,7 @@ class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler { } else { // TODO: Check the getDiagnostics value and see if appropriate jobManager.actionFailed(taskId, status.diagnostics) - notifier.error("", "Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})") + notifier.error( "Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})", status.diagnostics) } } } diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt index 7fc89e8..697bb40 100644 --- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt +++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt @@ -18,6 +18,7 @@ package org.apache.amaterasu.leader.yarn import org.apache.amaterasu.common.logging.KLogging import org.apache.amaterasu.common.utils.ActiveNotifier +import org.apache.commons.lang.exception.ExceptionUtils import java.nio.ByteBuffer @@ -30,7 +31,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync class YarnNMCallbackHandler(val notifier: ActiveNotifier) : KLogging() , NMClientAsync.CallbackHandler { override fun onStartContainerError(containerId: ContainerId, t: Throwable) { - notifier.error("","Container ${containerId.containerId} couldn't start. message ${t.message}") + notifier.error("Error starting a container ${t.message!!}", ExceptionUtils.getStackTrace(t)) } override fun onGetContainerStatusError(containerId: ContainerId, t: Throwable) { @@ -46,7 +47,7 @@ class YarnNMCallbackHandler(val notifier: ActiveNotifier) : KLogging() , NMClien } override fun onStopContainerError(containerId: ContainerId, t: Throwable) { - notifier.error("","Container ${containerId.containerId} has thrown an error. message ${t.message}") + notifier.error("Error running a container ${t.message!!}", ExceptionUtils.getStackTrace(t)) } override fun onContainerStopped(containerId: ContainerId) { diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip index 1d735fd..7ba06f4 100644 Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ
