This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 93bdda517 [CELEBORN-1725][FOLLOWUP] Optimize `isAllMapTasksEnd`
performance
93bdda517 is described below
commit 93bdda517814ede906c79d8e74c153b3c57c0f58
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 29 17:58:08 2024 +0800
[CELEBORN-1725][FOLLOWUP] Optimize `isAllMapTasksEnd` performance
Followup for https://github.com/apache/celeborn/pull/2905,
using the same logic to optimize `isAllMapTasksEnd` method.
Address comments:
https://github.com/apache/celeborn/pull/2905#pullrequestreview-2457905880
No.
Same logic with https://github.com/apache/celeborn/pull/2905
Closes #2959 from turboFei/celeborn_1725_follow.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit c84733fcf8d463f2c0e35c33747cbfbbc68da411)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/client/ClientUtils.scala | 40 ++++++++++++++++++++++
.../apache/celeborn/client/LifecycleManager.scala | 6 ++--
.../commit/ReducePartitionCommitHandler.scala | 15 ++------
3 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
b/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
new file mode 100644
index 000000000..d7dccb941
--- /dev/null
+++ b/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+object ClientUtils {
+
+ /**
+ * Check if all the mapper attempts are finished. If any of the attempts is
not finished, return false.
+ * This method checks the attempts array in reverse order, which can be
faster if the unfinished attempts
+ * are more likely to be towards the end of the array.
+ *
+ * @param attempts The mapper finished attemptId array. An attempt ID of -1
indicates that the mapper is not finished.
+ * @return True if all mapper attempts are finished, false otherwise.
+ */
+ def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
+ var i = attempts.length - 1
+ while (i >= 0) {
+ if (attempts(i) < 0) {
+ return false
+ }
+ i -= 1
+ }
+ true
+ }
+}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 88bbcab79..afa544c98 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -743,8 +743,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
return
}
- def isAllMaptaskEnd(shuffleId: Int): Boolean = {
- !commitManager.getMapperAttempts(shuffleId).exists(_ < 0)
+ def areAllMapTasksEnd(shuffleId: Int): Boolean = {
+
ClientUtils.areAllMapperAttemptsFinished(commitManager.getMapperAttempts(shuffleId))
}
shuffleIds.synchronized {
@@ -781,7 +781,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
s"unexpected! unknown appShuffleId $appShuffleId when checking
shuffle deterministic level"))
}
} else {
- shuffleIds.values.map(v => v._1).toSeq.reverse.find(isAllMaptaskEnd)
match {
+ shuffleIds.values.map(v => v._1).toSeq.reverse.find(areAllMapTasksEnd)
match {
case Some(shuffleId) =>
val pbGetShuffleIdResponse = {
logDebug(
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 2877e8562..01f5f1d1e 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import com.google.common.cache.{Cache, CacheBuilder}
-import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
+import org.apache.celeborn.client.{ClientUtils, ShuffleCommittedInfo,
WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers}
import org.apache.celeborn.common.CelebornConf
@@ -245,7 +245,7 @@ class ReducePartitionCommitHandler(
if (attempts(mapId) < 0) {
attempts(mapId) = attemptId
// Mapper with this attemptId finished, also check all other mapper
finished or not.
- (true, areAllMapperAttemptsFinished(attempts))
+ (true, ClientUtils.areAllMapperAttemptsFinished(attempts))
} else {
// Mapper with another attemptId finished, skip this request
(false, false)
@@ -326,15 +326,4 @@ class ReducePartitionCommitHandler(
(timeout <= 0, stageEndTimeout - timeout)
}
-
- private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
- var i = attempts.length - 1
- while (i >= 0) {
- if (attempts(i) < 0) {
- return false
- }
- i -= 1
- }
- true
- }
}