This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 678d2a7831 Refactor tri pack & unpack use stream (#16045)
678d2a7831 is described below
commit 678d2a7831a5e0bb5c5b66af8f35b9023ecc353b
Author: earthchen <[email protected]>
AuthorDate: Thu Feb 5 14:02:08 2026 +0800
Refactor tri pack & unpack use stream (#16045)
* refactor: enhance backpressure handling with byte tracking in HTTP/2
streams
* Update
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java
Co-authored-by: Copilot <[email protected]>
* fix
* refactor: expose methods for byte tracking in AbstractTripleClientStream
and Http2ServerChannelObserver
* refactor: enhance error handling and byte tracking in HTTP/2 stream
observers
* refactor: improve closing logic and state management in
LengthFieldStreamingDecoder
* fix: prevent race condition in stream initialization
* refactor: unify client and server deframer for Triple protocol
This commit unifies the deframer implementation between client and server
sides for the Triple protocol, eliminating code duplication and improving
maintainability.
## Key Changes
### Deleted Files
- TriDecoder.java: Removed client-specific deframer implementation
- Deframer.java: Removed client deframer interface
- RecordListener.java: Removed test helper
- TriDecoderTest.java: Removed corresponding test
### Core Modifications
1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support:
- Message boundary isolation (prevents reading beyond current message)
- mark/reset support required by Hessian2 and other deserializers
- Zero-copy optimization by reading directly from accumulate stream
2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to
onFragmentMessage() method for better flow control
3. **AbstractTripleClientStream**: Migrated from TriDecoder to
GrpcStreamingDecoder,
using ByteBufInputStream to adapt Netty's ByteBuf to InputStream
4. **Stream.Listener.onMessage**: Changed parameter from byte[] to
InputStream
for unified handling and memory optimization
5. **PackableMethod**: Added default parseResponse(InputStream) method for
backward compatibility
6. **CompositeInputStream**: Improved stream release logic to prevent
exceptions
7. **DescriptorUtils**: Added mark() call before reading stream to support
reset
## Architecture Change
Before:
- Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer
- Server: InputStream -> LengthFieldStreamingDecoder -> byte[] ->
deserializer
After:
- Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder ->
BoundedInputStream -> deserializer
- Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream ->
deserializer
## Bug Fix
Fixed BoundedInputStream.reset() not restoring the 'remaining' counter,
which caused streams to return EOF after mark/reset. This was the root cause
of POJO method invocation failures with "Unexpected serialization
type:null" error.
* refactor: update tri serialization methods to use InputStream and
OutputStream
* refactor: update tri serialization methods to use InputStream and
OutputStream
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Co-authored-by: Copilot <[email protected]>
* refactor: enhance error handling in deframer by using try-with-resources
for InputStream
* refactor: unify client and server deframer for Triple protocol
This commit unifies the deframer implementation between client and server
sides for the Triple protocol, eliminating code duplication and improving
maintainability.
## Key Changes
### Deleted Files
- TriDecoder.java: Removed client-specific deframer implementation
- Deframer.java: Removed client deframer interface
- RecordListener.java: Removed test helper
- TriDecoderTest.java: Removed corresponding test
### Core Modifications
1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support:
- Message boundary isolation (prevents reading beyond current message)
- mark/reset support required by Hessian2 and other deserializers
- Zero-copy optimization by reading directly from accumulate stream
2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to
onFragmentMessage() method for better flow control
3. **AbstractTripleClientStream**: Migrated from TriDecoder to
GrpcStreamingDecoder,
using ByteBufInputStream to adapt Netty's ByteBuf to InputStream
4. **Stream.Listener.onMessage**: Changed parameter from byte[] to
InputStream
for unified handling and memory optimization
5. **PackableMethod**: Added default parseResponse(InputStream) method for
backward compatibility
6. **CompositeInputStream**: Improved stream release logic to prevent
exceptions
7. **DescriptorUtils**: Added mark() call before reading stream to support
reset
## Architecture Change
Before:
- Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer
- Server: InputStream -> LengthFieldStreamingDecoder -> byte[] ->
deserializer
After:
- Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder ->
BoundedInputStream -> deserializer
- Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream ->
deserializer
## Bug Fix
Fixed BoundedInputStream.reset() not restoring the 'remaining' counter,
which caused streams to return EOF after mark/reset. This was the root cause
of POJO method invocation failures with "Unexpected serialization
type:null" error.
* refactor: enhance error handling in deframer by using try-with-resources
for InputStream
* refactor: update tri serialization methods to use InputStream and
OutputStream
* fix
* refactor: update tri serialization methods to use InputStream and
OutputStream
* fix
---------
Co-authored-by: Copilot <[email protected]>
Co-authored-by: zrlw <[email protected]>
---
.../main/java/org/apache/dubbo/rpc/model/Pack.java | 11 +-
.../org/apache/dubbo/rpc/model/PackableMethod.java | 63 ++++++----
.../java/org/apache/dubbo/rpc/model/UnPack.java | 18 ++-
.../org/apache/dubbo/rpc/model/WrapperUnPack.java | 23 ++++
.../dubbo/rpc/protocol/tri/DescriptorUtils.java | 4 +-
.../dubbo/rpc/protocol/tri/PbArrayPacker.java | 10 ++
.../apache/dubbo/rpc/protocol/tri/PbUnpack.java | 6 +
.../rpc/protocol/tri/ReflectionPackableMethod.java | 53 ++++++++-
.../tri/TripleCustomerProtocolWrapper.java | 131 +++++++++++++++++++++
.../protocol/tri/h12/grpc/GrpcCompositeCodec.java | 12 +-
10 files changed, 294 insertions(+), 37 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
index 340b668e42..f0d4d76c29 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
@@ -16,12 +16,17 @@
*/
package org.apache.dubbo.rpc.model;
+import java.io.OutputStream;
+
public interface Pack {
/**
- * @param obj instance
- * @return byte array
- * @throws Exception when error occurs
+ * @deprecated use {@link #pack(Object, OutputStream)} instead
*/
+ @Deprecated
byte[] pack(Object obj) throws Exception;
+
+ default void pack(Object obj, OutputStream out) throws Exception {
+ out.write(pack(obj));
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
index 2065726c22..c043833b90 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java
@@ -16,8 +16,8 @@
*/
package org.apache.dubbo.rpc.model;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
+import java.io.OutputStream;
/**
* A packable method is used to customize serialization for methods. It can
provide a common wrapper
@@ -25,14 +25,26 @@ import java.io.InputStream;
*/
public interface PackableMethod {
+ /**
+ * @deprecated use {@link #parseRequest(InputStream)} instead
+ */
+ @Deprecated
default Object parseRequest(byte[] data) throws Exception {
return getRequestUnpack().unpack(data);
}
+ /**
+ * @deprecated use {@link #parseResponse(InputStream)} instead
+ */
+ @Deprecated
default Object parseResponse(byte[] data) throws Exception {
return parseResponse(data, false);
}
+ /**
+ * @deprecated use {@link #parseResponse(InputStream, boolean)} instead
+ */
+ @Deprecated
default Object parseResponse(byte[] data, boolean isReturnTriException)
throws Exception {
UnPack unPack = getResponseUnpack();
if (unPack instanceof WrapperUnPack) {
@@ -42,34 +54,45 @@ public interface PackableMethod {
}
/**
- * Parse response from InputStream.
- * Default implementation reads all bytes and delegates to byte[] version.
- *
- * @param inputStream the input stream containing the response data
- * @param isReturnTriException whether the response is a Triple exception
- * @return the parsed response object
- * @throws Exception if parsing fails
+ * @deprecated use {@link #packRequest(Object, OutputStream)} instead
*/
- default Object parseResponse(InputStream inputStream, boolean
isReturnTriException) throws Exception {
- // Read all bytes from InputStream
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- byte[] tmp = new byte[4096];
- int len;
- while ((len = inputStream.read(tmp)) != -1) {
- buffer.write(tmp, 0, len);
- }
- byte[] data = buffer.toByteArray();
- return parseResponse(data, isReturnTriException);
- }
-
+ @Deprecated
default byte[] packRequest(Object request) throws Exception {
return getRequestPack().pack(request);
}
+ /**
+ * @deprecated use {@link #packResponse(Object, OutputStream)} instead
+ */
+ @Deprecated
default byte[] packResponse(Object response) throws Exception {
return getResponsePack().pack(response);
}
+ default Object parseRequest(InputStream inputStream) throws Exception {
+ return getRequestUnpack().unpack(inputStream);
+ }
+
+ default Object parseResponse(InputStream inputStream) throws Exception {
+ return parseResponse(inputStream, false);
+ }
+
+ default Object parseResponse(InputStream inputStream, boolean
isReturnTriException) throws Exception {
+ UnPack unPack = getResponseUnpack();
+ if (unPack instanceof WrapperUnPack) {
+ return ((WrapperUnPack) unPack).unpack(inputStream,
isReturnTriException);
+ }
+ return unPack.unpack(inputStream);
+ }
+
+ default void packRequest(Object request, OutputStream outputStream) throws
Exception {
+ getRequestPack().pack(request, outputStream);
+ }
+
+ default void packResponse(Object response, OutputStream outputStream)
throws Exception {
+ getResponsePack().pack(response, outputStream);
+ }
+
default boolean needWrapper() {
return false;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
index d3c6b8ac6f..698c4dd5ae 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
@@ -16,12 +16,24 @@
*/
package org.apache.dubbo.rpc.model;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
public interface UnPack {
/**
- * @param data byte array
- * @return object instance
- * @throws Exception exception
+ * @deprecated use {@link #unpack(InputStream)} instead
*/
+ @Deprecated
Object unpack(byte[] data) throws Exception;
+
+ default Object unpack(InputStream inputStream) throws Exception {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ byte[] tmp = new byte[4096];
+ int len;
+ while ((len = inputStream.read(tmp)) != -1) {
+ buffer.write(tmp, 0, len);
+ }
+ return unpack(buffer.toByteArray());
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
index 1fd216b807..63d68b0ea8 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/WrapperUnPack.java
@@ -16,11 +16,34 @@
*/
package org.apache.dubbo.rpc.model;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
public interface WrapperUnPack extends UnPack {
+ @Override
default Object unpack(byte[] data) throws Exception {
return unpack(data, false);
}
+ /**
+ * @deprecated use {@link #unpack(InputStream, boolean)} instead
+ */
+ @Deprecated
Object unpack(byte[] data, boolean isReturnTriException) throws Exception;
+
+ @Override
+ default Object unpack(InputStream inputStream) throws Exception {
+ return unpack(inputStream, false);
+ }
+
+ default Object unpack(InputStream inputStream, boolean
isReturnTriException) throws Exception {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ byte[] tmp = new byte[4096];
+ int len;
+ while ((len = inputStream.read(tmp)) != -1) {
+ buffer.write(tmp, 0, len);
+ }
+ return unpack(buffer.toByteArray(), isReturnTriException);
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
index dbc2ac0b60..e8f34431e2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.rpc.Invoker;
@@ -126,9 +125,8 @@ public final class DescriptorUtils {
MethodDescriptor methodDescriptor =
findReflectionMethodDescriptor(serviceDescriptor, methodName);
if (methodDescriptor == null) {
rawMessage.mark(Integer.MAX_VALUE);
- byte[] data = StreamUtils.readBytes(rawMessage);
List<MethodDescriptor> methodDescriptors =
serviceDescriptor.getMethods(methodName);
- TripleRequestWrapper request =
TripleRequestWrapper.parseFrom(data);
+ TripleRequestWrapper request =
TripleRequestWrapper.parseFrom(rawMessage);
String[] paramTypes = request.getArgTypes().toArray(new String[0]);
// wrapper mode the method can overload so maybe list
for (MethodDescriptor descriptor : methodDescriptors) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
index b91d4b940f..c13bcd1bda 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbArrayPacker.java
@@ -18,6 +18,8 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.rpc.model.Pack;
+import java.io.OutputStream;
+
import com.google.protobuf.Message;
public class PbArrayPacker implements Pack {
@@ -37,4 +39,12 @@ public class PbArrayPacker implements Pack {
}
return PB_PACK.pack(obj);
}
+
+ @Override
+ public void pack(Object obj, OutputStream out) throws Exception {
+ if (!singleArgument) {
+ obj = ((Object[]) obj)[0];
+ }
+ ((Message) obj).writeTo(out);
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
index 81d12d2420..4aa333790e 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/PbUnpack.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.rpc.model.UnPack;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
public class PbUnpack<T> implements UnPack {
@@ -34,4 +35,9 @@ public class PbUnpack<T> implements UnPack {
final ByteArrayInputStream bais = new ByteArrayInputStream(data);
return SingleProtobufUtils.deserialize(bais, clz);
}
+
+ @Override
+ public Object unpack(InputStream inputStream) throws IOException {
+ return SingleProtobufUtils.deserialize(inputStream, clz);
+ }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index ae19f2a464..b60a8156c7 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -32,6 +32,8 @@ import org.apache.dubbo.rpc.model.WrapperUnPack;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
@@ -325,6 +327,18 @@ public class ReflectionPackableMethod implements
PackableMethod {
.build()
.toByteArray();
}
+
+ @Override
+ public void pack(Object obj, OutputStream out) throws IOException {
+ ByteArrayOutputStream dataBos = new ByteArrayOutputStream();
+ multipleSerialization.serialize(url, requestSerialize,
actualResponseType, obj, dataBos);
+
TripleCustomerProtocolWrapper.TripleResponseWrapper.Builder.newBuilder()
+ .setSerializeType(requestSerialize)
+ .setType(actualResponseType.getName())
+ .setData(dataBos.toByteArray())
+ .build()
+ .writeTo(out);
+ }
}
private static class WrapResponseUnpack implements WrapperUnPack {
@@ -348,9 +362,23 @@ public class ReflectionPackableMethod implements
PackableMethod {
return unpack(data, false);
}
+ @Override
public Object unpack(byte[] data, boolean isReturnTriException) throws
IOException, ClassNotFoundException {
TripleCustomerProtocolWrapper.TripleResponseWrapper wrapper =
TripleCustomerProtocolWrapper.TripleResponseWrapper.parseFrom(data);
+ return deserializeFromWrapper(wrapper, isReturnTriException);
+ }
+
+ @Override
+ public Object unpack(InputStream inputStream, boolean
isReturnTriException) throws Exception {
+ TripleCustomerProtocolWrapper.TripleResponseWrapper wrapper =
+
TripleCustomerProtocolWrapper.TripleResponseWrapper.parseFrom(inputStream);
+ return deserializeFromWrapper(wrapper, isReturnTriException);
+ }
+
+ private Object deserializeFromWrapper(
+ TripleCustomerProtocolWrapper.TripleResponseWrapper wrapper,
boolean isReturnTriException)
+ throws IOException, ClassNotFoundException {
final String serializeType =
convertHessianFromWrapper(wrapper.getSerializeType());
CodecSupport.checkSerialization(serializeType, allSerialize);
@@ -389,6 +417,15 @@ public class ReflectionPackableMethod implements
PackableMethod {
@Override
public byte[] pack(Object obj) throws IOException {
+ return buildWrapper(obj).toByteArray();
+ }
+
+ @Override
+ public void pack(Object obj, OutputStream out) throws IOException {
+ buildWrapper(obj).writeTo(out);
+ }
+
+ private TripleCustomerProtocolWrapper.TripleRequestWrapper
buildWrapper(Object obj) throws IOException {
Object[] arguments;
if (singleArgument) {
arguments = new Object[] {obj};
@@ -402,7 +439,7 @@ public class ReflectionPackableMethod implements
PackableMethod {
builder.addArgTypes(type);
}
if (actualRequestTypes == null || actualRequestTypes.length == 0) {
- return builder.build().toByteArray();
+ return builder.build();
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (int i = 0; i < arguments.length; i++) {
@@ -411,7 +448,7 @@ public class ReflectionPackableMethod implements
PackableMethod {
builder.addArgs(bos.toByteArray());
bos.reset();
}
- return builder.build().toByteArray();
+ return builder.build();
}
/**
@@ -449,10 +486,22 @@ public class ReflectionPackableMethod implements
PackableMethod {
this.allSerialize = allSerialize;
}
+ @Override
public Object unpack(byte[] data, boolean isReturnTriException) throws
IOException, ClassNotFoundException {
TripleCustomerProtocolWrapper.TripleRequestWrapper wrapper =
TripleCustomerProtocolWrapper.TripleRequestWrapper.parseFrom(data);
+ return deserializeFromWrapper(wrapper);
+ }
+
+ @Override
+ public Object unpack(InputStream inputStream, boolean
isReturnTriException) throws Exception {
+ TripleCustomerProtocolWrapper.TripleRequestWrapper wrapper =
+
TripleCustomerProtocolWrapper.TripleRequestWrapper.parseFrom(inputStream);
+ return deserializeFromWrapper(wrapper);
+ }
+ private Object[]
deserializeFromWrapper(TripleCustomerProtocolWrapper.TripleRequestWrapper
wrapper)
+ throws IOException, ClassNotFoundException {
String wrapperSerializeType =
convertHessianFromWrapper(wrapper.getSerializeType());
CodecSupport.checkSerialization(wrapperSerializeType,
allSerialize);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
index 7b57a71d25..e9abb8dd73 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
@@ -19,6 +19,9 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -71,6 +74,36 @@ public class TripleCustomerProtocolWrapper {
return val;
}
+ public static int readRawVarint32(InputStream inputStream) throws
IOException {
+ int result = 0;
+ int shift = 0;
+ while (shift < 35) {
+ int b = inputStream.read();
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream while reading
varint");
+ }
+ result |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ shift += 7;
+ }
+ throw new IOException("Malformed varint");
+ }
+
+ private static byte[] readExactly(InputStream inputStream, int n) throws
IOException {
+ byte[] data = new byte[n];
+ int offset = 0;
+ while (offset < n) {
+ int read = inputStream.read(data, offset, n - offset);
+ if (read == -1) {
+ throw new IOException("Unexpected end of stream, expected " +
n + " bytes but got " + offset);
+ }
+ offset += read;
+ }
+ return data;
+ }
+
public static int extractFieldNumFromTag(int tag) {
return tag >> 3;
}
@@ -168,6 +201,54 @@ public class TripleCustomerProtocolWrapper {
return byteBuffer.array();
}
+ public static TripleResponseWrapper parseFrom(InputStream inputStream)
throws IOException {
+ TripleResponseWrapper tripleResponseWrapper = new
TripleResponseWrapper();
+ int b;
+ while ((b = inputStream.read()) != -1) {
+ int tag = b;
+ if ((b & 0x80) != 0) {
+ int shift = 7;
+ tag = b & 0x7F;
+ while (shift < 35) {
+ b = inputStream.read();
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream
while reading tag");
+ }
+ tag |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+ }
+
+ int fieldNum = extractFieldNumFromTag(tag);
+ int wireType = extractWireTypeFromTag(tag);
+ if (wireType != 2) {
+ throw new RuntimeException(
+ String.format("unexpected wireType, expect %d
realType %d", 2, wireType));
+ }
+
+ int length = readRawVarint32(inputStream);
+ byte[] fieldData = readExactly(inputStream, length);
+
+ if (fieldNum == 1) {
+ tripleResponseWrapper.serializeType = new
String(fieldData);
+ } else if (fieldNum == 2) {
+ tripleResponseWrapper.data = fieldData;
+ } else if (fieldNum == 3) {
+ tripleResponseWrapper.type = new String(fieldData);
+ } else {
+ throw new RuntimeException("fieldNum should in (1,2,3)");
+ }
+ }
+ return tripleResponseWrapper;
+ }
+
+ public void writeTo(OutputStream outputStream) throws IOException {
+ outputStream.write(toByteArray());
+ }
+
public static final class Builder {
private String serializeType;
@@ -317,6 +398,56 @@ public class TripleCustomerProtocolWrapper {
return byteBuffer.array();
}
+ public static TripleRequestWrapper parseFrom(InputStream inputStream)
throws IOException {
+ TripleRequestWrapper tripleRequestWrapper = new
TripleRequestWrapper();
+ tripleRequestWrapper.args = new ArrayList<>();
+ tripleRequestWrapper.argTypes = new ArrayList<>();
+ int b;
+ while ((b = inputStream.read()) != -1) {
+ int tag = b;
+ if ((b & 0x80) != 0) {
+ int shift = 7;
+ tag = b & 0x7F;
+ while (shift < 35) {
+ b = inputStream.read();
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream
while reading tag");
+ }
+ tag |= (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+ }
+
+ int fieldNum = extractFieldNumFromTag(tag);
+ int wireType = extractWireTypeFromTag(tag);
+ if (wireType != 2) {
+ throw new RuntimeException(
+ String.format("unexpected wireType, expect %d
realType %d", 2, wireType));
+ }
+
+ int length = readRawVarint32(inputStream);
+ byte[] fieldData = readExactly(inputStream, length);
+
+ if (fieldNum == 1) {
+ tripleRequestWrapper.serializeType = new String(fieldData);
+ } else if (fieldNum == 2) {
+ tripleRequestWrapper.args.add(fieldData);
+ } else if (fieldNum == 3) {
+ tripleRequestWrapper.argTypes.add(new String(fieldData));
+ } else {
+ throw new RuntimeException("fieldNum should in (1,2,3)");
+ }
+ }
+ return tripleRequestWrapper;
+ }
+
+ public void writeTo(OutputStream outputStream) throws IOException {
+ outputStream.write(toByteArray());
+ }
+
public static final class Builder {
private String serializeType;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
index e66eebad79..54e6533137 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/grpc/GrpcCompositeCodec.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri.h12.grpc;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.UrlUtils;
@@ -32,6 +31,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.PackableMethod;
import org.apache.dubbo.rpc.model.PackableMethodFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -84,9 +84,10 @@ public class GrpcCompositeCodec implements HttpMessageCodec {
try {
int compressed = 0;
outputStream.write(compressed);
- byte[] bytes = packableMethod.packResponse(data);
- writeLength(outputStream, bytes.length);
- outputStream.write(bytes);
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ packableMethod.packResponse(data, buffer);
+ writeLength(outputStream, buffer.size());
+ buffer.writeTo(outputStream);
} catch (HttpStatusException e) {
throw e;
} catch (Exception e) {
@@ -97,8 +98,7 @@ public class GrpcCompositeCodec implements HttpMessageCodec {
@Override
public Object decode(InputStream inputStream, Class<?> targetType, Charset
charset) throws DecodeException {
try {
- byte[] data = StreamUtils.readBytes(inputStream);
- return packableMethod.parseRequest(data);
+ return packableMethod.parseRequest(inputStream);
} catch (HttpStatusException e) {
throw e;
} catch (Exception e) {