This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3b37da52a [#1198] improvement: zerocopy from Protobuf's ByteString to
Netty's ByteBuf (#1199)
3b37da52a is described below
commit 3b37da52abec635400dbb98316635a75a5f59649
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Sep 16 19:10:34 2023 +0800
[#1198] improvement: zerocopy from Protobuf's ByteString to Netty's ByteBuf
(#1199)
### What changes were proposed in this pull request?
zerocopy from Protobuf's ByteString to Netty's ByteBuf
### Why are the changes needed?
#1198 . Improve the performance when using the GRpc
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
existing unit tests
---
.../main/java/org/apache/uniffle/common/util/ByteBufUtils.java | 8 ++++++++
.../java/org/apache/uniffle/server/ShuffleServerGrpcService.java | 5 ++++-
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
index 8288df8ff..7cb5eab6f 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ByteBufUtils.java
@@ -17,9 +17,12 @@
package org.apache.uniffle.common.util;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
public class ByteBufUtils {
@@ -85,4 +88,9 @@ public class ByteBufUtils {
from.readBytes(to, offset, length);
from.resetReaderIndex();
}
+
+ public static ByteBuf byteStringToByteBuf(ByteString bytes) {
+ final ByteBuffer byteBuffer = bytes.asReadOnlyByteBuffer();
+ return Unpooled.wrappedBuffer(byteBuffer);
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 3b41af321..279180edb 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -31,6 +31,7 @@ import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
+import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.StringUtils;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -47,6 +48,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.AppHeartBeatRequest;
@@ -892,6 +894,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
ShufflePartitionedBlock[] ret = new ShufflePartitionedBlock[blocks.size()];
int i = 0;
for (ShuffleBlock block : blocks) {
+ ByteBuf data = ByteBufUtils.byteStringToByteBuf(block.getData());
ret[i] =
new ShufflePartitionedBlock(
block.getLength(),
@@ -899,7 +902,7 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
block.getCrc(),
block.getBlockId(),
block.getTaskAttemptId(),
- block.getData().toByteArray());
+ data);
i++;
}
return ret;