This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aaba0e0eb72 [SPARK-40320][CORE] Executor should exit when 
initialization failed for fatal error
aaba0e0eb72 is described below

commit aaba0e0eb72dd6d211dfae2a61255cd88dad0169
Author: chenliang.lu <[email protected]>
AuthorDate: Wed Oct 26 01:44:37 2022 -0500

    [SPARK-40320][CORE] Executor should exit when initialization failed for 
fatal error
    
    ### What changes were proposed in this pull request?
    Before when the Executor plugin fails to initialize for fatal error, the 
Executor shows active in sparkUI but does not accept any job. As expected 
Executor should exit when initialization failed with fatal error.
    The root cause (thanks for help mridulm and Ngone51 ):  The thread pool 
execution uses the inappropriate API `submit` instead of `execute`. This leads 
to  `SparkUncaughtExceptionHandler` can't catch the fatal error.
    
    ### Why are the changes needed?
    As expected Executor should exit when initialization failed with fatal 
error. Before Executor was active but can't do anything which is confused.
    
    ### Does this PR introduce any user-facing change?
    No
    ### How was this patch tested?
    new added test
    
    Closes #37779 from yabola/executor.
    
    Authored-by: chenliang.lu <[email protected]>
    Signed-off-by: Mridul <mridul<at>gmail.com>
---
 .../org/apache/spark/rpc/netty/MessageLoop.scala   |  6 ++-
 .../CoarseGrainedExecutorBackendSuite.scala        | 59 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
index c985c72f2ad..df7cd0b44c9 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
@@ -171,7 +171,11 @@ private class DedicatedMessageLoop(
   }
 
   (1 to endpoint.threadCount()).foreach { _ =>
-    threadpool.submit(receiveLoopRunnable)
+    /**
+     * We need to be careful not to use [[ExecutorService#submit]].
+     * `submit` api will swallow uncaught exceptions in 
[[FutureTask#setException]].
+     * */
+    threadpool.execute(receiveLoopRunnable)
   }
 
   // Mark active to handle the OnStart message.
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index a8b1304b76f..64789ca94e0 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.concurrent.TrieMap
 import scala.collection.mutable
@@ -36,11 +37,13 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.TestUtils._
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, 
PluginContext, SparkPlugin}
+import org.apache.spark.internal.config.PLUGINS
 import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, TaskDescription}
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{KillTask, 
LaunchTask}
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
@@ -535,6 +538,39 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
     }
   }
 
+  /**
+   * A fatal error occurred when [[Executor]] was initialized, this should be 
caught by
+   * [[SparkUncaughtExceptionHandler]] and [[Executor]] can exit by itself.
+   */
+  test("SPARK-40320 Executor should exit when initialization failed for fatal 
error") {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[1, 1, 1024]")
+      .set(PLUGINS, Seq(classOf[TestFatalErrorPlugin].getName))
+      .setAppName("test")
+    sc = new SparkContext(conf)
+    val executorAddCounter = new AtomicInteger(0)
+    val executorRemovedCounter = new AtomicInteger(0)
+
+    val listener = new SparkListener() {
+      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
+        executorAddCounter.getAndIncrement()
+      }
+
+      override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = {
+        executorRemovedCounter.getAndIncrement()
+      }
+    }
+    try {
+      sc.addSparkListener(listener)
+      eventually(timeout(15.seconds)) {
+        assert(executorAddCounter.get() >= 2)
+        assert(executorRemovedCounter.get() >= 2)
+      }
+    } finally {
+      sc.removeSparkListener(listener)
+    }
+  }
+
   private def createMockEnv(conf: SparkConf, serializer: JavaSerializer,
       rpcEnv: Option[RpcEnv] = None): SparkEnv = {
     val mockEnv = mock[SparkEnv]
@@ -547,3 +583,24 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
     mockEnv
   }
 }
+
+private class TestFatalErrorPlugin extends SparkPlugin {
+  override def driverPlugin(): DriverPlugin = new TestDriverPlugin()
+
+  override def executorPlugin(): ExecutorPlugin = new TestErrorExecutorPlugin()
+}
+
+private class TestDriverPlugin extends DriverPlugin {
+}
+
+private class TestErrorExecutorPlugin extends ExecutorPlugin {
+
+  override def init(_ctx: PluginContext, extraConf: java.util.Map[String, 
String]): Unit = {
+    // scalastyle:off throwerror
+    /**
+     * A fatal error. See nonFatal definition in [[NonFatal]].
+     */
+    throw new UnsatisfiedLinkError("Mock throws fatal error.")
+    // scalastyle:on throwerror
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to