This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new bdccbe4  [SPARK-36532][CORE][3.1] Fix deadlock in 
CoarseGrainedExecutorBackend.onDisconnected to avoid executorsconnected to 
avoid executor shutdown hang shutdown hang
bdccbe4 is described below

commit bdccbe4841e1a34a116ff2aa86063e0a64d45022
Author: yi.wu <[email protected]>
AuthorDate: Fri Aug 20 20:04:09 2021 +0800

    [SPARK-36532][CORE][3.1] Fix deadlock in 
CoarseGrainedExecutorBackend.onDisconnected to avoid executorsconnected to 
avoid executor shutdown hang shutdown hang
    
    ### What changes were proposed in this pull request?
    
    Instead of exiting the executor within the RpcEnv's thread, exit the 
executor in a separate thread.
    
    ### Why are the changes needed?
    
    The current exit way in `onDisconnected` can cause the deadlock, which has 
the exact same root cause with https://github.com/apache/spark/pull/12012:
    
    * `onDisconnected` -> `System.exit` are called in sequence in the thread of 
`MessageLoop.threadpool`
    * `System.exit` triggers shutdown hooks and `executor.stop` is one of the 
hooks.
    * `executor.stop` stops the `Dispatcher`, which waits for the 
`MessageLoop.threadpool`  to shutdown further.
    * Thus, the thread which runs `System.exit` waits for hooks to be done, but 
the `MessageLoop.threadpool` in the hook waits that thread to finish. Finally, 
this mutual dependence results in the deadlock.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the executor shutdown won't hang.
    
    ### How was this patch tested?
    
    Pass existing tests.
    
    Closes #33759 from Ngone51/fix-executor-shutdown-hang.
    
    Authored-by: yi.wu <yi.wudatabricks.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #33795 from Ngone51/cherry-pick-spark-36532.
    
    Authored-by: yi.wu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/executor/CoarseGrainedExecutorBackend.scala | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 95237c9..f8024b0 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -192,11 +192,17 @@ private[spark] class CoarseGrainedExecutorBackend(
       stopping.set(true)
       new Thread("CoarseGrainedExecutorBackend-stop-executor") {
         override def run(): Unit = {
-          // executor.stop() will call `SparkEnv.stop()` which waits until 
RpcEnv stops totally.
-          // However, if `executor.stop()` runs in some thread of RpcEnv, 
RpcEnv won't be able to
-          // stop until `executor.stop()` returns, which becomes a dead-lock 
(See SPARK-14180).
-          // Therefore, we put this line in a new thread.
-          executor.stop()
+          // `executor` can be null if there's any error in 
`CoarseGrainedExecutorBackend.onStart`
+          // or fail to create `Executor`.
+          if (executor == null) {
+            System.exit(1)
+          } else {
+            // executor.stop() will call `SparkEnv.stop()` which waits until 
RpcEnv stops totally.
+            // However, if `executor.stop()` runs in some thread of RpcEnv, 
RpcEnv won't be able to
+            // stop until `executor.stop()` returns, which becomes a dead-lock 
(See SPARK-14180).
+            // Therefore, we put this line in a new thread.
+            executor.stop()
+          }
         }
       }.start()
 
@@ -272,8 +278,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (notifyDriver && driver.nonEmpty) {
         driver.get.send(RemoveExecutor(executorId, new 
ExecutorLossReason(reason)))
       }
-
-      System.exit(code)
+      self.send(Shutdown)
     } else {
       logInfo("Skip exiting executor since it's been already asked to exit 
before.")
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to