Repository: spark Updated Branches: refs/heads/master 21a7bfd5c -> b3f9dbf48
[SPARK-19606][MESOS] Support constraints in spark-dispatcher ## What changes were proposed in this pull request? A discussed in SPARK-19606, the addition of a new config property named "spark.mesos.constraints.driver" for constraining drivers running on a Mesos cluster ## How was this patch tested? Corresponding unit test added also tested locally on a Mesos cluster Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Paul Mackles <pmack...@adobe.com> Closes #19543 from pmackles/SPARK-19606. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3f9dbf4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3f9dbf4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3f9dbf4 Branch: refs/heads/master Commit: b3f9dbf48ec0938ff5c98833bb6b6855c620ef57 Parents: 21a7bfd Author: Paul Mackles <pmack...@adobe.com> Authored: Sun Nov 12 11:21:23 2017 -0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Nov 12 11:21:23 2017 -0800 ---------------------------------------------------------------------- docs/running-on-mesos.md | 17 ++++++- .../org/apache/spark/deploy/mesos/config.scala | 7 +++ .../cluster/mesos/MesosClusterScheduler.scala | 15 ++++--- .../mesos/MesosClusterSchedulerSuite.scala | 47 ++++++++++++++++++++ .../spark/scheduler/cluster/mesos/Utils.scala | 31 ++++++++----- 5 files changed, 100 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 7a443ff..19ec7c1 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -263,7 +263,10 @@ resource offers will be accepted. conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false") {% endhighlight %} -For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors. +For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will +be checked to see if they meet both these constraints and only then will be accepted to start new executors. + +To constrain where driver tasks are run, use `spark.mesos.driver.constraints` # Mesos Docker Support @@ -447,7 +450,9 @@ See the [configuration page](configuration.html) for information on Spark config <td><code>spark.mesos.constraints</code></td> <td>(none)</td> <td> - Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes. + Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. This setting + applies only to executors. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos + Attributes & Resources</a> for more information on attributes. <ul> <li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li> <li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li> @@ -458,6 +463,14 @@ See the [configuration page](configuration.html) for information on Spark config </td> </tr> <tr> + <td><code>spark.mesos.driver.constraints</code></td> + <td>(none)</td> + <td> + Same as <code>spark.mesos.constraints</code> except applied to drivers when launched through the dispatcher. By default, + all offers with sufficient resources will be accepted. + </td> +</tr> +<tr> <td><code>spark.mesos.containerizer</code></td> <td><code>docker</code></td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 821534e..d134847 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -122,4 +122,11 @@ package object config { "Example: key1:val1,key2:val2") .stringConf .createOptional + + private[spark] val DRIVER_CONSTRAINTS = + ConfigBuilder("spark.mesos.driver.constraints") + .doc("Attribute based constraints on mesos resource offers. Applied by the dispatcher " + + "when launching drivers. Default is to accept all offers with sufficient resources.") + .stringConf + .createWithDefault("") } http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index de846c8..c41283e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -556,9 +556,10 @@ private[spark] class MesosClusterScheduler( private class ResourceOffer( val offer: Offer, - var remainingResources: JList[Resource]) { + var remainingResources: JList[Resource], + var attributes: JList[Attribute]) { override def toString(): String = { - s"Offer id: ${offer.getId}, resources: ${remainingResources}" + s"Offer id: ${offer.getId}, resources: ${remainingResources}, attributes: ${attributes}" } } @@ -601,10 +602,14 @@ private[spark] class MesosClusterScheduler( for (submission <- candidates) { val driverCpu = submission.cores val driverMem = submission.mem - logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") + val driverConstraints = + parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS)) + logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " + + s"driverConstraints: $driverConstraints") val offerOption = currentOffers.find { offer => getResource(offer.remainingResources, "cpus") >= driverCpu && - getResource(offer.remainingResources, "mem") >= driverMem + getResource(offer.remainingResources, "mem") >= driverMem && + matchesAttributeRequirements(driverConstraints, toAttributeMap(offer.attributes)) } if (offerOption.isEmpty) { logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " + @@ -652,7 +657,7 @@ private[spark] class MesosClusterScheduler( val currentTime = new Date() val currentOffers = offers.asScala.map { - offer => new ResourceOffer(offer, offer.getResourcesList) + offer => new ResourceOffer(offer, offer.getResourcesList, offer.getAttributesList) }.toList stateLock.synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 77acee6..e534b9d 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -254,6 +254,53 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2") } + test("accept/decline offers with driver constraints") { + setScheduler() + + val mem = 1000 + val cpu = 1 + val s2Attributes = List(Utils.createTextAttribute("c1", "a")) + val s3Attributes = List( + Utils.createTextAttribute("c1", "a"), + Utils.createTextAttribute("c2", "b")) + val offers = List( + Utils.createOffer("o1", "s1", mem, cpu, None, 0), + Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes), + Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes)) + + def submitDriver(driverConstraints: String): Unit = { + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", mem, cpu, true, + command, + Map("spark.mesos.executor.home" -> "test", + "spark.app.name" -> "test", + config.DRIVER_CONSTRAINTS.key -> driverConstraints), + "s1", + new Date())) + assert(response.success) + } + + submitDriver("c1:x") + scheduler.resourceOffers(driver, offers.asJava) + offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue)) + + submitDriver("c1:y;c2:z") + scheduler.resourceOffers(driver, offers.asJava) + offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue)) + + submitDriver("") + scheduler.resourceOffers(driver, offers.asJava) + Utils.verifyTaskLaunched(driver, "o1") + + submitDriver("c1:a") + scheduler.resourceOffers(driver, offers.asJava) + Utils.verifyTaskLaunched(driver, "o2") + + submitDriver("c1:a;c2:b") + scheduler.resourceOffers(driver, offers.asJava) + Utils.verifyTaskLaunched(driver, "o3") + } + test("supports spark.mesos.driver.labels") { setScheduler() http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index 5636ac5..c9f4747 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -43,12 +43,13 @@ object Utils { .build() def createOffer( - offerId: String, - slaveId: String, - mem: Int, - cpus: Int, - ports: Option[(Long, Long)] = None, - gpus: Int = 0): Offer = { + offerId: String, + slaveId: String, + mem: Int, + cpus: Int, + ports: Option[(Long, Long)] = None, + gpus: Int = 0, + attributes: List[Attribute] = List.empty): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -63,7 +64,7 @@ object Utils { .setName("ports") .setType(Value.Type.RANGES) .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() - .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) + .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) } if (gpus > 0) { builder.addResourcesBuilder() @@ -73,9 +74,10 @@ object Utils { } builder.setId(createOfferId(offerId)) .setFrameworkId(FrameworkID.newBuilder() - .setValue("f1")) + .setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) .setHostname(s"host${slaveId}") + .addAllAttributes(attributes.asJava) .build() } @@ -125,7 +127,7 @@ object Utils { .getVariablesList .asScala assert(envVars - .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head assert(variableOne.getSecret.isInitialized) assert(variableOne.getSecret.getType == Secret.Type.REFERENCE) @@ -154,7 +156,7 @@ object Utils { .getVariablesList .asScala assert(envVars - .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars val variableOne = envVars.filter(_.getName == "USER").head assert(variableOne.getSecret.isInitialized) assert(variableOne.getSecret.getType == Secret.Type.VALUE) @@ -212,4 +214,13 @@ object Utils { assert(secretVolTwo.getSource.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes)) } + + def createTextAttribute(name: String, value: String): Attribute = { + Attribute.newBuilder() + .setName(name) + .setType(Value.Type.TEXT) + .setText(Value.Text.newBuilder().setValue(value)) + .build() + } } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org