This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 95e73b3  [SPARK-27112][CORE] : Create a resource ordering between 
threads to r…
95e73b3 is described below

commit 95e73b328ac883be2ced9099f20c8878e498e297
Author: pgandhi <pgan...@verizonmedia.com>
AuthorDate: Tue Mar 19 16:22:40 2019 -0500

    [SPARK-27112][CORE] : Create a resource ordering between threads to r…
    
    …esolve the deadlocks encountered when trying to kill executors either due 
to dynamic allocation or blacklisting
    
    Closes #24072 from pgandhi999/SPARK-27112-2.
    
    Authored-by: pgandhi <pgandhiverizonmedia.com>
    Signed-off-by: Imran Rashid <irashidcloudera.com>
    
    ## What changes were proposed in this pull request?
    
    There are two deadlocks as a result of the interplay between three 
different threads:
    
    **task-result-getter thread**
    
    **spark-dynamic-executor-allocation thread**
    
    **dispatcher-event-loop thread(makeOffers())**
    
    The fix ensures ordering synchronization constraint by acquiring lock on 
`TaskSchedulerImpl` before acquiring lock on `CoarseGrainedSchedulerBackend` in 
`makeOffers()` as well as killExecutors() method. This ensures resource 
ordering between the threads and thus, fixes the deadlocks.
    
    ## How was this patch tested?
    
    Manual Tests
    
    Closes #24134 from pgandhi999/branch-2.4-SPARK-27112.
    
    Authored-by: pgandhi <pgan...@verizonmedia.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala   | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index de7c0d8..aa4e638 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -237,7 +237,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     // Make fake resource offers on all executors
     private def makeOffers() {
       // Make sure no executor is killed while some task is launching on it
-      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+      val taskDescs = withLock {
         // Filter out executors under killing
         val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
         val workOffers = activeExecutors.map {
@@ -263,7 +263,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     // Make fake resource offers on just one executor
     private def makeOffers(executorId: String) {
       // Make sure no executor is killed while some task is launching on it
-      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+      val taskDescs = withLock {
         // Filter out executors under killing
         if (executorIsAlive(executorId)) {
           val executorData = executorDataMap(executorId)
@@ -607,7 +607,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       force: Boolean): Seq[String] = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
 
-    val response = synchronized {
+    val response = withLock {
       val (knownExecutors, unknownExecutors) = 
executorIds.partition(executorDataMap.contains)
       unknownExecutors.foreach { id =>
         logWarning(s"Executor to kill $id does not exist!")
@@ -685,6 +685,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
+
+  // SPARK-27112: We need to ensure that there is ordering of lock acquisition
+  // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in 
order to fix
+  // the deadlock issue exposed in SPARK-27112
+  private def withLock[T](fn: => T): T = scheduler.synchronized {
+    CoarseGrainedSchedulerBackend.this.synchronized { fn }
+  }
 }
 
 private[spark] object CoarseGrainedSchedulerBackend {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to