Repository: spark Updated Branches: refs/heads/master 7a2b5f93b -> 89bf370e4
[SPARK-15555][MESOS] Driver with --supervise option cannot be killed in Mesos mode ## What changes were proposed in this pull request? Not adding the Killed applications for retry. ## How was this patch tested? I have verified manually in the Mesos cluster, with the changes the killed applications move to Finished Drivers section and will not retry. Author: Devaraj K <deva...@apache.org> Closes #13323 from devaraj-kavali/SPARK-15555. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/89bf370e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/89bf370e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/89bf370e Branch: refs/heads/master Commit: 89bf370e4f53c02b018b23adc653cd718869489e Parents: 7a2b5f9 Author: Devaraj K <deva...@apache.org> Authored: Tue Jan 3 11:02:42 2017 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Tue Jan 3 11:02:42 2017 -0800 ---------------------------------------------------------------------- .../cluster/mesos/MesosClusterScheduler.scala | 1 - .../mesos/MesosClusterSchedulerSuite.scala | 57 +++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/89bf370e/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index f384290..c5bbcb9 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -647,7 +647,6 @@ private[spark] class MesosClusterScheduler( */ private def shouldRelaunch(state: MesosTaskState): Boolean = { state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || state == MesosTaskState.TASK_LOST } http://git-wip-us.apache.org/repos/asf/spark/blob/89bf370e/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 74e5ce2..b9d0984 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date} import scala.collection.JavaConverters._ -import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} @@ -236,4 +236,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(networkInfos.size == 1) assert(networkInfos.get(0).getName == "test-network-name") } + + test("can kill supervised drivers") { + val driver = mock[SchedulerDriver] + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("spark mesos") + scheduler = new MesosClusterScheduler( + new BlackHoleMesosClusterPersistenceEngineFactory, conf) { + override def start(): Unit = { + ready = true + mesosDriver = driver + } + } + scheduler.start() + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 100, 1, true, command, + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date())) + assert(response.success) + val slaveId = SlaveID.newBuilder().setValue("s1").build() + val offer = Offer.newBuilder() + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1000).build()) + .setName("mem") + .setType(Type.SCALAR)) + .setId(OfferID.newBuilder().setValue("o1").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build()) + .setSlaveId(slaveId) + .setHostname("host1") + .build() + // Offer the resource to launch the submitted driver + scheduler.resourceOffers(driver, Collections.singletonList(offer)) + var state = scheduler.getSchedulerState() + assert(state.launchedDrivers.size == 1) + // Issue the request to kill the launched driver + val killResponse = scheduler.killDriver(response.submissionId) + assert(killResponse.success) + + val taskStatus = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) + .setSlaveId(slaveId) + .setState(MesosTaskState.TASK_KILLED) + .build() + // Update the status of the killed task + scheduler.statusUpdate(driver, taskStatus) + // Driver should be moved to finishedDrivers for kill + state = scheduler.getSchedulerState() + assert(state.pendingRetryDrivers.isEmpty) + assert(state.launchedDrivers.isEmpty) + assert(state.finishedDrivers.size == 1) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org