This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c84733fcf [CELEBORN-1725][FOLLOWUP] Optimize `isAllMapTasksEnd` 
performance
c84733fcf is described below

commit c84733fcf8d463f2c0e35c33747cbfbbc68da411
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 29 17:58:08 2024 +0800

    [CELEBORN-1725][FOLLOWUP] Optimize `isAllMapTasksEnd` performance
    
    ### What changes were proposed in this pull request?
    
    Followup for https://github.com/apache/celeborn/pull/2905,
    
    using the same logic to optimize `isAllMapTasksEnd` method.
    
    ### Why are the changes needed?
    Address comments: 
https://github.com/apache/celeborn/pull/2905#pullrequestreview-2457905880
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Same logic with https://github.com/apache/celeborn/pull/2905
    
    Closes #2959 from turboFei/celeborn_1725_follow.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/client/ClientUtils.scala   | 40 ++++++++++++++++++++++
 .../apache/celeborn/client/LifecycleManager.scala  |  6 ++--
 .../commit/ReducePartitionCommitHandler.scala      | 15 ++------
 3 files changed, 45 insertions(+), 16 deletions(-)

diff --git a/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala 
b/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
new file mode 100644
index 000000000..d7dccb941
--- /dev/null
+++ b/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+object ClientUtils {
+
+  /**
+   * Check if all the mapper attempts are finished. If any of the attempts is 
not finished, return false.
+   * This method checks the attempts array in reverse order, which can be 
faster if the unfinished attempts
+   * are more likely to be towards the end of the array.
+   *
+   * @param attempts The mapper finished attemptId array. An attempt ID of -1 
indicates that the mapper is not finished.
+   * @return True if all mapper attempts are finished, false otherwise.
+   */
+  def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
+    var i = attempts.length - 1
+    while (i >= 0) {
+      if (attempts(i) < 0) {
+        return false
+      }
+      i -= 1
+    }
+    true
+  }
+}
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 056747e22..dbb1f0b3e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -862,8 +862,8 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       return
     }
 
-    def isAllMaptaskEnd(shuffleId: Int): Boolean = {
-      !commitManager.getMapperAttempts(shuffleId).exists(_ < 0)
+    def areAllMapTasksEnd(shuffleId: Int): Boolean = {
+      
ClientUtils.areAllMapperAttemptsFinished(commitManager.getMapperAttempts(shuffleId))
     }
 
     shuffleIds.synchronized {
@@ -912,7 +912,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         }
       } else {
         shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
-          isAllMaptaskEnd) match {
+          areAllMapTasksEnd) match {
           case Some(shuffleId) =>
             val pbGetShuffleIdResponse = {
               logDebug(
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 9148b9501..a2b7ac11a 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 
 import com.google.common.cache.{Cache, CacheBuilder}
 
-import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
+import org.apache.celeborn.client.{ClientUtils, ShuffleCommittedInfo, 
WorkerStatusTracker}
 import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers}
 import org.apache.celeborn.common.CelebornConf
@@ -251,7 +251,7 @@ class ReducePartitionCommitHandler(
       if (attempts(mapId) < 0) {
         attempts(mapId) = attemptId
         // Mapper with this attemptId finished, also check all other mapper 
finished or not.
-        (true, areAllMapperAttemptsFinished(attempts))
+        (true, ClientUtils.areAllMapperAttemptsFinished(attempts))
       } else {
         // Mapper with another attemptId finished, skip this request
         (false, false)
@@ -336,15 +336,4 @@ class ReducePartitionCommitHandler(
 
     (timeout <= 0, stageEndTimeout - timeout)
   }
-
-  private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
-    var i = attempts.length - 1
-    while (i >= 0) {
-      if (attempts(i) < 0) {
-        return false
-      }
-      i -= 1
-    }
-    true
-  }
 }

Reply via email to