This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a4391364dd24 [SPARK-50247][CORE] Define 
`BLOCK_MANAGER_REREGISTRATION_FAILED` as `ExecutorExitCode`
a4391364dd24 is described below

commit a4391364dd240b82bb0980732b6ff0473b299bd4
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Nov 7 10:38:09 2024 +0800

    [SPARK-50247][CORE] Define `BLOCK_MANAGER_REREGISTRATION_FAILED` as 
`ExecutorExitCode`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to define a new error code, 
`BLOCK_MANAGER_REREGISTRATION_FAILED` as `ExecutorExitCode` officially from 
Apache Spark 4.0 like the existing `HEARTBEAT_FAILURE`.
    
    
https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala#L46
    
    ### Why are the changes needed?
    
    Until Spark 3, Spark executor fails with `-1` like the following without 
providing a way to handle this specific error specifically.
    
    
https://github.com/apache/spark/blob/0cb7a42731076b9960eb9ad27067cab7ae570356/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L673-L674
    
    ### Does this PR introduce _any_ user-facing change?
    
    To handle this executor failure reason properly.
    
    ### How was this patch tested?
    
    Pass with the newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48776 from dongjoon-hyun/SPARK-50247.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../apache/spark/executor/ExecutorExitCode.scala   |  5 +++
 .../cluster/StandaloneSchedulerBackend.scala       |  3 ++
 .../org/apache/spark/storage/BlockManager.scala    |  4 +--
 .../scala/org/apache/spark/SparkContextSuite.scala | 37 ++++++++++++++++++++++
 4 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 5300598ef53e..7f0be5c1b704 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -49,6 +49,9 @@ object ExecutorExitCode {
    * TaskReaper. */
   val KILLED_BY_TASK_REAPER = 57
 
+  /** Executor is unable to re-register BlockManager. */
+  val BLOCK_MANAGER_REREGISTRATION_FAILED = 58
+
   def explainExitCode(exitCode: Int): String = {
     exitCode match {
       case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -63,6 +66,8 @@ object ExecutorExitCode {
         "ExternalBlockStore failed to create a local temporary directory."
       case HEARTBEAT_FAILURE =>
         "Unable to send heartbeats to driver."
+      case BLOCK_MANAGER_REREGISTRATION_FAILED =>
+        "Executor killed due to a failure of block manager re-registration."
       case KILLED_BY_TASK_REAPER =>
         "Executor killed by TaskReaper."
       case _ =>
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index f4caecd7d674..eb408a95589f 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -186,6 +186,9 @@ private[spark] class StandaloneSchedulerBackend(
     val reason: ExecutorLossReason = exitStatus match {
       case Some(ExecutorExitCode.HEARTBEAT_FAILURE) =>
         ExecutorExited(ExecutorExitCode.HEARTBEAT_FAILURE, exitCausedByApp = 
false, message)
+      case Some(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED) =>
+        ExecutorExited(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED,
+          exitCausedByApp = false, message)
       case Some(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) =>
         ExecutorExited(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR,
           exitCausedByApp = false, message)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a15b9f91cf26..06857dff99c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,7 +40,7 @@ import org.apache.commons.io.IOUtils
 
 import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
-import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.executor.{DataReadMethod, ExecutorExitCode}
 import org.apache.spark.internal.{config, Logging, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.internal.config.{Network, 
RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests}
@@ -671,7 +671,7 @@ private[spark] class BlockManager(
       reportAllBlocks()
     } else {
       logError("Exiting executor due to block manager re-registration failure")
-      System.exit(-1)
+      System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 44b2da603a1f..dd42549e46d9 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1423,6 +1423,43 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     sc = new SparkContext(conf)
     sc.stop()
   }
+
+  test("SPARK-50247: BLOCK_MANAGER_REREGISTRATION_FAILED should be counted as 
network failure") {
+    // This test case follows the test structure of HEARTBEAT_FAILURE error 
code (SPARK-39957)
+    val conf = new SparkConf().set(TASK_MAX_FAILURES, 1)
+    val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code", 
conf)
+    val result = sc.parallelize(1 to 10, 1).map { x =>
+      val context = org.apache.spark.TaskContext.get()
+      if (context.taskAttemptId() == 0) {
+        System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
+      } else {
+        x
+      }
+    }.count()
+    assert(result == 10L)
+    sc.stop()
+  }
+
+  test("SPARK-50247: BLOCK_MANAGER_REREGISTRATION_FAILED will be counted as 
task failure when " +
+    "EXECUTOR_REMOVE_DELAY is disabled") {
+    // This test case follows the test structure of HEARTBEAT_FAILURE error 
code (SPARK-39957)
+    val conf = new SparkConf().set(TASK_MAX_FAILURES, 
1).set(EXECUTOR_REMOVE_DELAY.key, "0s")
+    val sc = new SparkContext("local-cluster[1, 1, 1024]", "test-exit-code", 
conf)
+    eventually(timeout(30.seconds), interval(1.seconds)) {
+      val e = intercept[SparkException] {
+        sc.parallelize(1 to 10, 1).map { x =>
+          val context = org.apache.spark.TaskContext.get()
+          if (context.taskAttemptId() == 0) {
+            System.exit(ExecutorExitCode.BLOCK_MANAGER_REREGISTRATION_FAILED)
+          } else {
+            x
+          }
+        }.count()
+      }
+      assert(e.getMessage.contains("Remote RPC client disassociated"))
+    }
+    sc.stop()
+  }
 }
 
 object SparkContextSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to