This is an automated email from the ASF dual-hosted git repository. kerwinzhang pushed a commit to branch 0.3.1-speed in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit dea208deacbd2a9bc2301dfd4bb9bd63fdcb83b6 Author: xiyu.zk <[email protected]> AuthorDate: Wed Oct 25 15:33:13 2023 +0800 test --- .../apache/celeborn/client/LifecycleManager.scala | 2 +- .../common/meta/ShufflePartitionLocationInfo.scala | 27 ++++++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 807c4a660..144fe837f 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -319,7 +319,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends val initialLocs = workerSnapshots(shuffleId) .values() .asScala - .flatMap(_.getAllPrimaryLocationsWithMinEpoch().asScala) + .flatMap(_.getAllPrimaryLocationsWithMinEpoch()) .filter(p => (partitionType == PartitionType.REDUCE && p.getEpoch == 0) || (partitionType == PartitionType.MAP && p.getId == partitionId)) .toArray diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala index 1cf318a58..240560d2a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala @@ -19,17 +19,16 @@ package org.apache.celeborn.common.meta import java.util import java.util.concurrent.ConcurrentHashMap - import scala.collection.JavaConverters._ - import org.apache.celeborn.common.protocol.PartitionLocation +import scala.collection.mutable.ArrayBuffer + class ShufflePartitionLocationInfo { type PartitionInfo = ConcurrentHashMap[Int, util.Set[PartitionLocation]] private val primaryPartitionLocations = new PartitionInfo private val replicaPartitionLocations = new PartitionInfo - implicit val partitionOrdering: Ordering[PartitionLocation] = Ordering.by(_.getEpoch) def addPrimaryPartitions(primaryLocations: util.List[PartitionLocation]) = { addPartitions(primaryPartitionLocations, primaryLocations) @@ -89,10 +88,24 @@ class ShufflePartitionLocationInfo { } } - def getAllPrimaryLocationsWithMinEpoch(): util.Set[PartitionLocation] = { - primaryPartitionLocations.values().asScala.map { partitionLocations => - partitionLocations.asScala.min - }.toSet.asJava + def getAllPrimaryLocationsWithMinEpoch(): ArrayBuffer[PartitionLocation] = { + val set = new ArrayBuffer[PartitionLocation](primaryPartitionLocations.size()) + val ite = primaryPartitionLocations.values().iterator() + while (ite.hasNext) { + val ite1 = ite.next().iterator() + var min: PartitionLocation = null + if (ite1.hasNext) { + min = ite1.next(); + } + while (ite1.hasNext) { + val next = ite1.next() + if (min.getEpoch > next.getEpoch) { + min = next; + } + } + set += min; + } + set } private def addPartitions(
