This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9229eb9  [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for 
WorkerWatcher to avoid the duplicate System.exit
9229eb9 is described below

commit 9229eb9a94cfcfd8c030b7dff4ab81227fffc742
Author: yi.wu <[email protected]>
AuthorDate: Thu Jul 1 11:40:00 2021 +0800

    [SPARK-35714][FOLLOW-UP][CORE] Use a shared stopping flag for WorkerWatcher 
to avoid the duplicate System.exit
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to let `WorkerWatcher` reuse the `stopping` flag in 
`CoarseGrainedExecutorBackend` to avoid the duplicate call of `System.exit`.
    
    ### Why are the changes needed?
    
    As a followup of https://github.com/apache/spark/pull/32868, this PR tries 
to give a more robust fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass existing tests.
    
    Closes #33028 from Ngone51/spark-35714-followup.
    
    Lead-authored-by: yi.wu <[email protected]>
    Co-authored-by: wuyi <[email protected]>
    Signed-off-by: yi.wu <[email protected]>
    (cherry picked from commit 868a59470650cc12272de0d0b04c6d98b1fe076d)
    Signed-off-by: yi.wu <[email protected]>
---
 .../apache/spark/deploy/worker/WorkerWatcher.scala | 17 ++++++-----
 .../executor/CoarseGrainedExecutorBackend.scala    | 33 +++++++++++++---------
 2 files changed, 27 insertions(+), 23 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 43ec492..efffc9f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -17,13 +17,10 @@
 
 package org.apache.spark.deploy.worker
 
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.concurrent.duration._
+import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
-import org.apache.spark.util.ThreadUtils
 
 /**
  * Endpoint which connects to a worker process and terminates the JVM if the
@@ -31,7 +28,10 @@ import org.apache.spark.util.ThreadUtils
  * Provides fate sharing between a worker and its associated child processes.
  */
 private[spark] class WorkerWatcher(
-    override val rpcEnv: RpcEnv, workerUrl: String, isTesting: Boolean = false)
+    override val rpcEnv: RpcEnv,
+    workerUrl: String,
+    isTesting: Boolean = false,
+    isChildProcessStopping: AtomicBoolean = new AtomicBoolean(false))
   extends RpcEndpoint with Logging {
 
   logInfo(s"Connecting to worker $workerUrl")
@@ -53,10 +53,9 @@ private[spark] class WorkerWatcher(
   private def exitNonZero() =
     if (isTesting) {
       isShutDown = true
-    } else {
-      ThreadUtils.awaitResult(Future {
-        System.exit(-1)
-      }, 5.seconds)
+    } else if (isChildProcessStopping.compareAndSet(false, true)) {
+      // SPARK-35714: avoid the duplicate call of `System.exit` to avoid the 
dead lock
+      System.exit(-1)
     }
 
   override def receive: PartialFunction[Any, Unit] = {
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index d607ee8..95237c9 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -60,7 +60,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private implicit val formats = DefaultFormats
 
-  private[this] val stopping = new AtomicBoolean(false)
+  private[executor] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
 
@@ -261,18 +261,22 @@ private[spark] class CoarseGrainedExecutorBackend(
                              reason: String,
                              throwable: Throwable = null,
                              notifyDriver: Boolean = true) = {
-    val message = "Executor self-exiting due to : " + reason
-    if (throwable != null) {
-      logError(message, throwable)
-    } else {
-      logError(message)
-    }
+    if (stopping.compareAndSet(false, true)) {
+      val message = "Executor self-exiting due to : " + reason
+      if (throwable != null) {
+        logError(message, throwable)
+      } else {
+        logError(message)
+      }
 
-    if (notifyDriver && driver.nonEmpty) {
-      driver.get.send(RemoveExecutor(executorId, new 
ExecutorLossReason(reason)))
-    }
+      if (notifyDriver && driver.nonEmpty) {
+        driver.get.send(RemoveExecutor(executorId, new 
ExecutorLossReason(reason)))
+      }
 
-    System.exit(code)
+      System.exit(code)
+    } else {
+      logInfo("Skip exiting executor since it's been already asked to exit 
before.")
+    }
   }
 
   private def decommissionSelf(): Unit = {
@@ -441,10 +445,11 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
       val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, 
arguments.bindAddress,
         arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = 
false)
 
-      env.rpcEnv.setupEndpoint("Executor",
-        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
+      val backend = backendCreateFn(env.rpcEnv, arguments, env, 
cfg.resourceProfile)
+      env.rpcEnv.setupEndpoint("Executor", backend)
       arguments.workerUrl.foreach { url =>
-        env.rpcEnv.setupEndpoint("WorkerWatcher", new 
WorkerWatcher(env.rpcEnv, url))
+        env.rpcEnv.setupEndpoint("WorkerWatcher",
+          new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = 
backend.stopping))
       }
       env.rpcEnv.awaitTermination()
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to