This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 069e5b6c1 [CELEBORN-1769] Fix packed partition location cause 
GetReducerFileGroupResponse lose location
069e5b6c1 is described below

commit 069e5b6c18e11d53c1ade6e4a85484856f22fba9
Author: mingji <[email protected]>
AuthorDate: Tue Dec 10 18:03:00 2024 +0800

    [CELEBORN-1769] Fix packed partition location cause 
GetReducerFileGroupResponse lose location
    
    ### What changes were proposed in this pull request?
    Fix the issue of losing the primary location when parsing 
`GetReducerFileGroupResponse` from `LifecycleManager`.
    
    ### Why are the changes needed?
    In previous optimizations, I introduced packed partition locations to 
reduce the size of RPC calls, based on the assumption that primary partition 
locations would always be available. However, in some test scenarios where data 
replication is enabled and workers are randomly terminated, the primary 
location may be lost while the replica location remains. This causes the 
replica locations to be ignored which will cause data loss.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA and cluster.
    
    Closes #2986 from FMX/b1769.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../common/protocol/message/ControlMessages.scala  | 15 +++-
 .../celeborn/common/util/PbSerDeUtilsTest.scala    | 99 +++++++++++++++++++++-
 2 files changed, 110 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 3c9b22c54..0e465196d 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -1158,10 +1158,21 @@ object ControlMessages extends Logging {
           .parseFrom(message.getPayload)
         val fileGroup = 
pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
           case (partitionId, fileGroup) =>
+            val locationsSet: java.util.Set[PartitionLocation] =
+              new util.LinkedHashSet[PartitionLocation]()
+
+            // In PbGetReducerFileGroupResponse, location with same
+            // uniqueId will not be put into the location set
+            // check out the logic 
@org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles
+            // This is why we should join the primary location list and 
replica location list
+
+            val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair(
+              fileGroup.getPartitionLocationsPair)
+            locationsSet.addAll(pris)
+            locationsSet.addAll(reps)
             (
               partitionId,
-              PbSerDeUtils.fromPbPackedPartitionLocationsPair(
-                fileGroup.getPartitionLocationsPair)._1.asScala.toSet.asJava)
+              locationsSet)
         }.asJava
 
         val attempts = 
pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index bd886e931..9e1d442fb 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -21,15 +21,18 @@ import java.io.File
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, 
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, 
WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta._
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
-import 
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
+import org.apache.celeborn.common.protocol.PartitionLocation.Mode
+import org.apache.celeborn.common.protocol.message.{ControlMessages, 
StatusCode}
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
 WorkerResource}
 import org.apache.celeborn.common.quota.ResourceConsumption
 import 
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
 toPbPackedPartitionLocationsPair}
 
@@ -470,4 +473,96 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     testSerializationPerformance(100)
   }
 
+  test("GetReduceFileGroup with primary and replica locations") {
+    val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+      JavaUtils.newConcurrentHashMap()
+    val locationSet = new util.LinkedHashSet[PartitionLocation]()
+    val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+    locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    shuffleMap.put(1, locationSet)
+    val attempts = Array.fill(10)(10)
+    val succeedPartitions = 
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+    val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+      StatusCode.SUCCESS,
+      shuffleMap,
+      attempts,
+      succeedPartitions)
+
+    val transportGetReducerFileGroup =
+      ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+    val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+      ControlMessages.fromTransportMessage(
+        transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+    val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+    locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+    assert(uniqueIds.isEmpty)
+  }
+
+  test("GetReduceFileGroup with primary location only") {
+    val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+      JavaUtils.newConcurrentHashMap()
+    val locationSet = new util.LinkedHashSet[PartitionLocation]()
+    val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+    locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    shuffleMap.put(1, locationSet)
+    val attempts = Array.fill(10)(10)
+    val succeedPartitions = 
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+    val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+      StatusCode.SUCCESS,
+      shuffleMap,
+      attempts,
+      succeedPartitions)
+
+    val transportGetReducerFileGroup =
+      ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+    val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+      ControlMessages.fromTransportMessage(
+        transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+    val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+    locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+    assert(uniqueIds.isEmpty)
+  }
+
+  test("GetReduceFileGroup with replica location only") {
+    val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+      JavaUtils.newConcurrentHashMap()
+    val locationSet = new util.LinkedHashSet[PartitionLocation]()
+    val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+    locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    shuffleMap.put(1, locationSet)
+    val attempts = Array.fill(10)(10)
+    val succeedPartitions = 
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+    val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+      StatusCode.SUCCESS,
+      shuffleMap,
+      attempts,
+      succeedPartitions)
+
+    val transportGetReducerFileGroup =
+      ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+    val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+      ControlMessages.fromTransportMessage(
+        transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+    val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+    locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+    assert(uniqueIds.isEmpty)
+  }
 }

Reply via email to