Repository: spark Updated Branches: refs/heads/master dd784a882 -> 978cd5f12
[SPARK-15271][MESOS] Allow force pulling executor docker images ## What changes were proposed in this pull request? Mesos agents by default will not pull docker images which are cached locally already. In order to run Spark executors from mutable tags like `:latest` this commit introduces a Spark setting `spark.mesos.executor.docker.forcePullImage`. Setting this flag to true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous implementation and Mesos' default behaviour). ## How was this patch tested? I ran a sample application including this change on a Mesos cluster and verified the correct behaviour for both, with and without, force pulling the executor image. As expected the image is being force pulled if the flag is set. Author: Philipp Hoffmann <m...@philipphoffmann.de> Closes #13051 from philipphoffmann/force-pull-image. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/978cd5f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/978cd5f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/978cd5f1 Branch: refs/heads/master Commit: 978cd5f125eb5a410bad2e60bf8385b11cf1b978 Parents: dd784a8 Author: Philipp Hoffmann <m...@philipphoffmann.de> Authored: Mon Jul 25 20:14:47 2016 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Mon Jul 25 20:14:47 2016 +0100 ---------------------------------------------------------------------- .../cluster/mesos/MesosClusterScheduler.scala | 14 ++--- .../MesosCoarseGrainedSchedulerBackend.scala | 7 ++- .../MesosFineGrainedSchedulerBackend.scala | 7 ++- .../mesos/MesosSchedulerBackendUtil.scala | 20 ++++--- ...esosCoarseGrainedSchedulerBackendSuite.scala | 63 ++++++++++++++++++++ .../MesosFineGrainedSchedulerBackendSuite.scala | 2 + dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- docs/_config.yml | 2 +- docs/running-on-mesos.md | 12 ++++ pom.xml | 2 +- 14 files changed, 110 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 39b0f4d..1e9644d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler( .addAllResources(memResourcesToUse.asJava) offer.resources = finalResources.asJava submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => - val container = taskInfo.getContainerBuilder() - val volumes = submission.schedulerProperties - .get("spark.mesos.executor.docker.volumes") - .map(MesosSchedulerBackendUtil.parseVolumesSpec) - val portmaps = submission.schedulerProperties - .get("spark.mesos.executor.docker.portmaps") - .map(MesosSchedulerBackendUtil.parsePortMappingsSpec) - MesosSchedulerBackendUtil.addDockerInfo( - container, image, volumes = volumes, portmaps = portmaps) - taskInfo.setContainer(container.build()) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + submission.schedulerProperties.get, + taskInfo.getContainerBuilder()) } val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) queuedTasks += taskInfo.build() http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 99e6d39..52993ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .addAllResources(memResourcesToUse.asJava) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf.getOption, + taskBuilder.getContainerBuilder + ) } tasks(offer.getId) ::= taskBuilder.build() http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index e08dc3b..8d4fc9e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setData(ByteString.copyFrom(createExecArg())) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf.getOption, + executorInfo.getContainerBuilder() + ) } (executorInfo.build(), resourcesAfterMem.asJava) http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 05b2b08..aa669f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.mesos.Protos.{ContainerInfo, Volume} import org.apache.mesos.Protos.ContainerInfo.DockerInfo -import org.apache.spark.SparkConf import org.apache.spark.internal.Logging /** @@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { def addDockerInfo( container: ContainerInfo.Builder, image: String, + forcePullImage: Boolean = false, volumes: Option[List[Volume]] = None, network: Option[ContainerInfo.DockerInfo.Network] = None, portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image) + val docker = ContainerInfo.DockerInfo.newBuilder() + .setImage(image) + .setForcePullImage(forcePullImage) network.foreach(docker.setNetwork) portmaps.foreach(_.foreach(docker.addPortMappings)) @@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } /** - * Setup a docker containerizer + * Setup a docker containerizer from MesosDriverDescription scheduler properties */ def setupContainerBuilderDockerInfo( imageName: String, - conf: SparkConf, + conf: String => Option[String], builder: ContainerInfo.Builder): Unit = { - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") + val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage") + .exists(_.equals("true")) + val volumes = conf("spark.mesos.executor.docker.volumes") .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") + val portmaps = conf("spark.mesos.executor.docker.portmaps") .map(parsePortMappingsSpec) + addDockerInfo( builder, imageName, + forcePullImage = forcePullImage, volumes = volumes, portmaps = portmaps) logDebug("setupContainerDockerInfo: using docker image: " + imageName) http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index c2779d7..d3a85c6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("docker settings are reflected in created tasks") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image", + "spark.mesos.executor.docker.forcePullImage" -> "true", + "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", + "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched("o1").asScala + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val volumes = containerInfo.getVolumesList.asScala + assert(volumes.size == 1) + + val volume = volumes.head + assert(volume.getHostPath == "/host_vol") + assert(volume.getContainerPath == "/container_vol") + assert(volume.getMode == Volume.Mode.RO) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(dockerInfo.getForcePullImage) + + val portMappings = dockerInfo.getPortMappingsList.asScala + assert(portMappings.size == 1) + + val portMapping = portMappings.head + assert(portMapping.getHostPort == 8080) + assert(portMapping.getContainerPort == 80) + assert(portMapping.getProtocol == "tcp") + } + + test("force-pull-image option is disabled by default") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched("o1").asScala + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(!dockerInfo.getForcePullImage) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 41693b1..fcf39f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite val conf = new SparkConf() .set("spark.mesos.executor.docker.image", "spark/mock") + .set("spark.mesos.executor.docker.forcePullImage", "true") .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") @@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite val (execInfo, _) = backend.createExecutorInfo( Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) + assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) assert(portmaps.get(0).getContainerPort.equals(8080)) http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.2 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 5d536b7..ff15873 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -116,7 +116,7 @@ libfb303-0.9.2.jar libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.3 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index d16f42a..2b5764f 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -122,7 +122,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.4 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 2e261cb..3f53fdb 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -122,7 +122,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.6 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 67f38f4..d3a7ab8 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -130,7 +130,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/dev/deps/spark-deps-hadoop-2.7 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 0758396..05317a0 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -131,7 +131,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/docs/_config.yml ---------------------------------------------------------------------- diff --git a/docs/_config.yml b/docs/_config.yml index be3d8a2..bbb576e 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT SPARK_VERSION_SHORT: 2.1.0 SCALA_BINARY_VERSION: "2.11" SCALA_VERSION: "2.11.7" -MESOS_VERSION: 0.21.0 +MESOS_VERSION: 0.22.0 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK SPARK_GITHUB_URL: https://github.com/apache/spark http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 10dc9ce..ce888b5 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods. Requires Mesos version 0.20.1 or later. +Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image +tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the +image before running the executor. Force pulling images is only available in Mesos version 0.22 and above. + # Running Alongside Hadoop You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a @@ -335,6 +339,14 @@ See the [configuration page](configuration.html) for information on Spark config </td> </tr> <tr> + <td><code>spark.mesos.executor.docker.forcePullImage</code></td> + <td>false</td> + <td> + Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>. + By default Mesos agents will not pull images they already have cached. + </td> +</tr> +<tr> <td><code>spark.mesos.executor.docker.volumes</code></td> <td>(none)</td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/978cd5f1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d064cb5..b69292d 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ <java.version>1.7</java.version> <maven.version>3.3.9</maven.version> <sbt.project.name>spark</sbt.project.name> - <mesos.version>0.21.1</mesos.version> + <mesos.version>0.22.2</mesos.version> <mesos.classifier>shaded-protobuf</mesos.classifier> <slf4j.version>1.7.16</slf4j.version> <log4j.version>1.2.17</log4j.version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org