This is an automated email from the ASF dual-hosted git repository.

oxsean pushed a commit to branch 3.3-dev
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3-dev by this push:
     new 9355b35e50 Implement full zero-copy output for Dubbo Triple protocol  
(#15616)
9355b35e50 is described below

commit 9355b35e50cc4e561bfd1d3c621d25abe0eee19f
Author: heliang <[email protected]>
AuthorDate: Thu Oct 30 18:39:28 2025 +0800

    Implement full zero-copy output for Dubbo Triple protocol  (#15616)
    
    * git commit -m "feat: implement zero-copy output for Triple Protobuf 
serialization"
    
    * code format
    
    * refactor: fix pack path
---
 .../main/java/org/apache/dubbo/rpc/model/Pack.java |  22 +++
 .../org/apache/dubbo/rpc/model/PackContext.java    |  69 +++++++++
 .../src/main/resources/Dubbo3TripleStub.mustache   | 156 ++++++++++++++++++++-
 .../main/resources/MutinyDubbo3TripleStub.mustache | 104 +++++++++++++-
 .../resources/ReactorDubbo3TripleStub.mustache     | 104 +++++++++++++-
 .../dubbo/rpc/protocol/tri/PbArrayPacker.java      |  45 ++++++
 .../rpc/protocol/tri/ReflectionPackableMethod.java |   5 +-
 .../protocol/tri/h12/grpc/GrpcCompositeCodec.java  |  42 +++++-
 8 files changed, 524 insertions(+), 23 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
index 340b668e42..0eb24a3e3a 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
@@ -19,9 +19,31 @@ package org.apache.dubbo.rpc.model;
 public interface Pack {
 
     /**
+     * Pack object to byte array
      * @param obj instance
      * @return byte array
      * @throws Exception when error occurs
      */
     byte[] pack(Object obj) throws Exception;
+
+    /**
+     * Check if this Pack implementation supports zero-copy stream packing
+     * @return true if supports stream packing
+     */
+    default boolean supportsStreamPacking() {
+        return false;
+    }
+
+    /**
+     * Create a PackContext for zero-copy optimization.
+     * The context encapsulates both size calculation and stream writing.
+     *
+     * @param obj instance to pack
+     * @return PackContext instance
+     * @throws Exception when error occurs
+     */
+    default PackContext createPackContext(Object obj) throws Exception {
+        byte[] bytes = pack(obj);
+        return PackContext.of(bytes);
+    }
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackContext.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackContext.java
new file mode 100644
index 0000000000..70cc5ca5dc
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.model;
+
+import java.io.OutputStream;
+
+/**
+ * Pack context for zero-copy optimization.
+ * Encapsulates both size calculation and stream writing capability.
+ */
+public interface PackContext {
+
+    /**
+     * Get the packed size in bytes
+     * @return size in bytes
+     */
+    int getSize();
+
+    /**
+     * Write packed data to output stream
+     * @param os output stream
+     * @throws Exception if write fails
+     */
+    void writeTo(OutputStream os) throws Exception;
+
+    /**
+     * Create a PackContext from byte array (fallback implementation)
+     * @param bytes byte array
+     * @return PackContext instance
+     */
+    static PackContext of(byte[] bytes) {
+        return new ByteArrayPackContext(bytes);
+    }
+
+    /**
+     * Default implementation backed by byte array
+     */
+    class ByteArrayPackContext implements PackContext {
+        private final byte[] bytes;
+
+        ByteArrayPackContext(byte[] bytes) {
+            this.bytes = bytes;
+        }
+
+        @Override
+        public int getSize() {
+            return bytes.length;
+        }
+
+        @Override
+        public void writeTo(OutputStream os) throws Exception {
+            os.write(bytes);
+        }
+    }
+}
diff --git 
a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache 
b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
index 4bdf318303..e09a6c0a56 100644
--- a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
@@ -66,38 +66,182 @@ public final class {{className}} {
 
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
     {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
-    obj -> ((com.google.protobuf.Message) obj).toByteArray(),obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+    obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+        @Override
+        public byte[] pack(Object obj) throws Exception {
+            return ((com.google.protobuf.Message) obj).toByteArray();
+        }
+        @Override
+        public boolean supportsStreamPacking() {
+            return true;
+        }
+        @Override
+        public org.apache.dubbo.rpc.model.PackContext createPackContext(Object 
obj) throws Exception {
+            final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+            return new org.apache.dubbo.rpc.model.PackContext() {
+                private final int size = message.getSerializedSize();
+                @Override
+                public int getSize() {
+                    return size;
+                }
+                @Override
+                public void writeTo(java.io.OutputStream os) throws Exception {
+                    message.writeTo(os);
+                }
+            };
+        }
+    }, {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
 
     private static final StubMethodDescriptor {{methodName}}AsyncMethod = new 
StubMethodDescriptor("{{originMethodName}}",
     {{inputType}}.class, java.util.concurrent.CompletableFuture.class, 
MethodDescriptor.RpcType.UNARY,
-    obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+    obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+        @Override
+        public byte[] pack(Object obj) throws Exception {
+            return ((com.google.protobuf.Message) obj).toByteArray();
+        }
+        @Override
+        public boolean supportsStreamPacking() {
+            return true;
+        }
+        @Override
+        public org.apache.dubbo.rpc.model.PackContext createPackContext(Object 
obj) throws Exception {
+            final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+            return new org.apache.dubbo.rpc.model.PackContext() {
+                private final int size = message.getSerializedSize();
+                @Override
+                public int getSize() {
+                    return size;
+                }
+                @Override
+                public void writeTo(java.io.OutputStream os) throws Exception {
+                    message.writeTo(os);
+                }
+            };
+        }
+    }, {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
 
     private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod = 
new StubMethodDescriptor("{{originMethodName}}Async",
     {{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
-    obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+    obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+        @Override
+        public byte[] pack(Object obj) throws Exception {
+            return ((com.google.protobuf.Message) obj).toByteArray();
+        }
+        @Override
+        public boolean supportsStreamPacking() {
+            return true;
+        }
+        @Override
+        public org.apache.dubbo.rpc.model.PackContext createPackContext(Object 
obj) throws Exception {
+            final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+            return new org.apache.dubbo.rpc.model.PackContext() {
+                private final int size = message.getSerializedSize();
+                @Override
+                public int getSize() {
+                    return size;
+                }
+                @Override
+                public void writeTo(java.io.OutputStream os) throws Exception {
+                    message.writeTo(os);
+                }
+            };
+        }
+    }, {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
 {{/unaryMethods}}
 {{#serverStreamingMethods}}
 
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
     {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.SERVER_STREAM,
-    obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+    obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+        @Override
+        public byte[] pack(Object obj) throws Exception {
+            return ((com.google.protobuf.Message) obj).toByteArray();
+        }
+        @Override
+        public boolean supportsStreamPacking() {
+            return true;
+        }
+        @Override
+        public org.apache.dubbo.rpc.model.PackContext createPackContext(Object 
obj) throws Exception {
+            final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+            return new org.apache.dubbo.rpc.model.PackContext() {
+                private final int size = message.getSerializedSize();
+                @Override
+                public int getSize() {
+                    return size;
+                }
+                @Override
+                public void writeTo(java.io.OutputStream os) throws Exception {
+                    message.writeTo(os);
+                }
+            };
+        }
+    }, {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
 {{/serverStreamingMethods}}
 {{#clientStreamingMethods}}
 
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
     {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.CLIENT_STREAM,
-    obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+    obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+        @Override
+        public byte[] pack(Object obj) throws Exception {
+            return ((com.google.protobuf.Message) obj).toByteArray();
+        }
+        @Override
+        public boolean supportsStreamPacking() {
+            return true;
+        }
+        @Override
+        public org.apache.dubbo.rpc.model.PackContext createPackContext(Object 
obj) throws Exception {
+            final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+            return new org.apache.dubbo.rpc.model.PackContext() {
+                private final int size = message.getSerializedSize();
+                @Override
+                public int getSize() {
+                    return size;
+                }
+                @Override
+                public void writeTo(java.io.OutputStream os) throws Exception {
+                    message.writeTo(os);
+                }
+            };
+        }
+    }, {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
 {{/clientStreamingMethods}}
 {{#biStreamingWithoutClientStreamMethods}}
 
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
     {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.BI_STREAM,
-    obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+    obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+        @Override
+        public byte[] pack(Object obj) throws Exception {
+            return ((com.google.protobuf.Message) obj).toByteArray();
+        }
+        @Override
+        public boolean supportsStreamPacking() {
+            return true;
+        }
+        @Override
+        public org.apache.dubbo.rpc.model.PackContext createPackContext(Object 
obj) throws Exception {
+            final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+            return new org.apache.dubbo.rpc.model.PackContext() {
+                private final int size = message.getSerializedSize();
+                @Override
+                public int getSize() {
+                    return size;
+                }
+                @Override
+                public void writeTo(java.io.OutputStream os) throws Exception {
+                    message.writeTo(os);
+                }
+            };
+        }
+    }, {{inputType}}::parseFrom,
     {{outputType}}::parseFrom);
 {{/biStreamingWithoutClientStreamMethods}}
 
diff --git 
a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
 
b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
index 33c30c7858..34d6f712ee 100644
--- 
a/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
+++ 
b/dubbo-plugin/dubbo-compiler/src/main/resources/MutinyDubbo3TripleStub.mustache
@@ -71,7 +71,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.UNARY,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/unaryMethods}}
 
@@ -81,7 +105,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.SERVER_STREAM,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/serverStreamingMethods}}
 
@@ -91,7 +139,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.CLIENT_STREAM,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/clientStreamingMethods}}
 
@@ -101,7 +173,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.BI_STREAM,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/biStreamingWithoutClientStreamMethods}}
 
diff --git 
a/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
 
b/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
index 6003703c03..11b2c92aa4 100644
--- 
a/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
+++ 
b/dubbo-plugin/dubbo-compiler/src/main/resources/ReactorDubbo3TripleStub.mustache
@@ -71,7 +71,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.UNARY,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/unaryMethods}}
 
@@ -81,7 +105,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.SERVER_STREAM,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/serverStreamingMethods}}
 
@@ -91,7 +139,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.CLIENT_STREAM,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/clientStreamingMethods}}
 
@@ -101,7 +173,31 @@ public final class {{className}} {
     {{/javaDoc}}
     private static final StubMethodDescriptor {{methodName}}Method = new 
StubMethodDescriptor("{{originMethodName}}",
         {{inputType}}.class, {{outputType}}.class, 
MethodDescriptor.RpcType.BI_STREAM,
-        obj -> ((com.google.protobuf.Message) obj).toByteArray(), obj -> 
((com.google.protobuf.Message) obj).toByteArray(), {{inputType}}::parseFrom,
+        obj -> ((com.google.protobuf.Message) obj).toByteArray(), new 
org.apache.dubbo.rpc.model.Pack() {
+            @Override
+            public byte[] pack(Object obj) throws Exception {
+                return ((com.google.protobuf.Message) obj).toByteArray();
+            }
+            @Override
+            public boolean supportsStreamPacking() {
+                return true;
+            }
+            @Override
+            public org.apache.dubbo.rpc.model.PackContext 
createPackContext(Object obj) throws Exception {
+                final com.google.protobuf.Message message = 
(com.google.protobuf.Message) obj;
+                return new org.apache.dubbo.rpc.model.PackContext() {
+                    private final int size = message.getSerializedSize();
+                    @Override
+                    public int getSize() {
+                        return size;
+                    }
+                    @Override
+                    public void writeTo(java.io.OutputStream os) throws 
Exception {
+                        message.writeTo(os);
+                    }
+                };
+            }
+        }, {{inputType}}::parseFrom,
         {{outputType}}::parseFrom);
 {{/biStreamingWithoutClientStreamMethods}}
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
index b91d4b940f..7fd5efec21 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
@@ -17,6 +17,9 @@
 package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.rpc.model.Pack;
+import org.apache.dubbo.rpc.model.PackContext;
+
+import java.io.OutputStream;
 
 import com.google.protobuf.Message;
 
@@ -37,4 +40,46 @@ public class PbArrayPacker implements Pack {
         }
         return PB_PACK.pack(obj);
     }
+
+    @Override
+    public boolean supportsStreamPacking() {
+        return true;
+    }
+
+    @Override
+    public PackContext createPackContext(Object obj) throws Exception {
+        Message message = extractMessage(obj);
+        return new ProtobufPackContext(message);
+    }
+
+    public boolean isSingleArgument() {
+        return singleArgument;
+    }
+
+    private Message extractMessage(Object obj) {
+        if (!singleArgument) {
+            obj = ((Object[]) obj)[0];
+        }
+        return (Message) obj;
+    }
+
+    private static class ProtobufPackContext implements PackContext {
+        private final Message message;
+        private final int size;
+
+        ProtobufPackContext(Message message) {
+            this.message = message;
+            this.size = message.getSerializedSize();
+        }
+
+        @Override
+        public int getSize() {
+            return size;
+        }
+
+        @Override
+        public void writeTo(OutputStream os) throws Exception {
+            message.writeTo(os);
+        }
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index ae19f2a464..72fd6d29e4 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -38,8 +38,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.stream.Stream;
 
-import com.google.protobuf.Message;
-
 import static org.apache.dubbo.common.constants.CommonConstants.$ECHO;
 import static org.apache.dubbo.common.utils.ProtobufUtils.isProtobufClass;
 
@@ -50,7 +48,6 @@ public class ReflectionPackableMethod implements 
PackableMethod {
     private static final String REACTOR_RETURN_CLASS = 
"reactor.core.publisher.Mono";
     private static final String RX_RETURN_CLASS = "io.reactivex.Single";
     private static final String GRPC_STREAM_CLASS = 
"io.grpc.stub.StreamObserver";
-    private static final Pack PB_PACK = o -> ((Message) o).toByteArray();
 
     private final Pack requestPack;
     private final Pack responsePack;
@@ -89,7 +86,7 @@ public class ReflectionPackableMethod implements 
PackableMethod {
         this.needWrapper = needWrap(method, actualRequestTypes, 
actualResponseType);
         if (!needWrapper) {
             requestPack = new PbArrayPacker(singleArgument);
-            responsePack = PB_PACK;
+            responsePack = new PbArrayPacker(true);
             requestUnpack = new PbUnpack<>(actualRequestTypes[0]);
             responseUnpack = new PbUnpack<>(actualResponseType);
         } else {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index e66eebad79..09dafb2022 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -24,11 +24,14 @@ import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
 import org.apache.dubbo.common.utils.UrlUtils;
 import org.apache.dubbo.remoting.http12.exception.DecodeException;
 import org.apache.dubbo.remoting.http12.exception.EncodeException;
+import org.apache.dubbo.remoting.http12.exception.HttpOverPayloadException;
 import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageCodec;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.model.Pack;
+import org.apache.dubbo.rpc.model.PackContext;
 import org.apache.dubbo.rpc.model.PackableMethod;
 import org.apache.dubbo.rpc.model.PackableMethodFactory;
 
@@ -38,6 +41,9 @@ import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.concurrent.ConcurrentHashMap;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
 import static 
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_METHOD_FACTORY;
 
@@ -78,12 +84,38 @@ public class GrpcCompositeCodec implements HttpMessageCodec 
{
 
     @Override
     public void encode(OutputStream outputStream, Object data, Charset 
charset) throws EncodeException {
-        // protobuf
-        // TODO int compressed = 
Identity.MESSAGE_ENCODING.equals(requestMetadata.compressor.getMessageEncoding())
 ? 0 :
-        // 1;
         try {
-            int compressed = 0;
-            outputStream.write(compressed);
+            if (packableMethod != null
+                    && !packableMethod.needWrapper()
+                    && outputStream instanceof ByteBufOutputStream) {
+
+                Pack responsePack = packableMethod.getResponsePack();
+
+                if (responsePack.supportsStreamPacking()) {
+                    ByteBufOutputStream bbos = (ByteBufOutputStream) 
outputStream;
+                    ByteBuf buffer = bbos.buffer();
+
+                    try {
+                        PackContext ctx = responsePack.createPackContext(data);
+                        int payloadSize = ctx.getSize();
+                        int totalSize = 5 + payloadSize;
+                        buffer.ensureWritable(totalSize);
+
+                        buffer.writeByte(0);
+                        buffer.writeInt(payloadSize);
+                        ctx.writeTo(outputStream);
+
+                        return;
+
+                    } catch (IndexOutOfBoundsException | 
HttpOverPayloadException e) {
+                        if (e instanceof HttpOverPayloadException) {
+                            throw e;
+                        }
+                    }
+                }
+            }
+
+            outputStream.write(0);
             byte[] bytes = packableMethod.packResponse(data);
             writeLength(outputStream, bytes.length);
             outputStream.write(bytes);


Reply via email to