This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new dec0f31de [CELEBORN-1769] Fix packed partition location cause 
GetReducerFileGroupResponse lose location
dec0f31de is described below

commit dec0f31dede527a31f4b8b5b7ca1bafec7c50fb1
Author: mingji <[email protected]>
AuthorDate: Tue Dec 10 18:03:00 2024 +0800

    [CELEBORN-1769] Fix packed partition location cause 
GetReducerFileGroupResponse lose location
    
    ### What changes were proposed in this pull request?
    Fix the issue of losing the primary location when parsing 
`GetReducerFileGroupResponse` from `LifecycleManager`.
    
    ### Why are the changes needed?
    In previous optimizations, I introduced packed partition locations to 
reduce the size of RPC calls, based on the assumption that primary partition 
locations would always be available. However, in some test scenarios where data 
replication is enabled and workers are randomly terminated, the primary 
location may be lost while the replica location remains. This causes the 
replica locations to be ignored which will cause data loss.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA and cluster.
    
    Closes #2986 from FMX/b1769.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 069e5b6c18e11d53c1ade6e4a85484856f22fba9)
    Signed-off-by: Shuang <[email protected]>
---
 .../common/protocol/message/ControlMessages.scala  | 15 +++-
 .../celeborn/common/util/PbSerDeUtilsTest.scala    | 99 +++++++++++++++++++++-
 2 files changed, 110 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 6870ba1ca..f5cd7ff74 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -1063,10 +1063,21 @@ object ControlMessages extends Logging {
           .parseFrom(message.getPayload)
         val fileGroup = 
pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
           case (partitionId, fileGroup) =>
+            val locationsSet: java.util.Set[PartitionLocation] =
+              new util.LinkedHashSet[PartitionLocation]()
+
+            // In PbGetReducerFileGroupResponse, location with same
+            // uniqueId will not be put into the location set
+            // check out the logic 
@org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles
+            // This is why we should join the primary location list and 
replica location list
+
+            val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair(
+              fileGroup.getPartitionLocationsPair)
+            locationsSet.addAll(pris)
+            locationsSet.addAll(reps)
             (
               partitionId,
-              PbSerDeUtils.fromPbPackedPartitionLocationsPair(
-                fileGroup.getPartitionLocationsPair)._1.asScala.toSet.asJava)
+              locationsSet)
         }.asJava
 
         val attempts = 
pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 485662cda..94c73204a 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -21,15 +21,18 @@ import java.io.File
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, 
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, 
WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta._
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
-import 
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
+import org.apache.celeborn.common.protocol.PartitionLocation.Mode
+import org.apache.celeborn.common.protocol.message.{ControlMessages, 
StatusCode}
+import 
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
 WorkerResource}
 import org.apache.celeborn.common.quota.ResourceConsumption
 import 
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
 toPbPackedPartitionLocationsPair}
 
@@ -466,4 +469,96 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
     testSerializationPerformance(100)
   }
 
+  test("GetReduceFileGroup with primary and replica locations") {
+    val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+      JavaUtils.newConcurrentHashMap()
+    val locationSet = new util.LinkedHashSet[PartitionLocation]()
+    val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+    locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    shuffleMap.put(1, locationSet)
+    val attempts = Array.fill(10)(10)
+    val succeedPartitions = 
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+    val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+      StatusCode.SUCCESS,
+      shuffleMap,
+      attempts,
+      succeedPartitions)
+
+    val transportGetReducerFileGroup =
+      ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+    val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+      ControlMessages.fromTransportMessage(
+        transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+    val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+    locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+    assert(uniqueIds.isEmpty)
+  }
+
+  test("GetReduceFileGroup with primary location only") {
+    val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+      JavaUtils.newConcurrentHashMap()
+    val locationSet = new util.LinkedHashSet[PartitionLocation]()
+    val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+    locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+    shuffleMap.put(1, locationSet)
+    val attempts = Array.fill(10)(10)
+    val succeedPartitions = 
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+    val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+      StatusCode.SUCCESS,
+      shuffleMap,
+      attempts,
+      succeedPartitions)
+
+    val transportGetReducerFileGroup =
+      ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+    val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+      ControlMessages.fromTransportMessage(
+        transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+    val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+    locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+    assert(uniqueIds.isEmpty)
+  }
+
+  test("GetReduceFileGroup with replica location only") {
+    val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+      JavaUtils.newConcurrentHashMap()
+    val locationSet = new util.LinkedHashSet[PartitionLocation]()
+    val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+    locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+    shuffleMap.put(1, locationSet)
+    val attempts = Array.fill(10)(10)
+    val succeedPartitions = 
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+    val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+      StatusCode.SUCCESS,
+      shuffleMap,
+      attempts,
+      succeedPartitions)
+
+    val transportGetReducerFileGroup =
+      ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+    val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+      ControlMessages.fromTransportMessage(
+        transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+    val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+    locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+    assert(uniqueIds.isEmpty)
+  }
 }

Reply via email to