roadan closed pull request #20: PySpark fixes for YARN and Mesos URL: https://github.com/apache/incubator-amaterasu/pull/20
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala index 7c9f924..3661b48 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala @@ -71,8 +71,8 @@ class ClusterConfig extends Logging { var memoryMB: Int = 1024 def load(props: Properties): Unit = { - if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int] - if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").asInstanceOf[Int] + if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").toInt + if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").toInt } } @@ -83,8 +83,8 @@ class ClusterConfig extends Logging { var memoryMB: Int = 1024 def load(props: Properties): Unit = { - if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int] - if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int] + if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").toInt + if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").toInt } } @@ -133,9 +133,9 @@ class ClusterConfig extends Logging { def load(props: Properties): Unit = { - if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").asInstanceOf[Double] - if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").asInstanceOf[Long] - if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").asInstanceOf[Long] + if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").toDouble + if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong + if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong Tasks.load(props) } @@ -148,9 +148,9 @@ class ClusterConfig extends Logging { def load(props: Properties): Unit = { - if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").asInstanceOf[Int] - if (props.containsKey("jobs.tasks.cpus")) attempts = props.getProperty("jobs.tasks.cpus").asInstanceOf[Int] - if (props.containsKey("jobs.tasks.mem")) attempts = props.getProperty("jobs.tasks.mem").asInstanceOf[Int] + if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").toInt + if (props.containsKey("jobs.tasks.cpus")) cpus = props.getProperty("jobs.tasks.cpus").toInt + if (props.containsKey("jobs.tasks.mem")) mem = props.getProperty("jobs.tasks.mem").toInt } } @@ -209,7 +209,7 @@ class ClusterConfig extends Logging { if (props.containsKey("timeout")) timeout = props.getProperty("timeout").asInstanceOf[Double] if (props.containsKey("mode")) mode = props.getProperty("mode") if (props.containsKey("workingFolder")) workingFolder = props.getProperty("workingFolder", s"/user/$user") - + if (props.containsKey("pysparkPath")) pysparkPath = props.getProperty("pysparkPath") // TODO: rethink this Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py index 0faae2b..f3c9fc0 100755 --- a/executor/src/main/resources/spark_intp.py +++ b/executor/src/main/resources/spark_intp.py @@ -21,6 +21,7 @@ import os import sys import zipimport +sys.path.append(os.getcwd()) from runtime import AmaContext, Environment # os.chdir(os.getcwd() + '/build/resources/test/') diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala index 94b8056..79fe18a 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala @@ -16,19 +16,21 @@ */ package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark -import java.io.{File, PrintWriter, StringWriter} +import java.io.File import java.util import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.execution.actions.Notifier -import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage} +import org.apache.amaterasu.common.execution.dependencies.PythonDependencies import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment import org.apache.amaterasu.sdk.AmaterasuRunner import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession -import scala.sys.process.Process +import scala.sys.process.{Process, ProcessLogger} + + class PySparkRunner extends AmaterasuRunner with Logging { @@ -69,6 +71,15 @@ class PySparkRunner extends AmaterasuRunner with Logging { object PySparkRunner { + def collectCondaPackages(): String = { + val pkgsDirs = new File("./miniconda/pkgs") + (pkgsDirs.listFiles.filter { + file => file.getName.endsWith(".tar.bz2") + }.map { + file => s"./miniconda/pkgs/${file.getName}" + }.toBuffer ++ "dist/codegen.py").mkString(",") + } + def apply(env: Environment, jobId: String, notifier: Notifier, @@ -77,14 +88,13 @@ object PySparkRunner { pyDeps: PythonDependencies, config: ClusterConfig): PySparkRunner = { + val shellLoger = ProcessLogger( + (o: String) => println(o), + (e: String) => println(e) + ) + //TODO: can we make this less ugly? - var pysparkPython = "/usr/bin/python" - if (pyDeps != null && - pyDeps.packages.nonEmpty) { - loadPythonDependencies(pyDeps, notifier) - pysparkPython = "miniconda/bin/python" - } val result = new PySparkRunner @@ -98,87 +108,44 @@ object PySparkRunner { intpPath = s"spark_intp.py" } var pysparkPath = "" - if (env.configuration.contains("pysparkPath")) { - pysparkPath = env.configuration("pysparkPath") - } else { - pysparkPath = s"${config.spark.home}/bin/spark-submit" - } - val proc = Process(Seq(pysparkPath, intpPath, port.toString), None, - "PYTHONPATH" -> pypath, - "PYSPARK_PYTHON" -> pysparkPython, - "PYTHONHASHSEED" -> 0.toString) #> System.out + var condaPkgs = "" + if (pyDeps != null) + condaPkgs = collectCondaPackages() + var sparkCmd: Seq[String] = Seq() + config.mode match { + case "yarn" => + pysparkPath = s"spark/bin/spark-submit" + sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString) + val proc = Process(sparkCmd, None, + "PYTHONPATH" -> pypath, + "PYTHONHASHSEED" -> 0.toString) + + proc.run(shellLoger) + case "mesos" => + pysparkPath = config.pysparkPath + if (pysparkPath.endsWith("spark-submit")) { + sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString) + } + else { + sparkCmd = Seq(pysparkPath, intpPath, port.toString) + } + var pysparkPython = "/usr/bin/python" - proc.run() + if (pyDeps != null && + pyDeps.packages.nonEmpty) { + pysparkPython = "./miniconda/bin/python" + } + val proc = Process(sparkCmd, None, + "PYTHONPATH" -> pypath, + "PYSPARK_PYTHON" -> pysparkPython, + "PYTHONHASHSEED" -> 0.toString) + proc.run(shellLoger) + } result.notifier = notifier result } - /** - * This installs the required python dependencies. - * We basically need 2 packages to make pyspark work with customer's scripts: - * 1. py4j - supplied by spark, for communication between Python and Java runtimes. - * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects. - * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent - * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them. - * - * TODO - figure out if we really want to support pip directly, or if Anaconda is enough. - * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml - * @param notifier - */ - private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = { - notifier.info("loading anaconda evn") - installAnacondaOnNode() - val codegenPackage = PythonPackage("codegen", channel = Option("auto")) - installAnacondaPackage(codegenPackage) - try { - deps.packages.foreach(pack => { - pack.index.getOrElse("anaconda").toLowerCase match { - case "anaconda" => installAnacondaPackage(pack) - // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this - } - }) - } - catch { - - case rte: RuntimeException => - val sw = new StringWriter - rte.printStackTrace(new PrintWriter(sw)) - notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}") - case e: Exception => - val sw = new StringWriter - e.printStackTrace(new PrintWriter(sw)) - notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}") - } - } - - - /** - * Installs one python package using Anaconda. - * Anaconda works with multiple channels, or better called, repositories. - * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel. - * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel. - * @param pythonPackage This comes from parsing the python.yml dep file. - */ - private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = { - val channel = pythonPackage.channel.getOrElse("anaconda") - if (channel == "anaconda") { - Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") - } else { - Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") - } - } - - /** - * Installs Anaconda and then links it with the local spark that was installed on the executor. - */ - private def installAnacondaOnNode(): Unit = { - Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") - Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") - Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") - } - - } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index ff56d8c..ba7ff03 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -47,6 +47,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { ) private var conf: Option[Map[String, Any]] = _ private var executorEnv: Option[Map[String, Any]] = _ + private var clusterConfig: ClusterConfig = _ override def init(execData: ExecData, jobId: String, @@ -60,7 +61,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { (o: String) => log.info(o), (e: String) => log.error("", e) ) - + clusterConfig = config var jars = Seq.empty[String] if (execData.deps != null) { @@ -83,9 +84,15 @@ class SparkRunnersProvider extends RunnersProvider with Logging { sparkScalaRunner.initializeAmaContext(execData.env) runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner) - + var pypath = "" // TODO: get rid of hard-coded version - lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config) + config.mode match { + case "yarn" => + pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}" + case "mesos" => + pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}" + } + lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config) runners.put(pySparkRunner.getIdentifier, pySparkRunner) lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark) @@ -95,17 +102,22 @@ class SparkRunnersProvider extends RunnersProvider with Logging { private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = { val channel = pythonPackage.channel.getOrElse("anaconda") if (channel == "anaconda") { - Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger + Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger } else { - Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger + Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger } } private def installAnacondaOnNode(): Unit = { // TODO: get rid of hard-coded version - Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger - Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger - Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger + + this.clusterConfig.mode match { + case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger + case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger + } + + Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger + Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger } private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = { diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala index 0bf7337..f2c2afa 100644 --- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala @@ -152,9 +152,9 @@ object SparkRunnerHelper extends Logging { .set("spark.history.kerberos.principal", "none") .set("spark.master", master) - .set("spark.executor.instances", "1") // TODO: change this + .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1")) .set("spark.yarn.jars", s"spark/jars/*") - .set("spark.executor.memory", "1g") + .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g")) .set("spark.dynamicAllocation.enabled", "false") .set("spark.eventLog.enabled", "false") .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/") diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties index 19cb189..d402fed 100755 --- a/executor/src/test/resources/amaterasu.properties +++ b/executor/src/test/resources/amaterasu.properties @@ -6,3 +6,4 @@ mode=mesos webserver.port=8000 webserver.root=dist spark.version=2.1.1-bin-hadoop2.7 +pysparkPath = /usr/bin/python diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py index a427e92..fd8dc0e 100755 --- a/executor/src/test/resources/spark_intp.py +++ b/executor/src/test/resources/spark_intp.py @@ -31,6 +31,7 @@ zip.extractall() sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark') sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j') +sys.path.append(os.getcwd()) # py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip' # py4j_importer = zipimport.zipimporter(py4j_path) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services