This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch 0.3.1-speed
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit dea208deacbd2a9bc2301dfd4bb9bd63fdcb83b6
Author: xiyu.zk <[email protected]>
AuthorDate: Wed Oct 25 15:33:13 2023 +0800

    test
---
 .../apache/celeborn/client/LifecycleManager.scala  |  2 +-
 .../common/meta/ShufflePartitionLocationInfo.scala | 27 ++++++++++++++++------
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 807c4a660..144fe837f 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -319,7 +319,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
           val initialLocs = workerSnapshots(shuffleId)
             .values()
             .asScala
-            .flatMap(_.getAllPrimaryLocationsWithMinEpoch().asScala)
+            .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
             .filter(p =>
               (partitionType == PartitionType.REDUCE && p.getEpoch == 0) || 
(partitionType == PartitionType.MAP && p.getId == partitionId))
             .toArray
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
 
b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
index 1cf318a58..240560d2a 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala
@@ -19,17 +19,16 @@ package org.apache.celeborn.common.meta
 
 import java.util
 import java.util.concurrent.ConcurrentHashMap
-
 import scala.collection.JavaConverters._
-
 import org.apache.celeborn.common.protocol.PartitionLocation
 
+import scala.collection.mutable.ArrayBuffer
+
 class ShufflePartitionLocationInfo {
   type PartitionInfo = ConcurrentHashMap[Int, util.Set[PartitionLocation]]
 
   private val primaryPartitionLocations = new PartitionInfo
   private val replicaPartitionLocations = new PartitionInfo
-  implicit val partitionOrdering: Ordering[PartitionLocation] = 
Ordering.by(_.getEpoch)
 
   def addPrimaryPartitions(primaryLocations: util.List[PartitionLocation]) = {
     addPartitions(primaryPartitionLocations, primaryLocations)
@@ -89,10 +88,24 @@ class ShufflePartitionLocationInfo {
     }
   }
 
-  def getAllPrimaryLocationsWithMinEpoch(): util.Set[PartitionLocation] = {
-    primaryPartitionLocations.values().asScala.map { partitionLocations =>
-      partitionLocations.asScala.min
-    }.toSet.asJava
+  def getAllPrimaryLocationsWithMinEpoch(): ArrayBuffer[PartitionLocation] = {
+    val set = new 
ArrayBuffer[PartitionLocation](primaryPartitionLocations.size())
+    val ite = primaryPartitionLocations.values().iterator()
+    while (ite.hasNext) {
+      val ite1 = ite.next().iterator()
+      var min: PartitionLocation = null
+      if (ite1.hasNext) {
+        min = ite1.next();
+      }
+      while (ite1.hasNext) {
+        val next = ite1.next()
+        if (min.getEpoch > next.getEpoch) {
+          min = next;
+        }
+      }
+      set += min;
+    }
+    set
   }
 
   private def addPartitions(

Reply via email to