This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c84733fcf [CELEBORN-1725][FOLLOWUP] Optimize `isAllMapTasksEnd`
performance
c84733fcf is described below
commit c84733fcf8d463f2c0e35c33747cbfbbc68da411
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 29 17:58:08 2024 +0800
[CELEBORN-1725][FOLLOWUP] Optimize `isAllMapTasksEnd` performance
### What changes were proposed in this pull request?
Followup for https://github.com/apache/celeborn/pull/2905,
using the same logic to optimize `isAllMapTasksEnd` method.
### Why are the changes needed?
Address comments:
https://github.com/apache/celeborn/pull/2905#pullrequestreview-2457905880
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Same logic with https://github.com/apache/celeborn/pull/2905
Closes #2959 from turboFei/celeborn_1725_follow.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/client/ClientUtils.scala | 40 ++++++++++++++++++++++
.../apache/celeborn/client/LifecycleManager.scala | 6 ++--
.../commit/ReducePartitionCommitHandler.scala | 15 ++------
3 files changed, 45 insertions(+), 16 deletions(-)
diff --git a/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
b/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
new file mode 100644
index 000000000..d7dccb941
--- /dev/null
+++ b/client/src/main/scala/org/apache/celeborn/client/ClientUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.client
+
+object ClientUtils {
+
+ /**
+ * Check if all the mapper attempts are finished. If any of the attempts is
not finished, return false.
+ * This method checks the attempts array in reverse order, which can be
faster if the unfinished attempts
+ * are more likely to be towards the end of the array.
+ *
+ * @param attempts The mapper finished attemptId array. An attempt ID of -1
indicates that the mapper is not finished.
+ * @return True if all mapper attempts are finished, false otherwise.
+ */
+ def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
+ var i = attempts.length - 1
+ while (i >= 0) {
+ if (attempts(i) < 0) {
+ return false
+ }
+ i -= 1
+ }
+ true
+ }
+}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 056747e22..dbb1f0b3e 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -862,8 +862,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
return
}
- def isAllMaptaskEnd(shuffleId: Int): Boolean = {
- !commitManager.getMapperAttempts(shuffleId).exists(_ < 0)
+ def areAllMapTasksEnd(shuffleId: Int): Boolean = {
+
ClientUtils.areAllMapperAttemptsFinished(commitManager.getMapperAttempts(shuffleId))
}
shuffleIds.synchronized {
@@ -912,7 +912,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
}
} else {
shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
- isAllMaptaskEnd) match {
+ areAllMapTasksEnd) match {
case Some(shuffleId) =>
val pbGetShuffleIdResponse = {
logDebug(
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 9148b9501..a2b7ac11a 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import com.google.common.cache.{Cache, CacheBuilder}
-import org.apache.celeborn.client.{ShuffleCommittedInfo, WorkerStatusTracker}
+import org.apache.celeborn.client.{ClientUtils, ShuffleCommittedInfo,
WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers,
ShuffleFailedWorkers}
import org.apache.celeborn.common.CelebornConf
@@ -251,7 +251,7 @@ class ReducePartitionCommitHandler(
if (attempts(mapId) < 0) {
attempts(mapId) = attemptId
// Mapper with this attemptId finished, also check all other mapper
finished or not.
- (true, areAllMapperAttemptsFinished(attempts))
+ (true, ClientUtils.areAllMapperAttemptsFinished(attempts))
} else {
// Mapper with another attemptId finished, skip this request
(false, false)
@@ -336,15 +336,4 @@ class ReducePartitionCommitHandler(
(timeout <= 0, stageEndTimeout - timeout)
}
-
- private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
- var i = attempts.length - 1
- while (i >= 0) {
- if (attempts(i) < 0) {
- return false
- }
- i -= 1
- }
- true
- }
}