maobaolong commented on code in PR #2182:
URL:
https://github.com/apache/incubator-uniffle/pull/2182#discussion_r1806061199
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo
decodeShuffleServerInfo(ByteBuf byteBuf) {
return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
}
- public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
- int partId = byteBuf.readInt();
- long blockId = byteBuf.readLong();
- int length = byteBuf.readInt();
- int shuffleId = byteBuf.readInt();
- long crc = byteBuf.readLong();
- long taskAttemptId = byteBuf.readLong();
+ public static Pair<Integer, ShufflePartitionedBlock>
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+ // partId Int
Review Comment:
It would be better to create another issue to track the next plan base on
this PR, and it could be clear and easy to track it by adding a issue link here
as a `TODO` comment.
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Encoders.java:
##########
@@ -59,19 +59,27 @@ public static int
encodeLengthOfShuffleServerInfo(ShuffleServerInfo shuffleServe
+ 2 * Integer.BYTES;
}
+ public static int encodeLengthShuffleBlockInfoCommon() {
Review Comment:
This could be a Constant?
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo
decodeShuffleServerInfo(ByteBuf byteBuf) {
return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
}
- public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
- int partId = byteBuf.readInt();
- long blockId = byteBuf.readLong();
- int length = byteBuf.readInt();
- int shuffleId = byteBuf.readInt();
- long crc = byteBuf.readLong();
- long taskAttemptId = byteBuf.readLong();
+ public static Pair<Integer, ShufflePartitionedBlock>
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+ // partId Int
Review Comment:
<img width="774" alt="image"
src="https://github.com/user-attachments/assets/399772e1-2235-4d42-9903-e64291d6619c">
##########
server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java:
##########
@@ -912,11 +905,9 @@ private ServerRpcAuditContext createAuditContext(
auditLogger = AUDIT_LOGGER;
}
ServerRpcAuditContext auditContext = new
ServerRpcAuditContext(auditLogger);
+ auditContext.withCreationTimeNs(System.nanoTime());
Review Comment:
This could be another fix?
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -54,19 +57,18 @@ public static ShuffleBlockInfo
decodeShuffleBlockInfo(ByteBuf byteBuf) {
for (int k = 0; k < lengthOfShuffleServers; k++) {
serverInfos.add(decodeShuffleServerInfo(byteBuf));
}
- int uncompressLength = byteBuf.readInt();
- long freeMemory = byteBuf.readLong();
- return new ShuffleBlockInfo(
- shuffleId,
- partId,
- blockId,
- length,
- crc,
- data,
- serverInfos,
- uncompressLength,
- freeMemory,
- taskAttemptId);
+ final int uncompressLength = byteBuf.readInt();
+ // freeMemory Long
+ byteBuf.skipBytes(8);
+
+ int shuffleBlockInfoLength =
Review Comment:
You mean shuffleBlockInfoLength to shuffleBlockInfoEncodeLength?
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -54,19 +57,18 @@ public static ShuffleBlockInfo
decodeShuffleBlockInfo(ByteBuf byteBuf) {
for (int k = 0; k < lengthOfShuffleServers; k++) {
serverInfos.add(decodeShuffleServerInfo(byteBuf));
}
- int uncompressLength = byteBuf.readInt();
- long freeMemory = byteBuf.readLong();
- return new ShuffleBlockInfo(
- shuffleId,
- partId,
- blockId,
- length,
- crc,
- data,
- serverInfos,
- uncompressLength,
- freeMemory,
- taskAttemptId);
+ final int uncompressLength = byteBuf.readInt();
+ // freeMemory Long
+ byteBuf.skipBytes(8);
+
+ int shuffleBlockInfoLength =
+ Encoders.encodeLengthShuffleBlockInfoCommon()
+ + length
+ + Encoders.encodeLengthOfShuffleServerInfos(serverInfos);
+
+ return Pair.of(
+ shuffleBlockInfoLength,
Review Comment:
It could be better to calc the decodedLength by the byteBuf position
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo
decodeShuffleServerInfo(ByteBuf byteBuf) {
return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
}
- public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
- int partId = byteBuf.readInt();
- long blockId = byteBuf.readLong();
- int length = byteBuf.readInt();
- int shuffleId = byteBuf.readInt();
- long crc = byteBuf.readLong();
- long taskAttemptId = byteBuf.readLong();
+ public static Pair<Integer, ShufflePartitionedBlock>
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+ // partId Int
Review Comment:
We should deprecate the SEND_SHUFFLE_DATA_REQUEST and removed in the future
when release a new major version
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/Decoders.java:
##########
@@ -39,13 +40,15 @@ public static ShuffleServerInfo
decodeShuffleServerInfo(ByteBuf byteBuf) {
return new ShuffleServerInfo(id, host, grpcPort, nettyPort);
}
- public static ShuffleBlockInfo decodeShuffleBlockInfo(ByteBuf byteBuf) {
- int partId = byteBuf.readInt();
- long blockId = byteBuf.readLong();
- int length = byteBuf.readInt();
- int shuffleId = byteBuf.readInt();
- long crc = byteBuf.readLong();
- long taskAttemptId = byteBuf.readLong();
+ public static Pair<Integer, ShufflePartitionedBlock>
decodeShuffleBlockInfo(ByteBuf byteBuf) {
+ // partId Int
Review Comment:
When we upgrade to V2, we could send the encoded length to server, to avoid
re-compute the encodedLength in the server side again.
##########
common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java:
##########
@@ -95,15 +115,46 @@ public void encode(ByteBuf buf) {
buf.writeLong(timestamp);
}
- private static Map<Integer, List<ShuffleBlockInfo>>
decodePartitionData(ByteBuf byteBuf) {
- Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
+ public void decodeShuffleData(ByteBuf byteBuf) {
+ this.appId = ByteBufUtils.readLengthAndString(byteBuf);
+ encodedLength += ByteBufUtils.encodedLength(appId);
+ this.shuffleId = byteBuf.readInt();
+ encodedLength += Integer.BYTES;
+ encodedLength += Integer.BYTES; // stageAttemptNumber not in use, keep
same with encodedLength.
+ this.requireId = byteBuf.readLong();
+ encodedLength += Long.BYTES;
+ this.partitionToBlocksInServer = decodePartitionData(byteBuf);
+ this.timestamp = byteBuf.readLong();
+ encodedLength += Long.BYTES;
+ }
+
+ public int getEncodedLength() {
+ return encodedLength;
Review Comment:
This make me confusing, what is the different between `getEncodedLength()`
and `encodedLength`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]