This is an automated email from the ASF dual-hosted git repository.

yaniv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-amaterasu.git

commit 9d14164317343afc3b4c84693413adc85da36b46
Author: Yaniv Rodenski <[email protected]>
AuthorDate: Sun May 5 10:14:03 2019 +1000

    mesos executing the new python support
---
 .../common/configuration/ClusterConfig.scala       |   6 +-
 .../runners/providers/BasicPythonRunnerProvider.kt |   3 +-
 .../runners/providers/PandasRunnerProvider.kt      |   3 +-
 .../runners/providers/PythonRunnerProviderBase.kt  |   5 +-
 .../dist/amaterasu_pandas-0.2.0-incubating-rc4.zip | Bin 8304 -> 8304 bytes
 .../dist/amaterasu_python-0.2.0-incubating-rc4.zip | Bin 6167 -> 6167 bytes
 frameworks/spark/dispatcher/build.gradle           |   5 -
 .../spark/dispatcher/SparkSetupProvider.kt         |  87 ++++++++
 .../runners/providers/PySparkRunnerProvider.kt}    |  31 ++-
 .../providers/SparkSubmitScalaRunnerProvider.kt    |  28 +++
 .../spark/dispatcher/SparkSetupProvider.scala      | 234 ++++++++++-----------
 .../runners/providers/PySparkRunnerProvider.scala  |  86 ++++----
 .../providers/SparkScalaRunnerProvider.scala       | 138 ++++++------
 .../providers/SparkSubmitScalaRunnerProvider.scala | 100 ++++-----
 .../amaterasu_pyspark-0.2.0-incubating-rc4.zip     | Bin 14488 -> 14488 bytes
 .../frameworls/FrameworkProvidersFactory.kt        |   1 -
 .../leader/common/utilities/DataLoader.kt          |  14 +-
 .../amaterasu/leader/yarn/ApplicationMaster.kt     |   4 +-
 .../leader/mesos/schedulers/JobScheduler.scala     |   4 +-
 sdk/build.gradle                                   |   6 +-
 .../sdk/frameworks/RunnerSetupProvider.kt          |   6 +-
 .../dist/amaterasu-sdk-0.2.0-incubating-rc4.zip    | Bin 15020 -> 15020 bytes
 22 files changed, 455 insertions(+), 306 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
 
