This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a4391364dd24 [SPARK-50247][CORE] Define
`BLOCK_MANAGER_REREGISTRATION_FAILED` as `ExecutorExitCode`
a4391364dd24 is described below
commit a4391364dd240b82bb0980732b6ff0473b299bd4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 7 10:38:09 2024 +0800
[SPARK-50247][CORE] Define `BLOCK_MANAGER_REREGISTRATION_FAILED` as
`ExecutorExitCode`
### What changes were proposed in this pull request?
This PR aims to define a new error code,
`BLOCK_MANAGER_REREGISTRATION_FAILED` as `ExecutorExitCode` officially from
Apache Spark 4.0 like the existing `HEARTBEAT_FAILURE`.
https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala#L46
### Why are the changes needed?
Until Spark 3, Spark executor fails with `-1` like the following without
providing a way to handle this specific error specifically.
https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L673-L674
### Does this PR introduce _any_ user-facing change?
To handle this executor failure reason properly.
### How was this patch tested?
Pass with the newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48776 from dongjoon-hyun/SPARK-50247.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../apache/spark/executor/ExecutorExitCode.scala | 5 +++
.../cluster/StandaloneSchedulerBackend.scala | 3 ++
.../org/apache/spark/storage/BlockManager.scala | 4 +--
.../scala/org/apache/spark/SparkContextSuite.scala | 37 ++++++++++++++++++++++
4 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 5300598ef53e..7f0be5c1b704 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -49,6 +49,9 @@ object ExecutorExitCode {
* TaskReaper. */
val KILLED_BY_TASK_REAPER = 57
+ /** Executor is unable to re-register BlockManager. */
+ val BLOCK_MANAGER_REREGISTRATION_FAILED = 58
+
def explainExitCode(exitCode: Int): String = {
exitCode match {
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -63,6 +66,8 @@ object ExecutorExitCode {
"ExternalBlockStore failed to create a local temporary directory."
case HEARTBEAT_FAILURE =>
"Unable to send heartbeats to driver."
+ case BLOCK_MANAGER_REREGISTRATION_FAILED =>
+ "Executor killed due to a failure of block manager re-registration."
case KILLED_BY_TASK_REAPER =>
"Executor killed by TaskReaper."
case _ =>
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index f4caecd7d674..eb408a95589f 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -186,6 +186,9 @@ private[spark] class StandaloneSchedulerBackend(
val reason: ExecutorLossReason = exitStatus match {
case Some(ExecutorExitCode.HEARTBEAT_FAILURE) =>
ExecutorExited(ExecutorExitCode.HEARTBEAT_FAILURE, exitCausedByApp =
false, message)
+ case Some(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED) =>
+ ExecutorExited(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED,
+ exitCausedByApp = false, message)
case Some(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) =>
ExecutorExited(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR,
exitCausedByApp = false, message)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a15b9f91cf26..06857dff99c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,7 +40,7 @@ import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.executor.{DataReadMethod, ExecutorExitCode}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{Network,
RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests}
@@ -671,7 +671,7 @@ private[spark] class BlockManager(
reportAllBlocks()
} else {
logError("Exiting executor due to block manager re-registration failure")
- System.exit(-1)
+ System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 44b2da603a1f..dd42549e46d9 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1423,6 +1423,43 @@ class SparkContextSuite extends SparkFunSuite with
LocalSparkContext with Eventu
sc = new SparkContext(conf)
sc.stop()
}
+
+ test("SPARK-50247: BLOCK_MANAGER_REREGISTRATION_FAILED should be counted as
network failure") {
+ // This test case follows the test structure of HEARTBEAT_FAILURE error
code (SPARK-39957)
+ val conf = new SparkConf().set(TASK_MAX_FAILURES, 1)
+ val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code",
conf)
+ val result = sc.parallelize(1 to 10, 1).map { x =>
+ val context = org.apache.spark.TaskContext.get()
+ if (context.taskAttemptId() == 0) {
+ System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
+ } else {
+ x
+ }
+ }.count()
+ assert(result == 10L)
+ sc.stop()
+ }
+
+ test("SPARK-50247: BLOCK_MANAGER_REREGISTRATION_FAILED will be counted as
task failure when " +
+ "EXECUTOR_REMOVE_DELAY is disabled") {
+ // This test case follows the test structure of HEARTBEAT_FAILURE error
code (SPARK-39957)
+ val conf = new SparkConf().set(TASK_MAX_FAILURES,
1).set(EXECUTOR_REMOVE_DELAY.key, "0s")
+ val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code",
conf)
+ eventually(timeout(30.seconds), interval(1.seconds)) {
+ val e = intercept[SparkException] {
+ sc.parallelize(1 to 10, 1).map { x =>
+ val context = org.apache.spark.TaskContext.get()
+ if (context.taskAttemptId() == 0) {
+ System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
+ } else {
+ x
+ }
+ }.count()
+ }
+ assert(e.getMessage.contains("Remote RPC client disassociated"))
+ }
+ sc.stop()
+ }
}
object SparkContextSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]