Repository: spark Updated Branches: refs/heads/master bfda53f63 -> eca58755f
[SPARK-16927][SPARK-16923] Override task properties at dispatcher. ## What changes were proposed in this pull request? - enable setting default properties for all jobs submitted through the dispatcher [SPARK-16927] - remove duplication of conf vars on cluster submitted jobs [SPARK-16923] (this is a small fix, so I'm including in the same PR) ## How was this patch tested? mesos/spark integration test suite manual testing Author: Timothy Chen <[email protected]> Closes #14511 from mgummelt/override-props. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eca58755 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eca58755 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eca58755 Branch: refs/heads/master Commit: eca58755fbbc11937b335ad953a3caff89b818e6 Parents: bfda53f Author: Timothy Chen <[email protected]> Authored: Wed Aug 10 10:11:03 2016 +0100 Committer: Sean Owen <[email protected]> Committed: Wed Aug 10 10:11:03 2016 +0100 ---------------------------------------------------------------------- .../cluster/mesos/MesosClusterScheduler.scala | 44 ++++++++++---------- docs/running-on-mesos.md | 11 +++++ 2 files changed, 33 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/eca58755/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 2189fca..bb6f6b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler( .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) } - private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { - m.updated(k, f(m.getOrElse(k, default))) - } - private def getDriverFrameworkID(desc: MesosDriverDescription): String = { s"${frameworkId}-${desc.submissionId}" } - private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { - val env = { - val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ") - val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts) - val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") + private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { + m.updated(k, f(m.getOrElse(k, default))) + } - var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( - v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" - ) + private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { + // TODO(mgummelt): Don't do this here. This should be passed as a --conf + val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( + v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" + ) - driverEnv ++ executorEnv ++ commandEnv - } + val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv val envBuilder = Environment.newBuilder() env.foreach { case (k, v) => @@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler( "--driver-cores", desc.cores.toString, "--driver-memory", s"${desc.mem}M") - val replicatedOptionsBlacklist = Set( - "spark.jars", // Avoids duplicate classes in classpath - "spark.submit.deployMode", // this would be set to `cluster`, but we need client - "spark.master" // this contains the address of the dispatcher, not master - ) - // Assume empty main class means we're running python if (!desc.command.mainClass.equals("")) { options ++= Seq("--class", desc.command.mainClass) @@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler( .mkString(",") options ++= Seq("--py-files", formattedFiles) } - desc.conf.getAll + + // --conf + val replicatedOptionsBlacklist = Set( + "spark.jars", // Avoids duplicate classes in classpath + "spark.submit.deployMode", // this would be set to `cluster`, but we need client + "spark.master" // this contains the address of the dispatcher, not master + ) + val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap + val driverConf = desc.conf.getAll .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } - .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + .toMap + (defaultConf ++ driverConf).foreach { case (key, value) => + options ++= Seq("--conf", s"$key=${shellEscape(value)}") } + options } http://git-wip-us.apache.org/repos/asf/spark/blob/eca58755/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 613da68..a6ce34c 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -467,6 +467,17 @@ See the [configuration page](configuration.html) for information on Spark config Set the Spark Mesos dispatcher webui_url for interacting with the framework. If unset it will point to Spark's internal web UI. </td> + </tr> +<tr> + <td><code>spark.mesos.dispatcher.driverDefault.[PropertyName]</code></td> + <td><code>(none)</code></td> + <td> + Set default properties for drivers submitted through the + dispatcher. For example, + spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g + results in the executors for all drivers submitted in cluster mode + to run in 32g containers. +</td> </tr> <tr> <td><code>spark.mesos.dispatcher.historyServer.url</code></td> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
