Repository: spark Updated Branches: refs/heads/master 86430cc4e -> d89bfc923
[SPARK-18232][MESOS] Support CNI ## What changes were proposed in this pull request? Adds support for CNI-isolated containers ## How was this patch tested? I launched SparkPi both with and without `spark.mesos.network.name`, and verified the job completed successfully. Author: Michael Gummelt <mgumm...@mesosphere.io> Closes #15740 from mgummelt/spark-342-cni. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d89bfc92 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d89bfc92 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d89bfc92 Branch: refs/heads/master Commit: d89bfc92302424406847ac7a9cfca714e6b742fc Parents: 86430cc Author: Michael Gummelt <mgumm...@mesosphere.io> Authored: Mon Nov 14 23:46:54 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Nov 14 23:46:54 2016 -0800 ---------------------------------------------------------------------- docs/running-on-mesos.md | 27 +++-- .../cluster/mesos/MesosClusterScheduler.scala | 8 +- .../MesosCoarseGrainedSchedulerBackend.scala | 23 ++-- .../MesosFineGrainedSchedulerBackend.scala | 9 +- .../mesos/MesosSchedulerBackendUtil.scala | 120 +++++++++---------- .../mesos/MesosClusterSchedulerSuite.scala | 26 ++++ ...esosCoarseGrainedSchedulerBackendSuite.scala | 19 ++- 7 files changed, 131 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 923d8db..8d5ad12 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -368,17 +368,6 @@ See the [configuration page](configuration.html) for information on Spark config </td> </tr> <tr> - <td><code>spark.mesos.executor.docker.portmaps</code></td> - <td>(none)</td> - <td> - Set the list of incoming ports exposed by the Docker image, which was set using - <code>spark.mesos.executor.docker.image</code>. The format of this property is a comma-separated list of - mappings which take the form: - - <pre>host_port:container_port[:tcp|:udp]</pre> - </td> -</tr> -<tr> <td><code>spark.mesos.executor.home</code></td> <td>driver side <code>SPARK_HOME</code></td> <td> @@ -505,12 +494,26 @@ See the [configuration page](configuration.html) for information on Spark config Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found since this configuration is just a upper limit and not a guaranteed amount. </td> + </tr> +<tr> + <td><code>spark.mesos.network.name</code></td> + <td><code>(none)</code></td> + <td> + Attach containers to the given named network. If this job is + launched in cluster mode, also launch the driver in the given named + network. See + <a href="http://mesos.apache.org/documentation/latest/cni/">the Mesos CNI docs</a> + for more details. + </td> </tr> <tr> <td><code>spark.mesos.fetcherCache.enable</code></td> <td><code>false</code></td> <td> - If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/) + If set to `true`, all URIs (example: `spark.executor.uri`, + `spark.mesos.uris`) will be cached by the <a + href="http://mesos.apache.org/documentation/latest/fetcher/">Mesos + Fetcher Cache</a> </td> </tr> </table> http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 8db1d12..f384290 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -531,13 +531,7 @@ private[spark] class MesosClusterScheduler( .setCommand(buildDriverCommand(desc)) .addAllResources(cpuResourcesToUse.asJava) .addAllResources(memResourcesToUse.asJava) - - desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image, - desc.conf, - taskInfo.getContainerBuilder) - } - + taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf)) taskInfo.build } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 842c05e..3258b09 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -213,7 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .format(prefixEnv, runScript) + s" --driver-url $driverURL" + s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + + s" --hostname ${executorHostname(offer)}" + s" --cores $numCores" + s" --app-id $appId") } else { @@ -225,7 +225,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + s" --driver-url $driverURL" + s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + + s" --hostname ${executorHostname(offer)}" + s" --cores $numCores" + s" --app-id $appId") command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) @@ -418,16 +418,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) .setName("Task " + taskId) - taskBuilder.addAllResources(resourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - taskBuilder.getContainerBuilder - ) - } + taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) tasks(offer.getId) ::= taskBuilder.build() remainingResources(offerId) = resourcesLeft.asJava @@ -658,6 +650,15 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def numExecutors(): Int = { slaves.values.map(_.taskIDs.size).sum } + + private def executorHostname(offer: Offer): String = { + if (sc.conf.getOption("spark.mesos.network.name").isDefined) { + // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0 + "0.0.0.0" + } else { + offer.getHostname + } + } } private class Slave(val hostname: String) { http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index c1aa001..779ffb5 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -155,14 +155,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - executorInfo.getContainerBuilder() - ) - } - + executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) (executorInfo.build(), resourcesAfterMem.asJava) } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 3fe0674..a2adb22 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -17,8 +17,8 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.mesos.Protos.{ContainerInfo, Image, Volume} -import org.apache.mesos.Protos.ContainerInfo.DockerInfo +import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume} +import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -99,67 +99,67 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .toList } - /** - * Construct a DockerInfo structure and insert it into a ContainerInfo - */ - def addDockerInfo( - container: ContainerInfo.Builder, - image: String, - containerizer: String, - forcePullImage: Boolean = false, - volumes: Option[List[Volume]] = None, - portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - - containerizer match { - case "docker" => - container.setType(ContainerInfo.Type.DOCKER) - val docker = ContainerInfo.DockerInfo.newBuilder() - .setImage(image) - .setForcePullImage(forcePullImage) - // TODO (mgummelt): Remove this. Portmaps have no effect, - // as we don't support bridge networking. - portmaps.foreach(_.foreach(docker.addPortMappings)) - container.setDocker(docker) - case "mesos" => - container.setType(ContainerInfo.Type.MESOS) - val imageProto = Image.newBuilder() - .setType(Image.Type.DOCKER) - .setDocker(Image.Docker.newBuilder().setName(image)) - .setCached(!forcePullImage) - container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto)) - case _ => - throw new SparkException( - "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}") + def containerInfo(conf: SparkConf): ContainerInfo = { + val containerType = if (conf.contains("spark.mesos.executor.docker.image") && + conf.get("spark.mesos.containerizer", "docker") == "docker") { + ContainerInfo.Type.DOCKER + } else { + ContainerInfo.Type.MESOS } - volumes.foreach(_.foreach(container.addVolumes)) + val containerInfo = ContainerInfo.newBuilder() + .setType(containerType) + + conf.getOption("spark.mesos.executor.docker.image").map { image => + val forcePullImage = conf + .getOption("spark.mesos.executor.docker.forcePullImage") + .exists(_.equals("true")) + + val portMaps = conf + .getOption("spark.mesos.executor.docker.portmaps") + .map(parsePortMappingsSpec) + .getOrElse(List.empty) + + if (containerType == ContainerInfo.Type.DOCKER) { + containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps)) + } else { + containerInfo.setMesos(mesosInfo(image, forcePullImage)) + } + + val volumes = conf + .getOption("spark.mesos.executor.docker.volumes") + .map(parseVolumesSpec) + + volumes.foreach(_.foreach(containerInfo.addVolumes(_))) + } + + conf.getOption("spark.mesos.network.name").map { name => + val info = NetworkInfo.newBuilder().setName(name).build() + containerInfo.addNetworkInfos(info) + } + + containerInfo.build() } - /** - * Setup a docker containerizer from MesosDriverDescription scheduler properties - */ - def setupContainerBuilderDockerInfo( - imageName: String, - conf: SparkConf, - builder: ContainerInfo.Builder): Unit = { - val forcePullImage = conf - .getOption("spark.mesos.executor.docker.forcePullImage") - .exists(_.equals("true")) - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") - .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") - .map(parsePortMappingsSpec) - - val containerizer = conf.get("spark.mesos.containerizer", "docker") - addDockerInfo( - builder, - imageName, - containerizer, - forcePullImage = forcePullImage, - volumes = volumes, - portmaps = portmaps) - logDebug("setupContainerDockerInfo: using docker image: " + imageName) + private def dockerInfo( + image: String, + forcePullImage: Boolean, + portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = { + val dockerBuilder = ContainerInfo.DockerInfo.newBuilder() + .setImage(image) + .setForcePullImage(forcePullImage) + portMaps.foreach(dockerBuilder.addPortMappings(_)) + + dockerBuilder.build + } + + private def mesosInfo(image: String, forcePullImage: Boolean): MesosInfo = { + val imageProto = Image.newBuilder() + .setType(Image.Type.DOCKER) + .setDocker(Image.Docker.newBuilder().setName(image)) + .setCached(!forcePullImage) + ContainerInfo.MesosInfo.newBuilder() + .setImage(imageProto) + .build } } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 87d9080..74e5ce2 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -210,4 +210,30 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi (v.getName, v.getValue)).toMap assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL") } + + test("supports spark.mesos.network.name") { + setScheduler() + + val mem = 1000 + val cpu = 1 + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", mem, cpu, true, + command, + Map("spark.mesos.executor.home" -> "test", + "spark.app.name" -> "test", + "spark.mesos.network.name" -> "test-network-name"), + "s1", + new Date())) + + assert(response.success) + + val offer = Utils.createOffer("o1", "s1", mem, cpu) + scheduler.resourceOffers(driver, List(offer).asJava) + + val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") + val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList + assert(networkInfos.size == 1) + assert(networkInfos.get(0).getName == "test-network-name") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/d89bfc92/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f73638f..a674da4 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -388,9 +388,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val dockerInfo = containerInfo.getDocker - assert(dockerInfo.getImage == "some_image") - assert(dockerInfo.getForcePullImage) - val portMappings = dockerInfo.getPortMappingsList.asScala assert(portMappings.size == 1) @@ -491,6 +488,22 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!uris.asScala.head.getCache) } + test("mesos supports spark.mesos.network.name") { + setBackend(Map( + "spark.mesos.network.name" -> "test-network-name" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList + assert(networkInfos.size == 1) + assert(networkInfos.get(0).getName == "test-network-name") + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) private def verifyDeclinedOffer(driver: SchedulerDriver, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org