This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new aaba0e0eb72 [SPARK-40320][CORE] Executor should exit when
initialization failed for fatal error
aaba0e0eb72 is described below
commit aaba0e0eb72dd6d211dfae2a61255cd88dad0169
Author: chenliang.lu <[email protected]>
AuthorDate: Wed Oct 26 01:44:37 2022 -0500
[SPARK-40320][CORE] Executor should exit when initialization failed for
fatal error
### What changes were proposed in this pull request?
Before when the Executor plugin fails to initialize for fatal error, the
Executor shows active in sparkUI but does not accept any job. As expected
Executor should exit when initialization failed with fatal error.
The root cause (thanks for help mridulm and Ngone51 ): The thread pool
execution uses the inappropriate API `submit` instead of `execute`. This leads
to `SparkUncaughtExceptionHandler` can't catch the fatal error.
### Why are the changes needed?
As expected Executor should exit when initialization failed with fatal
error. Before Executor was active but can't do anything which is confused.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
new added test
Closes #37779 from yabola/executor.
Authored-by: chenliang.lu <[email protected]>
Signed-off-by: Mridul <mridul<at>gmail.com>
---
.../org/apache/spark/rpc/netty/MessageLoop.scala | 6 ++-
.../CoarseGrainedExecutorBackendSuite.scala | 59 +++++++++++++++++++++-
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
index c985c72f2ad..df7cd0b44c9 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala
@@ -171,7 +171,11 @@ private class DedicatedMessageLoop(
}
(1 to endpoint.threadCount()).foreach { _ =>
- threadpool.submit(receiveLoopRunnable)
+ /**
+ * We need to be careful not to use [[ExecutorService#submit]].
+ * `submit` api will swallow uncaught exceptions in
[[FutureTask#setException]].
+ * */
+ threadpool.execute(receiveLoopRunnable)
}
// Mark active to handle the OnStart message.
diff --git
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index a8b1304b76f..64789ca94e0 100644
---
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
@@ -36,11 +37,13 @@ import org.scalatestplus.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.TestUtils._
+import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin,
PluginContext, SparkPlugin}
+import org.apache.spark.internal.config.PLUGINS
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded,
SparkListenerExecutorRemoved, TaskDescription}
import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{KillTask,
LaunchTask}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
@@ -535,6 +538,39 @@ class CoarseGrainedExecutorBackendSuite extends
SparkFunSuite
}
}
+ /**
+ * A fatal error occurred when [[Executor]] was initialized, this should be
caught by
+ * [[SparkUncaughtExceptionHandler]] and [[Executor]] can exit by itself.
+ */
+ test("SPARK-40320 Executor should exit when initialization failed for fatal
error") {
+ val conf = new SparkConf()
+ .setMaster("local-cluster[1, 1, 1024]")
+ .set(PLUGINS, Seq(classOf[TestFatalErrorPlugin].getName))
+ .setAppName("test")
+ sc = new SparkContext(conf)
+ val executorAddCounter = new AtomicInteger(0)
+ val executorRemovedCounter = new AtomicInteger(0)
+
+ val listener = new SparkListener() {
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded):
Unit = {
+ executorAddCounter.getAndIncrement()
+ }
+
+ override def onExecutorRemoved(executorRemoved:
SparkListenerExecutorRemoved): Unit = {
+ executorRemovedCounter.getAndIncrement()
+ }
+ }
+ try {
+ sc.addSparkListener(listener)
+ eventually(timeout(15.seconds)) {
+ assert(executorAddCounter.get() >= 2)
+ assert(executorRemovedCounter.get() >= 2)
+ }
+ } finally {
+ sc.removeSparkListener(listener)
+ }
+ }
+
private def createMockEnv(conf: SparkConf, serializer: JavaSerializer,
rpcEnv: Option[RpcEnv] = None): SparkEnv = {
val mockEnv = mock[SparkEnv]
@@ -547,3 +583,24 @@ class CoarseGrainedExecutorBackendSuite extends
SparkFunSuite
mockEnv
}
}
+
+private class TestFatalErrorPlugin extends SparkPlugin {
+ override def driverPlugin(): DriverPlugin = new TestDriverPlugin()
+
+ override def executorPlugin(): ExecutorPlugin = new TestErrorExecutorPlugin()
+}
+
+private class TestDriverPlugin extends DriverPlugin {
+}
+
+private class TestErrorExecutorPlugin extends ExecutorPlugin {
+
+ override def init(_ctx: PluginContext, extraConf: java.util.Map[String,
String]): Unit = {
+ // scalastyle:off throwerror
+ /**
+ * A fatal error. See nonFatal definition in [[NonFatal]].
+ */
+ throw new UnsatisfiedLinkError("Mock throws fatal error.")
+ // scalastyle:on throwerror
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]