maobaolong commented on code in PR #2182:
URL: 
https://github.com/apache/incubator-uniffle/pull/2182#discussion_r1806061199


##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo 
decodeShuffleServerInfo(ByteBuf byteBuf) {
     return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
   }
 
-  public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
-    int partId = byteBuf.readInt();
-    long blockId = byteBuf.readLong();
-    int length = byteBuf.readInt();
-    int shuffleId = byteBuf.readInt();
-    long crc = byteBuf.readLong();
-    long taskAttemptId = byteBuf.readLong();
+  public static Pair<Integer, ShufflePartitionedBlock> 
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+    // partId Int

Review Comment:
   It would be better to create another issue to track the next plan base on 
this PR, and it could be clear and easy to track it by adding a issue link here 
as a `TODO` comment.



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java:
##########
@@ -59,19 +59,27 @@ public static int 
encodeLengthOfShuffleServerInfo(ShuffleServerInfo shuffleServe
         + 2 * Integer.BYTES;
   }
 
+  public static int encodeLengthShuffleBlockInfoCommon() {

Review Comment:
   This could be a Constant?



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo 
decodeShuffleServerInfo(ByteBuf byteBuf) {
     return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
   }
 
-  public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
-    int partId = byteBuf.readInt();
-    long blockId = byteBuf.readLong();
-    int length = byteBuf.readInt();
-    int shuffleId = byteBuf.readInt();
-    long crc = byteBuf.readLong();
-    long taskAttemptId = byteBuf.readLong();
+  public static Pair<Integer, ShufflePartitionedBlock> 
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+    // partId Int

Review Comment:
   <img width="774" alt="image" 
src="https://github.com/user-attachments/assets/399772e1-2235-4d42-9903-e64291d6619c";>
   



##########
server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java:
##########
@@ -912,11 +905,9 @@ private ServerRpcAuditContext createAuditContext(
       auditLogger = AUDIT_LOGGER;
     }
     ServerRpcAuditContext auditContext = new 
ServerRpcAuditContext(auditLogger);
+    auditContext.withCreationTimeNs(System.nanoTime());

Review Comment:
   This could be another fix?



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -54,19 +57,18 @@ public static ShuffleBlockInfo 
decodeShuffleBlockInfo(ByteBuf byteBuf) {
     for (int k = 0; k < lengthOfShuffleServers; k++) {
       serverInfos.add(decodeShuffleServerInfo(byteBuf));
     }
-    int uncompressLength = byteBuf.readInt();
-    long freeMemory = byteBuf.readLong();
-    return new ShuffleBlockInfo(
-        shuffleId,
-        partId,
-        blockId,
-        length,
-        crc,
-        data,
-        serverInfos,
-        uncompressLength,
-        freeMemory,
-        taskAttemptId);
+    final int uncompressLength = byteBuf.readInt();
+    // freeMemory Long
+    byteBuf.skipBytes(8);
+
+    int shuffleBlockInfoLength =

Review Comment:
   You mean shuffleBlockInfoLength to shuffleBlockInfoEncodeLength?



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -54,19 +57,18 @@ public static ShuffleBlockInfo 
decodeShuffleBlockInfo(ByteBuf byteBuf) {
     for (int k = 0; k < lengthOfShuffleServers; k++) {
       serverInfos.add(decodeShuffleServerInfo(byteBuf));
     }
-    int uncompressLength = byteBuf.readInt();
-    long freeMemory = byteBuf.readLong();
-    return new ShuffleBlockInfo(
-        shuffleId,
-        partId,
-        blockId,
-        length,
-        crc,
-        data,
-        serverInfos,
-        uncompressLength,
-        freeMemory,
-        taskAttemptId);
+    final int uncompressLength = byteBuf.readInt();
+    // freeMemory Long
+    byteBuf.skipBytes(8);
+
+    int shuffleBlockInfoLength =
+        Encoders.encodeLengthShuffleBlockInfoCommon()
+            + length
+            + Encoders.encodeLengthOfShuffleServerInfos(serverInfos);
+
+    return Pair.of(
+        shuffleBlockInfoLength,

Review Comment:
   It could be better to calc the decodedLength by the byteBuf position



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo 
decodeShuffleServerInfo(ByteBuf byteBuf) {
     return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
   }
 
-  public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
-    int partId = byteBuf.readInt();
-    long blockId = byteBuf.readLong();
-    int length = byteBuf.readInt();
-    int shuffleId = byteBuf.readInt();
-    long crc = byteBuf.readLong();
-    long taskAttemptId = byteBuf.readLong();
+  public static Pair<Integer, ShufflePartitionedBlock> 
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+    // partId Int

Review Comment:
   We should deprecate the SEND_SHUFFLE_DATA_REQUEST and removed in the future 
when release a new major version



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo 
decodeShuffleServerInfo(ByteBuf byteBuf) {
     return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
   }
 
-  public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
-    int partId = byteBuf.readInt();
-    long blockId = byteBuf.readLong();
-    int length = byteBuf.readInt();
-    int shuffleId = byteBuf.readInt();
-    long crc = byteBuf.readLong();
-    long taskAttemptId = byteBuf.readLong();
+  public static Pair<Integer, ShufflePartitionedBlock> 
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+    // partId Int

Review Comment:
   When we upgrade to V2, we could send the encoded length to server, to avoid 
re-compute the encodedLength in the server side again.



##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java:
##########
@@ -95,15 +115,46 @@ public void encode(ByteBuf buf) {
     buf.writeLong(timestamp);
   }
 
-  private static Map<Integer, List<ShuffleBlockInfo>> 
decodePartitionData(ByteBuf byteBuf) {
-    Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+  public void decodeShuffleData(ByteBuf byteBuf) {
+    this.appId = ByteBufUtils.readLengthAndString(byteBuf);
+    encodedLength += ByteBufUtils.encodedLength(appId);
+    this.shuffleId = byteBuf.readInt();
+    encodedLength += Integer.BYTES;
+    encodedLength += Integer.BYTES; // stageAttemptNumber not in use, keep 
same with encodedLength.
+    this.requireId = byteBuf.readLong();
+    encodedLength += Long.BYTES;
+    this.partitionToBlocksInServer = decodePartitionData(byteBuf);
+    this.timestamp = byteBuf.readLong();
+    encodedLength += Long.BYTES;
+  }
+
+  public int getEncodedLength() {
+    return encodedLength;

Review Comment:
   This make me confusing, what is the different between `getEncodedLength()` 
and `encodedLength`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to