This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 678d2a7831 Refactor tri pack & unpack use stream (#16045)
678d2a7831 is described below

commit 678d2a7831a5e0bb5c5b66af8f35b9023ecc353b
Author: earthchen <[email protected]>
AuthorDate: Thu Feb 5 14:02:08 2026 +0800

    Refactor tri pack & unpack use stream (#16045)
    
    * refactor: enhance backpressure handling with byte tracking in HTTP/2 
streams
    
    * Update 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * fix
    
    * refactor: expose methods for byte tracking in AbstractTripleClientStream 
and Http2ServerChannelObserver
    
    * refactor: enhance error handling and byte tracking in HTTP/2 stream 
observers
    
    * refactor: improve closing logic and state management in 
LengthFieldStreamingDecoder
    
    * fix: prevent race condition in stream initialization
    
    * refactor: unify client and server deframer for Triple protocol
    
    This commit unifies the deframer implementation between client and server
    sides for the Triple protocol, eliminating code duplication and improving
    maintainability.
    
    ## Key Changes
    
    ### Deleted Files
    - TriDecoder.java: Removed client-specific deframer implementation
    - Deframer.java: Removed client deframer interface
    - RecordListener.java: Removed test helper
    - TriDecoderTest.java: Removed corresponding test
    
    ### Core Modifications
    
    1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support:
       - Message boundary isolation (prevents reading beyond current message)
       - mark/reset support required by Hessian2 and other deserializers
       - Zero-copy optimization by reading directly from accumulate stream
    
    2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to
       onFragmentMessage() method for better flow control
    
    3. **AbstractTripleClientStream**: Migrated from TriDecoder to 
GrpcStreamingDecoder,
       using ByteBufInputStream to adapt Netty's ByteBuf to InputStream
    
    4. **Stream.Listener.onMessage**: Changed parameter from byte[] to 
InputStream
       for unified handling and memory optimization
    
    5. **PackableMethod**: Added default parseResponse(InputStream) method for
       backward compatibility
    
    6. **CompositeInputStream**: Improved stream release logic to prevent 
exceptions
    
    7. **DescriptorUtils**: Added mark() call before reading stream to support 
reset
    
    ## Architecture Change
    
    Before:
    - Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer
    - Server: InputStream -> LengthFieldStreamingDecoder -> byte[] -> 
deserializer
    
    After:
    - Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder -> 
BoundedInputStream -> deserializer
    - Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream -> 
deserializer
    
    ## Bug Fix
    
    Fixed BoundedInputStream.reset() not restoring the 'remaining' counter,
    which caused streams to return EOF after mark/reset. This was the root cause
    of POJO method invocation failures with "Unexpected serialization 
type:null" error.
    
    * refactor: update tri serialization methods to use InputStream and 
OutputStream
    
    * refactor: update tri serialization methods to use InputStream and 
OutputStream
    
    * Update 
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * refactor: enhance error handling in deframer by using try-with-resources 
for InputStream
    
    * refactor: unify client and server deframer for Triple protocol
    
    This commit unifies the deframer implementation between client and server
    sides for the Triple protocol, eliminating code duplication and improving
    maintainability.
    
    ## Key Changes
    
    ### Deleted Files
    - TriDecoder.java: Removed client-specific deframer implementation
    - Deframer.java: Removed client deframer interface
    - RecordListener.java: Removed test helper
    - TriDecoderTest.java: Removed corresponding test
    
    ### Core Modifications
    
    1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support:
       - Message boundary isolation (prevents reading beyond current message)
       - mark/reset support required by Hessian2 and other deserializers
       - Zero-copy optimization by reading directly from accumulate stream
    
    2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to
       onFragmentMessage() method for better flow control
    
    3. **AbstractTripleClientStream**: Migrated from TriDecoder to 
GrpcStreamingDecoder,
       using ByteBufInputStream to adapt Netty's ByteBuf to InputStream
    
    4. **Stream.Listener.onMessage**: Changed parameter from byte[] to 
InputStream
       for unified handling and memory optimization
    
    5. **PackableMethod**: Added default parseResponse(InputStream) method for
       backward compatibility
    
    6. **CompositeInputStream**: Improved stream release logic to prevent 
exceptions
    
    7. **DescriptorUtils**: Added mark() call before reading stream to support 
reset
    
    ## Architecture Change
    
    Before:
    - Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer
    - Server: InputStream -> LengthFieldStreamingDecoder -> byte[] -> 
deserializer
    
    After:
    - Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder -> 
BoundedInputStream -> deserializer
    - Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream -> 
deserializer
    
    ## Bug Fix
    
    Fixed BoundedInputStream.reset() not restoring the 'remaining' counter,
    which caused streams to return EOF after mark/reset. This was the root cause
    of POJO method invocation failures with "Unexpected serialization 
type:null" error.
    
    * refactor: enhance error handling in deframer by using try-with-resources 
for InputStream
    
    * refactor: update tri serialization methods to use InputStream and 
OutputStream
    
    * fix
    
    * refactor: update tri serialization methods to use InputStream and 
OutputStream
    
    * fix
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: zrlw <[email protected]>
---
 .../main/java/org/apache/dubbo/rpc/model/Pack.java |  11 +-
 .../org/apache/dubbo/rpc/model/PackableMethod.java |  63 ++++++----
 .../java/org/apache/dubbo/rpc/model/UnPack.java    |  18 ++-
 .../org/apache/dubbo/rpc/model/WrapperUnPack.java  |  23 ++++
 .../dubbo/rpc/protocol/tri/DescriptorUtils.java    |   4 +-
 .../dubbo/rpc/protocol/tri/PbArrayPacker.java      |  10 ++
 .../apache/dubbo/rpc/protocol/tri/PbUnpack.java    |   6 +
 .../rpc/protocol/tri/ReflectionPackableMethod.java |  53 ++++++++-
 .../tri/TripleCustomerProtocolWrapper.java         | 131 +++++++++++++++++++++
 .../protocol/tri/h12/grpc/GrpcCompositeCodec.java  |  12 +-
 10 files changed, 294 insertions(+), 37 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
index 340b668e42..f0d4d76c29 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
@@ -16,12 +16,17 @@
  */
 package org.apache.dubbo.rpc.model;
 
+import java.io.OutputStream;
+
 public interface Pack {
 
     /**
-     * @param obj instance
-     * @return byte array
-     * @throws Exception when error occurs
+     * @deprecated use {@link #pack(Object, OutputStream)} instead
      */
+    @Deprecated
     byte[] pack(Object obj) throws Exception;
+
+    default void pack(Object obj, OutputStream out) throws Exception {
+        out.write(pack(obj));
+    }
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
index 2065726c22..c043833b90 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
@@ -16,8 +16,8 @@
  */
 package org.apache.dubbo.rpc.model;
 
-import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 /**
  * A packable method is used to customize serialization for methods. It can 
provide a common wrapper
@@ -25,14 +25,26 @@ import java.io.InputStream;
  */
 public interface PackableMethod {
 
+    /**
+     * @deprecated use {@link #parseRequest(InputStream)} instead
+     */
+    @Deprecated
     default Object parseRequest(byte[] data) throws Exception {
         return getRequestUnpack().unpack(data);
     }
 
+    /**
+     * @deprecated use {@link #parseResponse(InputStream)} instead
+     */
+    @Deprecated
     default Object parseResponse(byte[] data) throws Exception {
         return parseResponse(data, false);
     }
 
+    /**
+     * @deprecated use {@link #parseResponse(InputStream, boolean)} instead
+     */
+    @Deprecated
     default Object parseResponse(byte[] data, boolean isReturnTriException) 
throws Exception {
         UnPack unPack = getResponseUnpack();
         if (unPack instanceof WrapperUnPack) {
@@ -42,34 +54,45 @@ public interface PackableMethod {
     }
 
     /**
-     * Parse response from InputStream.
-     * Default implementation reads all bytes and delegates to byte[] version.
-     *
-     * @param inputStream the input stream containing the response data
-     * @param isReturnTriException whether the response is a Triple exception
-     * @return the parsed response object
-     * @throws Exception if parsing fails
+     * @deprecated use {@link #packRequest(Object, OutputStream)} instead
      */
-    default Object parseResponse(InputStream inputStream, boolean 
isReturnTriException) throws Exception {
-        // Read all bytes from InputStream
-        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-        byte[] tmp = new byte[4096];
-        int len;
-        while ((len = inputStream.read(tmp)) != -1) {
-            buffer.write(tmp, 0, len);
-        }
-        byte[] data = buffer.toByteArray();
-        return parseResponse(data, isReturnTriException);
-    }
-
+    @Deprecated
     default byte[] packRequest(Object request) throws Exception {
         return getRequestPack().pack(request);
     }
 
+    /**
+     * @deprecated use {@link #packResponse(Object, OutputStream)} instead
+     */
+    @Deprecated
     default byte[] packResponse(Object response) throws Exception {
         return getResponsePack().pack(response);
     }
 
+    default Object parseRequest(InputStream inputStream) throws Exception {
+        return getRequestUnpack().unpack(inputStream);
+    }
+
+    default Object parseResponse(InputStream inputStream) throws Exception {
+        return parseResponse(inputStream, false);
+    }
+
+    default Object parseResponse(InputStream inputStream, boolean 
isReturnTriException) throws Exception {
+        UnPack unPack = getResponseUnpack();
+        if (unPack instanceof WrapperUnPack) {
+            return ((WrapperUnPack) unPack).unpack(inputStream, 
isReturnTriException);
+        }
+        return unPack.unpack(inputStream);
+    }
+
+    default void packRequest(Object request, OutputStream outputStream) throws 
Exception {
+        getRequestPack().pack(request, outputStream);
+    }
+
+    default void packResponse(Object response, OutputStream outputStream) 
throws Exception {
+        getResponsePack().pack(response, outputStream);
+    }
+
     default boolean needWrapper() {
         return false;
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
index d3c6b8ac6f..698c4dd5ae 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
@@ -16,12 +16,24 @@
  */
 package org.apache.dubbo.rpc.model;
 
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
 public interface UnPack {
 
     /**
-     * @param data byte array
-     * @return object instance
-     * @throws Exception            exception
+     * @deprecated use {@link #unpack(InputStream)} instead
      */
+    @Deprecated
     Object unpack(byte[] data) throws Exception;
+
+    default Object unpack(InputStream inputStream) throws Exception {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        byte[] tmp = new byte[4096];
+        int len;
+        while ((len = inputStream.read(tmp)) != -1) {
+            buffer.write(tmp, 0, len);
+        }
+        return unpack(buffer.toByteArray());
+    }
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java 
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
index 1fd216b807..63d68b0ea8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
@@ -16,11 +16,34 @@
  */
 package org.apache.dubbo.rpc.model;
 
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
 public interface WrapperUnPack extends UnPack {
 
+    @Override
     default Object unpack(byte[] data) throws Exception {
         return unpack(data, false);
     }
 
+    /**
+     * @deprecated use {@link #unpack(InputStream, boolean)} instead
+     */
+    @Deprecated
     Object unpack(byte[] data, boolean isReturnTriException) throws Exception;
+
+    @Override
+    default Object unpack(InputStream inputStream) throws Exception {
+        return unpack(inputStream, false);
+    }
+
+    default Object unpack(InputStream inputStream, boolean 
isReturnTriException) throws Exception {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        byte[] tmp = new byte[4096];
+        int len;
+        while ((len = inputStream.read(tmp)) != -1) {
+            buffer.write(tmp, 0, len);
+        }
+        return unpack(buffer.toByteArray(), isReturnTriException);
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
index dbc2ac0b60..e8f34431e2 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
 import org.apache.dubbo.rpc.Invoker;
@@ -126,9 +125,8 @@ public final class DescriptorUtils {
         MethodDescriptor methodDescriptor = 
findReflectionMethodDescriptor(serviceDescriptor, methodName);
         if (methodDescriptor == null) {
             rawMessage.mark(Integer.MAX_VALUE);
-            byte[] data = StreamUtils.readBytes(rawMessage);
             List<MethodDescriptor> methodDescriptors = 
serviceDescriptor.getMethods(methodName);
-            TripleRequestWrapper request = 
TripleRequestWrapper.parseFrom(data);
+            TripleRequestWrapper request = 
TripleRequestWrapper.parseFrom(rawMessage);
             String[] paramTypes = request.getArgTypes().toArray(new String[0]);
             // wrapper mode the method can overload so maybe list
             for (MethodDescriptor descriptor : methodDescriptors) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
index b91d4b940f..c13bcd1bda 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
@@ -18,6 +18,8 @@ package org.apache.dubbo.rpc.protocol.tri;
 
 import org.apache.dubbo.rpc.model.Pack;
 
+import java.io.OutputStream;
+
 import com.google.protobuf.Message;
 
 public class PbArrayPacker implements Pack {
@@ -37,4 +39,12 @@ public class PbArrayPacker implements Pack {
         }
         return PB_PACK.pack(obj);
     }
+
+    @Override
+    public void pack(Object obj, OutputStream out) throws Exception {
+        if (!singleArgument) {
+            obj = ((Object[]) obj)[0];
+        }
+        ((Message) obj).writeTo(out);
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
index 81d12d2420..4aa333790e 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.rpc.model.UnPack;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 
 public class PbUnpack<T> implements UnPack {
 
@@ -34,4 +35,9 @@ public class PbUnpack<T> implements UnPack {
         final ByteArrayInputStream bais = new ByteArrayInputStream(data);
         return SingleProtobufUtils.deserialize(bais, clz);
     }
+
+    @Override
+    public Object unpack(InputStream inputStream) throws IOException {
+        return SingleProtobufUtils.deserialize(inputStream, clz);
+    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index ae19f2a464..b60a8156c7 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -32,6 +32,8 @@ import org.apache.dubbo.rpc.model.WrapperUnPack;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.Collection;
@@ -325,6 +327,18 @@ public class ReflectionPackableMethod implements 
PackableMethod {
                     .build()
                     .toByteArray();
         }
+
+        @Override
+        public void pack(Object obj, OutputStream out) throws IOException {
+            ByteArrayOutputStream dataBos = new ByteArrayOutputStream();
+            multipleSerialization.serialize(url, requestSerialize, 
actualResponseType, obj, dataBos);
+            
TripleCustomerProtocolWrapper.TripleResponseWrapper.Builder.newBuilder()
+                    .setSerializeType(requestSerialize)
+                    .setType(actualResponseType.getName())
+                    .setData(dataBos.toByteArray())
+                    .build()
+                    .writeTo(out);
+        }
     }
 
     private static class WrapResponseUnpack implements WrapperUnPack {
@@ -348,9 +362,23 @@ public class ReflectionPackableMethod implements 
PackableMethod {
             return unpack(data, false);
         }
 
+        @Override
         public Object unpack(byte[] data, boolean isReturnTriException) throws 
IOException, ClassNotFoundException {
             TripleCustomerProtocolWrapper.TripleResponseWrapper wrapper =
                     
TripleCustomerProtocolWrapper.TripleResponseWrapper.parseFrom(data);
+            return deserializeFromWrapper(wrapper, isReturnTriException);
+        }
+
+        @Override
+        public Object unpack(InputStream inputStream, boolean 
isReturnTriException) throws Exception {
+            TripleCustomerProtocolWrapper.TripleResponseWrapper wrapper =
+                    
TripleCustomerProtocolWrapper.TripleResponseWrapper.parseFrom(inputStream);
+            return deserializeFromWrapper(wrapper, isReturnTriException);
+        }
+
+        private Object deserializeFromWrapper(
+                TripleCustomerProtocolWrapper.TripleResponseWrapper wrapper, 
boolean isReturnTriException)
+                throws IOException, ClassNotFoundException {
             final String serializeType = 
convertHessianFromWrapper(wrapper.getSerializeType());
 
             CodecSupport.checkSerialization(serializeType, allSerialize);
@@ -389,6 +417,15 @@ public class ReflectionPackableMethod implements 
PackableMethod {
 
         @Override
         public byte[] pack(Object obj) throws IOException {
+            return buildWrapper(obj).toByteArray();
+        }
+
+        @Override
+        public void pack(Object obj, OutputStream out) throws IOException {
+            buildWrapper(obj).writeTo(out);
+        }
+
+        private TripleCustomerProtocolWrapper.TripleRequestWrapper 
buildWrapper(Object obj) throws IOException {
             Object[] arguments;
             if (singleArgument) {
                 arguments = new Object[] {obj};
@@ -402,7 +439,7 @@ public class ReflectionPackableMethod implements 
PackableMethod {
                 builder.addArgTypes(type);
             }
             if (actualRequestTypes == null || actualRequestTypes.length == 0) {
-                return builder.build().toByteArray();
+                return builder.build();
             }
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             for (int i = 0; i < arguments.length; i++) {
@@ -411,7 +448,7 @@ public class ReflectionPackableMethod implements 
PackableMethod {
                 builder.addArgs(bos.toByteArray());
                 bos.reset();
             }
-            return builder.build().toByteArray();
+            return builder.build();
         }
 
         /**
@@ -449,10 +486,22 @@ public class ReflectionPackableMethod implements 
PackableMethod {
             this.allSerialize = allSerialize;
         }
 
+        @Override
         public Object unpack(byte[] data, boolean isReturnTriException) throws 
IOException, ClassNotFoundException {
             TripleCustomerProtocolWrapper.TripleRequestWrapper wrapper =
                     
TripleCustomerProtocolWrapper.TripleRequestWrapper.parseFrom(data);
+            return deserializeFromWrapper(wrapper);
+        }
+
+        @Override
+        public Object unpack(InputStream inputStream, boolean 
isReturnTriException) throws Exception {
+            TripleCustomerProtocolWrapper.TripleRequestWrapper wrapper =
+                    
TripleCustomerProtocolWrapper.TripleRequestWrapper.parseFrom(inputStream);
+            return deserializeFromWrapper(wrapper);
+        }
 
+        private Object[] 
deserializeFromWrapper(TripleCustomerProtocolWrapper.TripleRequestWrapper 
wrapper)
+                throws IOException, ClassNotFoundException {
             String wrapperSerializeType = 
convertHessianFromWrapper(wrapper.getSerializeType());
             CodecSupport.checkSerialization(wrapperSerializeType, 
allSerialize);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
index 7b57a71d25..e9abb8dd73 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
@@ -19,6 +19,9 @@ package org.apache.dubbo.rpc.protocol.tri;
 import org.apache.dubbo.common.utils.Assert;
 import org.apache.dubbo.common.utils.CollectionUtils;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -71,6 +74,36 @@ public class TripleCustomerProtocolWrapper {
         return val;
     }
 
+    public static int readRawVarint32(InputStream inputStream) throws 
IOException {
+        int result = 0;
+        int shift = 0;
+        while (shift < 35) {
+            int b = inputStream.read();
+            if (b == -1) {
+                throw new IOException("Unexpected end of stream while reading 
varint");
+            }
+            result |= (b & 0x7F) << shift;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+            shift += 7;
+        }
+        throw new IOException("Malformed varint");
+    }
+
+    private static byte[] readExactly(InputStream inputStream, int n) throws 
IOException {
+        byte[] data = new byte[n];
+        int offset = 0;
+        while (offset < n) {
+            int read = inputStream.read(data, offset, n - offset);
+            if (read == -1) {
+                throw new IOException("Unexpected end of stream, expected " + 
n + " bytes but got " + offset);
+            }
+            offset += read;
+        }
+        return data;
+    }
+
     public static int extractFieldNumFromTag(int tag) {
         return tag >> 3;
     }
@@ -168,6 +201,54 @@ public class TripleCustomerProtocolWrapper {
             return byteBuffer.array();
         }
 
+        public static TripleResponseWrapper parseFrom(InputStream inputStream) 
throws IOException {
+            TripleResponseWrapper tripleResponseWrapper = new 
TripleResponseWrapper();
+            int b;
+            while ((b = inputStream.read()) != -1) {
+                int tag = b;
+                if ((b & 0x80) != 0) {
+                    int shift = 7;
+                    tag = b & 0x7F;
+                    while (shift < 35) {
+                        b = inputStream.read();
+                        if (b == -1) {
+                            throw new IOException("Unexpected end of stream 
while reading tag");
+                        }
+                        tag |= (b & 0x7F) << shift;
+                        if ((b & 0x80) == 0) {
+                            break;
+                        }
+                        shift += 7;
+                    }
+                }
+
+                int fieldNum = extractFieldNumFromTag(tag);
+                int wireType = extractWireTypeFromTag(tag);
+                if (wireType != 2) {
+                    throw new RuntimeException(
+                            String.format("unexpected wireType, expect %d 
realType %d", 2, wireType));
+                }
+
+                int length = readRawVarint32(inputStream);
+                byte[] fieldData = readExactly(inputStream, length);
+
+                if (fieldNum == 1) {
+                    tripleResponseWrapper.serializeType = new 
String(fieldData);
+                } else if (fieldNum == 2) {
+                    tripleResponseWrapper.data = fieldData;
+                } else if (fieldNum == 3) {
+                    tripleResponseWrapper.type = new String(fieldData);
+                } else {
+                    throw new RuntimeException("fieldNum should in (1,2,3)");
+                }
+            }
+            return tripleResponseWrapper;
+        }
+
+        public void writeTo(OutputStream outputStream) throws IOException {
+            outputStream.write(toByteArray());
+        }
+
         public static final class Builder {
             private String serializeType;
 
@@ -317,6 +398,56 @@ public class TripleCustomerProtocolWrapper {
             return byteBuffer.array();
         }
 
+        public static TripleRequestWrapper parseFrom(InputStream inputStream) 
throws IOException {
+            TripleRequestWrapper tripleRequestWrapper = new 
TripleRequestWrapper();
+            tripleRequestWrapper.args = new ArrayList<>();
+            tripleRequestWrapper.argTypes = new ArrayList<>();
+            int b;
+            while ((b = inputStream.read()) != -1) {
+                int tag = b;
+                if ((b & 0x80) != 0) {
+                    int shift = 7;
+                    tag = b & 0x7F;
+                    while (shift < 35) {
+                        b = inputStream.read();
+                        if (b == -1) {
+                            throw new IOException("Unexpected end of stream 
while reading tag");
+                        }
+                        tag |= (b & 0x7F) << shift;
+                        if ((b & 0x80) == 0) {
+                            break;
+                        }
+                        shift += 7;
+                    }
+                }
+
+                int fieldNum = extractFieldNumFromTag(tag);
+                int wireType = extractWireTypeFromTag(tag);
+                if (wireType != 2) {
+                    throw new RuntimeException(
+                            String.format("unexpected wireType, expect %d 
realType %d", 2, wireType));
+                }
+
+                int length = readRawVarint32(inputStream);
+                byte[] fieldData = readExactly(inputStream, length);
+
+                if (fieldNum == 1) {
+                    tripleRequestWrapper.serializeType = new String(fieldData);
+                } else if (fieldNum == 2) {
+                    tripleRequestWrapper.args.add(fieldData);
+                } else if (fieldNum == 3) {
+                    tripleRequestWrapper.argTypes.add(new String(fieldData));
+                } else {
+                    throw new RuntimeException("fieldNum should in (1,2,3)");
+                }
+            }
+            return tripleRequestWrapper;
+        }
+
+        public void writeTo(OutputStream outputStream) throws IOException {
+            outputStream.write(toByteArray());
+        }
+
         public static final class Builder {
 
             private String serializeType;
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index e66eebad79..54e6533137 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.io.StreamUtils;
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
 import org.apache.dubbo.common.utils.UrlUtils;
@@ -32,6 +31,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.PackableMethod;
 import org.apache.dubbo.rpc.model.PackableMethodFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -84,9 +84,10 @@ public class GrpcCompositeCodec implements HttpMessageCodec {
         try {
             int compressed = 0;
             outputStream.write(compressed);
-            byte[] bytes = packableMethod.packResponse(data);
-            writeLength(outputStream, bytes.length);
-            outputStream.write(bytes);
+            ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+            packableMethod.packResponse(data, buffer);
+            writeLength(outputStream, buffer.size());
+            buffer.writeTo(outputStream);
         } catch (HttpStatusException e) {
             throw e;
         } catch (Exception e) {
@@ -97,8 +98,7 @@ public class GrpcCompositeCodec implements HttpMessageCodec {
     @Override
     public Object decode(InputStream inputStream, Class<?> targetType, Charset 
charset) throws DecodeException {
         try {
-            byte[] data = StreamUtils.readBytes(inputStream);
-            return packableMethod.parseRequest(data);
+            return packableMethod.parseRequest(inputStream);
         } catch (HttpStatusException e) {
             throw e;
         } catch (Exception e) {

Reply via email to