Repository: spark
Updated Branches:
  refs/heads/master 7a2b5f93b -> 89bf370e4


[SPARK-15555][MESOS] Driver with --supervise option cannot be killed in Mesos 
mode

## What changes were proposed in this pull request?

Not adding the Killed applications for retry.
## How was this patch tested?

I have verified manually in the Mesos cluster, with the changes the killed 
applications move to Finished Drivers section and will not retry.

Author: Devaraj K <deva...@apache.org>

Closes #13323 from devaraj-kavali/SPARK-15555.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89bf370e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89bf370e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89bf370e

Branch: refs/heads/master
Commit: 89bf370e4f53c02b018b23adc653cd718869489e
Parents: 7a2b5f9
Author: Devaraj K <deva...@apache.org>
Authored: Tue Jan 3 11:02:42 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Tue Jan 3 11:02:42 2017 -0800

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala   |  1 -
 .../mesos/MesosClusterSchedulerSuite.scala      | 57 +++++++++++++++++++-
 2 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/89bf370e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index f384290..c5bbcb9 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -647,7 +647,6 @@ private[spark] class MesosClusterScheduler(
    */
   private def shouldRelaunch(state: MesosTaskState): Boolean = {
     state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
       state == MesosTaskState.TASK_LOST
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/89bf370e/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 74e5ce2..b9d0984 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date}
 
 import scala.collection.JavaConverters._
 
-import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.Protos.Value.{Scalar, Type}
 import org.apache.mesos.SchedulerDriver
 import org.mockito.{ArgumentCaptor, Matchers}
@@ -236,4 +236,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite 
with LocalSparkContext wi
     assert(networkInfos.size == 1)
     assert(networkInfos.get(0).getName == "test-network-name")
   }
+
+  test("can kill supervised drivers") {
+    val driver = mock[SchedulerDriver]
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("spark mesos")
+    scheduler = new MesosClusterScheduler(
+      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+      override def start(): Unit = {
+        ready = true
+        mesosDriver = driver
+      }
+    }
+    scheduler.start()
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", 
"test")), "s1", new Date()))
+    assert(response.success)
+    val slaveId = SlaveID.newBuilder().setValue("s1").build()
+    val offer = Offer.newBuilder()
+      .addResources(
+        Resource.newBuilder().setRole("*")
+          
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+      .addResources(
+        Resource.newBuilder().setRole("*")
+          .setScalar(Scalar.newBuilder().setValue(1000).build())
+          .setName("mem")
+          .setType(Type.SCALAR))
+      .setId(OfferID.newBuilder().setValue("o1").build())
+      .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+      .setSlaveId(slaveId)
+      .setHostname("host1")
+      .build()
+    // Offer the resource to launch the submitted driver
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+    var state = scheduler.getSchedulerState()
+    assert(state.launchedDrivers.size == 1)
+    // Issue the request to kill the launched driver
+    val killResponse = scheduler.killDriver(response.submissionId)
+    assert(killResponse.success)
+
+    val taskStatus = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+      .setSlaveId(slaveId)
+      .setState(MesosTaskState.TASK_KILLED)
+      .build()
+    // Update the status of the killed task
+    scheduler.statusUpdate(driver, taskStatus)
+    // Driver should be moved to finishedDrivers for kill
+    state = scheduler.getSchedulerState()
+    assert(state.pendingRetryDrivers.isEmpty)
+    assert(state.launchedDrivers.isEmpty)
+    assert(state.finishedDrivers.size == 1)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to