This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b37da52a [#1198] improvement: zerocopy from Protobuf's ByteString to 
Netty's ByteBuf (#1199)
3b37da52a is described below

commit 3b37da52abec635400dbb98316635a75a5f59649
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Sep 16 19:10:34 2023 +0800

    [#1198] improvement: zerocopy from Protobuf's ByteString to Netty's ByteBuf 
(#1199)
    
    ### What changes were proposed in this pull request?
    
    zerocopy from Protobuf's ByteString to Netty's ByteBuf
    
    ### Why are the changes needed?
    
    #1198 . Improve the performance when using the GRpc
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    existing unit tests
---
 .../main/java/org/apache/uniffle/common/util/ByteBufUtils.java    | 8 ++++++++
 .../java/org/apache/uniffle/server/ShuffleServerGrpcService.java  | 5 ++++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
index 8288df8ff..7cb5eab6f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
@@ -17,9 +17,12 @@
 
 package org.apache.uniffle.common.util;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
+import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 
 public class ByteBufUtils {
 
@@ -85,4 +88,9 @@ public class ByteBufUtils {
     from.readBytes(to, offset, length);
     from.resetReaderIndex();
   }
+
+  public static ByteBuf byteStringToByteBuf(ByteString bytes) {
+    final ByteBuffer byteBuffer = bytes.asReadOnlyByteBuffer();
+    return Unpooled.wrappedBuffer(byteBuffer);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 3b41af321..279180edb 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -31,6 +31,7 @@ import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.Context;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
+import io.netty.buffer.ByteBuf;
 import org.apache.commons.lang3.StringUtils;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -47,6 +48,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.proto.RssProtos;
 import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
@@ -892,6 +894,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
     int i = 0;
     for (ShuffleBlock block : blocks) {
+      ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
       ret[i] =
           new ShufflePartitionedBlock(
               block.getLength(),
@@ -899,7 +902,7 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
               block.getCrc(),
               block.getBlockId(),
               block.getTaskAttemptId(),
-              block.getData().toByteArray());
+              data);
       i++;
     }
     return ret;

Reply via email to