Repository: spark
Updated Branches:
  refs/heads/master dee801adb -> 2eaeafe8a


[SPARK-12330][MESOS] Fix mesos coarse mode cleanup

In the current implementation the mesos coarse scheduler does not wait for the 
mesos tasks to complete before ending the driver. This causes a race where the 
task has to finish cleaning up before the mesos driver terminates it with a 
SIGINT (and SIGKILL after 3 seconds if the SIGINT doesn't work).

This PR causes the mesos coarse scheduler to wait for the mesos tasks to finish 
(with a timeout defined by `spark.mesos.coarse.shutdown.ms`)

This PR also fixes a regression caused by [SPARK-10987] whereby submitting a 
shutdown causes a race between the local shutdown procedure and the 
notification of the scheduler driver disconnection. If the scheduler driver 
disconnection wins the race, the coarse executor incorrectly exits with status 
1 (instead of the proper status 0)

With this patch the mesos coarse scheduler terminates properly, the executors 
clean up, and the tasks are reported as `FINISHED` in the Mesos console (as 
opposed to `KILLED` in < 1.6 or `FAILED` in 1.6 and later)

Author: Charles Allen <[email protected]>

Closes #10319 from drcrallen/SPARK-12330.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2eaeafe8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2eaeafe8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2eaeafe8

Branch: refs/heads/master
Commit: 2eaeafe8a2aa31be9b230b8d53d3baccd32535b1
Parents: dee801a
Author: Charles Allen <[email protected]>
Authored: Thu Feb 4 10:27:25 2016 -0800
Committer: Andrew Or <[email protected]>
Committed: Thu Feb 4 10:27:25 2016 -0800

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala |  8 +++-
 .../mesos/CoarseMesosSchedulerBackend.scala     | 39 +++++++++++++++++++-
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2eaeafe8/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 136cf4a..3b5cb18 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
 
 import java.net.URL
 import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.mutable
 import scala.util.{Failure, Success}
@@ -42,6 +43,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     env: SparkEnv)
   extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
 
+  private[this] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
 
@@ -102,19 +104,23 @@ private[spark] class CoarseGrainedExecutorBackend(
       }
 
     case StopExecutor =>
+      stopping.set(true)
       logInfo("Driver commanded a shutdown")
       // Cannot shutdown here because an ack may need to be sent back to the 
caller. So send
       // a message to self to actually do the shutdown.
       self.send(Shutdown)
 
     case Shutdown =>
+      stopping.set(true)
       executor.stop()
       stop()
       rpcEnv.shutdown()
   }
 
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-    if (driver.exists(_.address == remoteAddress)) {
+    if (stopping.get()) {
+      logInfo(s"Driver from $remoteAddress disconnected during shutdown")
+    } else if (driver.exists(_.address == remoteAddress)) {
       logError(s"Driver $remoteAddress disassociated! Shutting down.")
       System.exit(1)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/2eaeafe8/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 2f095b8..722293b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -19,11 +19,13 @@ package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
 import java.util.{Collections, List => JList}
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, HashSet}
 
+import com.google.common.base.Stopwatch
 import com.google.common.collect.HashBiMap
 import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
@@ -60,6 +62,12 @@ private[spark] class CoarseMesosSchedulerBackend(
   // Maximum number of cores to acquire (TODO: we'll need more flexible 
controls here)
   val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
 
+  private[this] val shutdownTimeoutMS = 
conf.getTimeAsMs("spark.mesos.coarse.shutdown.ms", "10s")
+    .ensuring(_ >= 0, "spark.mesos.coarse.shutdown.ms must be >= 0")
+
+  // Synchronization protected by stateLock
+  private[this] var stopCalled: Boolean = false
+
   // If shuffle service is enabled, the Spark driver will register with the 
shuffle service.
   // This is for cleaning up shuffle files reliably.
   private val shuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -245,6 +253,13 @@ private[spark] class CoarseMesosSchedulerBackend(
    */
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     stateLock.synchronized {
+      if (stopCalled) {
+        logDebug("Ignoring offers during shutdown")
+        // Driver should simply return a stopped status on race
+        // condition between this.stop() and completing here
+        offers.asScala.map(_.getId).foreach(d.declineOffer)
+        return
+      }
       val filters = Filters.newBuilder().setRefuseSeconds(5).build()
       for (offer <- offers.asScala) {
         val offerAttributes = toAttributeMap(offer.getAttributesList)
@@ -364,7 +379,29 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   override def stop() {
-    super.stop()
+    // Make sure we're not launching tasks during shutdown
+    stateLock.synchronized {
+      if (stopCalled) {
+        logWarning("Stop called multiple times, ignoring")
+        return
+      }
+      stopCalled = true
+      super.stop()
+    }
+    // Wait for executors to report done, or else mesosDriver.stop() will 
forcefully kill them.
+    // See SPARK-12330
+    val stopwatch = new Stopwatch()
+    stopwatch.start()
+    // slaveIdsWithExecutors has no memory barrier, so this is eventually 
consistent
+    while (slaveIdsWithExecutors.nonEmpty &&
+      stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) {
+      Thread.sleep(100)
+    }
+    if (slaveIdsWithExecutors.nonEmpty) {
+      logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} 
remaining executors "
+        + s"to terminate within $shutdownTimeoutMS ms. This may leave 
temporary files "
+        + "on the mesos nodes.")
+    }
     if (mesosDriver != null) {
       mesosDriver.stop()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to