This is an automated email from the ASF dual-hosted git repository.
oxsean pushed a commit to branch 3.3-dev
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3-dev by this push:
new 9355b35e50 Implement full zero-copy output for Dubbo Triple protocol
(#15616)
9355b35e50 is described below
commit 9355b35e50cc4e561bfd1d3c621d25abe0eee19f
Author: heliang <[email protected]>
AuthorDate: Thu Oct 30 18:39:28 2025 +0800
Implement full zero-copy output for Dubbo Triple protocol (#15616)
* git commit -m "feat: implement zero-copy output for Triple Protobuf
serialization"
* code format
* refactor: fix pack path
---
.../main/java/org/apache/dubbo/rpc/model/Pack.java | 22 +++
.../org/apache/dubbo/rpc/model/PackContext.java | 69 +++++++++
.../src/main/resources/Dubbo3TripleStub.mustache | 156 ++++++++++++++++++++-
.../main/resources/MutinyDubbo3TripleStub.mustache | 104 +++++++++++++-
.../resources/ReactorDubbo3TripleStub.mustache | 104 +++++++++++++-
.../dubbo/rpc/protocol/tri/PbArrayPacker.java | 45 ++++++
.../rpc/protocol/tri/ReflectionPackableMethod.java | 5 +-
.../protocol/tri/h12/grpc/GrpcCompositeCodec.java | 42 +++++-
8 files changed, 524 insertions(+), 23 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
index 340b668e42..0eb24a3e3a 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
@@ -19,9 +19,31 @@ package org.apache.dubbo.rpc.model;
public interface Pack {
/**
+ * Pack object to byte array
* @param obj instance
* @return byte array
* @throws Exception when error occurs
*/
byte[] pack(Object obj) throws Exception;
+
+ /**
+ * Check if this Pack implementation supports zero-copy stream packing
+ * @return true if supports stream packing
+ */
+ default boolean supportsStreamPacking() {
+ return false;
+ }
+
+ /**
+ * Create a PackContext for zero-copy optimization.
+ * The context encapsulates both size calculation and stream writing.
+ *
+ * @param obj instance to pack
+ * @return PackContext instance
+ * @throws Exception when error occurs
+ */
+ default PackContext createPackContext(Object obj) throws Exception {
+ byte[] bytes = pack(obj);
+ return PackContext.of(bytes);
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackContext.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackContext.java
new file mode 100644
index 0000000000..70cc5ca5dc
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.model;
+
+import java.io.OutputStream;
+
+/**
+ * Pack context for zero-copy optimization.
+ * Encapsulates both size calculation and stream writing capability.
+ */
+public interface PackContext {
+
+ /**
+ * Get the packed size in bytes
+ * @return size in bytes
+ */
+ int getSize();
+
+ /**
+ * Write packed data to output stream
+ * @param os output stream
+ * @throws Exception if write fails
+ */
+ void writeTo(OutputStream os) throws Exception;
+
+ /**
+ * Create a PackContext from byte array (fallback implementation)
+ * @param bytes byte array
+ * @return PackContext instance
+ */
+ static PackContext of(byte[] bytes) {
+ return new ByteArrayPackContext(bytes);
+ }
+
+ /**
+ * Default implementation backed by byte array
+ */
+ class ByteArrayPackContext implements PackContext {
+ private final byte[] bytes;
+
+ ByteArrayPackContext(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public int getSize() {
+ return bytes.length;
+ }
+
+ @Override
+ public void writeTo(OutputStream os) throws Exception {
+ os.write(bytes);
+ }
+ }
+}
diff --git
a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
index 4bdf318303..e09a6c0a56 100644
--- a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
@@ -66,38 +66,182 @@ public final class {{className}} {
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(),obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext createPackContext(Object
obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
private static final StubMethodDescriptor {{methodName}}AsyncMethod = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, java.util.concurrent.CompletableFuture.class,
MethodDescriptor.RpcType.UNARY,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext createPackContext(Object
obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod =
new StubMethodDescriptor("{{originMethodName}}Async",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext createPackContext(Object
obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/unaryMethods}}
{{#serverStreamingMethods}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.SERVER_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext createPackContext(Object
obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/serverStreamingMethods}}
{{#clientStreamingMethods}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.CLIENT_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext createPackContext(Object
obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/clientStreamingMethods}}
{{#biStreamingWithoutClientStreamMethods}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.BI_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext createPackContext(Object
obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/biStreamingWithoutClientStreamMethods}}
diff --git
a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
index 33c30c7858..34d6f712ee 100644
---
a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
+++
b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
@@ -71,7 +71,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.UNARY,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/unaryMethods}}
@@ -81,7 +105,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.SERVER_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/serverStreamingMethods}}
@@ -91,7 +139,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.CLIENT_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/clientStreamingMethods}}
@@ -101,7 +173,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.BI_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/biStreamingWithoutClientStreamMethods}}
diff --git
a/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
b/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
index 6003703c03..11b2c92aa4 100644
---
a/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
+++
b/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
@@ -71,7 +71,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.UNARY,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/unaryMethods}}
@@ -81,7 +105,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.SERVER_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/serverStreamingMethods}}
@@ -91,7 +139,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.CLIENT_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/clientStreamingMethods}}
@@ -101,7 +173,31 @@ public final class {{className}} {
{{/javaDoc}}
private static final StubMethodDescriptor {{methodName}}Method = new
StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class,
MethodDescriptor.RpcType.BI_STREAM,
- obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj ->
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+ obj -> ((com.google.protobuf.Message) obj).toByteArray(), new
org.apache.dubbo.rpc.model.Pack() {
+ @Override
+ public byte[] pack(Object obj) throws Exception {
+ return ((com.google.protobuf.Message) obj).toByteArray();
+ }
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+ @Override
+ public org.apache.dubbo.rpc.model.PackContext
createPackContext(Object obj) throws Exception {
+ final com.google.protobuf.Message message =
(com.google.protobuf.Message) obj;
+ return new org.apache.dubbo.rpc.model.PackContext() {
+ private final int size = message.getSerializedSize();
+ @Override
+ public int getSize() {
+ return size;
+ }
+ @Override
+ public void writeTo(java.io.OutputStream os) throws
Exception {
+ message.writeTo(os);
+ }
+ };
+ }
+ }, {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/biStreamingWithoutClientStreamMethods}}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
index b91d4b940f..7fd5efec21 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
@@ -17,6 +17,9 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.rpc.model.Pack;
+import org.apache.dubbo.rpc.model.PackContext;
+
+import java.io.OutputStream;
import com.google.protobuf.Message;
@@ -37,4 +40,46 @@ public class PbArrayPacker implements Pack {
}
return PB_PACK.pack(obj);
}
+
+ @Override
+ public boolean supportsStreamPacking() {
+ return true;
+ }
+
+ @Override
+ public PackContext createPackContext(Object obj) throws Exception {
+ Message message = extractMessage(obj);
+ return new ProtobufPackContext(message);
+ }
+
+ public boolean isSingleArgument() {
+ return singleArgument;
+ }
+
+ private Message extractMessage(Object obj) {
+ if (!singleArgument) {
+ obj = ((Object[]) obj)[0];
+ }
+ return (Message) obj;
+ }
+
+ private static class ProtobufPackContext implements PackContext {
+ private final Message message;
+ private final int size;
+
+ ProtobufPackContext(Message message) {
+ this.message = message;
+ this.size = message.getSerializedSize();
+ }
+
+ @Override
+ public int getSize() {
+ return size;
+ }
+
+ @Override
+ public void writeTo(OutputStream os) throws Exception {
+ message.writeTo(os);
+ }
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index ae19f2a464..72fd6d29e4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -38,8 +38,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.stream.Stream;
-import com.google.protobuf.Message;
-
import static org.apache.dubbo.common.constants.CommonConstants.$ECHO;
import static org.apache.dubbo.common.utils.ProtobufUtils.isProtobufClass;
@@ -50,7 +48,6 @@ public class ReflectionPackableMethod implements
PackableMethod {
private static final String REACTOR_RETURN_CLASS =
"reactor.core.publisher.Mono";
private static final String RX_RETURN_CLASS = "io.reactivex.Single";
private static final String GRPC_STREAM_CLASS =
"io.grpc.stub.StreamObserver";
- private static final Pack PB_PACK = o -> ((Message) o).toByteArray();
private final Pack requestPack;
private final Pack responsePack;
@@ -89,7 +86,7 @@ public class ReflectionPackableMethod implements
PackableMethod {
this.needWrapper = needWrap(method, actualRequestTypes,
actualResponseType);
if (!needWrapper) {
requestPack = new PbArrayPacker(singleArgument);
- responsePack = PB_PACK;
+ responsePack = new PbArrayPacker(true);
requestUnpack = new PbUnpack<>(actualRequestTypes[0]);
responseUnpack = new PbUnpack<>(actualResponseType);
} else {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index e66eebad79..09dafb2022 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -24,11 +24,14 @@ import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException;
import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
import org.apache.dubbo.remoting.http12.message.MediaType;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.Pack;
+import org.apache.dubbo.rpc.model.PackContext;
import org.apache.dubbo.rpc.model.PackableMethod;
import org.apache.dubbo.rpc.model.PackableMethodFactory;
@@ -38,6 +41,9 @@ import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.concurrent.ConcurrentHashMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
@@ -78,12 +84,38 @@ public class GrpcCompositeCodec implements HttpMessageCodec
{
@Override
public void encode(OutputStream outputStream, Object data, Charset
charset) throws EncodeException {
- // protobuf
- // TODO int compressed =
Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding())
? 0 :
- // 1;
try {
- int compressed = 0;
- outputStream.write(compressed);
+ if (packableMethod != null
+ && !packableMethod.needWrapper()
+ && outputStream instanceof ByteBufOutputStream) {
+
+ Pack responsePack = packableMethod.getResponsePack();
+
+ if (responsePack.supportsStreamPacking()) {
+ ByteBufOutputStream bbos = (ByteBufOutputStream)
outputStream;
+ ByteBuf buffer = bbos.buffer();
+
+ try {
+ PackContext ctx = responsePack.createPackContext(data);
+ int payloadSize = ctx.getSize();
+ int totalSize = 5 + payloadSize;
+ buffer.ensureWritable(totalSize);
+
+ buffer.writeByte(0);
+ buffer.writeInt(payloadSize);
+ ctx.writeTo(outputStream);
+
+ return;
+
+ } catch (IndexOutOfBoundsException |
HttpOverPayloadException e) {
+ if (e instanceof HttpOverPayloadException) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ outputStream.write(0);
byte[] bytes = packableMethod.packResponse(data);
writeLength(outputStream, bytes.length);
outputStream.write(bytes);