b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index ed429de..e5c05a7 100755
--- 
a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ 
b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -66,7 +66,7 @@ class ClusterConfig extends Logging {
       if (props.containsKey("yarn.hadoop.home.dir")) hadoopHomeDir = 
props.getProperty("yarn.hadoop.home.dir")
 
       this.master.load(props)
-      this.Worker.load(props)
+      this.worker.load(props)
     }
 
     class Master {
@@ -81,7 +81,7 @@ class ClusterConfig extends Logging {
 
     val Master = new Master()
 
-    object Worker {
+    class Worker {
       var cores: Int = 1
       var memoryMB: Int = 1024
 
@@ -91,6 +91,8 @@ class ClusterConfig extends Logging {
       }
     }
 
+    val worker = new Worker()
+
 
   }
 
diff --git 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
index a78e623..2cac127 100644
--- 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
+++ 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
@@ -16,6 +16,7 @@
  */
 package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers
 
+import com.uchuhimo.konf.Config
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 
@@ -29,7 +30,7 @@ import org.apache.amaterasu.common.dataobjects.ActionData
 
     override fun getActionUserResources(jobId: String, actionData: 
ActionData): Array<String> = arrayOf()
 
-    override fun getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String {
+    override fun getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String {
         return super.getCommand(jobId, actionData, env, executorId, 
callbackAddress) + " && $virtualPythonPath ${actionData.src}"
     }
 
diff --git 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
index be32d9b..e616498 100644
--- 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
+++ 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
@@ -1,5 +1,6 @@
 package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers
 
+import com.uchuhimo.konf.Config
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 
@@ -13,7 +14,7 @@ import org.apache.amaterasu.common.dataobjects.ActionData
             return resources
         }
 
-    override fun getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String {
+    override fun getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String {
         return super.getCommand(jobId, actionData, env, executorId, 
callbackAddress) + " && $virtualPythonPath ${actionData.src}"
     }
 }
\ No newline at end of file
diff --git 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
index 8636f22..daea452 100644
--- 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
+++ 
b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
@@ -16,6 +16,7 @@
  */
 package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers
 
+import com.uchuhimo.konf.Config
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.leader.common.utilities.DataLoader
@@ -32,12 +33,12 @@ abstract class PythonRunnerProviderBase(val env: String, 
val conf: ClusterConfig
     override val runnerResources: Array<String>
         get() = arrayOf("amaterasu-sdk-${conf.version()}.zip")
 
-    override fun getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String {
+    override fun getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String {
         val pythonPath = conf.pythonPath()
         val virtualEnvCmd = "$pythonPath -m venv amaterasu_env"
         val installBaseRequirementsCmd = "$virtualPythonPath -m pip install 
--upgrade --force-reinstall -r $requirementsFileName"
         var cmd = "$virtualEnvCmd && $installBaseRequirementsCmd"
-        val execData = DataLoader.getExecutorData(env, conf)
+        val execData = DataLoader.getExecutorData(this.env, conf)
         execData.pyDeps?.filePaths?.forEach {
             path -> cmd += " && $pythonPath -m pip install -r 
${path.split('/').last()}"
         }
diff --git 
a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
 
b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index 6a7200c..c625dd1 100644
Binary files 
a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
 and 
b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
 differ
diff --git 
a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
 
b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index 7f94f5b..8433083 100644
Binary files 
a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
 and 
b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
 differ
diff --git a/frameworks/spark/dispatcher/build.gradle 
b/frameworks/spark/dispatcher/build.gradle
index e28b202..2f4d469 100644
--- a/frameworks/spark/dispatcher/build.gradle
+++ b/frameworks/spark/dispatcher/build.gradle
@@ -34,7 +34,6 @@ buildscript {
 plugins {
     id 'com.github.johnrengelman.shadow' version '1.2.4'
     id 'com.github.maiflai.scalatest' version '0.22'
-    id 'scala'
 }
 
 shadowJar {
@@ -52,10 +51,6 @@ apply plugin: 'kotlin'
 
 
 dependencies {
-    compile 'org.scala-lang:scala-library:2.11.8'
-//    compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-//    compile group: 'org.scala-lang', name: 'scala-compiler', version: 
'2.11.8'
-
     compile project(':common')
     compile project(':leader-common')
     compile project(':amaterasu-sdk')
diff --git 
a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
 
b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
new file mode 100644
index 0000000..63a4234
--- /dev/null
+++ 
b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -0,0 +1,87 @@
+package org.apache.amaterasu.frameworks.spark.dispatcher
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import 
org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.PySparkRunnerProvider
+import 
org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.SparkSubmitScalaRunnerProvider
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.leader.common.utilities.MemoryFormatParser
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
+import org.apache.commons.lang.StringUtils
+import java.io.File
+
+class SparkSetupProvider : FrameworkSetupProvider {
+
+    private lateinit var env: String
+    private lateinit var conf: ClusterConfig
+
+    private val sparkExecConfigurations: Map<String, Any> by lazy {
+        val execData = DataLoader.getExecutorData(env, conf)
+        execData.configurations["spark"].orEmpty()
+    }
+
+    private lateinit var runnerProviders: Map<String, RunnerSetupProvider>
+
+    override fun init(env: String, conf: ClusterConfig) {
+
+        this.env = env
+        this.conf = conf
+
+        runnerProviders = mapOf(
+                "jar" to SparkSubmitScalaRunnerProvider(conf),
+                "pyspark" to PySparkRunnerProvider(env, conf)
+        )
+
+    }
+
+    override val environmentVariables: Map<String, String> by lazy {
+        when (conf.mode()) {
+            "mesos" -> mapOf("SPARK_HOME" to 
"spark-${conf.webserver().sparkVersion()}", "SPARK_HOME_DOCKER" to 
"/opt/spark/")
+            "yarn" -> mapOf("SPARK_HOME" to 
StringUtils.stripStart(conf.spark().home(), "/"))
+            else -> mapOf()
+        }
+    }
+
+    override val groupResources: Array<File> by lazy {
+        when (conf.mode()) {
+            "mesos" -> 
arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), 
File("spark-runner-${conf.version()}-all.jar"), 
File("spark-runtime-${conf.version()}.jar"))
+            "yarn" -> arrayOf(File("spark-runner-${conf.version()}-all.jar"), 
File("spark-runtime-${conf.version()}.jar"), 
File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
+            else -> arrayOf()
+        }
+    }
+
+    override fun getRunnerProvider(runnerId: String): RunnerSetupProvider {
+        return runnerProviders.getValue(runnerId)
+    }
+
+    override val groupIdentifier: String = "spark"
+    override val configurationItems = arrayOf("sparkConfiguration", 
"sparkExecutor")
+
+    override val driverConfiguration: DriverConfiguration
+        get() {
+            //TODO: Spark configuration sould come for the ENV only
+            val sparkOpts = conf.spark().opts()
+            val cpu: Int = when {
+                sparkExecConfigurations.containsKey("spark.yarn.am.cores") -> 
sparkExecConfigurations["spark.yarn.am.cores"].toString().toInt()
+                sparkExecConfigurations.containsKey("spark.driver.cores") -> 
sparkExecConfigurations["spark.driver.cores"].toString().toInt()
+                sparkOpts.contains("yarn.am.cores") -> 
sparkOpts["yarn.am.cores"].toString().toInt()
+                sparkOpts.contains("driver.cores") -> 
sparkOpts["driver.cores"].toString().toInt()
+                conf.yarn().worker().cores() > 0 -> 
conf.yarn().worker().cores()
+                else -> 1
+            }
+
+            val mem: Int = when {
+                sparkExecConfigurations.containsKey("spark.yarn.am.memory") -> 
MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.yarn.am.memory"].toString())
+                sparkExecConfigurations.containsKey("spark.driver.memeory") -> 
MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.driver.memeory"].toString())
+                sparkOpts.contains("yarn.am.memory") -> 
MemoryFormatParser.extractMegabytes(sparkOpts["yarn.am.memory"].get())
+                sparkOpts.contains("driver.memory") -> 
MemoryFormatParser.extractMegabytes(sparkOpts["driver.memory"].get())
+                conf.yarn().worker().memoryMB() > 0 -> 
conf.yarn().worker().memoryMB()
+                conf.taskMem() > 0 -> conf.taskMem()
+                else -> 1024
+            }
+            return DriverConfiguration(mem, cpu)
+        }
+
+
+}
\ No newline at end of file
diff --git 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
 
b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
similarity index 54%
copy from 
frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
copy to 
frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
index a78e623..1d8e3ce 100644
--- 
a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
+++ 
b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
@@ -14,23 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.frameworks.python.dispatcher.runners.providers
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
 
+import com.uchuhimo.konf.Config
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
+import 
org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
+import org.apache.amaterasu.leader.common.configuration.Job
+
+class PySparkRunnerProvider(env: String, conf: ClusterConfig) : 
PythonRunnerProviderBase(env, conf) {
+
+    override fun getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String {
+        val command = super.getCommand(jobId, actionData , env, executorId, 
callbackAddress)
+        command + " && \$SPARK_HOME/bin/spark-submit --master 
${env[Job.master]} " +
+                "--conf spark.pyspark.python=${conf.pythonPath()} " +
+                "--conf spark.pyspark.driver.python=$virtualPythonPath " +
+                "--files \$SPARK_HOME/conf/hive-site.xml ${actionData.src}"
+
+        return command
+    }
+
+
+    override fun getActionUserResources(jobId: String, actionData: 
ActionData): Array<String> = arrayOf()
 
- class BasicPythonRunnerProvider(env: String, conf: ClusterConfig) : 
PythonRunnerProviderBase(env, conf) {
     override val runnerResources: Array<String>
         get() {
             var resources = super.runnerResources
-            resources += "amaterasu_python-${conf.version()}.zip"
+            resources += "amaterasu_pyspark-${conf.version()}.zip"
+            //log.info("PYSPARK RESOURCES ==> ${resources.toSet}")
             return resources
         }
 
-    override fun getActionUserResources(jobId: String, actionData: 
ActionData): Array<String> = arrayOf()
-
-    override fun getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String {
-        return super.getCommand(jobId, actionData, env, executorId, 
callbackAddress) + " && $virtualPythonPath ${actionData.src}"
-    }
-
+    override val hasExecutor: Boolean = false
 }
\ No newline at end of file
diff --git 
a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
 
b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
new file mode 100644
index 0000000..a07747a
--- /dev/null
+++ 
b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -0,0 +1,28 @@
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import com.uchuhimo.konf.Config
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.utils.ArtifactUtil
+import org.apache.amaterasu.leader.common.configuration.Job
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+
+class SparkSubmitScalaRunnerProvider(val conf: ClusterConfig) : 
RunnerSetupProvider() {
+
+    override fun getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String {
+        val util = ArtifactUtil(listOf(actionData.repo), jobId)
+        val classParam = if (actionData.hasArtifact) " --class 
${actionData.entryClass}" else ""
+
+        return "\$SPARK_HOME/bin/spark-submit $classParam 
${util.getLocalArtifacts(actionData.artifact).first().name} " +
+                " --master ${env[Job.master]}" +
+                " --jars spark-runtime-${conf.version()}.jar >&1"
+    }
+
+    override fun getActionUserResources(jobId: String, actionData: 
ActionData): Array<String> = arrayOf()
+
+    override fun getActionDependencies(jobId: String, actionData: ActionData): 
Array<String> = arrayOf()
+
+    override val hasExecutor: Boolean = false
+    override val runnerResources: Array<String> = arrayOf()
+
+}
\ No newline at end of file
diff --git 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index cd16b33..76d0aa8 100644
--- 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -1,117 +1,117 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.dispatcher
-
-import java.io.File
-import java.util
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._
-import org.apache.amaterasu.leader.common.utilities.{DataLoader, 
MemoryFormatParser}
-import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
-import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, 
RunnerSetupProvider}
-import org.apache.commons.lang.StringUtils
-
-import scala.collection.mutable
-import collection.JavaConversions._
-
-class SparkSetupProvider extends FrameworkSetupProvider {
-
-  private var env: String = _
-  private var conf: ClusterConfig = _
-  private lazy val sparkExecConfigurations: mutable.Map[String, Any] = 
loadSparkConfig
-
-  private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = 
mutable.Map[String, RunnerSetupProvider]()
-
-  private def loadSparkConfig: mutable.Map[String, Any] = {
-
-    val execData = DataLoader.getExecutorData(env, conf)
-    val sparkExecConfiguration = execData.getConfigurations.get("spark")
-    if (sparkExecConfiguration.isEmpty) {
-      throw new Exception(s"Spark configuration files could not be loaded for 
the environment $env")
-    }
-    collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
-
-  }
-
-  override def init(env: String, conf: ClusterConfig): Unit = {
-    this.env = env
-    this.conf = conf
-
-//    this.sparkExecConfigurations = loadSparkConfig
-    runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
-    runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf))
-    runnerProviders += ("pyspark" -> PySparkRunnerProvider(env, conf))
-
-  }
-
-  override def getGroupIdentifier: String = "spark"
-
-  override def getGroupResources: Array[File] = conf.mode match {
-    case "mesos" => Array[File](new 
File(s"spark-${conf.webserver.sparkVersion}.tgz"), new 
File(s"spark-runner-${conf.version}-all.jar"), new 
File(s"spark-runtime-${conf.version}.jar"))
-    case "yarn" => Array[File](new 
File(s"spark-runner-${conf.version}-all.jar"), new 
File(s"spark-runtime-${conf.version}.jar"), new 
File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home))
-    case _ => Array[File]()
-  }
-
-
-  override def getEnvironmentVariables: util.Map[String, String] = conf.mode 
match {
-    case "mesos" => Map[String, String]("SPARK_HOME" -> 
s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/")
-    case "yarn" => Map[String, String]("SPARK_HOME" -> 
StringUtils.stripStart(conf.spark.home, "/"))
-    case _ => Map[String, String]()
-  }
-
-  override def getDriverConfiguration: DriverConfiguration = {
-    var cpu: Int = 0
-    if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) {
-      cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt
-    } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) {
-      cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt
-    } else if (conf.spark.opts.contains("yarn.am.cores")) {
-      cpu = conf.spark.opts("yarn.am.cores").toInt
-    } else if (conf.spark.opts.contains("driver.cores")) {
-      cpu = conf.spark.opts("driver.cores").toInt
-    } else if (conf.yarn.Worker.cores > 0) {
-      cpu = conf.yarn.Worker.cores
-    } else {
-      cpu = 1
-    }
-    var mem: Int = 0
-    if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) {
-      mem = 
MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString)
-    } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) {
-      mem = 
MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString)
-    } else if (conf.spark.opts.contains("yarn.am.memory")) {
-      mem = 
MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory"))
-    } else if (conf.spark.opts.contains("driver.memory")) {
-      mem = 
MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory"))
-    } else if (conf.yarn.Worker.memoryMB > 0) {
-      mem = conf.yarn.Worker.memoryMB
-    } else if (conf.taskMem > 0) {
-      mem = conf.taskMem
-    } else {
-      mem = 1024
-    }
-
-    new DriverConfiguration(mem, cpu)
-  }
-
-  override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
-    runnerProviders(runnerId)
-  }
-
-  override def getConfigurationItems = Array("sparkConfiguration", 
"sparkExecutor")
-}
\ No newline at end of file
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.frameworks.spark.dispatcher
+//
+//import java.io.File
+//import java.util
+//
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._
+//import org.apache.amaterasu.leader.common.utilities.{DataLoader, 
MemoryFormatParser}
+//import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
+//import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, 
RunnerSetupProvider}
+//import org.apache.commons.lang.StringUtils
+//
+//import scala.collection.mutable
+//import collection.JavaConversions._
+//
+//class SparkSetupProvider extends FrameworkSetupProvider {
+//
+//  private var env: String = _
+//  private var conf: ClusterConfig = _
+//  private lazy val sparkExecConfigurations: mutable.Map[String, Any] = 
loadSparkConfig
+//
+//  private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = 
mutable.Map[String, RunnerSetupProvider]()
+//
+//  private def loadSparkConfig: mutable.Map[String, Any] = {
+//
+//    val execData = DataLoader.getExecutorData(env, conf)
+//    val sparkExecConfiguration = execData.getConfigurations.get("spark")
+//    if (sparkExecConfiguration.isEmpty) {
+//      throw new Exception(s"Spark configuration files could not be loaded 
for the environment $env")
+//    }
+//    collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
+//
+//  }
+//
+//  override def init(env: String, conf: ClusterConfig): Unit = {
+//    this.env = env
+//    this.conf = conf
+//
+////    this.sparkExecConfigurations = loadSparkConfig
+//    runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
+//    runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf))
+//    runnerProviders += ("pyspark" -> new PySparkRunnerProvider(env, conf))
+//
+//  }
+//
+//  override def getGroupIdentifier: String = "spark"
+//
+//  override def getGroupResources: Array[File] = conf.mode match {
+//    case "mesos" => Array[File](new 
File(s"spark-${conf.webserver.sparkVersion}.tgz"), new 
File(s"spark-runner-${conf.version}-all.jar"), new 
File(s"spark-runtime-${conf.version}.jar"))
+//    case "yarn" => Array[File](new 
File(s"spark-runner-${conf.version}-all.jar"), new 
File(s"spark-runtime-${conf.version}.jar"), new 
File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home))
+//    case _ => Array[File]()
+//  }
+//
+//
+//  override def getEnvironmentVariables: util.Map[String, String] = conf.mode 
match {
+//    case "mesos" => Map[String, String]("SPARK_HOME" -> 
s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/")
+//    case "yarn" => Map[String, String]("SPARK_HOME" -> 
StringUtils.stripStart(conf.spark.home, "/"))
+//    case _ => Map[String, String]()
+//  }
+//
+//  override def getDriverConfiguration: DriverConfiguration = {
+//    var cpu: Int = 0
+//    if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) {
+//      cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt
+//    } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) {
+//      cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt
+//    } else if (conf.spark.opts.contains("yarn.am.cores")) {
+//      cpu = conf.spark.opts("yarn.am.cores").toInt
+//    } else if (conf.spark.opts.contains("driver.cores")) {
+//      cpu = conf.spark.opts("driver.cores").toInt
+//    } else if (conf.yarn.Worker.cores > 0) {
+//      cpu = conf.yarn.Worker.cores
+//    } else {
+//      cpu = 1
+//    }
+//    var mem: Int = 0
+//    if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) {
+//      mem = 
MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString)
+//    } else if 
(sparkExecConfigurations.get("spark.driver.memeory").isDefined) {
+//      mem = 
MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString)
+//    } else if (conf.spark.opts.contains("yarn.am.memory")) {
+//      mem = 
MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory"))
+//    } else if (conf.spark.opts.contains("driver.memory")) {
+//      mem = 
MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory"))
+//    } else if (conf.yarn.Worker.memoryMB > 0) {
+//      mem = conf.yarn.Worker.memoryMB
+//    } else if (conf.taskMem > 0) {
+//      mem = conf.taskMem
+//    } else {
+//      mem = 1024
+//    }
+//
+//    new DriverConfiguration(mem, cpu)
+//  }
+//
+//  override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
+//    runnerProviders(runnerId)
+//  }
+//
+//  override def getConfigurationItems = Array("sparkConfiguration", 
"sparkExecutor")
+//}
\ No newline at end of file
diff --git 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index ee20009..db1e3fc 100644
--- 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -1,41 +1,45 @@
-package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ActionData
-import 
org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-
-class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends 
PythonRunnerProviderBase(env, conf) {
-
-  override def getCommand(jobId: String, actionData: ActionData, env: String, 
executorId: String, callbackAddress: String): String = {
-
-    val command = super.getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String)
-    log.info(s"===> Cluster manager: ${conf.mode}")
-    command +
-      //s" $$SPARK_HOME/conf/spark-env.sh" +
-      // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
-      //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d
-      s" && $$SPARK_HOME/bin/spark-submit --master yarn-client " +
-      s"--conf spark.pyspark.python=${conf.pythonPath} " +
-      s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " +
-      s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}"
-  }
-
-  override def getRunnerResources: Array[String] = {
-    var resources = super.getRunnerResources
-    resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip"
-    log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}")
-    resources
-  }
-
-
-  override def getHasExecutor: Boolean = false
-
-  override def getActionUserResources(jobId: String, actionData: ActionData): 
Array[String] = Array[String]()
-}
-
-object PySparkRunnerProvider {
-  def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = {
-    val result = new PySparkRunnerProvider(env, conf)
-    result
-  }
-}
\ No newline at end of file
+//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+//
+//import com.uchuhimo.konf.Config
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.ActionData
+//import 
org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
+//import org.apache.amaterasu.leader.common.configuration.Job
+//
+//class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) 
extends PythonRunnerProviderBase(env, conf) {
+//
+//
+//
+//  override def getRunnerResources: Array[String] = {
+//    var resources = super.getRunnerResources
+//    resources = resources :+ s"amaterasu_pyspark-${conf.version}.zip"
+//    log.info(s"PYSPARK RESOURCES ==> ${resources.toSet}")
+//    resources
+//  }
+//
+//
+//  override def getHasExecutor: Boolean = false
+//
+//  override def getActionUserResources(jobId: String, actionData: 
ActionData): Array[String] = Array[String]()
+//
+//  override def getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String = {
+//
+//    val command = super.getCommand(jobId: String, actionData: ActionData, 
env: String, executorId: String, callbackAddress: String)
+//        log.info(s"===> Cluster manager: ${conf.mode}")
+//        command +
+//          //s" $$SPARK_HOME/conf/spark-env.sh" +
+//          // s" && env PYSPARK_PYTHON=$getVirtualPythonPath" +
+//          //s" env PYSPARK_DRIVER_PYTHON=$getVirtualPythonPath" + d
+//          s" && $$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " +
+//          s"--conf spark.pyspark.python=${conf.pythonPath} " +
+//          s"--conf spark.pyspark.driver.python=$getVirtualPythonPath " +
+//          s"--files $$SPARK_HOME/conf/hive-site.xml ${actionData.getSrc}"
+//  }
+//}
+//
+//object PySparkRunnerProvider {
+//  def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = {
+//    val result = new PySparkRunnerProvider(env, conf)
+//    result
+//  }
+//}
\ No newline at end of file
diff --git 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
index 6cd63f5..40f9194 100644
--- 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++ 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
@@ -1,69 +1,69 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-
-import java.net.URLEncoder
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.yarn.api.ApplicationConstants
-
-class SparkScalaRunnerProvider extends RunnerSetupProvider {
-
-  private var conf: ClusterConfig = _
-  private val libPath = System.getProperty("java.library.path")
-
-  override def getCommand(jobId: String, actionData: ActionData, env: String, 
executorId: String, callbackAddress: String): String = conf.mode match {
-    case "mesos" =>
-      s"env AMA_NODE=${sys.env("AMA_NODE")} env 
MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env 
SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz
 " +
-      s"java -cp 
executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/*
 " +
-      s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
-      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor 
$jobId ${conf.master} ${actionData.getName}".stripMargin
-    case "yarn" =>
-      s"/bin/bash 
${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " +
-      s"java -cp 
${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/
 " +
-      "-Xmx2G " +
-      "-Dscala.usejavacp=true " +
-      "-Dhdp.version=2.6.5.0-292 " +
-      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
-      s"'$jobId' '${conf.master}' '${actionData.getName}' 
'${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' 
'${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' 
'$executorId' '$callbackAddress' " +
-      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
-      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
-    case _ => ""
-  }
-
-  override def getRunnerResources: Array[String] =
-    Array[String]()
-
-  override def getActionDependencies(jobId: String, actionData: ActionData): 
Array[String] =
-    Array[String]()
-
-  override def getHasExecutor: Boolean = true
-
-  override def getActionUserResources(jobId: String, actionData: ActionData): 
Array[String] = Array[String]()
-}
-
-object SparkScalaRunnerProvider {
-  def apply(conf: ClusterConfig): SparkScalaRunnerProvider = {
-    val result = new SparkScalaRunnerProvider
-    result.conf = conf
-    result
-  }
-}
\ No newline at end of file
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+//
+//import java.net.URLEncoder
+//
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.ActionData
+//import org.apache.amaterasu.leader.common.utilities.DataLoader
+//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+//import org.apache.commons.lang.StringUtils
+//import org.apache.hadoop.yarn.api.ApplicationConstants
+//
+//class SparkScalaRunnerProvider extends RunnerSetupProvider {
+//
+//  private var conf: ClusterConfig = _
+//  private val libPath = System.getProperty("java.library.path")
+//
+//  override def getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String = conf.mode match {
+//    case "mesos" =>
+//      s"env AMA_NODE=${sys.env("AMA_NODE")} env 
MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath} env 
SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.webserver.Port}/dist/spark-${conf.webserver.sparkVersion}.tgz
 " +
+//      s"java -cp 
executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/*
 " +
+//      s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
+//      s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor 
$jobId ${conf.master} ${actionData.getName}".stripMargin
+//    case "yarn" =>
+//      s"/bin/bash 
${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " +
+//      s"java -cp 
${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/
 " +
+//      "-Xmx2G " +
+//      "-Dscala.usejavacp=true " +
+//      "-Dhdp.version=2.6.5.0-292 " +
+//      "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher 
" +
+//      s"'$jobId' '${conf.master}' '${actionData.getName}' 
'${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' 
'${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' 
'$executorId' '$callbackAddress' " +
+//      s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
+//      s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
+//    case _ => ""
+//  }
+//
+//  override def getRunnerResources: Array[String] =
+//    Array[String]()
+//
+//  override def getActionDependencies(jobId: String, actionData: ActionData): 
Array[String] =
+//    Array[String]()
+//
+//  override def getHasExecutor: Boolean = true
+//
+//  override def getActionUserResources(jobId: String, actionData: 
ActionData): Array[String] = Array[String]()
+//}
+//
+//object SparkScalaRunnerProvider {
+//  def apply(conf: ClusterConfig): SparkScalaRunnerProvider = {
+//    val result = new SparkScalaRunnerProvider
+//    result.conf = conf
+//    result
+//  }
+//}
\ No newline at end of file
diff --git 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
index 8622800..b8e9677 100644
--- 
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
+++ 
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
@@ -1,50 +1,50 @@
-package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-
-import java.io.File
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.utils.ArtifactUtil
-import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
-
-import scala.collection.JavaConverters._
-
-class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider {
-
-  private var conf: ClusterConfig = _
-  val jarFile = new 
File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-  val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
-  val amalocation = new File(s"${new File(jarFile.getParent).getParent}")
-
-  override def getCommand(jobId: String, actionData: ActionData, env: String, 
executorId: String, callbackAddress: String): String = {
-
-    val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
-    val classParam = if (actionData.getHasArtifact) s" --class 
${actionData.entryClass}" else ""
-    s"$$SPARK_HOME/bin/spark-submit $classParam 
${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode 
client --jars spark-runtime-${conf.version}.jar >&1"
-
-  }
-
-  override def getRunnerResources: Array[String] =
-    Array[String]()
-
-  override def getActionUserResources(jobId: String, actionData: ActionData): 
Array[String] =
-    Array[String]()
-
-
-  override def getActionDependencies(jobId: String, actionData: ActionData): 
Array[String] =
-    Array[String]()
-
-
-  override def getHasExecutor: Boolean = false
-
-
-}
-
-object SparkSubmitScalaRunnerProvider {
-  def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = {
-    val result = new SparkSubmitScalaRunnerProvider
-
-    result.conf = conf
-    result
-  }
-}
\ No newline at end of file
+//package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+//
+//import java.io.File
+//
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.ActionData
+//import org.apache.amaterasu.common.utils.ArtifactUtil
+//import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+//
+//import scala.collection.JavaConverters._
+//
+//class SparkSubmitScalaRunnerProvider extends RunnerSetupProvider {
+//
+//  private var conf: ClusterConfig = _
+//  val jarFile = new 
File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+//  val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
+//  val amalocation = new File(s"${new File(jarFile.getParent).getParent}")
+//
+//  override def getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String = {
+//
+//    val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
+//    val classParam = if (actionData.getHasArtifact) s" --class 
${actionData.entryClass}" else ""
+//    s"$$SPARK_HOME/bin/spark-submit $classParam 
${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode 
client --jars spark-runtime-${conf.version}.jar >&1"
+//
+//  }
+//
+//  override def getRunnerResources: Array[String] =
+//    Array[String]()
+//
+//  override def getActionUserResources(jobId: String, actionData: 
ActionData): Array[String] =
+//    Array[String]()
+//
+//
+//  override def getActionDependencies(jobId: String, actionData: ActionData): 
Array[String] =
+//    Array[String]()
+//
+//
+//  override def getHasExecutor: Boolean = false
+//
+//
+//}
+//
+//object SparkSubmitScalaRunnerProvider {
+//  def apply(conf: ClusterConfig): SparkSubmitScalaRunnerProvider = {
+//    val result = new SparkSubmitScalaRunnerProvider
+//
+//    result.conf = conf
+//    result
+//  }
+//}
\ No newline at end of file
diff --git 
a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
 
b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index fa4e0eb..8d56b70 100644
Binary files 
a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
 and 
b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
 differ
diff --git 
a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
 
b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
index cf3e10d..78e06fa 100644
--- 
a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
+++ 
b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
@@ -32,7 +32,6 @@ class FrameworkProvidersFactory(val env: String, val config: 
ClusterConfig) : KL
 
         providers = runnerTypes.map  {
 
-
             val provider = it.newInstance()
 
             provider.init(env, config)
diff --git 
a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
 
b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
index 73c66fd..fa114b0 100644
--- 
a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
+++ 
b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
@@ -101,9 +101,19 @@ object DataLoader : KLogging() {
     fun getExecutorData(env: String, clusterConf: ClusterConfig): ExecData {
 
         // loading the job configuration
-        val envValue = File("repo/env/$env/job.yml").readText() //TODO: change 
this to YAML
-        val envData = ymlMapper.readValue<Environment>(envValue)
+        var envFile = File("repo/env/$env/job.yml")
+        val envValue = if (envFile.exists()) {
+            envFile.readText()
+        } else {
+            envFile = File("repo/env/$env/job.yaml")
+            if (envFile.exists()) {
+                envFile.readText()
+            } else {
+                ""
+            }
+        }
 
+        val envData = ymlMapper.readValue<Environment>(envValue)
         // loading all additional configurations
         val files = File("repo/env/$env/").listFiles().filter { it.isFile 
}.filter { it.name != "job.yml" }
         val config = files.map { yamlToMap(it) }.toMap()
diff --git 
a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
 
b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index 861ca3f..075cfec 100644
--- 
a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ 
b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -237,7 +237,9 @@ class ApplicationMaster : KLogging(), 
AMRMClientAsync.CallbackHandler {
                         val framework = 
frameworkFactory.getFramework(actionData.groupId)
                         val runnerProvider = 
framework.getRunnerProvider(actionData.typeId)
                         val ctx = 
Records.newRecord(ContainerLaunchContext::class.java)
-                        val commands: List<String> = 
listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, 
"${actionData.id}-${container.id.containerId}", address))
+
+                        val envConf = 
configManager.getActionConfiguration(actionData.name, actionData.config)
+                        val commands: List<String> = 
listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, 
"${actionData.id}-${container.id.containerId}", address))
 
                         notifier.info("container command 
${commands.joinToString(prefix = " ", postfix = " ")}")
                         ctx.commands = commands
diff --git 
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
 
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 19cb831..293b878 100755
--- 
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ 
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -168,6 +168,7 @@ class JobScheduler extends AmaterasuScheduler {
             val envYaml = 
configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
             writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, 
"env.yaml")
 
+            val envConf = 
configManager.getActionConfiguration(actionData.getName, actionData.getConfig)
             val dataStores = DataLoader.getTaskData(actionData, env).getExports
             val writer = new StringWriter()
             yamlMapper.writeValue(writer, dataStores)
@@ -190,6 +191,7 @@ class JobScheduler extends AmaterasuScheduler {
             log.info(s">>>> Framework: ${actionData.getGroupId}")
             val frameworkProvider = 
frameworkFactory.providers(actionData.getGroupId)
             log.info(s">>>> Runner: ${actionData.getTypeId}")
+
             val runnerProvider = 
frameworkProvider.getRunnerProvider(actionData.getTypeId)
 
             // searching for an executor that already exist on the slave, if 
non exist
@@ -207,7 +209,7 @@ class JobScheduler extends AmaterasuScheduler {
             //                          if(!actionData.getSrc.isEmpty){
             //                            
copy(get(s"repo/src/${actionData.getSrc}"), 
get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), 
REPLACE_EXISTING)
             //                          }
-            val commandStr = runnerProvider.getCommand(jobManager.getJobId, 
actionData, env, executorId, "")
+            val commandStr = runnerProvider.getCommand(jobManager.getJobId, 
actionData, envConf, executorId, "")
             log.info(s"===> Command: $commandStr")
             val command = CommandInfo
               .newBuilder
diff --git a/sdk/build.gradle b/sdk/build.gradle
index f299963..9335188 100644
--- a/sdk/build.gradle
+++ b/sdk/build.gradle
@@ -67,7 +67,10 @@ dependencies {
     testCompile group: 'junit', name: 'junit', version: '4.11'
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
-
+    compile('com.uchuhimo:konf:0.11') {
+        exclude group: 'org.eclipse.jgit'
+    }
+    
     testCompile 'org.jetbrains.spek:spek-api:1.1.5'
     testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
     testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
@@ -75,6 +78,7 @@ dependencies {
     // spek requires kotlin-reflect, can be omitted if already in the classpath
     testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
 
+    
     testImplementation 'org.junit.platform:junit-platform-runner:1.0.0'
     testImplementation 'org.junit.platform:junit-platform-launcher:1.0.0'
     testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.0'
diff --git 
a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
 
b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index 499ce10..0c2baf1 100644
--- 
a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ 
b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@ -19,16 +19,16 @@ package org.apache.amaterasu.sdk.frameworks
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.utils.ArtifactUtil
 import org.apache.amaterasu.common.utils.FileUtil
-import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.common.logging.Logging
+import com.uchuhimo.konf.Config
 
 abstract class RunnerSetupProvider : Logging() {
 
-    private val actionFiles = arrayOf("env.yaml", "runtime.yaml", 
"datastores.yaml", "datasets.yaml")
+    private val actionFiles = arrayOf("env.yaml", "runtime.yaml", 
"datasets.yaml")
 
     abstract val runnerResources: Array<String>
 
-    abstract fun getCommand(jobId: String, actionData: ActionData, env: 
String, executorId: String, callbackAddress: String): String
+    abstract fun getCommand(jobId: String, actionData: ActionData, env: 
Config, executorId: String, callbackAddress: String): String
 
     protected fun getDownloadableActionSrcPath(jobId: String, actionData: 
ActionData): String {
         return "$jobId/${actionData.name}/${actionData.src}"
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip 
b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index b2d76c0..2b0e997 100644
Binary files a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip and 
b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip differ

Reply via email to