mridulm commented on code in PR #2456:
URL: https://github.com/apache/celeborn/pull/2456#discussion_r1563116925


##########
common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala:
##########
@@ -490,4 +490,171 @@ object PbSerDeUtils {
       pbWorkerEventInfo.getWorkerEventType.getNumber,
       pbWorkerEventInfo.getEventStartTime())
   }
+
+  private def toPackedPartitionLocation(
+      pbPackedLocationsBuilder: PbPackedPartitionLocations.Builder,
+      workerIdList: util.ArrayList[String],
+      mountPointsList: util.ArrayList[String],
+      pLoc: PartitionLocation) = {

Review Comment:
   Add return type to method defn



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -606,14 +606,23 @@ private ConcurrentHashMap<Integer, PartitionLocation> 
registerShuffleInternal(
         StatusCode respStatus = Utils.toStatusCode(response.getStatus());
         if (StatusCode.SUCCESS.equals(respStatus)) {
           ConcurrentHashMap<Integer, PartitionLocation> result = 
JavaUtils.newConcurrentHashMap();
-          for (int i = 0; i < response.getPartitionLocationsList().size(); 
i++) {
-            PartitionLocation partitionLoc =
-                
PbSerDeUtils.fromPbPartitionLocation(response.getPartitionLocationsList().get(i));
-            pushExcludedWorkers.remove(partitionLoc.hostAndPushPort());
-            if (partitionLoc.hasPeer()) {
-              
pushExcludedWorkers.remove(partitionLoc.getPeer().hostAndPushPort());
+          if (response.getPartitionLocationsList().isEmpty()) {
+            Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations 
=
+                PbSerDeUtils.fromPbPackedPartitionLocationsPair(
+                    response.getPackedPartitionLocationsPair());
+            for (PartitionLocation location : locations._1) {
+              result.put(location.getId(), location);
+            }
+          } else {
+            for (int i = 0; i < response.getPartitionLocationsList().size(); 
i++) {

Review Comment:
   nit: I know this was existing code, but pull 
`response.getPartitionLocationsList()` into a local variable ?



##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -606,14 +606,23 @@ private ConcurrentHashMap<Integer, PartitionLocation> 
registerShuffleInternal(
         StatusCode respStatus = Utils.toStatusCode(response.getStatus());
         if (StatusCode.SUCCESS.equals(respStatus)) {
           ConcurrentHashMap<Integer, PartitionLocation> result = 
JavaUtils.newConcurrentHashMap();
-          for (int i = 0; i < response.getPartitionLocationsList().size(); 
i++) {
-            PartitionLocation partitionLoc =
-                
PbSerDeUtils.fromPbPartitionLocation(response.getPartitionLocationsList().get(i));
-            pushExcludedWorkers.remove(partitionLoc.hostAndPushPort());
-            if (partitionLoc.hasPeer()) {
-              
pushExcludedWorkers.remove(partitionLoc.getPeer().hostAndPushPort());
+          if (response.getPartitionLocationsList().isEmpty()) {
+            Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations 
=
+                PbSerDeUtils.fromPbPackedPartitionLocationsPair(
+                    response.getPackedPartitionLocationsPair());
+            for (PartitionLocation location : locations._1) {
+              result.put(location.getId(), location);

Review Comment:
   QQ: Wont `pushExcludedWorkers` not apply here ?



##########
common/src/main/proto/TransportMessages.proto:
##########
@@ -758,3 +763,27 @@ message PbApplicationMeta {
 message PbApplicationMetaRequest {
   string appId = 1;
 }
+
+message PbPackedPartitionLocations {
+  repeated int32 ids = 1;
+  repeated int32 epoches = 2;
+  repeated int32 workerIds = 3;
+  repeated string workerIdsSet = 4;
+  repeated bytes mapIdBitMap = 5;
+  repeated int32 types = 6;
+  repeated int32 mountPoints = 7;
+  repeated string mountPointsSet = 8;
+  repeated bool finalResult = 9 ;
+  repeated string filePaths = 10;
+  repeated int32 availableStorageTypes = 11;
+}
+
+message PbPackedPartitionLocationsPair {
+  PbPackedPartitionLocations locations = 1;
+  repeated int32 replicates = 2;

Review Comment:
   `replicas` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to