This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 069e5b6c1 [CELEBORN-1769] Fix packed partition location cause
GetReducerFileGroupResponse lose location
069e5b6c1 is described below
commit 069e5b6c18e11d53c1ade6e4a85484856f22fba9
Author: mingji <[email protected]>
AuthorDate: Tue Dec 10 18:03:00 2024 +0800
[CELEBORN-1769] Fix packed partition location cause
GetReducerFileGroupResponse lose location
### What changes were proposed in this pull request?
Fix the issue of losing the primary location when parsing
`GetReducerFileGroupResponse` from `LifecycleManager`.
### Why are the changes needed?
In previous optimizations, I introduced packed partition locations to
reduce the size of RPC calls, based on the assumption that primary partition
locations would always be available. However, in some test scenarios where data
replication is enabled and workers are randomly terminated, the primary
location may be lost while the replica location remains. This causes the
replica locations to be ignored which will cause data loss.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes #2986 from FMX/b1769.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../common/protocol/message/ControlMessages.scala | 15 +++-
.../celeborn/common/util/PbSerDeUtilsTest.scala | 99 +++++++++++++++++++++-
2 files changed, 110 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 3c9b22c54..0e465196d 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -1158,10 +1158,21 @@ object ControlMessages extends Logging {
.parseFrom(message.getPayload)
val fileGroup =
pbGetReducerFileGroupResponse.getFileGroupsMap.asScala.map {
case (partitionId, fileGroup) =>
+ val locationsSet: java.util.Set[PartitionLocation] =
+ new util.LinkedHashSet[PartitionLocation]()
+
+ // In PbGetReducerFileGroupResponse, location with same
+ // uniqueId will not be put into the location set
+ // check out the logic
@org.apache.celeborn.client.commit.CommitHandler.parallelCommitFiles
+ // This is why we should join the primary location list and
replica location list
+
+ val (pris, reps) = PbSerDeUtils.fromPbPackedPartitionLocationsPair(
+ fileGroup.getPartitionLocationsPair)
+ locationsSet.addAll(pris)
+ locationsSet.addAll(reps)
(
partitionId,
- PbSerDeUtils.fromPbPackedPartitionLocationsPair(
- fileGroup.getPartitionLocationsPair)._1.asScala.toSet.asJava)
+ locationsSet)
}.asJava
val attempts =
pbGetReducerFileGroupResponse.getAttemptsList.asScala.map(_.toInt).toArray
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index bd886e931..9e1d442fb 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -21,15 +21,18 @@ import java.io.File
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.Random
import org.apache.hadoop.shaded.org.apache.commons.lang3.RandomStringUtils
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
-import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo,
DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo,
WorkerInfo, WorkerStatus}
+import org.apache.celeborn.common.meta._
import org.apache.celeborn.common.protocol.{PartitionLocation,
PbPackedWorkerResource, PbWorkerResource, StorageInfo}
-import
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
+import org.apache.celeborn.common.protocol.PartitionLocation.Mode
+import org.apache.celeborn.common.protocol.message.{ControlMessages,
StatusCode}
+import
org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse,
WorkerResource}
import org.apache.celeborn.common.quota.ResourceConsumption
import
org.apache.celeborn.common.util.PbSerDeUtils.{fromPbPackedPartitionLocationsPair,
toPbPackedPartitionLocationsPair}
@@ -470,4 +473,96 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
testSerializationPerformance(100)
}
+ test("GetReduceFileGroup with primary and replica locations") {
+ val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+ JavaUtils.newConcurrentHashMap()
+ val locationSet = new util.LinkedHashSet[PartitionLocation]()
+ val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+ locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ shuffleMap.put(1, locationSet)
+ val attempts = Array.fill(10)(10)
+ val succeedPartitions =
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+ val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+ StatusCode.SUCCESS,
+ shuffleMap,
+ attempts,
+ succeedPartitions)
+
+ val transportGetReducerFileGroup =
+ ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+ val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+ ControlMessages.fromTransportMessage(
+ transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+ val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+ locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+ assert(uniqueIds.isEmpty)
+ }
+
+ test("GetReduceFileGroup with primary location only") {
+ val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+ JavaUtils.newConcurrentHashMap()
+ val locationSet = new util.LinkedHashSet[PartitionLocation]()
+ val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+ locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.PRIMARY))
+ shuffleMap.put(1, locationSet)
+ val attempts = Array.fill(10)(10)
+ val succeedPartitions =
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+ val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+ StatusCode.SUCCESS,
+ shuffleMap,
+ attempts,
+ succeedPartitions)
+
+ val transportGetReducerFileGroup =
+ ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+ val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+ ControlMessages.fromTransportMessage(
+ transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+ val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+ locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+ assert(uniqueIds.isEmpty)
+ }
+
+ test("GetReduceFileGroup with replica location only") {
+ val shuffleMap: util.Map[Integer, util.Set[PartitionLocation]] =
+ JavaUtils.newConcurrentHashMap()
+ val locationSet = new util.LinkedHashSet[PartitionLocation]()
+ val uniqueIds = mutable.Set("0-0", "1-0", "2-0", "3-0", "4-0")
+ locationSet.add(new PartitionLocation(0, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ locationSet.add(new PartitionLocation(1, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ locationSet.add(new PartitionLocation(2, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ locationSet.add(new PartitionLocation(3, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ locationSet.add(new PartitionLocation(4, 0, "h", 1, 1, 1, 1, Mode.REPLICA))
+ shuffleMap.put(1, locationSet)
+ val attempts = Array.fill(10)(10)
+ val succeedPartitions =
Array.fill(10)(10).map(java.lang.Integer.valueOf).toSet.asJava
+
+ val GetReducerFileGroupResponseMsg = GetReducerFileGroupResponse(
+ StatusCode.SUCCESS,
+ shuffleMap,
+ attempts,
+ succeedPartitions)
+
+ val transportGetReducerFileGroup =
+ ControlMessages.toTransportMessage(GetReducerFileGroupResponseMsg)
+ val fromTransportGetReducerFileGroup: GetReducerFileGroupResponse =
+ ControlMessages.fromTransportMessage(
+ transportGetReducerFileGroup).asInstanceOf[GetReducerFileGroupResponse]
+
+ val locations = fromTransportGetReducerFileGroup.fileGroup.get(1)
+ locations.asScala.foreach(p => uniqueIds.remove(p.getUniqueId))
+ assert(uniqueIds.isEmpty)
+ }
}