This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 3350b12ce [CELEBORN-2166] Fast fail reduce stage if shuffle data is 
lost because of worker lost
3350b12ce is described below

commit 3350b12ce0a3f3c38214b3e699351d726ad8276e
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Oct 29 10:04:03 2025 +0800

    [CELEBORN-2166] Fast fail reduce stage if shuffle data is lost because of 
worker lost
    
    - Fix the WorkerStatusTracker logic, so unknown workers are marked 
correctly in excluded workers.
    - Trigger shuffle data lost if the worker hosting the shuffle data is lost.
    
    This can be extended to –
    - fast fail mapper stages as well before the commit starts.
    - with push replicate enabled with multiple workers loss.
    
    Currently even if worker crashs or became unavailable for some reason and 
marked as lost by Master, reduce stage still try to read data from it and fail 
after running for sometime which is in-efficient. We can detect this early and 
fail the reduce stage with SHUFFLE_DATA_LOST before starting the stage.
    
    NA
    
    WIP
    
    Closes #3496 from s0nskar/CELEBORN-2166.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 1157d6a8c11966a2b02d0ab1a1f3501174421962)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/celeborn/client/LifecycleManager.scala   |  4 ++--
 .../apache/celeborn/client/WorkerStatusTracker.scala    |  6 +++---
 .../client/commit/ReducePartitionCommitHandler.scala    | 16 +++++++++++++++-
 .../celeborn/client/WorkerStatusTrackerSuite.scala      | 17 ++++++++++++-----
 .../scala/org/apache/celeborn/common/CelebornConf.scala |  9 +++++++++
 docs/configuration/client.md                            |  1 +
 6 files changed, 42 insertions(+), 11 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ab04b319a..e8dc9ddee 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -590,7 +590,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
                 e)
               connectFailedWorkers.put(
                 workerInfo,
-                (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+                (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
           }
           iter.remove()
         }
@@ -608,7 +608,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         logError(s"Init rpc client failed for $shuffleId on $workerInfo during 
reserve slots, reason: Timeout.")
         connectFailedWorkers.put(
           workerInfo,
-          (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+          (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
       }
     }
   }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 698032014..2e94e6cb4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -157,8 +157,7 @@ class WorkerStatusTracker(
       excludedWorkers.asScala.foreach {
         case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
           statusCode match {
-            case StatusCode.WORKER_UNKNOWN |
-                StatusCode.WORKER_UNRESPONSIVE |
+            case StatusCode.WORKER_UNRESPONSIVE |
                 StatusCode.COMMIT_FILE_EXCEPTION |
                 StatusCode.NO_AVAILABLE_WORKING_DIR |
                 StatusCode.RESERVE_SLOTS_FAILED |
@@ -185,7 +184,8 @@ class WorkerStatusTracker(
         }
       }
       for (worker <- res.unknownWorkers.asScala) {
-        if (!excludedWorkers.containsKey(worker)) {
+        if (!excludedWorkers.containsKey(worker) || excludedWorkers.get(
+            worker)._1 != StatusCode.WORKER_UNKNOWN) {
           excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
           statusChanged = true
         }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 41f8e7ab0..7fe781da6 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -134,10 +134,24 @@ class ReducePartitionCommitHandler(
     if (mockShuffleLost) {
       mockShuffleLostShuffle == shuffleId
     } else {
-      dataLostShuffleSet.contains(shuffleId)
+      dataLostShuffleSet.contains(shuffleId) || 
isStageDataLostInUnknownWorker(shuffleId)
     }
   }
 
+  private def isStageDataLostInUnknownWorker(shuffleId: Int): Boolean = {
+    if (conf.clientShuffleDataLostOnUnknownWorkerEnabled && 
!conf.clientPushReplicateEnabled) {
+      val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+      if (allocatedWorkers != null) {
+        return workerStatusTracker.excludedWorkers.asScala.collect {
+          case (workerId, (status, _))
+              if status == StatusCode.WORKER_UNKNOWN && 
allocatedWorkers.contains(workerId) =>
+            workerId
+        }.nonEmpty
+      }
+    }
+    false
+  }
+
   override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean 
= {
     isStageEndOrInProcess(shuffleId)
   }
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index efaeb8439..0355b4d86 100644
--- 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++ 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -36,7 +36,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     val statusTracker = new WorkerStatusTracker(celebornConf, null)
 
     val registerTime = System.currentTimeMillis()
-    statusTracker.excludedWorkers.put(mock("host1"), 
(StatusCode.WORKER_UNKNOWN, registerTime))
+    statusTracker.excludedWorkers.put(mock("host1"), 
(StatusCode.WORKER_UNRESPONSIVE, registerTime))
     statusTracker.excludedWorkers.put(mock("host2"), 
(StatusCode.WORKER_SHUTDOWN, registerTime))
 
     // test reserve (only statusCode list in handleHeartbeatResponse)
@@ -46,7 +46,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     // only reserve host1
     Assert.assertEquals(
       statusTracker.excludedWorkers.get(mock("host1")),
-      (StatusCode.WORKER_UNKNOWN, registerTime))
+      (StatusCode.WORKER_UNRESPONSIVE, registerTime))
     
Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host2")))
 
     // add shutdown/excluded worker
@@ -55,13 +55,20 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     statusTracker.handleHeartbeatResponse(response1)
 
     // test keep Unknown register time
+    Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host1")))
     Assert.assertEquals(
-      statusTracker.excludedWorkers.get(mock("host1")),
-      (StatusCode.WORKER_UNKNOWN, registerTime))
+      statusTracker.excludedWorkers.get(mock("host1"))._1,
+      StatusCode.WORKER_UNKNOWN)
+    Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
+    Assert.assertEquals(
+      statusTracker.excludedWorkers.get(mock("host3"))._1,
+      StatusCode.WORKER_UNKNOWN)
 
     // test new added shutdown/excluded workers
     Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0")))
-    Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
+    Assert.assertEquals(
+      statusTracker.excludedWorkers.get(mock("host0"))._1,
+      StatusCode.WORKER_EXCLUDED)
     
Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4")))
     Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 666e3ff4f..7b8cf19e8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1093,6 +1093,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
   def clientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
     get(CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ_ENABLED)
+  def clientShuffleDataLostOnUnknownWorkerEnabled: Boolean =
+    get(CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED)
 
   // //////////////////////////////////////////////////////
   //                   Client Shuffle                    //
@@ -6666,4 +6668,11 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefaultString("true")
 
+  val CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED: ConfigEntry[Boolean] 
=
+    buildConf("celeborn.client.shuffleDataLostOnUnknownWorker.enabled")
+      .categories("client")
+      .version("0.7.0")
+      .doc("Whether to mark shuffle data lost when unknown worker is 
detected.")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 5b6ae85e2..fb56d8d72 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -121,6 +121,7 @@ license: |
 | celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark 
application have skewed partition, this value can set to true to improve 
performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled | 
 | celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | 
false | Whether to filter excluded worker when register shuffle. | 0.4.0 |  | 
 | celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether 
to revise lost shuffles. | 0.6.0 |  | 
+| celeborn.client.shuffleDataLostOnUnknownWorker.enabled | false | false | 
Whether to mark shuffle data lost when unknown worker is detected. | 0.7.0 |  | 
 | celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
 | celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to 
clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 |  
| 
 | celeborn.client.spark.fetch.cleanFailedShuffleInterval | 1s | false | the 
interval to clean the failed-to-fetch shuffle files, only valid when 
celeborn.client.spark.fetch.cleanFailedShuffle is enabled | 0.6.0 |  | 

Reply via email to