This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch issue-1102 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit a00a66525be9078c5127ad8fcca04386341bab0e Author: xiyu.zk <[email protected]> AuthorDate: Wed Oct 25 15:33:13 2023 +0800 [CELEBORN-1102] Optimize the performance of getAllPrimaryLocationsWithMinEpoch --- .../apache/celeborn/client/LifecycleManager.scala | 2 +- .../common/meta/ShufflePartitionLocationInfo.scala | 24 +++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index ea1517a21..bd8527db3 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -319,7 +319,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends val initialLocs = workerSnapshots(shuffleId) .values() .asScala - .flatMap(_.getAllPrimaryLocationsWithMinEpoch().asScala) + .flatMap(_.getAllPrimaryLocationsWithMinEpoch()) .filter(p => (partitionType == PartitionType.REDUCE && p.getEpoch == 0) || (partitionType == PartitionType.MAP && p.getId == partitionId)) .toArray diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala index 1cf318a58..12e848830 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala @@ -21,6 +21,7 @@ import java.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import org.apache.celeborn.common.protocol.PartitionLocation @@ -29,7 +30,6 @@ class ShufflePartitionLocationInfo { private val primaryPartitionLocations = new PartitionInfo private val replicaPartitionLocations = new PartitionInfo - implicit val partitionOrdering: Ordering[PartitionLocation] = Ordering.by(_.getEpoch) def addPrimaryPartitions(primaryLocations: util.List[PartitionLocation]) = { addPartitions(primaryPartitionLocations, primaryLocations) @@ -89,10 +89,24 @@ class ShufflePartitionLocationInfo { } } - def getAllPrimaryLocationsWithMinEpoch(): util.Set[PartitionLocation] = { - primaryPartitionLocations.values().asScala.map { partitionLocations => - partitionLocations.asScala.min - }.toSet.asJava + def getAllPrimaryLocationsWithMinEpoch(): ArrayBuffer[PartitionLocation] = { + val set = new ArrayBuffer[PartitionLocation](primaryPartitionLocations.size()) + val locationsIterator = primaryPartitionLocations.values().iterator() + while (locationsIterator.hasNext) { + val locationIterator = locationsIterator.next().iterator() + var min: PartitionLocation = null + if (locationIterator.hasNext) { + min = locationIterator.next(); + } + while (locationIterator.hasNext) { + val next = locationIterator.next() + if (min.getEpoch > next.getEpoch) { + min = next; + } + } + set += min; + } + set } private def addPartitions(
