This is an automated email from the ASF dual-hosted git repository.
joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 54587638685b [SPARK-48541][CORE] Add a new exit code for executors
killed by TaskReaper
54587638685b is described below
commit 54587638685bd633cb3840a23afd5a809d796d47
Author: Bo Zhang <[email protected]>
AuthorDate: Wed Jun 19 11:03:24 2024 -0700
[SPARK-48541][CORE] Add a new exit code for executors killed by TaskReaper
### What changes were proposed in this pull request?
This change adds a new exit code, 57, for executors killed by TaskReaper.
### Why are the changes needed?
This is to better monitor the cases when executors are killed by TaskReaper.
### Does this PR introduce _any_ user-facing change?
Yes. The exit code for executors killed by TaskReaper will change.
### How was this patch tested?
Updated unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46883 from bozhang2820/spark-48541.
Authored-by: Bo Zhang <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
---
.../main/scala/org/apache/spark/executor/Executor.scala | 7 ++++---
.../org/apache/spark/executor/ExecutorExitCode.scala | 6 ++++++
.../spark/util/SparkUncaughtExceptionHandler.scala | 3 +++
.../scala/org/apache/spark/JobCancellationSuite.scala | 17 +++++++++++++++--
.../spark/util/SparkUncaughtExceptionHandlerSuite.scala | 5 +++++
5 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4e5d151468d8..7317d3c47c08 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -1041,9 +1041,8 @@ private[spark] class Executor(
} else {
// In non-local-mode, the exception thrown here will bubble up to
the uncaught exception
// handler and cause the executor JVM to exit.
- throw SparkException.internalError(
- s"Killing executor JVM because killed task $taskId could not be
stopped within " +
- s"$killTimeoutMs ms.", category = "EXECUTOR")
+ throw new KilledByTaskReaperException(s"Killing executor JVM
because killed task " +
+ s"$taskId could not be stopped within $killTimeoutMs ms.")
}
}
} finally {
@@ -1328,3 +1327,5 @@ private[spark] object Executor {
}
}
}
+
+class KilledByTaskReaperException(message: String) extends
SparkException(message)
diff --git
a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 99858f785600..5300598ef53e 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -45,6 +45,10 @@ object ExecutorExitCode {
*/
val HEARTBEAT_FAILURE = 56
+ /** The default uncaught exception handler was reached and the exception was
thrown by
+ * TaskReaper. */
+ val KILLED_BY_TASK_REAPER = 57
+
def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -59,6 +63,8 @@ object ExecutorExitCode {
"ExternalBlockStore failed to create a local temporary directory."
case HEARTBEAT_FAILURE =>
"Unable to send heartbeats to driver."
+ case KILLED_BY_TASK_REAPER =>
+ "Executor killed by TaskReaper."
case _ =>
"Unknown executor exit code (" + exitCode + ")" + (
if (exitCode > 128) {
diff --git
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index 730b762a3948..c1ea4f929101 100644
---
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util
+import org.apache.spark.executor.{ExecutorExitCode,
KilledByTaskReaperException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.THREAD
@@ -56,6 +57,8 @@ private[spark] class SparkUncaughtExceptionHandler(val
exitOnUncaughtException:
// SPARK-24294: This is defensive code, in case that
SparkFatalException is
// misused and uncaught.
System.exit(SparkExitCode.OOM)
+ case _: KilledByTaskReaperException if exitOnUncaughtException =>
+ System.exit(ExecutorExitCode.KILLED_BY_TASK_REAPER)
case _ if exitOnUncaughtException =>
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
case _ =>
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index c15fdf098bb5..58cf14e969e5 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.{Semaphore, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
@@ -29,9 +30,10 @@ import scala.concurrent.duration._
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.must.Matchers
+import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd,
SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerTaskEnd,
SparkListenerTaskStart}
+import org.apache.spark.scheduler.{SparkListener,
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart,
SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.ThreadUtils
/**
@@ -429,12 +431,20 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
.set(TASK_REAPER_KILL_TIMEOUT.key, "5s")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
- // Add a listener to release the semaphore once any tasks are launched.
+ // Add a listener to release a semaphore once any tasks are launched, and
another semaphore
+ // once an executor is removed.
val sem = new Semaphore(0)
+ val semExec = new Semaphore(0)
+ val execLossReason = new ArrayBuffer[String]()
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sem.release()
}
+
+ override def onExecutorRemoved(executorRemoved:
SparkListenerExecutorRemoved): Unit = {
+ execLossReason += executorRemoved.reason
+ semExec.release()
+ }
})
// jobA is the one to be cancelled.
@@ -455,6 +465,9 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA,
15.seconds) }.getCause
assert(e.getMessage contains "cancel")
+ semExec.acquire(2)
+ val expectedReason = s"Command exited with code
${ExecutorExitCode.KILLED_BY_TASK_REAPER}"
+ assert(execLossReason == Seq(expectedReason, expectedReason))
// Once A is cancelled, job B should finish fairly quickly.
assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
diff --git
a/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
b/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
index 9e23b25493df..484340966155 100644
---
a/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/SparkUncaughtExceptionHandlerSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import scala.util.Try
import org.apache.spark.SparkFunSuite
+import org.apache.spark.executor.{ExecutorExitCode,
KilledByTaskReaperException}
class SparkUncaughtExceptionHandlerSuite extends SparkFunSuite {
@@ -33,6 +34,8 @@ class SparkUncaughtExceptionHandlerSuite extends
SparkFunSuite {
(ThrowableTypes.RuntimeException, false, 0),
(ThrowableTypes.OutOfMemoryError, true, SparkExitCode.OOM),
(ThrowableTypes.OutOfMemoryError, false, SparkExitCode.OOM),
+ (ThrowableTypes.KilledByTaskReaperException, true,
ExecutorExitCode.KILLED_BY_TASK_REAPER),
+ (ThrowableTypes.KilledByTaskReaperException, false, 0),
(ThrowableTypes.SparkFatalRuntimeException, true,
SparkExitCode.UNCAUGHT_EXCEPTION),
(ThrowableTypes.SparkFatalRuntimeException, false, 0),
(ThrowableTypes.SparkFatalOutOfMemoryError, true, SparkExitCode.OOM),
@@ -64,6 +67,8 @@ object ThrowableTypes extends Enumeration {
val RuntimeException = ThrowableTypesVal("RuntimeException", new
RuntimeException)
val OutOfMemoryError = ThrowableTypesVal("OutOfMemoryError", new
OutOfMemoryError)
+ val KilledByTaskReaperException =
ThrowableTypesVal("KilledByTaskReaperException",
+ new KilledByTaskReaperException("dummy message"))
val SparkFatalRuntimeException =
ThrowableTypesVal("SparkFatalException(RuntimeException)",
new SparkFatalException(new RuntimeException))
val SparkFatalOutOfMemoryError =
ThrowableTypesVal("SparkFatalException(OutOfMemoryError)",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]