This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new f995ef506c Fix triple method override (#13224)
f995ef506c is described below
commit f995ef506c2e3306f9cbcd4bc33ad169a3da0b7e
Author: Albumen Kevin <[email protected]>
AuthorDate: Wed Oct 18 11:39:33 2023 +0800
Fix triple method override (#13224)
* Fix triple method override
* Fix server
* Fix test
---
.../rpc/protocol/tri/ReflectionPackableMethod.java | 19 +++++---------
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 25 +++++++++++--------
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 2 --
.../tri/call/ReflectionAbstractServerCall.java | 29 +++++++++++++++-------
.../tri/call/ReflectionServerCallTest.java | 4 +++
5 files changed, 45 insertions(+), 34 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
index da3e67dfa4..66f34d48c3 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ReflectionPackableMethod.java
@@ -22,16 +22,16 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.serialize.MultipleSerialization;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.Constants;
-import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.remoting.transport.CodecSupport;
+import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.Pack;
import org.apache.dubbo.rpc.model.PackableMethod;
-
-import com.google.protobuf.Message;
import org.apache.dubbo.rpc.model.UnPack;
import org.apache.dubbo.rpc.model.WrapperUnPack;
+import com.google.protobuf.Message;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -42,7 +42,6 @@ import java.util.stream.Stream;
import static org.apache.dubbo.common.constants.CommonConstants.$ECHO;
import static
org.apache.dubbo.common.constants.CommonConstants.PROTOBUF_MESSAGE_CLASS_NAME;
-import static
org.apache.dubbo.rpc.protocol.tri.TripleProtocol.METHOD_ATTR_PACK;
public class ReflectionPackableMethod implements PackableMethod {
@@ -118,16 +117,10 @@ public class ReflectionPackableMethod implements
PackableMethod {
}
public static ReflectionPackableMethod init(MethodDescriptor
methodDescriptor, URL url) {
- Object stored = methodDescriptor.getAttribute(METHOD_ATTR_PACK);
- if (stored != null) {
- return (ReflectionPackableMethod) stored;
- }
- final String serializeName = UrlUtils.serializationOrDefault(url);
- final Collection<String> allSerialize =
UrlUtils.allSerializations(url);
- ReflectionPackableMethod reflectionPackableMethod = new
ReflectionPackableMethod(
+ String serializeName = UrlUtils.serializationOrDefault(url);
+ Collection<String> allSerialize = UrlUtils.allSerializations(url);
+ return new ReflectionPackableMethod(
methodDescriptor, url, serializeName, allSerialize);
- methodDescriptor.addAttribute(METHOD_ATTR_PACK,
reflectionPackableMethod);
- return reflectionPackableMethod;
}
static boolean isStreamType(Class<?> type) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index b666c8c5e3..7903cf00db 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -62,9 +62,11 @@ import org.apache.dubbo.rpc.support.RpcUtils;
import io.netty.util.AsciiString;
import java.util.Arrays;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
@@ -94,18 +96,22 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
private final TripleWriteQueue writeQueue = new TripleWriteQueue(256);
private static final boolean setFutureWhenSync =
Boolean.parseBoolean(System.getProperty(CommonConstants.SET_FUTURE_IN_SYNC_MODE,
"true"));
+ private final PackableMethodFactory packableMethodFactory;
+ private final Map<MethodDescriptor, PackableMethod> packableMethodCache =
new ConcurrentHashMap<>();
public TripleInvoker(Class<T> serviceType,
- URL url,
- String acceptEncodings,
- AbstractConnectionClient connectionClient,
- Set<Invoker<?>> invokers,
- ExecutorService streamExecutor) {
+ URL url,
+ String acceptEncodings,
+ AbstractConnectionClient connectionClient,
+ Set<Invoker<?>> invokers,
+ ExecutorService streamExecutor) {
super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY,
TOKEN_KEY});
this.invokers = invokers;
this.connectionClient = connectionClient;
this.acceptEncodings = acceptEncodings;
this.streamExecutor = streamExecutor;
+ this.packableMethodFactory =
url.getOrDefaultFrameworkModel().getExtensionLoader(PackableMethodFactory.class)
+
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()).getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY));
}
private static AsciiString getSchemeFromUrl(URL url) {
@@ -175,7 +181,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
}
- private static boolean isSync(MethodDescriptor methodDescriptor,
Invocation invocation){
+ private static boolean isSync(MethodDescriptor methodDescriptor,
Invocation invocation) {
if (!(invocation instanceof RpcInvocation)) {
return false;
}
@@ -230,7 +236,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new
RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " +
invocation.getServiceName() + "."
- + RpcUtils.getMethodName(invocation)+ ", terminate
directly."), invocation);
+ + RpcUtils.getMethodName(invocation) + ", terminate
directly."), invocation);
}
invocation.setAttachment(TIMEOUT_KEY, String.valueOf(timeout));
@@ -280,9 +286,8 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
if (methodDescriptor instanceof PackableMethod) {
meta.packableMethod = (PackableMethod) methodDescriptor;
} else {
- meta.packableMethod =
url.getOrDefaultFrameworkModel().getExtensionLoader(PackableMethodFactory.class)
-
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()).getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY))
- .create(methodDescriptor, url, TripleConstant.CONTENT_PROTO);
+ meta.packableMethod =
packableMethodCache.computeIfAbsent(methodDescriptor,
+ (md) -> packableMethodFactory.create(md, url,
TripleConstant.CONTENT_PROTO));
}
meta.convertNoLowerHeader = TripleProtocol.CONVERT_NO_LOWER_HEADER;
meta.ignoreDefaultVersion = TripleProtocol.IGNORE_1_0_0_VERSION;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 8f9925da79..f28d16287b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -55,8 +55,6 @@ import static
org.apache.dubbo.rpc.Constants.H2_SUPPORT_NO_LOWER_HEADER_KEY;
public class TripleProtocol extends AbstractProtocol {
-
- public static final String METHOD_ATTR_PACK = "pack";
private static final Logger logger =
LoggerFactory.getLogger(TripleProtocol.class);
private final PathResolver pathResolver;
private final TriBuiltinService triBuiltinService;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
index 7789474a70..80e7c3cbc9 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionAbstractServerCall.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri.call;
-import io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
@@ -29,6 +28,7 @@ import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.MethodDescriptor.RpcType;
+import org.apache.dubbo.rpc.model.PackableMethod;
import org.apache.dubbo.rpc.model.PackableMethodFactory;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
@@ -37,8 +37,12 @@ import
org.apache.dubbo.rpc.protocol.tri.TripleCustomerProtocolWapper;
import org.apache.dubbo.rpc.protocol.tri.stream.ServerStream;
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
+import io.netty.handler.codec.http.HttpHeaderNames;
+
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
@@ -46,6 +50,7 @@ import static
org.apache.dubbo.common.constants.CommonConstants.DUBBO_PACKABLE_M
public class ReflectionAbstractServerCall extends AbstractServerCall {
+ private static final String PACKABLE_METHOD_CACHE =
"PACKABLE_METHOD_CACHE";
private final List<HeaderFilter> headerFilters;
private List<MethodDescriptor> methodDescriptors;
@@ -120,10 +125,7 @@ public class ReflectionAbstractServerCall extends
AbstractServerCall {
}
}
if (methodDescriptor != null) {
- final URL url = invoker.getUrl();
- packableMethod =
frameworkModel.getExtensionLoader(PackableMethodFactory.class)
-
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()).getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY))
- .create(methodDescriptor, url, (String)
requestMetadata.get(HttpHeaderNames.CONTENT_TYPE.toString()));
+ loadPackableMethod(invoker.getUrl());
}
trySetListener();
if (listener == null) {
@@ -191,10 +193,19 @@ public class ReflectionAbstractServerCall extends
AbstractServerCall {
+ serviceDescriptor.getInterfaceName()), null);
return;
}
- final URL url = invoker.getUrl();
- packableMethod =
frameworkModel.getExtensionLoader(PackableMethodFactory.class)
-
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()).getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY))
- .create(methodDescriptor, url, (String)
requestMetadata.get(HttpHeaderNames.CONTENT_TYPE.toString()));
+ loadPackableMethod(invoker.getUrl());
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadPackableMethod(URL url) {
+ Map<MethodDescriptor, PackableMethod> cacheMap =
(Map<MethodDescriptor, PackableMethod>) url.getServiceModel()
+ .getServiceMetadata()
+ .getAttributeMap()
+ .computeIfAbsent(PACKABLE_METHOD_CACHE, (k) -> new
ConcurrentHashMap<>());
+ packableMethod = cacheMap.computeIfAbsent(methodDescriptor,
+ (md) ->
frameworkModel.getExtensionLoader(PackableMethodFactory.class)
+
.getExtension(ConfigurationUtils.getGlobalConfiguration(url.getApplicationModel()).getString(DUBBO_PACKABLE_METHOD_FACTORY,
DEFAULT_KEY))
+ .create(methodDescriptor, url, (String)
requestMetadata.get(HttpHeaderNames.CONTENT_TYPE.toString())));
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
index 7b841e5ac4..7b379dbcdd 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/call/ReflectionServerCallTest.java
@@ -24,6 +24,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ReflectionMethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
+import org.apache.dubbo.rpc.model.ServiceMetadata;
import org.apache.dubbo.rpc.protocol.tri.DescriptorService;
import org.apache.dubbo.rpc.protocol.tri.HelloReply;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleServerStream;
@@ -46,6 +47,7 @@ class ReflectionServerCallTest {
Invoker<?> invoker = Mockito.mock(Invoker.class);
TripleServerStream serverStream =
Mockito.mock(TripleServerStream.class);
ProviderModel providerModel = Mockito.mock(ProviderModel.class);
+ ServiceMetadata serviceMetadata = new ServiceMetadata();
Method method = DescriptorService.class.getMethod("sayHello",
HelloReply.class);
MethodDescriptor methodDescriptor = new
ReflectionMethodDescriptor(method);
URL url = Mockito.mock(URL.class);
@@ -53,6 +55,8 @@ class ReflectionServerCallTest {
.thenReturn(url);
when(url.getServiceModel())
.thenReturn(providerModel);
+ when(providerModel.getServiceMetadata())
+ .thenReturn(serviceMetadata);
String service = "testService";
String methodName = "method";