Repository: spark
Updated Branches:
  refs/heads/master dc4c35183 -> 8da3f7041


[SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher

## What changes were proposed in this pull request?

Add Mesos labels support to the Spark Dispatcher

## How was this patch tested?

unit tests

Author: Michael Gummelt <mgumm...@mesosphere.io>

Closes #18220 from mgummelt/SPARK-21000-dispatcher-labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8da3f704
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8da3f704
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8da3f704

Branch: refs/heads/master
Commit: 8da3f7041aafa71d7596b531625edb899970fec2
Parents: dc4c351
Author: Michael Gummelt <mgumm...@mesosphere.io>
Authored: Sun Jun 11 09:49:39 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Jun 11 09:49:39 2017 +0100

----------------------------------------------------------------------
 docs/running-on-mesos.md                        | 14 +++++-
 .../org/apache/spark/deploy/mesos/config.scala  |  7 +++
 .../cluster/mesos/MesosClusterScheduler.scala   | 10 ++--
 .../MesosCoarseGrainedSchedulerBackend.scala    | 28 ++---------
 .../cluster/mesos/MesosProtoUtils.scala         | 53 ++++++++++++++++++++
 .../mesos/MesosClusterSchedulerSuite.scala      | 27 ++++++++++
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 23 ---------
 .../cluster/mesos/MesosProtoUtilsSuite.scala    | 48 ++++++++++++++++++
 8 files changed, 157 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8745e76..ec130c1 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for 
information on Spark config
   <td>(none)</td>
   <td>
     Set the Mesos labels to add to each task. Labels are free-form key-value 
pairs.
-    Key-value pairs should be separated by a colon, and commas used to list 
more than one.
-    Ex. key:value,key2:value2.
+    Key-value pairs should be separated by a colon, and commas used to
+    list more than one.  If your label includes a colon or comma, you
+    can escape it with a backslash.  Ex. key:value,key2:a\:b.
   </td>
 </tr>
 <tr>
@@ -469,6 +470,15 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
 </tr>
 <tr>
+  <td><code>spark.mesos.driver.labels</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Mesos labels to add to the driver.  See 
<code>spark.mesos.task.labels</code>
+    for formatting information.
+  </td>
+</tr>
+
+<tr>
   <td><code>spark.mesos.driverEnv.[EnvironmentVariableName]</code></td>
   <td><code>(none)</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 19e2533..56d697f 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -56,4 +56,11 @@ package object config {
       .stringConf
       .createOptional
 
+  private [spark] val DRIVER_LABELS =
+    ConfigBuilder("spark.mesos.driver.labels")
+      .doc("Mesos labels to add to the driver.  Labels are free-form key-value 
pairs.  Key-value" +
+        "pairs should be separated by a colon, and commas used to list more 
than one." +
+        "Ex. key:value,key2:value2")
+      .stringConf
+      .createOptional
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1bc6f71..577f9a8 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.config
 import org.apache.spark.deploy.mesos.MesosDriverDescription
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, 
KillSubmissionResponse, SubmissionStatusResponse}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.Utils
 
+
 /**
  * Tracks the current state of a Mesos Task that runs a Spark driver.
  * @param driverDescription Submitted driver description from
@@ -525,15 +527,17 @@ private[spark] class MesosClusterScheduler(
     offer.remainingResources = finalResources.asJava
 
     val appName = desc.conf.get("spark.app.name")
-    val taskInfo = TaskInfo.newBuilder()
+
+    TaskInfo.newBuilder()
       .setTaskId(taskId)
       .setName(s"Driver for ${appName}")
       .setSlaveId(offer.offer.getSlaveId)
       .setCommand(buildDriverCommand(desc))
       .addAllResources(cpuResourcesToUse.asJava)
       .addAllResources(memResourcesToUse.asJava)
-    taskInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
-    taskInfo.build
+      
.setLabels(MesosProtoUtils.mesosLabels(desc.conf.get(config.DRIVER_LABELS).getOrElse("")))
+      .setContainer(MesosSchedulerBackendUtil.containerInfo(desc.conf))
+      .build
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index ac7aec7..871685c 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -419,16 +419,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             .setSlaveId(offer.getSlaveId)
             .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, 
taskId))
             .setName(s"${sc.appName} $taskId")
-
-          taskBuilder.addAllResources(resourcesToUse.asJava)
-          
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
-
-          val labelsBuilder = taskBuilder.getLabelsBuilder
-          val labels = buildMesosLabels().asJava
-
-          labelsBuilder.addAllLabels(labels)
-
-          taskBuilder.setLabels(labelsBuilder)
+            .setLabels(MesosProtoUtils.mesosLabels(taskLabels))
+            .addAllResources(resourcesToUse.asJava)
+            .setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
 
           tasks(offer.getId) ::= taskBuilder.build()
           remainingResources(offerId) = resourcesLeft.asJava
@@ -444,21 +437,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     tasks.toMap
   }
 
-  private def buildMesosLabels(): List[Label] = {
-   taskLabels.split(",").flatMap(label =>
-      label.split(":") match {
-        case Array(key, value) =>
-          Some(Label.newBuilder()
-            .setKey(key)
-            .setValue(value)
-            .build())
-        case _ =>
-          logWarning(s"Unable to parse $label into a key:value label for the 
task.")
-          None
-      }
-    ).toList
-  }
-
   /** Extracts task needed resources from a list of available resources. */
   private def partitionTaskResources(
       resources: JList[Resource],

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala
new file mode 100644
index 0000000..fea01c7
--- /dev/null
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import scala.collection.JavaConverters._
+
+import org.apache.mesos.Protos
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+
+object MesosProtoUtils extends Logging {
+
+  /** Parses a label string of the format specified in 
spark.mesos.task.labels. */
+  def mesosLabels(labelsStr: String): Protos.Labels.Builder = {
+    val labels: Seq[Protos.Label] = if (labelsStr == "") {
+      Seq()
+    } else {
+      labelsStr.split("""(?<!\\),""").toSeq.map { labelStr =>
+        val parts = labelStr.split("""(?<!\\):""")
+        if (parts.length != 2) {
+          throw new SparkException(s"Malformed label: ${labelStr}")
+        }
+
+        val cleanedParts = parts
+          .map(part => part.replaceAll("""\\,""", ","))
+          .map(part => part.replaceAll("""\\:""", ":"))
+
+        Protos.Label.newBuilder()
+          .setKey(cleanedParts(0))
+          .setValue(cleanedParts(1))
+          .build()
+      }
+    }
+
+    Protos.Labels.newBuilder().addAllLabels(labels.asJava)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 32967b0..0bb4790 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -248,6 +248,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
     assert(networkInfos.get(0).getName == "test-network-name")
   }
 
+  test("supports spark.mesos.driver.labels") {
+    setScheduler()
+
+    val mem = 1000
+    val cpu = 1
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", mem, cpu, true,
+        command,
+        Map("spark.mesos.executor.home" -> "test",
+          "spark.app.name" -> "test",
+          "spark.mesos.driver.labels" -> "key:value"),
+        "s1",
+        new Date()))
+
+    assert(response.success)
+
+    val offer = Utils.createOffer("o1", "s1", mem, cpu)
+    scheduler.resourceOffers(driver, List(offer).asJava)
+
+    val launchedTasks = Utils.verifyTaskLaunched(driver, "o1")
+    val labels = launchedTasks.head.getLabels
+    assert(labels.getLabelsCount == 1)
+    assert(labels.getLabels(0).getKey == "key")
+    assert(labels.getLabels(0).getValue == "value")
+  }
+
   test("can kill supervised drivers") {
     val conf = new SparkConf()
     conf.setMaster("mesos://localhost:5050")

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 0418bfb..7cca5fe 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -532,29 +532,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     assert(launchedTasks.head.getLabels.equals(taskLabels))
   }
 
-  test("mesos ignored invalid labels and sets configurable labels on tasks") {
-    val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
-    setBackend(Map(
-      "spark.mesos.task.labels" -> taskLabelsString
-    ))
-
-    // Build up the labels
-    val taskLabels = Protos.Labels.newBuilder()
-      .addLabels(Protos.Label.newBuilder()
-        .setKey("mesos").setValue("test").build())
-      .addLabels(Protos.Label.newBuilder()
-        .setKey("label").setValue("test").build())
-      .build()
-
-    val offers = List(Resources(backend.executorMemory(sc), 1))
-    offerResources(offers)
-    val launchedTasks = verifyTaskLaunched(driver, "o1")
-
-    val labels = launchedTasks.head.getLabels
-
-    assert(launchedTasks.head.getLabels.equals(taskLabels))
-  }
-
   test("mesos supports spark.mesos.network.name") {
     setBackend(Map(
       "spark.mesos.network.name" -> "test-network-name"

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala
new file mode 100644
index 0000000..36a4c1a
--- /dev/null
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosProtoUtilsSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.SparkFunSuite
+
+class MesosProtoUtilsSuite extends SparkFunSuite {
+  test("mesosLabels") {
+    val labels = MesosProtoUtils.mesosLabels("key:value")
+    assert(labels.getLabelsCount == 1)
+    val label = labels.getLabels(0)
+    assert(label.getKey == "key")
+    assert(label.getValue == "value")
+
+    val labels2 = MesosProtoUtils.mesosLabels("key:value\\:value")
+    assert(labels2.getLabelsCount == 1)
+    val label2 = labels2.getLabels(0)
+    assert(label2.getKey == "key")
+    assert(label2.getValue == "value:value")
+
+    val labels3 = MesosProtoUtils.mesosLabels("key:value,key2:value2")
+    assert(labels3.getLabelsCount == 2)
+    assert(labels3.getLabels(0).getKey == "key")
+    assert(labels3.getLabels(0).getValue == "value")
+    assert(labels3.getLabels(1).getKey == "key2")
+    assert(labels3.getLabels(1).getValue == "value2")
+
+    val labels4 = MesosProtoUtils.mesosLabels("key:value\\,value")
+    assert(labels4.getLabelsCount == 1)
+    assert(labels4.getLabels(0).getKey == "key")
+    assert(labels4.getLabels(0).getValue == "value,value")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to