Repository: spark Updated Branches: refs/heads/branch-1.4 d4e31bfcd -> 226033cff
[SPARK-7373] [MESOS] Add docker support for launching drivers in mesos cluster mode. Using the existing docker support for mesos, also enabling the mesos cluster mode scheduler to launch Spark drivers in docker images as well. This also allows the executors launched by the drivers to be also in the same Docker image by passing the docker settings. Author: Timothy Chen <[email protected]> Closes #5917 from tnachen/spark_cluster_docker and squashes the following commits: 1e842f5 [Timothy Chen] Add docker support for launching drivers in mesos cluster mode. (cherry picked from commit 4eecf550aa7e4fb448baca82281bfd4e8bc4a778) Signed-off-by: Andrew Or <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/226033cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/226033cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/226033cf Branch: refs/heads/branch-1.4 Commit: 226033cfffa2f37ebaf8bc2c653f094e91ef0c9b Parents: d4e31bf Author: Timothy Chen <[email protected]> Authored: Thu May 7 12:23:16 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Thu May 7 12:23:22 2015 -0700 ---------------------------------------------------------------------- .../cluster/mesos/MesosClusterScheduler.scala | 30 +++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/226033cf/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 06f0e28..1067a7f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -370,16 +370,21 @@ private[spark] class MesosClusterScheduler( val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ") envBuilder.addVariables( Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts)) - val cmdOptions = generateCmdOption(desc) + val cmdOptions = generateCmdOption(desc).mkString(" ") + val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image") val executorUri = desc.schedulerProperties.get("spark.executor.uri") .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) val appArguments = desc.command.arguments.mkString(" ") - val cmd = if (executorUri.isDefined) { + val (executable, jar) = if (dockerDefined) { + // Application jar is automatically downloaded in the mounted sandbox by Mesos, + // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable. + ("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}") + } else if (executorUri.isDefined) { builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build()) val folderBasename = executorUri.get.split('/').last.split('.').head val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit" val cmdJar = s"../${desc.jarUrl.split("/").last}" - s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments" + (cmdExecutable, cmdJar) } else { val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home") .orElse(conf.getOption("spark.home")) @@ -389,9 +394,9 @@ private[spark] class MesosClusterScheduler( } val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath val cmdJar = desc.jarUrl.split("/").last - s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments" + (cmdExecutable, cmdJar) } - builder.setValue(cmd) + builder.setValue(s"$executable $cmdOptions $jar $appArguments") builder.setEnvironment(envBuilder.build()) builder.build() } @@ -458,9 +463,20 @@ private[spark] class MesosClusterScheduler( .setCommand(commandInfo) .addResources(cpuResource) .addResources(memResource) - .build() + submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => + val container = taskInfo.getContainerBuilder() + val volumes = submission.schedulerProperties + .get("spark.mesos.executor.docker.volumes") + .map(MesosSchedulerBackendUtil.parseVolumesSpec) + val portmaps = submission.schedulerProperties + .get("spark.mesos.executor.docker.portmaps") + .map(MesosSchedulerBackendUtil.parsePortMappingsSpec) + MesosSchedulerBackendUtil.addDockerInfo( + container, image, volumes = volumes, portmaps = portmaps) + taskInfo.setContainer(container.build()) + } val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo]) - queuedTasks += taskInfo + queuedTasks += taskInfo.build() logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + submission.submissionId) val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
