Repository: spark
Updated Branches:
  refs/heads/master 21a7bfd5c -> b3f9dbf48


[SPARK-19606][MESOS] Support constraints in spark-dispatcher

## What changes were proposed in this pull request?

A discussed in SPARK-19606, the addition of a new config property named 
"spark.mesos.constraints.driver" for constraining drivers running on a Mesos 
cluster

## How was this patch tested?

Corresponding unit test added also tested locally on a Mesos cluster

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Paul Mackles <pmack...@adobe.com>

Closes #19543 from pmackles/SPARK-19606.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3f9dbf4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3f9dbf4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3f9dbf4

Branch: refs/heads/master
Commit: b3f9dbf48ec0938ff5c98833bb6b6855c620ef57
Parents: 21a7bfd
Author: Paul Mackles <pmack...@adobe.com>
Authored: Sun Nov 12 11:21:23 2017 -0800
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sun Nov 12 11:21:23 2017 -0800

----------------------------------------------------------------------
 docs/running-on-mesos.md                        | 17 ++++++-
 .../org/apache/spark/deploy/mesos/config.scala  |  7 +++
 .../cluster/mesos/MesosClusterScheduler.scala   | 15 ++++---
 .../mesos/MesosClusterSchedulerSuite.scala      | 47 ++++++++++++++++++++
 .../spark/scheduler/cluster/mesos/Utils.scala   | 31 ++++++++-----
 5 files changed, 100 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 7a443ff..19ec7c1 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -263,7 +263,10 @@ resource offers will be accepted.
 conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false")
 {% endhighlight %}
 
-For example, Let's say `spark.mesos.constraints` is set to 
`os:centos7;us-east-1:false`, then the resource offers will be checked to see 
if they meet both these constraints and only then will be accepted to start new 
executors.
+For example, Let's say `spark.mesos.constraints` is set to 
`os:centos7;us-east-1:false`, then the resource offers will
+be checked to see if they meet both these constraints and only then will be 
accepted to start new executors.
+
+To constrain where driver tasks are run, use `spark.mesos.driver.constraints`
 
 # Mesos Docker Support
 
@@ -447,7 +450,9 @@ See the [configuration page](configuration.html) for 
information on Spark config
   <td><code>spark.mesos.constraints</code></td>
   <td>(none)</td>
   <td>
-    Attribute based constraints on mesos resource offers. By default, all 
resource offers will be accepted. Refer to <a 
href="http://mesos.apache.org/documentation/attributes-resources/";>Mesos 
Attributes & Resources</a> for more information on attributes.
+    Attribute based constraints on mesos resource offers. By default, all 
resource offers will be accepted. This setting
+    applies only to executors. Refer to <a 
href="http://mesos.apache.org/documentation/attributes-resources/";>Mesos
+    Attributes & Resources</a> for more information on attributes.
     <ul>
       <li>Scalar constraints are matched with "less than equal" semantics i.e. 
value in the constraint must be less than or equal to the value in the resource 
offer.</li>
       <li>Range constraints are matched with "contains" semantics i.e. value 
in the constraint must be within the resource offer's value.</li>
@@ -458,6 +463,14 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.driver.constraints</code></td>
+  <td>(none)</td>
+  <td>
+    Same as <code>spark.mesos.constraints</code> except applied to drivers 
when launched through the dispatcher. By default,
+    all offers with sufficient resources will be accepted.
+  </td>
+</tr>
+<tr>
   <td><code>spark.mesos.containerizer</code></td>
   <td><code>docker</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 821534e..d134847 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -122,4 +122,11 @@ package object config {
         "Example: key1:val1,key2:val2")
       .stringConf
       .createOptional
