This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new de436f715 [CELEBORN-1783] Fix Pending task in commitThreadPool wont be 
canceled
de436f715 is described below

commit de436f7151ee4859e4ab62efa41aae3122086506
Author: zhengtao <[email protected]>
AuthorDate: Thu Dec 19 14:24:36 2024 +0800

    [CELEBORN-1783] Fix Pending task in commitThreadPool wont be canceled
    
    1. Cancel all commit file jobs when handleCommitFiles timeout.
    2. Fix timeout commit jobs wont be set `CommitInfo.COMMIT_FINISHED`
    
    1. Pending task in commitThreadPool wont be canceled.
    3. Timeout commit jobs should set `commitInfo.status`.
    
![image](https://github.com/user-attachments/assets/38528460-8114-4c42-8dc2-a47ec396f99e)
    
    No.
    
    UT test.
    The `commitInfo.status` should be `COMMIT_FINISHED` when commitFile jobs 
timeout.
    Cluster test.
    
![image](https://github.com/user-attachments/assets/01beb183-da7e-4e44-85e1-3836fcad3c79)
    
    Closes #3004 from zaynt4606/clb1783.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 67971df68fd87fb90f60b17ca70e943abb3036c7)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  8 +--
 .../client/LifecycleManagerCommitFilesSuite.scala  | 74 +++++++++++++++++++++-
 .../service/deploy/worker/Controller.scala         | 34 ++++++----
 3 files changed, 96 insertions(+), 20 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ea0147d5d..203b08c1a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1092,7 +1092,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   // //////////////////////////////////////////////////////
   def testFetchFailure: Boolean = get(TEST_CLIENT_FETCH_FAILURE)
   def testMockDestroySlotsFailure: Boolean = 
get(TEST_CLIENT_MOCK_DESTROY_SLOTS_FAILURE)
-  def testMockCommitFilesFailure: Boolean = 
get(TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE)
+  def testMockCommitFilesFailure: Boolean = get(TEST_MOCK_COMMIT_FILES_FAILURE)
   def testPushPrimaryDataTimeout: Boolean = 
get(TEST_CLIENT_PUSH_PRIMARY_DATA_TIMEOUT)
   def testPushReplicaDataTimeout: Boolean = 
get(TEST_WORKER_PUSH_REPLICA_DATA_TIMEOUT)
   def testRetryRevive: Boolean = get(TEST_CLIENT_RETRY_REVIVE)
@@ -3181,10 +3181,10 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(false)
 
-  val TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE: ConfigEntry[Boolean] =
-    buildConf("celeborn.test.client.mockCommitFilesFailure")
+  val TEST_MOCK_COMMIT_FILES_FAILURE: ConfigEntry[Boolean] =
+    buildConf("celeborn.test.mockCommitFilesFailure")
       .internal
-      .categories("test", "client")
+      .categories("test", "client", "worker")
       .doc("Fail commit files request for test")
       .version("0.3.2")
       .booleanConf
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
index 23452bafa..7ccf6c74a 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerCommitFilesSuite.scala
@@ -52,7 +52,7 @@ class LifecycleManagerCommitFilesSuite extends 
WithShuffleClientSuite with MiniC
   test("test commit files without mocking failure") {
     val shuffleId = nextShuffleId
     val conf = celebornConf.clone
-    conf.set(CelebornConf.TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE.key, "false")
+    conf.set(CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key, "false")
     val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
     val ids = new util.ArrayList[Integer](10)
     0 until 10 foreach {
@@ -102,7 +102,7 @@ class LifecycleManagerCommitFilesSuite extends 
WithShuffleClientSuite with MiniC
   test("test commit files with mocking failure") {
     val shuffleId = nextShuffleId
     val conf = celebornConf.clone
-    conf.set(CelebornConf.TEST_CLIENT_MOCK_COMMIT_FILES_FAILURE.key, "true")
+    conf.set(CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key, "true")
     val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
     val ids = new util.ArrayList[Integer](10)
     0 until 10 foreach {
@@ -149,6 +149,76 @@ class LifecycleManagerCommitFilesSuite extends 
WithShuffleClientSuite with MiniC
     lifecycleManager.stop()
   }
 
+  test("test commit files with timeout failure") {
+    if (workerInfos.nonEmpty) {
+      shutdownMiniCluster()
+    }
+    celebornConf
+      .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "false")
+    val workerConf0 = Map(
+      s"${CelebornConf.WORKER_SHUFFLE_COMMIT_TIMEOUT.key}" -> "100",
+      s"${CelebornConf.WORKER_COMMIT_THREADS.key}" -> "1",
+      s"${CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key}" -> "true")
+    val (master, _) = setupMiniClusterWithRandomPorts(workerConf = workerConf0)
+    celebornConf.set(
+      CelebornConf.MASTER_ENDPOINTS.key,
+      master.conf.get(CelebornConf.MASTER_ENDPOINTS.key))
+
+    val shuffleId = nextShuffleId
+    val conf = celebornConf.clone
+    conf.set(CelebornConf.TEST_MOCK_COMMIT_FILES_FAILURE.key, "true")
+    val lifecycleManager: LifecycleManager = new LifecycleManager(APP, conf)
+    val ids = new util.ArrayList[Integer](1000)
+    0 until 1000 foreach {
+      ids.add(_)
+    }
+    val res = lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId, 
ids)
+    assert(res.status == StatusCode.SUCCESS)
+
+    lifecycleManager.setupEndpoints(
+      res.workerResource.keySet(),
+      shuffleId,
+      new ShuffleFailedWorkers())
+
+    lifecycleManager.reserveSlotsWithRetry(
+      shuffleId,
+      new util.HashSet(res.workerResource.keySet()),
+      res.workerResource,
+      updateEpoch = false)
+
+    lifecycleManager.commitManager.registerShuffle(shuffleId, 1, false)
+    0 until 1000 foreach { partitionId =>
+      lifecycleManager.commitManager.finishMapperAttempt(shuffleId, 0, 0, 1, 
partitionId)
+    }
+
+    val commitHandler = 
lifecycleManager.commitManager.getCommitHandler(shuffleId)
+    val params = new ArrayBuffer[CommitFilesParam](res.workerResource.size())
+    res.workerResource.asScala.foreach { case (workerInfo, (primaryIds, 
replicaIds)) =>
+      params += (CommitFilesParam(
+        workerInfo,
+        primaryIds.asScala.map(_.getUniqueId).toList.asJava,
+        replicaIds.asScala.map(_.getUniqueId).toList.asJava))
+    }
+    commitHandler.doParallelCommitFiles(
+      shuffleId,
+      lifecycleManager.commitManager.committedPartitionInfo.get(shuffleId),
+      params,
+      new ShuffleFailedWorkers)
+
+    workerInfos.keySet.foreach { worker =>
+      val commitInfoList =
+        worker.controller.shuffleCommitInfos.get(Utils.makeShuffleKey(APP, 
shuffleId))
+      assert(worker.controller.commitThreadPool.getQueue.size() == 0)
+      if (commitInfoList != null) {
+        commitInfoList.values().asScala.foreach { commitInfo =>
+          assert(commitInfo.status == CommitInfo.COMMIT_FINISHED)
+          assert(commitInfo.response.status == 
StatusCode.COMMIT_FILE_EXCEPTION)
+        }
+      }
+    }
+    lifecycleManager.stop()
+  }
+
   override def afterAll(): Unit = {
     logInfo("all test complete , stop celeborn mini cluster")
     shutdownMiniCluster()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 4bc8599c2..0e96944d8 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, 
AtomicIntegerArray, AtomicRef
 import java.util.function.BiFunction
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import io.netty.util.{HashedWheelTimer, Timeout, TimerTask}
 import org.roaringbitmap.RoaringBitmap
@@ -61,6 +62,7 @@ private[deploy] class Controller(
   val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate
   var shutdown: AtomicBoolean = _
   val defaultPushdataTimeout = conf.pushDataTimeoutMs
+  val mockCommitFilesFailure = conf.testMockCommitFilesFailure
 
   def init(worker: Worker): Unit = {
     storageManager = worker.storageManager
@@ -278,9 +280,9 @@ private[deploy] class Controller(
       committedStorageInfos: ConcurrentHashMap[String, StorageInfo],
       committedMapIdBitMap: ConcurrentHashMap[String, RoaringBitmap],
       partitionSizeList: LinkedBlockingQueue[Long],
-      isPrimary: Boolean = true): CompletableFuture[Void] = {
-    var future: CompletableFuture[Void] = null
-
+      isPrimary: Boolean = true)
+      : (CompletableFuture[Void], ArrayBuffer[CompletableFuture[Void]]) = {
+    val tasks = ArrayBuffer[CompletableFuture[Void]]()
     if (uniqueIds != null) {
       uniqueIds.asScala.foreach { uniqueId =>
         val task = CompletableFuture.runAsync(
@@ -320,6 +322,9 @@ private[deploy] class Controller(
                 } else {
                   emptyFileIds.add(uniqueId)
                 }
+                if (mockCommitFilesFailure) {
+                  Thread.sleep(10)
+                }
               } catch {
                 case e: IOException =>
                   logError(s"Commit file for $shuffleKey $uniqueId failed.", e)
@@ -328,16 +333,12 @@ private[deploy] class Controller(
             }
           },
           commitThreadPool)
-
-        if (future == null) {
-          future = task
-        } else {
-          future = CompletableFuture.allOf(future, task)
-        }
+        tasks.append(task)
       }
     }
-
-    future
+    val future: CompletableFuture[Void] =
+      if (tasks.isEmpty) null else CompletableFuture.allOf(tasks.toSeq: _*)
+    (future, tasks)
   }
 
   private def waitMapPartitionRegionFinished(fileWriter: FileWriter, 
waitTimeout: Long): Unit = {
@@ -467,7 +468,7 @@ private[deploy] class Controller(
     val committedMapIdBitMap = JavaUtils.newConcurrentHashMap[String, 
RoaringBitmap]()
     val partitionSizeList = new LinkedBlockingQueue[Long]()
 
-    val primaryFuture =
+    val (primaryFuture, primaryTasks) =
       commitFiles(
         shuffleKey,
         primaryIds,
@@ -477,7 +478,7 @@ private[deploy] class Controller(
         committedPrimaryStorageInfos,
         committedMapIdBitMap,
         partitionSizeList)
-    val replicaFuture = commitFiles(
+    val (replicaFuture, replicaTasks) = commitFiles(
       shuffleKey,
       replicaIds,
       committedReplicaIds,
@@ -499,6 +500,8 @@ private[deploy] class Controller(
         null
       }
 
+    val tasks = primaryTasks ++ replicaTasks
+
     def reply(): Unit = {
       // release slots before reply.
       val releasePrimaryLocations =
@@ -582,7 +585,10 @@ private[deploy] class Controller(
         new TimerTask {
           override def run(timeout: Timeout): Unit = {
             if (result.get() != null) {
-              result.get().cancel(true)
+              future.cancel(true)
+              tasks.foreach { task =>
+                task.cancel(true)
+              }
               logWarning(s"After waiting $shuffleCommitTimeout ms, cancel all 
commit file jobs.")
             }
           }

Reply via email to