This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 5f39404b07 Support set actual content length to inv/res attributes
(#12521)
5f39404b07 is described below
commit 5f39404b076734d93c4437994f56280daaaabcd0
Author: Albumen Kevin <[email protected]>
AuthorDate: Tue Jun 13 21:42:27 2023 +0800
Support set actual content length to inv/res attributes (#12521)
---
.../src/main/java/org/apache/dubbo/remoting/Constants.java | 3 +++
.../apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java | 4 ++++
.../org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java | 4 ++++
.../org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java | 4 ++--
.../dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java | 2 +-
.../java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java | 3 ++-
.../rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java | 2 +-
.../java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java | 3 ++-
.../dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java | 2 +-
.../org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java | 6 +++---
.../apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java | 6 +++++-
.../apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java | 4 +++-
12 files changed, 31 insertions(+), 12 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
index a5f7fcebef..e3dfee7da7 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
@@ -163,4 +163,7 @@ public interface Constants {
String OK_HTTP = "ok-http";
String URL_CONNECTION = "url-connection";
String APACHE_HTTP_CLIENT = "apache-http-client";
+
+ String CONTENT_LENGTH_KEY = "content-length";
+
}
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 3f2127c0b5..f23a7a9345 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.Decodeable;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.Request;
@@ -119,6 +120,9 @@ public class DecodeableRpcInvocation extends RpcInvocation
implements Codec, Dec
@Override
public Object decode(Channel channel, InputStream input) throws
IOException {
+ int contentLength = input.available();
+ getAttributes().put(Constants.CONTENT_LENGTH_KEY, contentLength);
+
ObjectInput in = CodecSupport.getSerialization(serializationType)
.deserialize(channel.getUrl(), input);
this.put(SERIALIZATION_ID_KEY, serializationType);
diff --git
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
index ef3b89bb31..ea6f94ed89 100644
---
a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
+++
b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Codec;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.Decodeable;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.CodecSupport;
@@ -82,6 +83,9 @@ public class DecodeableRpcResult extends AppResponse
implements Codec, Decodeabl
log.debug("Decoding in thread -- [" + thread.getName() + "#" +
thread.getId() + "]");
}
+ int contentLength = input.available();
+ setAttribute(Constants.CONTENT_LENGTH_KEY, contentLength);
+
// switch TCCL
if (invocation != null && invocation.getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invocation.getServiceModel().getClassLoader());
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
index 623e731ec4..8dddc8daf6 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/AbstractServerCall.java
@@ -52,8 +52,8 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_CREATE_STREAM_TRIPLE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_PARSE;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
public abstract class AbstractServerCall implements ServerCall,
ServerStream.Listener {
@@ -212,7 +212,7 @@ public abstract class AbstractServerCall implements
ServerCall, ServerStream.Lis
.getContextClassLoader();
try {
Object instance = parseSingleMessage(message);
- listener.onMessage(instance);
+ listener.onMessage(instance, message.length);
} catch (Exception e) {
final TriRpcStatus status =
TriRpcStatus.UNKNOWN.withDescription("Server error")
.withCause(e);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
index 754ef77f3c..2d3346b5c6 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/BiStreamServerCallListener.java
@@ -40,7 +40,7 @@ public class BiStreamServerCallListener extends
AbstractServerCallListener {
}
@Override
- public void onMessage(Object message) {
+ public void onMessage(Object message, int actualContentLength) {
if (message instanceof Object[]) {
message = ((Object[]) message)[0];
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index 6f5b43b6b8..d99c8c3bf0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -44,8 +44,9 @@ public interface ClientCall {
* Callback when message received.
*
* @param message message received
+ * @param actualContentLength actual content length from body
*/
- void onMessage(Object message);
+ void onMessage(Object message, int actualContentLength);
/**
* Callback when call is finished.
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 5d427e05ee..f52bf4e8d1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -38,7 +38,7 @@ public class ObserverToClientCallListenerAdapter implements
ClientCall.Listener
}
@Override
- public void onMessage(Object message) {
+ public void onMessage(Object message, int actualContentLength) {
delegate.onNext(message);
if (call.isAutoRequest()) {
call.request(1);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
index d43eaab973..45742b9a80 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerCall.java
@@ -37,8 +37,9 @@ public interface ServerCall {
* Callback when a request message is received.
*
* @param message message received
+ * @param actualContentLength actual content length from body
*/
- void onMessage(Object message);
+ void onMessage(Object message, int actualContentLength);
/**
* @param status when the call is canceled.
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
index d518047321..c6b42e50e2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ServerStreamServerCallListener.java
@@ -34,7 +34,7 @@ public class ServerStreamServerCallListener extends
AbstractServerCallListener {
}
@Override
- public void onMessage(Object message) {
+ public void onMessage(Object message, int actualContentLength) {
if (message instanceof Object[]) {
message = ((Object[]) message)[0];
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 59aeede63b..4ba712e843 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -18,7 +18,6 @@
package org.apache.dubbo.rpc.protocol.tri.call;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import io.netty.handler.codec.http2.Http2Exception;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
@@ -34,14 +33,15 @@ import
org.apache.dubbo.rpc.protocol.tri.stream.TripleClientStream;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import io.netty.channel.Channel;
+import io.netty.handler.codec.http2.Http2Exception;
import java.util.Map;
import java.util.concurrent.Executor;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_RESPONSE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_STREAM_LISTENER;
-import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
public class TripleClientCall implements ClientCall, ClientStream.Listener {
private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(TripleClientCall.class);
@@ -78,7 +78,7 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
}
try {
final Object unpacked =
requestMetadata.packableMethod.parseResponse(message, isReturnTriException);
- listener.onMessage(unpacked);
+ listener.onMessage(unpacked, message.length);
} catch (Throwable t) {
TriRpcStatus status =
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
.withCause(t);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
index 57dbb625f8..b08ee0618f 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri.call;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.protocol.tri.DeadlineFuture;
@@ -27,14 +28,16 @@ public class UnaryClientCallListener implements
ClientCall.Listener {
private final DeadlineFuture future;
private Object appResponse;
+ private int actualContentLength;
public UnaryClientCallListener(DeadlineFuture deadlineFuture) {
this.future = deadlineFuture;
}
@Override
- public void onMessage(Object message) {
+ public void onMessage(Object message, int actualContentLength) {
this.appResponse = message;
+ this.actualContentLength = actualContentLength;
}
@Override
@@ -50,6 +53,7 @@ public class UnaryClientCallListener implements
ClientCall.Listener {
} else {
result.setException(status.asException());
}
+ result.setAttribute(Constants.CONTENT_LENGTH_KEY, actualContentLength);
future.received(status, result);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
index 36011c5b3f..e53b72079a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryServerCallListener.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri.call;
+import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
@@ -40,12 +41,13 @@ public class UnaryServerCallListener extends
AbstractServerCallListener {
}
@Override
- public void onMessage(Object message) {
+ public void onMessage(Object message, int actualContentLength) {
if (message instanceof Object[]) {
invocation.setArguments((Object[]) message);
} else {
invocation.setArguments(new Object[]{message});
}
+ invocation.put(Constants.CONTENT_LENGTH_KEY, actualContentLength);
}
@Override