+
+  private[spark] val DRIVER_CONSTRAINTS =
+    ConfigBuilder("spark.mesos.driver.constraints")
+      .doc("Attribute based constraints on mesos resource offers. Applied by 
the dispatcher " +
+        "when launching drivers. Default is to accept all offers with 
sufficient resources.")
+      .stringConf
+      .createWithDefault("")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index de846c8..c41283e 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -556,9 +556,10 @@ private[spark] class MesosClusterScheduler(
 
   private class ResourceOffer(
       val offer: Offer,
-      var remainingResources: JList[Resource]) {
+      var remainingResources: JList[Resource],
+      var attributes: JList[Attribute]) {
     override def toString(): String = {
-      s"Offer id: ${offer.getId}, resources: ${remainingResources}"
+      s"Offer id: ${offer.getId}, resources: ${remainingResources}, 
attributes: ${attributes}"
     }
   }
 
@@ -601,10 +602,14 @@ private[spark] class MesosClusterScheduler(
     for (submission <- candidates) {
       val driverCpu = submission.cores
       val driverMem = submission.mem
-      logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: 
$driverMem")
+      val driverConstraints =
+        parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS))
+      logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: 
$driverMem, " +
+        s"driverConstraints: $driverConstraints")
       val offerOption = currentOffers.find { offer =>
         getResource(offer.remainingResources, "cpus") >= driverCpu &&
-        getResource(offer.remainingResources, "mem") >= driverMem
+        getResource(offer.remainingResources, "mem") >= driverMem &&
+        matchesAttributeRequirements(driverConstraints, 
toAttributeMap(offer.attributes))
       }
       if (offerOption.isEmpty) {
         logDebug(s"Unable to find offer to launch driver id: 
${submission.submissionId}, " +
@@ -652,7 +657,7 @@ private[spark] class MesosClusterScheduler(
     val currentTime = new Date()
 
     val currentOffers = offers.asScala.map {
-      offer => new ResourceOffer(offer, offer.getResourcesList)
+      offer => new ResourceOffer(offer, offer.getResourcesList, 
offer.getAttributesList)
     }.toList
 
     stateLock.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 77acee6..e534b9d 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -254,6 +254,53 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
     assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
   }
 
+  test("accept/decline offers with driver constraints") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+    val s2Attributes = List(Utils.createTextAttribute("c1", "a"))
+    val s3Attributes = List(
+      Utils.createTextAttribute("c1", "a"),
+      Utils.createTextAttribute("c2", "b"))
+    val offers = List(
+      Utils.createOffer("o1", "s1", mem, cpu, None, 0),
+      Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
+      Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))
+
+    def submitDriver(driverConstraints: String): Unit = {
+      val response = scheduler.submitDriver(
+        new MesosDriverDescription("d1", "jar", mem, cpu, true,
+          command,
+          Map("spark.mesos.executor.home" -> "test",
+            "spark.app.name" -> "test",
+            config.DRIVER_CONSTRAINTS.key -> driverConstraints),
+          "s1",
+          new Date()))
+      assert(response.success)
+    }
+
+    submitDriver("c1:x")
+    scheduler.resourceOffers(driver, offers.asJava)
+    offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))
+
+    submitDriver("c1:y;c2:z")
+    scheduler.resourceOffers(driver, offers.asJava)
+    offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))
+
+    submitDriver("")
+    scheduler.resourceOffers(driver, offers.asJava)
+    Utils.verifyTaskLaunched(driver, "o1")
+
+    submitDriver("c1:a")
+    scheduler.resourceOffers(driver, offers.asJava)
+    Utils.verifyTaskLaunched(driver, "o2")
+
+    submitDriver("c1:a;c2:b")
+    scheduler.resourceOffers(driver, offers.asJava)
+    Utils.verifyTaskLaunched(driver, "o3")
+  }
+
   test("supports spark.mesos.driver.labels") {
     setScheduler()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b3f9dbf4/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 5636ac5..c9f4747 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -43,12 +43,13 @@ object Utils {
     .build()
 
   def createOffer(
-      offerId: String,
-      slaveId: String,
-      mem: Int,
-      cpus: Int,
-      ports: Option[(Long, Long)] = None,
-      gpus: Int = 0): Offer = {
+                   offerId: String,
+                   slaveId: String,
+                   mem: Int,
+                   cpus: Int,
+                   ports: Option[(Long, Long)] = None,
+                   gpus: Int = 0,
+                   attributes: List[Attribute] = List.empty): Offer = {
     val builder = Offer.newBuilder()
     builder.addResourcesBuilder()
       .setName("mem")
@@ -63,7 +64,7 @@ object Utils {
         .setName("ports")
         .setType(Value.Type.RANGES)
         .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
-          .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
+        .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
     }
     if (gpus > 0) {
       builder.addResourcesBuilder()
@@ -73,9 +74,10 @@ object Utils {
     }
     builder.setId(createOfferId(offerId))
       .setFrameworkId(FrameworkID.newBuilder()
-        .setValue("f1"))
+      .setValue("f1"))
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
       .setHostname(s"host${slaveId}")
+      .addAllAttributes(attributes.asJava)
       .build()
   }
 
@@ -125,7 +127,7 @@ object Utils {
       .getVariablesList
       .asScala
     assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2)  // user-defined secret 
env vars
+      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env 
vars
     val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
@@ -154,7 +156,7 @@ object Utils {
       .getVariablesList
       .asScala
     assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2)  // user-defined secret 
env vars
+      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env 
vars
     val variableOne = envVars.filter(_.getName == "USER").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.VALUE)
@@ -212,4 +214,13 @@ object Utils {
     assert(secretVolTwo.getSource.getSecret.getValue.getData ==
       ByteString.copyFrom("password".getBytes))
   }
+
+  def createTextAttribute(name: String, value: String): Attribute = {
+    Attribute.newBuilder()
+      .setName(name)
+      .setType(Value.Type.TEXT)
+      .setText(Value.Text.newBuilder().setValue(value))
+      .build()
+  }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to