This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 5f39404b07 Support set actual content length to inv/res attributes 
(#12521)
5f39404b07 is described below

commit 5f39404b076734d93c4437994f56280daaaabcd0
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Jun 13 21:42:27 2023 +0800

    Support set actual content length to inv/res attributes (#12521)
---
 .../src/main/java/org/apache/dubbo/remoting/Constants.java          | 3 +++
 .../apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java    | 4 ++++
 .../org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java    | 4 ++++
 .../org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java  | 4 ++--
 .../dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java     | 2 +-
 .../java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java     | 3 ++-
 .../rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java  | 2 +-
 .../java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java     | 3 ++-
 .../dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java | 2 +-
 .../org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java    | 6 +++---
 .../apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java | 6 +++++-
 .../apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java | 4 +++-
 12 files changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
index a5f7fcebef..e3dfee7da7 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
@@ -163,4 +163,7 @@ public interface Constants {
     String OK_HTTP = "ok-http";
     String URL_CONNECTION = "url-connection";
     String APACHE_HTTP_CLIENT = "apache-http-client";
+
+    String CONTENT_LENGTH_KEY = "content-length";
+
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 3f2127c0b5..f23a7a9345 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.Codec;
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.Decodeable;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.Request;
@@ -119,6 +120,9 @@ public class DecodeableRpcInvocation extends RpcInvocation 
implements Codec, Dec
 
     @Override
     public Object decode(Channel channel, InputStream input) throws 
IOException {
+        int contentLength = input.available();
+        getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);
+
         ObjectInput in = CodecSupport.getSerialization(serializationType)
             .deserialize(channel.getUrl(), input);
         this.put(SERIALIZATION_ID_KEY, serializationType);
diff --git 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index ef3b89bb31..ea6f94ed89 100644
--- 
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.common.utils.Assert;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.Codec;
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.Decodeable;
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.CodecSupport;
@@ -82,6 +83,9 @@ public class DecodeableRpcResult extends AppResponse 
implements Codec, Decodeabl
             log.debug("Decoding in thread -- [" + thread.getName() + "#" + 
thread.getId() + "]");
         }
 
+        int contentLength = input.available();
+        setAttribute(Constants.CONTENT_LENGTH_KEY, contentLength);
+
         // switch TCCL
         if (invocation != null && invocation.getServiceModel() != null) {
             
Thread.currentThread().setContextClassLoader(invocation.getServiceModel().getClassLoader());
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
index 623e731ec4..8dddc8daf6 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
@@ -52,8 +52,8 @@ import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_CREATE_STREAM_TRIPLE;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_PARSE;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
+import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
 
 public abstract class AbstractServerCall implements ServerCall, 
ServerStream.Listener {
 
@@ -212,7 +212,7 @@ public abstract class AbstractServerCall implements 
ServerCall, ServerStream.Lis
             .getContextClassLoader();
         try {
             Object instance = parseSingleMessage(message);
-            listener.onMessage(instance);
+            listener.onMessage(instance, message.length);
         } catch (Exception e) {
             final TriRpcStatus status = 
TriRpcStatus.UNKNOWN.withDescription("Server error")
                 .withCause(e);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
index 754ef77f3c..2d3346b5c6 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
@@ -40,7 +40,7 @@ public class BiStreamServerCallListener extends 
AbstractServerCallListener {
     }
 
     @Override
-    public void onMessage(Object message) {
+    public void onMessage(Object message, int actualContentLength) {
         if (message instanceof Object[]) {
             message = ((Object[]) message)[0];
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index 6f5b43b6b8..d99c8c3bf0 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -44,8 +44,9 @@ public interface ClientCall {
          * Callback when message received.
          *
          * @param message message received
+         * @param actualContentLength actual content length from body
          */
-        void onMessage(Object message);
+        void onMessage(Object message, int actualContentLength);
 
         /**
          * Callback when call is finished.
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 5d427e05ee..f52bf4e8d1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -38,7 +38,7 @@ public class ObserverToClientCallListenerAdapter implements 
ClientCall.Listener
     }
 
     @Override
-    public void onMessage(Object message) {
+    public void onMessage(Object message, int actualContentLength) {
         delegate.onNext(message);
         if (call.isAutoRequest()) {
             call.request(1);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
index d43eaab973..45742b9a80 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
@@ -37,8 +37,9 @@ public interface ServerCall {
          * Callback when a request message is received.
          *
          * @param message message received
+         * @param actualContentLength actual content length from body
          */
-        void onMessage(Object message);
+        void onMessage(Object message, int actualContentLength);
 
         /**
          * @param status when the call is canceled.
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
index d518047321..c6b42e50e2 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
@@ -34,7 +34,7 @@ public class ServerStreamServerCallListener extends 
AbstractServerCallListener {
     }
 
     @Override
-    public void onMessage(Object message) {
+    public void onMessage(Object message, int actualContentLength) {
         if (message instanceof Object[]) {
             message = ((Object[]) message)[0];
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 59aeede63b..4ba712e843 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -18,7 +18,6 @@
 package org.apache.dubbo.rpc.protocol.tri.call;
 
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import io.netty.handler.codec.http2.Http2Exception;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
@@ -34,14 +33,15 @@ import 
org.apache.dubbo.rpc.protocol.tri.stream.TripleClientStream;
 import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
 
 import io.netty.channel.Channel;
+import io.netty.handler.codec.http2.Http2Exception;
 
 import java.util.Map;
 import java.util.concurrent.Executor;
 
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_STREAM_LISTENER;
-import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
 
 public class TripleClientCall implements ClientCall, ClientStream.Listener {
     private static final ErrorTypeAwareLogger LOGGER = 
LoggerFactory.getErrorTypeAwareLogger(TripleClientCall.class);
@@ -78,7 +78,7 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
         }
         try {
             final Object unpacked = 
requestMetadata.packableMethod.parseResponse(message, isReturnTriException);
-            listener.onMessage(unpacked);
+            listener.onMessage(unpacked, message.length);
         } catch (Throwable t) {
             TriRpcStatus status = 
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
                 .withCause(t);
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
index 57dbb625f8..b08ee0618f 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.rpc.protocol.tri.call;
 
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.TriRpcStatus;
 import org.apache.dubbo.rpc.protocol.tri.DeadlineFuture;
@@ -27,14 +28,16 @@ public class UnaryClientCallListener implements 
ClientCall.Listener {
 
     private final DeadlineFuture future;
     private Object appResponse;
+    private int actualContentLength;
 
     public UnaryClientCallListener(DeadlineFuture deadlineFuture) {
         this.future = deadlineFuture;
     }
 
     @Override
-    public void onMessage(Object message) {
+    public void onMessage(Object message, int actualContentLength) {
         this.appResponse = message;
+        this.actualContentLength = actualContentLength;
     }
 
     @Override
@@ -50,6 +53,7 @@ public class UnaryClientCallListener implements 
ClientCall.Listener {
          } else {
             result.setException(status.asException());
         }
+        result.setAttribute(Constants.CONTENT_LENGTH_KEY, actualContentLength);
         future.received(status, result);
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
index 36011c5b3f..e53b72079a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.dubbo.rpc.protocol.tri.call;
 
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
@@ -40,12 +41,13 @@ public class UnaryServerCallListener extends 
AbstractServerCallListener {
     }
 
     @Override
-    public void onMessage(Object message) {
+    public void onMessage(Object message, int actualContentLength) {
         if (message instanceof Object[]) {
             invocation.setArguments((Object[]) message);
         } else {
             invocation.setArguments(new Object[]{message});
         }
+        invocation.put(Constants.CONTENT_LENGTH_KEY, actualContentLength);
     }
 
     @Override

Reply via email to