mridulm commented on code in PR #2456:
URL: https://github.com/apache/celeborn/pull/2456#discussion_r1591956378
##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -512,7 +512,7 @@ private ConcurrentHashMap<Integer, PartitionLocation>
registerShuffle(
numPartitions,
() ->
lifecycleManagerRef.askSync(
- RegisterShuffle$.MODULE$.apply(shuffleId, numMappers,
numPartitions),
+ RegisterShuffle$.MODULE$.apply(shuffleId, numMappers,
numPartitions, true),
Review Comment:
Since we are using default value, when `packed` is matching default value,
do we want to drop specifying it ? (here and in subsequent files)
##########
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:
##########
@@ -490,4 +490,170 @@ object PbSerDeUtils {
pbWorkerEventInfo.getWorkerEventType.getNumber,
pbWorkerEventInfo.getEventStartTime())
}
+
+ private def toPackedPartitionLocation(
+ pbPackedLocationsBuilder: PbPackedPartitionLocations.Builder,
+ workerIdIndex: Map[String, Int],
+ mountPointsIndex: Map[String, Int],
+ location: PartitionLocation): PbPackedPartitionLocations.Builder = {
+ pbPackedLocationsBuilder.addIds(location.getId)
+ pbPackedLocationsBuilder.addEpoches(location.getEpoch)
+
pbPackedLocationsBuilder.addWorkerIds(workerIdIndex(location.getWorker.toUniqueId()))
+ pbPackedLocationsBuilder.addMapIdBitMap(
+ Utils.roaringBitmapToByteString(location.getMapIdBitMap))
+ pbPackedLocationsBuilder.addTypes(location.getStorageInfo.getType.getValue)
+ pbPackedLocationsBuilder.addMountPoints(
+ mountPointsIndex(location.getStorageInfo.getMountPoint))
+
pbPackedLocationsBuilder.addFinalResult(location.getStorageInfo.isFinalResult)
+ if (location.getStorageInfo.getFilePath != null &&
location.getStorageInfo.getFilePath.nonEmpty) {
+ pbPackedLocationsBuilder.addFilePaths(location.getStorageInfo.getFilePath
+ .substring(location.getStorageInfo.getMountPoint.length))
+ } else {
+ pbPackedLocationsBuilder.addFilePaths("")
+ }
+
pbPackedLocationsBuilder.addAvailableStorageTypes(location.getStorageInfo.availableStorageTypes)
+ pbPackedLocationsBuilder.addModes(location.getMode.mode())
+ }
+
+ def toPbPackedPartitionLocationsPair(inputLocations: List[PartitionLocation])
+ : PbPackedPartitionLocationsPair = {
+ val packedLocationPairsBuilder =
PbPackedPartitionLocationsPair.newBuilder()
+ val packedLocationsBuilder = PbPackedPartitionLocations.newBuilder()
+
+ val implicateLocations = inputLocations.map(_.getPeer).filterNot(_ == null)
+
+ val allLocations = (inputLocations ++ implicateLocations)
+ val workerIdList = new util.ArrayList[String](
+ allLocations.map(_.getWorker.toUniqueId()).toSet.asJava)
+ val workerIdIndex = workerIdList.asScala.zipWithIndex.toMap
+ val mountPointsList = new util.ArrayList[String](
+ allLocations.map(
+ _.getStorageInfo.getMountPoint).toSet.asJava)
+ val mountPointsIndex = mountPointsList.asScala.zipWithIndex.toMap
+
+ packedLocationsBuilder.addAllWorkerIdsSet(workerIdList)
+ packedLocationsBuilder.addAllMountPointsSet(mountPointsList)
+
+ val locationIndexes = allLocations.map(_.hashCode()).zipWithIndex.toMap
+
+ for (location <- allLocations) {
+ toPackedPartitionLocation(
+ packedLocationsBuilder,
+ workerIdIndex,
+ mountPointsIndex,
+ location)
+ if (location.getPeer != null) {
+ packedLocationPairsBuilder.addPeerIndexes(
+ locationIndexes(location.getPeer.hashCode()))
+ } else {
+ packedLocationPairsBuilder.addPeerIndexes(-1)
Review Comment:
`PartitionLocation.hashCode` can be `-1` - and yet be a valid peer.
If we want to make the assumption that `-1` means no peer, we should enforce
it in `PartitionLocation.hashCode`.
For example, change it to something like:
```
public int hashCode() {
int code = (id + epoch + host + rpcPort + pushPort +
fetchPort).hashCode();
// assuming MISSING_PEER_ID == -1
return MISSING_PEER_ID == code ? 0 : code;
}
```
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -313,6 +315,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
val shuffleId = pb.getShuffleId
val numMappers = pb.getNumMappers
val numPartitions = pb.getNumPartitions
+ val packed = pb.getPacked
Review Comment:
remove ?
##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -111,7 +111,7 @@ class ReducePartitionCommitHandler(
// In case of stage with no shuffle data, register shuffle will not be
called,
// so here we still need to check null.
if (requests != null && !requests.isEmpty) {
- requests.asScala.foreach(replyGetReducerFileGroup(_, shuffleId))
+ requests.asScala.foreach(r => replyGetReducerFileGroup(r, shuffleId))
Review Comment:
revert ?
##########
client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala:
##########
@@ -269,13 +269,16 @@ class ReducePartitionCommitHandler(
}
}
- private def replyGetReducerFileGroup(context: RpcCallContext, shuffleId:
Int): Unit = {
+ private def replyGetReducerFileGroup(
+ context: RpcCallContext,
+ shuffleId: Int): Unit = {
if (isStageDataLost(shuffleId)) {
context.reply(
GetReducerFileGroupResponse(
StatusCode.SHUFFLE_DATA_LOST,
JavaUtils.newConcurrentHashMap(),
- Array.empty))
+ Array.empty,
+ new util.HashSet[Integer]()))
Review Comment:
`Collections.emptySet[Integer]()` ? (here and below)
##########
common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala:
##########
@@ -179,6 +188,7 @@ object ControlMessages extends Logging {
maxWorkers: Int,
availableStorageTypes: Int,
excludedWorkerSet: Set[WorkerInfo] = Set.empty,
+ packed: Boolean = false,
Review Comment:
Do we want to consistently apply `packed: Boolean = true` for all cases ?
Right now, it is a mix of `true`, `false`, and no default value.
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -136,7 +136,9 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
locations.asScala.foreach(location => map.put(location.getId, location))
}
- case class RegisterCallContext(context: RpcCallContext, partitionId: Int =
-1) {
+ case class RegisterCallContext(
+ context: RpcCallContext,
+ partitionId: Int = -1) {
Review Comment:
revert ?
We seem to have a bunch of whitespace/newline changes ... is this due to
some spotless update ? Or else, can we revert them ?
##########
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:
##########
@@ -490,4 +490,170 @@ object PbSerDeUtils {
pbWorkerEventInfo.getWorkerEventType.getNumber,
pbWorkerEventInfo.getEventStartTime())
}
+
+ private def toPackedPartitionLocation(
+ pbPackedLocationsBuilder: PbPackedPartitionLocations.Builder,
+ workerIdIndex: Map[String, Int],
+ mountPointsIndex: Map[String, Int],
+ location: PartitionLocation): PbPackedPartitionLocations.Builder = {
+ pbPackedLocationsBuilder.addIds(location.getId)
+ pbPackedLocationsBuilder.addEpoches(location.getEpoch)
+
pbPackedLocationsBuilder.addWorkerIds(workerIdIndex(location.getWorker.toUniqueId()))
+ pbPackedLocationsBuilder.addMapIdBitMap(
+ Utils.roaringBitmapToByteString(location.getMapIdBitMap))
+ pbPackedLocationsBuilder.addTypes(location.getStorageInfo.getType.getValue)
+ pbPackedLocationsBuilder.addMountPoints(
+ mountPointsIndex(location.getStorageInfo.getMountPoint))
+
pbPackedLocationsBuilder.addFinalResult(location.getStorageInfo.isFinalResult)
+ if (location.getStorageInfo.getFilePath != null &&
location.getStorageInfo.getFilePath.nonEmpty) {
+ pbPackedLocationsBuilder.addFilePaths(location.getStorageInfo.getFilePath
+ .substring(location.getStorageInfo.getMountPoint.length))
+ } else {
+ pbPackedLocationsBuilder.addFilePaths("")
+ }
+
pbPackedLocationsBuilder.addAvailableStorageTypes(location.getStorageInfo.availableStorageTypes)
+ pbPackedLocationsBuilder.addModes(location.getMode.mode())
+ }
+
+ def toPbPackedPartitionLocationsPair(inputLocations: List[PartitionLocation])
+ : PbPackedPartitionLocationsPair = {
+ val packedLocationPairsBuilder =
PbPackedPartitionLocationsPair.newBuilder()
+ val packedLocationsBuilder = PbPackedPartitionLocations.newBuilder()
+
+ val implicateLocations = inputLocations.map(_.getPeer).filterNot(_ == null)
+
+ val allLocations = (inputLocations ++ implicateLocations)
+ val workerIdList = new util.ArrayList[String](
+ allLocations.map(_.getWorker.toUniqueId()).toSet.asJava)
+ val workerIdIndex = workerIdList.asScala.zipWithIndex.toMap
+ val mountPointsList = new util.ArrayList[String](
+ allLocations.map(
+ _.getStorageInfo.getMountPoint).toSet.asJava)
+ val mountPointsIndex = mountPointsList.asScala.zipWithIndex.toMap
+
+ packedLocationsBuilder.addAllWorkerIdsSet(workerIdList)
+ packedLocationsBuilder.addAllMountPointsSet(mountPointsList)
+
+ val locationIndexes = allLocations.map(_.hashCode()).zipWithIndex.toMap
+
+ for (location <- allLocations) {
+ toPackedPartitionLocation(
+ packedLocationsBuilder,
+ workerIdIndex,
+ mountPointsIndex,
+ location)
+ if (location.getPeer != null) {
+ packedLocationPairsBuilder.addPeerIndexes(
+ locationIndexes(location.getPeer.hashCode()))
+ } else {
+ packedLocationPairsBuilder.addPeerIndexes(-1)
+ }
+ }
+
+ packedLocationPairsBuilder.setInputLocationSize(inputLocations.size)
+
packedLocationPairsBuilder.setLocations(packedLocationsBuilder.build()).build()
+ }
+
+ def fromPbPackedPartitionLocationsPair(pbPartitionLocationsPair:
PbPackedPartitionLocationsPair)
+ : (util.List[PartitionLocation], util.List[PartitionLocation]) = {
+ val primaryLocations = new util.ArrayList[PartitionLocation]()
+ val replicateLocations = new util.ArrayList[PartitionLocation]()
+ val pbPackedPartitionLocations = pbPartitionLocationsPair.getLocations
+ val inputLocationSize = pbPartitionLocationsPair.getInputLocationSize
+ val idList = pbPackedPartitionLocations.getIdsList
+ val locationCount = idList.size()
+ var index = 0
+
+ val locations = new util.ArrayList[PartitionLocation]()
+ while (index < locationCount) {
+ val loc =
+ fromPackedPartitionLocations(pbPackedPartitionLocations, index)
+ if (index < inputLocationSize) {
Review Comment:
Do we need this ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]