This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 3124dd8ac5812a16c4774a5fbe03f985ea826f7e Author: ken.lj <[email protected]> AuthorDate: Fri Jul 17 17:12:09 2020 +0800 set metadata proxy timeout --- .../rpc/cluster/directory/MockDirInvocation.java | 5 + .../router/condition/ConditionRouterTest.java | 2 +- .../support/AbstractClusterInvokerTest.java | 6 +- .../org/apache/dubbo/common/ConfigurationURL.java | 12 +- .../src/main/java/org/apache/dubbo/common/URL.java | 290 +++++++++++++++++++-- .../org/apache/dubbo/config/cache/CacheTest.java | 2 +- .../apache/dubbo/metadata/MetadataConstants.java | 2 + .../org/apache/dubbo/metadata/MetadataInfo.java | 18 ++ .../dubbo/monitor/support/MonitorFilterTest.java | 8 +- .../registry/client/DefaultServiceInstance.java | 8 + .../dubbo/registry/client/InstanceAddressURL.java | 187 ++++++++++--- .../client/ServiceDiscoveryRegistryDirectory.java | 72 ++--- .../dubbo/registry/client/ServiceInstance.java | 2 + .../listener/ServiceInstancesChangedListener.java | 14 +- .../registry/client/metadata/MetadataUtils.java | 3 +- .../StandardMetadataServiceURLBuilder.java | 7 +- .../main/java/org/apache/dubbo/rpc/Invocation.java | 2 + .../main/java/org/apache/dubbo/rpc/RpcContext.java | 66 ++--- .../java/org/apache/dubbo/rpc/RpcInvocation.java | 32 ++- .../org/apache/dubbo/rpc/filter/GenericFilter.java | 2 +- .../apache/dubbo/rpc/protocol/AbstractInvoker.java | 2 +- .../dubbo/rpc/proxy/AbstractProxyInvoker.java | 2 +- .../dubbo/rpc/proxy/InvokerInvocationHandler.java | 9 +- .../dubbo/rpc/filter/ExceptionFilterTest.java | 8 +- .../apache/dubbo/rpc/filter/GenericFilterTest.java | 8 +- .../dubbo/rpc/filter/GenericImplFilterTest.java | 8 +- .../apache/dubbo/rpc/proxy/AbstractProxyTest.java | 4 +- .../apache/dubbo/rpc/support/MockInvocation.java | 5 + .../org/apache/dubbo/rpc/support/RpcUtilsTest.java | 48 ++-- .../rpc/protocol/dubbo/CallbackServiceCodec.java | 10 +- .../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 7 +- .../dubbo/ReferenceCountExchangeClient.java | 9 +- 32 files changed, 638 insertions(+), 222 deletions(-) diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java index 7a4ffb9..bc237b9 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/directory/MockDirInvocation.java @@ -52,6 +52,11 @@ public class MockDirInvocation implements Invocation { return null; } + @Override + public String getProtocolServiceKey() { + return null; + } + public String getMethodName() { return "echo"; } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java index 00fdf53..204ce9a 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/condition/ConditionRouterTest.java @@ -137,7 +137,7 @@ public class ConditionRouterTest { @Test public void testRoute_methodRoute() { - Invocation invocation = new RpcInvocation("getFoo", "com.foo.BarService", new Class<?>[0], new Object[0]); + Invocation invocation = new RpcInvocation("getFoo", "com.foo.BarService", "", new Class<?>[0], new Object[0]); // More than one methods, mismatch Router router = new ConditionRouterFactory().getRouter(getRouteUrl("methods=getFoo => host = 1.2.3.4")); boolean matchWhen = ((ConditionRouter) router).matchWhen( diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java index a27a70e..3f94910 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java @@ -494,21 +494,21 @@ public class AbstractClusterInvokerTest { Directory<DemoService> directory = new StaticDirectory<DemoService>(invokers); FailoverClusterInvoker<DemoService> failoverClusterInvoker = new FailoverClusterInvoker<DemoService>(directory); try { - failoverClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[0], new Object[0])); + failoverClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[0], new Object[0])); Assertions.fail(); } catch (RpcException e) { Assertions.assertEquals(RpcException.TIMEOUT_EXCEPTION, e.getCode()); } ForkingClusterInvoker<DemoService> forkingClusterInvoker = new ForkingClusterInvoker<DemoService>(directory); try { - forkingClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[0], new Object[0])); + forkingClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[0], new Object[0])); Assertions.fail(); } catch (RpcException e) { Assertions.assertEquals(RpcException.TIMEOUT_EXCEPTION, e.getCode()); } FailfastClusterInvoker<DemoService> failfastClusterInvoker = new FailfastClusterInvoker<DemoService>(directory); try { - failfastClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[0], new Object[0])); + failfastClusterInvoker.invoke(new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[0], new Object[0])); Assertions.fail(); } catch (RpcException e) { Assertions.assertEquals(RpcException.TIMEOUT_EXCEPTION, e.getCode()); diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java similarity index 56% copy from dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java copy to dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java index 7ba0a43..2042277 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/ConfigurationURL.java @@ -14,15 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.metadata; +package org.apache.dubbo.common; -public class MetadataConstants { - public static final String KEY_SEPARATOR = ":"; - public static final String DEFAULT_PATH_TAG = "metadata"; - public static final String KEY_REVISON_PREFIX = "revision"; - public static final String META_DATA_STORE_TAG = ".metaData"; - public static final String SERVICE_META_DATA_STORE_TAG = ".smd"; - public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd"; - public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay"; - public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000; +public class ConfigurationURL extends URL { } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java index 9742ebb..211d6f8 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java @@ -602,7 +602,7 @@ class URL implements Serializable { return Arrays.asList(strArray); } - private Map<String, Number> getNumbers() { + protected Map<String, Number> getNumbers() { // concurrent initialization is tolerant if (numbers == null) { numbers = new ConcurrentHashMap<>(); @@ -610,7 +610,7 @@ class URL implements Serializable { return numbers; } - private Map<String, Map<String, Number>> getMethodNumbers() { + protected Map<String, Map<String, Number>> getMethodNumbers() { if (methodNumbers == null) { // concurrent initialization is tolerant methodNumbers = new ConcurrentHashMap<>(); } @@ -795,7 +795,7 @@ class URL implements Serializable { } public String getMethodParameterStrict(String method, String key) { - Map<String, String> keyMap = methodParameters.get(method); + Map<String, String> keyMap = getMethodParameters().get(method); String value = null; if (keyMap != null) { value = keyMap.get(key); @@ -804,7 +804,7 @@ class URL implements Serializable { } public String getMethodParameter(String method, String key) { - Map<String, String> keyMap = methodParameters.get(method); + Map<String, String> keyMap = getMethodParameters().get(method); String value = null; if (keyMap != null) { value = keyMap.get(key); @@ -1653,16 +1653,276 @@ class URL implements Serializable { subParameter.put(key, value); } -// public String getServiceParameter(String service, String key) { -// return getParameter(key); -// } -// -// public String getServiceMethodParameter(String service, String key) { -// return getParameter(key); -// } -// -// public String getServiceParameter(String service, String key) { -// return getParameter(key); -// } + /* add service scope operations, see InstanceAddressURL */ + public Map<String, String> getServiceParameters(String service) { + return getParameters(); + } + + public String getServiceParameter(String service, String key) { + return getParameter(key); + } + + public String getServiceParameter(String service, String key, String defaultValue) { + String value = getServiceParameter(service, key); + return StringUtils.isEmpty(value) ? defaultValue : value; + } + + public int getServiceParameter(String service, String key, int defaultValue) { + return getParameter(key, defaultValue); + } + + public double getServiceParameter(String service, String key, double defaultValue) { + Number n = getServiceNumbers(service).get(key); + if (n != null) { + return n.doubleValue(); + } + String value = getServiceParameter(service, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + double d = Double.parseDouble(value); + getNumbers().put(key, d); + return d; + } + + public float getServiceParameter(String service, String key, float defaultValue) { + Number n = getNumbers().get(key); + if (n != null) { + return n.floatValue(); + } + String value = getServiceParameter(service, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + float f = Float.parseFloat(value); + getNumbers().put(key, f); + return f; + } + + public long getServiceParameter(String service, String key, long defaultValue) { + Number n = getNumbers().get(key); + if (n != null) { + return n.longValue(); + } + String value = getServiceParameter(service, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + long l = Long.parseLong(value); + getNumbers().put(key, l); + return l; + } + + public short getServiceParameter(String service, String key, short defaultValue) { + Number n = getNumbers().get(key); + if (n != null) { + return n.shortValue(); + } + String value = getServiceParameter(service, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + short s = Short.parseShort(value); + getNumbers().put(key, s); + return s; + } + + public byte getServiceParameter(String service, String key, byte defaultValue) { + Number n = getNumbers().get(key); + if (n != null) { + return n.byteValue(); + } + String value = getServiceParameter(service, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + byte b = Byte.parseByte(value); + getNumbers().put(key, b); + return b; + } + + public char getServiceParameter(String service, String key, char defaultValue) { + String value = getServiceParameter(service, key); + return StringUtils.isEmpty(value) ? defaultValue : value.charAt(0); + } + + public boolean getServiceParameter(String service, String key, boolean defaultValue) { + String value = getServiceParameter(service, key); + return StringUtils.isEmpty(value) ? defaultValue : Boolean.parseBoolean(value); + } + + public boolean hasServiceParameter(String service, String key) { + String value = getServiceParameter(service, key); + return value != null && value.length() > 0; + } + + public float getPositiveServiceParameter(String service, String key, float defaultValue) { + if (defaultValue <= 0) { + throw new IllegalArgumentException("defaultValue <= 0"); + } + float value = getServiceParameter(service, key, defaultValue); + return value <= 0 ? defaultValue : value; + } + + public double getPositiveServiceParameter(String service, String key, double defaultValue) { + if (defaultValue <= 0) { + throw new IllegalArgumentException("defaultValue <= 0"); + } + double value = getServiceParameter(service, key, defaultValue); + return value <= 0 ? defaultValue : value; + } + + public long getPositiveServiceParameter(String service, String key, long defaultValue) { + if (defaultValue <= 0) { + throw new IllegalArgumentException("defaultValue <= 0"); + } + long value = getServiceParameter(service, key, defaultValue); + return value <= 0 ? defaultValue : value; + } + + public int getPositiveServiceParameter(String service, String key, int defaultValue) { + if (defaultValue <= 0) { + throw new IllegalArgumentException("defaultValue <= 0"); + } + int value = getServiceParameter(service, key, defaultValue); + return value <= 0 ? defaultValue : value; + } + + public short getPositiveServiceParameter(String service, String key, short defaultValue) { + if (defaultValue <= 0) { + throw new IllegalArgumentException("defaultValue <= 0"); + } + short value = getServiceParameter(service, key, defaultValue); + return value <= 0 ? defaultValue : value; + } + + public byte getPositiveServiceParameter(String service, String key, byte defaultValue) { + if (defaultValue <= 0) { + throw new IllegalArgumentException("defaultValue <= 0"); + } + byte value = getServiceParameter(service, key, defaultValue); + return value <= 0 ? defaultValue : value; + } + + public String getServiceMethodParameterAndDecoded(String service, String method, String key) { + return URL.decode(getServiceMethodParameter(service, method, key)); + } + + public String getServiceMethodParameterAndDecoded(String service, String method, String key, String defaultValue) { + return URL.decode(getServiceMethodParameter(service, method, key, defaultValue)); + } + + public String getServiceMethodParameterStrict(String service, String method, String key) { + return getMethodParameterStrict(method, key); + } + + public String getServiceMethodParameter(String service, String method, String key) { + return getMethodParameter(method, key); + } + + public String getServiceMethodParameter(String service, String method, String key, String defaultValue) { + String value = getServiceMethodParameter(service, method, key); + return StringUtils.isEmpty(value) ? defaultValue : value; + } + + public double getServiceMethodParameter(String service, String method, String key, double defaultValue) { + Number n = getCachedNumber(method, key); + if (n != null) { + return n.doubleValue(); + } + String value = getServiceMethodParameter(service, method, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + double d = Double.parseDouble(value); + updateCachedNumber(method, key, d); + return d; + } + + public float getServiceMethodParameter(String service, String method, String key, float defaultValue) { + Number n = getCachedNumber(method, key); + if (n != null) { + return n.floatValue(); + } + String value = getServiceMethodParameter(service, method, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + float f = Float.parseFloat(value); + updateCachedNumber(method, key, f); + return f; + } + + public long getServiceMethodParameter(String service, String method, String key, long defaultValue) { + Number n = getCachedNumber(method, key); + if (n != null) { + return n.longValue(); + } + String value = getServiceMethodParameter(service, method, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + long l = Long.parseLong(value); + updateCachedNumber(method, key, l); + return l; + } + + public int getServiceMethodParameter(String service, String method, String key, int defaultValue) { + Number n = getCachedNumber(method, key); + if (n != null) { + return n.intValue(); + } + String value = getServiceMethodParameter(service, method, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + int i = Integer.parseInt(value); + updateCachedNumber(method, key, i); + return i; + } + + public short getMethodParameter(String service, String method, String key, short defaultValue) { + Number n = getCachedNumber(method, key); + if (n != null) { + return n.shortValue(); + } + String value = getServiceMethodParameter(service, method, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + short s = Short.parseShort(value); + updateCachedNumber(method, key, s); + return s; + } + + public byte getServiceMethodParameter(String service, String method, String key, byte defaultValue) { + Number n = getCachedNumber(method, key); + if (n != null) { + return n.byteValue(); + } + String value = getServiceMethodParameter(service, method, key); + if (StringUtils.isEmpty(value)) { + return defaultValue; + } + byte b = Byte.parseByte(value); + updateCachedNumber(method, key, b); + return b; + } + + public boolean hasServiceMethodParameter(String service, String method, String key) { + return hasMethodParameter(method, key); + } + + public boolean hasServiceMethodParameter(String service, String method) { + return hasMethodParameter(method); + } + + protected Map<String, Number> getServiceNumbers(String service) { + return getNumbers(); + } + + protected Map<String, Map<String, Number>> getServiceMethodNumbers(String service) { + return getMethodNumbers(); + } } diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java index 5cb18c0..ed7cb5e 100644 --- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java +++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/cache/CacheTest.java @@ -131,7 +131,7 @@ public class CacheTest { parameters.put("findCache.cache", "threadlocal"); URL url = new URL("dubbo", "127.0.0.1", 29582, "org.apache.dubbo.config.cache.CacheService", parameters); - Invocation invocation = new RpcInvocation("findCache", CacheService.class.getName(), new Class[]{String.class}, new String[]{"0"}, null, null, null); + Invocation invocation = new RpcInvocation("findCache", CacheService.class.getName(), "", new Class[]{String.class}, new String[]{"0"}, null, null, null); Cache cache = cacheFactory.getCache(url, invocation); assertTrue(cache instanceof ThreadLocalCache); diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java index 7ba0a43..6bf38c9 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java @@ -25,4 +25,6 @@ public class MetadataConstants { public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd"; public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay"; public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000; + public static final String METADATA_PROXY_TIMEOUT_KEY = "dubbo.application.metadata.delay"; + public static final int DEFAULT_METADATA_TIMEOUT_VALUE = 5000; } diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java index 931a331..e3f6881 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Objects; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.dubbo.common.constants.CommonConstants.DOT_SEPARATOR; @@ -176,6 +177,8 @@ public class MetadataInfo implements Serializable { private transient Map<String, String> consumerParams; private transient Map<String, Map<String, String>> methodParams; private transient Map<String, Map<String, String>> consumerMethodParams; + private volatile transient Map<String, Number> numbers; + private volatile transient Map<String, Map<String, Number>> methodNumbers; private transient String serviceKey; private transient String matchKey; @@ -382,6 +385,21 @@ public class MetadataInfo implements Serializable { } } + public Map<String, Number> getNumbers() { + // concurrent initialization is tolerant + if (numbers == null) { + numbers = new ConcurrentHashMap<>(); + } + return numbers; + } + + public Map<String, Map<String, Number>> getMethodNumbers() { + if (methodNumbers == null) { // concurrent initialization is tolerant + methodNumbers = new ConcurrentHashMap<>(); + } + return methodNumbers; + } + @Override public boolean equals(Object obj) { if (obj == null) { diff --git a/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java b/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java index 2180b27..dce7e51 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java +++ b/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java @@ -119,7 +119,7 @@ public class MonitorFilterTest { public void testFilter() throws Exception { MonitorFilter monitorFilter = new MonitorFilter(); monitorFilter.setMonitorFactory(monitorFactory); - Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), new Class<?>[0], new Object[0]); + Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), "", new Class<?>[0], new Object[0]); RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345); Result result = monitorFilter.invoke(serviceInvoker, invocation); result.whenCompleteWithContext((r, t) -> { @@ -149,7 +149,7 @@ public class MonitorFilterTest { MonitorFilter monitorFilter = new MonitorFilter(); MonitorFactory mockMonitorFactory = mock(MonitorFactory.class); monitorFilter.setMonitorFactory(mockMonitorFactory); - Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), new Class<?>[0], new Object[0]); + Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), "", new Class<?>[0], new Object[0]); Invoker invoker = mock(Invoker.class); given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE)); @@ -162,7 +162,7 @@ public class MonitorFilterTest { public void testGenericFilter() throws Exception { MonitorFilter monitorFilter = new MonitorFilter(); monitorFilter.setMonitorFactory(monitorFactory); - Invocation invocation = new RpcInvocation("$invoke", MonitorService.class.getName(), new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}}); + Invocation invocation = new RpcInvocation("$invoke", MonitorService.class.getName(), "", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}}); RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345); Result result = monitorFilter.invoke(serviceInvoker, invocation); result.whenCompleteWithContext((r, t) -> { @@ -196,7 +196,7 @@ public class MonitorFilterTest { monitorFilter.setMonitorFactory(mockMonitorFactory); given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor); - Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), new Class<?>[0], new Object[0]); + Invocation invocation = new RpcInvocation("aaa", MonitorService.class.getName(), "", new Class<?>[0], new Object[0]); monitorFilter.invoke(serviceInvoker, invocation); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java index 3ee5dd2..3c7204d 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java @@ -146,6 +146,14 @@ public class DefaultServiceInstance implements ServiceInstance { return extendParams; } + @Override + public Map<String, String> getAllParams() { + Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1)); + allParams.putAll(metadata); + allParams.putAll(extendParams); + return allParams; + } + public void setMetadata(Map<String, String> metadata) { this.metadata = metadata; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java index 149c6e2..ca8f9be 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java @@ -23,30 +23,31 @@ import org.apache.dubbo.rpc.RpcContext; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; -/** - * FIXME, replace RpcContext operations with explicitly defined APIs - */ public class InstanceAddressURL extends URL { private ServiceInstance instance; private MetadataInfo metadataInfo; + private volatile transient Map<String, Number> numbers; + private volatile transient Map<String, Map<String, Number>> methodNumbers; + + public InstanceAddressURL() { + } public InstanceAddressURL( ServiceInstance instance, MetadataInfo metadataInfo ) { -// super() this.instance = instance; this.metadataInfo = metadataInfo; this.host = instance.getHost(); this.port = instance.getPort(); } - public ServiceInstance getInstance() { return instance; } @@ -74,6 +75,11 @@ public class InstanceAddressURL extends URL { } @Override + public String getProtocolServiceKey() { + return RpcContext.getContext().getProtocolServiceKey(); + } + + @Override public String getServiceKey() { return RpcContext.getContext().getServiceKey(); } @@ -99,33 +105,32 @@ public class InstanceAddressURL extends URL { return getServiceInterface(); } - String value = getInstanceMetadata().get(key); - if (StringUtils.isEmpty(value) && metadataInfo != null) { - value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey()); + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + return getInstanceParameter(key); } - return value; + return getServiceParameter(protocolServiceKey, key); } @Override - public String getParameter(String key, String defaultValue) { - if (VERSION_KEY.equals(key)) { - return getVersion(); - } else if (GROUP_KEY.equals(key)) { - return getGroup(); - } else if (INTERFACE_KEY.equals(key)) { - return getServiceInterface(); - } - - String value = getParameter(key); - if (StringUtils.isEmpty(value)) { - return defaultValue; + public String getServiceParameter(String service, String key) { + String value = getInstanceParameter(key); + if (StringUtils.isEmpty(value) && metadataInfo != null) { + value = metadataInfo.getParameter(key, service); } return value; } + /** + * method parameter only exists in ServiceInfo + * + * @param method + * @param key + * @return + */ @Override - public String getMethodParameter(String method, String key) { - MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + public String getServiceMethodParameter(String protocolServiceKey, String method, String key) { + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(protocolServiceKey); String value = serviceInfo.getMethodParameter(method, key, null); if (StringUtils.isNotEmpty(value)) { return value; @@ -134,8 +139,24 @@ public class InstanceAddressURL extends URL { } @Override - public boolean hasMethodParameter(String method, String key) { - MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + public String getMethodParameter(String method, String key) { + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + return null; + } + return getServiceMethodParameter(protocolServiceKey, method, key); + } + + /** + * method parameter only exists in ServiceInfo + * + * @param method + * @param key + * @return + */ + @Override + public boolean hasServiceMethodParameter(String protocolServiceKey, String method, String key) { + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(protocolServiceKey); if (method == null) { String suffix = "." + key; @@ -160,20 +181,44 @@ public class InstanceAddressURL extends URL { } @Override - public boolean hasMethodParameter(String method) { - MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + public boolean hasMethodParameter(String method, String key) { + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + return false; + } + return hasServiceMethodParameter(protocolServiceKey, method, key); + } + + /** + * method parameter only exists in ServiceInfo + * + * @param method + * @return + */ + @Override + public boolean hasServiceMethodParameter(String protocolServiceKey, String method) { + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(protocolServiceKey); return serviceInfo.hasMethodParameter(method); } + @Override + public boolean hasMethodParameter(String method) { + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + return false; + } + return hasServiceMethodParameter(protocolServiceKey, method); + } + /** * Avoid calling this method in RPC call. * * @return */ @Override - public Map<String, String> getParameters() { + public Map<String, String> getServiceParameters(String protocolServiceKey) { Map<String, String> instanceParams = getInstanceMetadata(); - Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey())); + Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(protocolServiceKey)); int i = instanceParams == null ? 0 : instanceParams.size(); int j = metadataParams == null ? 0 : metadataParams.size(); Map<String, String> params = new HashMap<>((int) ((i + j) / 0.75) + 1); @@ -186,8 +231,13 @@ public class InstanceAddressURL extends URL { return params; } - private Map<String, String> getInstanceMetadata() { - return this.instance.getMetadata(); + @Override + public Map<String, String> getParameters() { + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + return getInstance().getAllParams(); + } + return getServiceParameters(protocolServiceKey); } @Override @@ -196,8 +246,7 @@ public class InstanceAddressURL extends URL { return this; } - String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey(); - getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value); + getInstance().getExtendParams().put(key, value); return this; } @@ -207,18 +256,84 @@ public class InstanceAddressURL extends URL { return this; } - String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey(); + getInstance().getExtendParams().putIfAbsent(key, value); + return this; + } + + public URL addServiceParameter(String protocolServiceKey, String key, String value) { + if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) { + return this; + } + + getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value); + return this; + } + + public URL addServiceParameterIfAbsent(String protocolServiceKey, String key, String value) { + if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) { + return this; + } + getMetadataInfo().getServiceInfo(protocolServiceKey).addParameterIfAbsent(key, value); return this; } - public URL addConsumerParams(Map<String, String> params) { - String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey(); + public URL addConsumerParams(String protocolServiceKey, Map<String, String> params) { getMetadataInfo().getServiceInfo(protocolServiceKey).addConsumerParams(params); return this; } @Override + protected Map<String, Number> getServiceNumbers(String protocolServiceKey) { + return getServiceInfo(protocolServiceKey).getNumbers(); + } + + @Override + protected Map<String, Number> getNumbers() { + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + if (numbers == null) { // concurrent initialization is tolerant + numbers = new ConcurrentHashMap<>(); + } + return numbers; + } + return getServiceNumbers(protocolServiceKey); + } + + @Override + protected Map<String, Map<String, Number>> getServiceMethodNumbers(String protocolServiceKey) { + return getServiceInfo(protocolServiceKey).getMethodNumbers(); + } + + @Override + protected Map<String, Map<String, Number>> getMethodNumbers() { + String protocolServiceKey = getProtocolServiceKey(); + if (protocolServiceKey == null) { + if (methodNumbers == null) { // concurrent initialization is tolerant + methodNumbers = new ConcurrentHashMap<>(); + } + return methodNumbers; + } + return getServiceMethodNumbers(protocolServiceKey); + } + + private MetadataInfo.ServiceInfo getServiceInfo(String protocolServiceKey) { + return metadataInfo.getServiceInfo(protocolServiceKey); + } + + private String getInstanceParameter(String key) { + String value = this.instance.getMetadata().get(key); + if (StringUtils.isNotEmpty(value)) { + return value; + } + return this.instance.getExtendParams().get(key); + } + + private Map<String, String> getInstanceMetadata() { + return this.instance.getMetadata(); + } + + @Override public boolean equals(Object obj) { // instance metadata equals if (obj == null) { diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java index e01a36e..9c725ac 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java @@ -57,49 +57,53 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im public synchronized void notify(List<URL> instanceUrls) { // Set the context of the address notification thread. RpcContext.setRpcContext(getConsumerUrl()); - if (CollectionUtils.isEmpty(instanceUrls)) { - // FIXME, empty protocol - } refreshInvoker(instanceUrls); } private void refreshInvoker(List<URL> invokerUrls) { - Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty:// to clear address."); + Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty InstanceAddressURL to clear address."); - if (invokerUrls.size() == 1 - && invokerUrls.get(0) != null - && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { - this.forbidden = true; // Forbid to access - this.invokers = Collections.emptyList(); - routerChain.setInvokers(this.invokers); - destroyAllInvokers(); // Close all invokers - } else { - this.forbidden = false; // Allow to access - Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference - if (CollectionUtils.isEmpty(invokerUrls)) { - return; + if (invokerUrls.size() == 1) { + URL url = invokerUrls.get(0); + if (!(url instanceof InstanceAddressURL)) { + throw new IllegalStateException("use empty InstanceAddressURL to clear address"); + } else { + InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url; + if (instanceAddressURL.getInstance() == null) { + this.forbidden = true; // Forbid to access + this.invokers = Collections.emptyList(); + routerChain.setInvokers(this.invokers); + destroyAllInvokers(); // Close all invokers + return; + } } + } - Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map + this.forbidden = false; // Allow to access + Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference + if (CollectionUtils.isEmpty(invokerUrls)) { + return; + } - if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { - logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")")); - return; - } + Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map - List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); - // pre-route and build cache, notice that route cache should build on original Invoker list. - // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. - routerChain.setInvokers(newInvokers); - this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; - this.urlInvokerMap = newUrlInvokerMap; + if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { + logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")")); + return; + } - if (oldUrlInvokerMap != null) { - try { - destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker - } catch (Exception e) { - logger.warn("destroyUnusedInvokers error. ", e); - } + List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); + // pre-route and build cache, notice that route cache should build on original Invoker list. + // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. + routerChain.setInvokers(newInvokers); + this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; + this.urlInvokerMap = newUrlInvokerMap; + + if (oldUrlInvokerMap != null) { + try { + destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker + } catch (Exception e) { + logger.warn("destroyUnusedInvokers error. ", e); } } } @@ -129,7 +133,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im } // FIXME, some keys may need to be removed. - instanceAddressURL.addConsumerParams(queryMap); + instanceAddressURL.addConsumerParams(getConsumerUrl().getProtocolServiceKey(), queryMap); Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress()); if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java index 9362811..7b1890d 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java @@ -86,6 +86,8 @@ public interface ServiceInstance extends Serializable { Map<String, String> getExtendParams(); + Map<String, String> getAllParams(); + /** * @return the hash code of current instance. */ diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java index 8af48a3..37b6bf0 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java @@ -19,6 +19,7 @@ package org.apache.dubbo.registry.client.event.listener; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.event.ConditionalEventListener; import org.apache.dubbo.event.EventListener; import org.apache.dubbo.metadata.MetadataInfo; @@ -26,6 +27,7 @@ import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo; import org.apache.dubbo.metadata.MetadataService; import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.client.DefaultServiceInstance; +import org.apache.dubbo.registry.client.InstanceAddressURL; import org.apache.dubbo.registry.client.RegistryClusterIdentifier; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; @@ -161,12 +163,13 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService(); metadataInfo = remoteMetadataService.getMetadata(instance); } else { - MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance); + MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery); metadataInfo = metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance)); } } catch (Exception e) { - // TODO, load metadata backup + logger.error("Failed to load service metadata, metadta type is " + metadataType, e); metadataInfo = null; + // TODO, load metadata backup. Stop getting metadata after x times of failure for one revision? } return metadataInfo; } @@ -174,7 +177,12 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener private void notifyAddressChanged() { listeners.forEach((key, notifyListener) -> { //FIXME, group wildcard match - notifyListener.notify(serviceUrls.get(key)); + List<URL> urls = serviceUrls.get(key); + if (CollectionUtils.isEmpty(urls)) { + urls = new ArrayList<>(); + urls.add(new InstanceAddressURL()); + } + notifyListener.notify(urls); }); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java index 2685a4a..b65dba1 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java @@ -22,6 +22,7 @@ import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.metadata.MetadataService; import org.apache.dubbo.metadata.WritableMetadataService; +import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl; import org.apache.dubbo.rpc.Invoker; @@ -67,7 +68,7 @@ public class MetadataUtils { getRemoteMetadataService().publishServiceDefinition(url); } - public static MetadataService getMetadataServiceProxy(ServiceInstance instance) { + public static MetadataService getMetadataServiceProxy(ServiceInstance instance, ServiceDiscovery serviceDiscovery) { String key = instance.getServiceName() + "##" + ServiceInstanceMetadataUtils.getExportedServicesRevision(instance); return metadataServiceProxies.computeIfAbsent(key, k -> { diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java index 026146e..6f6a501 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/StandardMetadataServiceURLBuilder.java @@ -18,6 +18,7 @@ package org.apache.dubbo.registry.client.metadata; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.URLBuilder; +import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.metadata.MetadataService; import org.apache.dubbo.registry.client.ServiceInstance; @@ -28,6 +29,9 @@ import java.util.Map; import static java.lang.String.valueOf; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PORT_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; +import static org.apache.dubbo.metadata.MetadataConstants.DEFAULT_METADATA_TIMEOUT_VALUE; +import static org.apache.dubbo.metadata.MetadataConstants.METADATA_PROXY_TIMEOUT_KEY; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getMetadataServiceURLsParams; /** @@ -64,7 +68,8 @@ public class StandardMetadataServiceURLBuilder implements MetadataServiceURLBuil .setHost(host) .setPort(port) .setProtocol(protocol) - .setPath(MetadataService.class.getName()); + .setPath(MetadataService.class.getName()) + .addParameter(TIMEOUT_KEY, ConfigurationUtils.get(METADATA_PROXY_TIMEOUT_KEY, DEFAULT_METADATA_TIMEOUT_VALUE)); // add parameters params.forEach((name, value) -> urlBuilder.addParameter(name, valueOf(value))); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java index ac02fee..8422baf 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Invocation.java @@ -32,6 +32,8 @@ public interface Invocation { String getTargetServiceUniqueName(); + String getProtocolServiceKey(); + /** * get method name. * diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java index 2c3d2b8..8077067 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java @@ -800,60 +800,48 @@ public class RpcContext { } // RPC service context updated before each service call. - private String group; - private String version; - private String interfaceName; - private String protocol; - private String serviceKey; - private String protocolServiceKey; private URL consumerUrl; public String getGroup() { - return group; - } - - public void setGroup(String group) { - this.group = group; + if (consumerUrl == null) { + return null; + } + return consumerUrl.getParameter(GROUP_KEY); } public String getVersion() { - return version; - } - - public void setVersion(String version) { - this.version = version; + if (consumerUrl == null) { + return null; + } + return consumerUrl.getParameter(VERSION_KEY); } public String getInterfaceName() { - return interfaceName; - } - - public void setInterfaceName(String interfaceName) { - this.interfaceName = interfaceName; + if (consumerUrl == null) { + return null; + } + return consumerUrl.getServiceInterface(); } public String getProtocol() { - return protocol; - } - - public void setProtocol(String protocol) { - this.protocol = protocol; + if (consumerUrl == null) { + return null; + } + return consumerUrl.getParameter(PROTOCOL_KEY, DUBBO); } public String getServiceKey() { - return serviceKey; - } - - public void setServiceKey(String serviceKey) { - this.serviceKey = serviceKey; + if (consumerUrl == null) { + return null; + } + return consumerUrl.getServiceKey(); } public String getProtocolServiceKey() { - return protocolServiceKey; - } - - public void setProtocolServiceKey(String protocolServiceKey) { - this.protocolServiceKey = protocolServiceKey; + if (consumerUrl == null) { + return null; + } + return consumerUrl.getProtocolServiceKey(); } public URL getConsumerUrl() { @@ -867,11 +855,5 @@ public class RpcContext { public static void setRpcContext(URL url) { RpcContext rpcContext = RpcContext.getContext(); rpcContext.setConsumerUrl(url); - rpcContext.setInterfaceName(url.getServiceInterface()); - rpcContext.setVersion(url.getParameter(VERSION_KEY)); - rpcContext.setGroup(url.getParameter(GROUP_KEY)); - rpcContext.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO)); - rpcContext.setServiceKey(url.getServiceKey()); - rpcContext.setProtocolServiceKey(url.getProtocolServiceKey()); } } \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java index 87baa42..4a78cd6 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java @@ -51,6 +51,7 @@ public class RpcInvocation implements Invocation, Serializable { private static final long serialVersionUID = -4355285085441097045L; private String targetServiceUniqueName; + private String protocolServiceKey; private String methodName; private String serviceName; @@ -83,8 +84,8 @@ public class RpcInvocation implements Invocation, Serializable { } public RpcInvocation(Invocation invocation, Invoker<?> invoker) { - this(invocation.getMethodName(), invocation.getServiceName(), invocation.getParameterTypes(), - invocation.getArguments(), new HashMap<>(invocation.getObjectAttachments()), + this(invocation.getMethodName(), invocation.getServiceName(), invocation.getProtocolServiceKey(), + invocation.getParameterTypes(), invocation.getArguments(), new HashMap<>(invocation.getObjectAttachments()), invocation.getInvoker(), invocation.getAttributes()); if (invoker != null) { URL url = invoker.getUrl(); @@ -109,35 +110,37 @@ public class RpcInvocation implements Invocation, Serializable { } } this.targetServiceUniqueName = invocation.getTargetServiceUniqueName(); + this.protocolServiceKey = invocation.getProtocolServiceKey(); } public RpcInvocation(Invocation invocation) { - this(invocation.getMethodName(), invocation.getServiceName(), invocation.getParameterTypes(), + this(invocation.getMethodName(), invocation.getServiceName(), invocation.getProtocolServiceKey(), invocation.getParameterTypes(), invocation.getArguments(), invocation.getObjectAttachments(), invocation.getInvoker(), invocation.getAttributes()); this.targetServiceUniqueName = invocation.getTargetServiceUniqueName(); } - public RpcInvocation(Method method, String serviceName, Object[] arguments) { - this(method, serviceName, arguments, null, null); + public RpcInvocation(Method method, String serviceName, String protocolServiceKey, Object[] arguments) { + this(method, serviceName, protocolServiceKey, arguments, null, null); } - public RpcInvocation(Method method, String serviceName, Object[] arguments, Map<String, Object> attachment, Map<Object, Object> attributes) { - this(method.getName(), serviceName, method.getParameterTypes(), arguments, attachment, null, attributes); + public RpcInvocation(Method method, String serviceName, String protocolServiceKey, Object[] arguments, Map<String, Object> attachment, Map<Object, Object> attributes) { + this(method.getName(), serviceName, protocolServiceKey, method.getParameterTypes(), arguments, attachment, null, attributes); this.returnType = method.getReturnType(); } - public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments) { - this(methodName, serviceName, parameterTypes, arguments, null, null, null); + public RpcInvocation(String methodName, String serviceName, String protocolServiceKey, Class<?>[] parameterTypes, Object[] arguments) { + this(methodName, serviceName, protocolServiceKey, parameterTypes, arguments, null, null, null); } - public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments, Map<String, Object> attachments) { - this(methodName, serviceName, parameterTypes, arguments, attachments, null, null); + public RpcInvocation(String methodName, String serviceName, String protocolServiceKey, Class<?>[] parameterTypes, Object[] arguments, Map<String, Object> attachments) { + this(methodName, serviceName, protocolServiceKey, parameterTypes, arguments, attachments, null, null); } - public RpcInvocation(String methodName, String serviceName, Class<?>[] parameterTypes, Object[] arguments, + public RpcInvocation(String methodName, String serviceName, String protocolServiceKey, Class<?>[] parameterTypes, Object[] arguments, Map<String, Object> attachments, Invoker<?> invoker, Map<Object, Object> attributes) { this.methodName = methodName; this.serviceName = serviceName; + this.protocolServiceKey = protocolServiceKey; this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes; this.arguments = arguments == null ? new Object[0] : arguments; this.attachments = attachments == null ? new HashMap<>() : attachments; @@ -194,6 +197,11 @@ public class RpcInvocation implements Invocation, Serializable { return targetServiceUniqueName; } + @Override + public String getProtocolServiceKey() { + return protocolServiceKey; + } + public void setTargetServiceUniqueName(String targetServiceUniqueName) { this.targetServiceUniqueName = targetServiceUniqueName; } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java index 13c29b6..0234139 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/GenericFilter.java @@ -140,7 +140,7 @@ public class GenericFilter implements Filter, Filter.Listener { } } - RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args, inv.getObjectAttachments(), inv.getAttributes()); + RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), invoker.getUrl().getProtocolServiceKey(), args, inv.getObjectAttachments(), inv.getAttributes()); rpcInvocation.setInvoker(inv.getInvoker()); rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName()); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java index 90e101d..0865f08 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java @@ -45,7 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; /** - * AbstractInvoker. + * This Invoker works on Consumer side. */ public abstract class AbstractInvoker<T> implements Invoker<T> { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java index 6d74825..1b1d592 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java @@ -33,7 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; /** - * InvokerWrapper + * This Invoker works on provider side, delegates RPC to interface implementation. */ public abstract class AbstractProxyInvoker<T> implements Invoker<T> { Logger logger = LoggerFactory.getLogger(AbstractProxyInvoker.class); diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java index 0eae664..69f1d06 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java @@ -16,6 +16,7 @@ */ package org.apache.dubbo.rpc.proxy; +import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.rpc.Constants; @@ -35,10 +36,14 @@ public class InvokerInvocationHandler implements InvocationHandler { private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class); private final Invoker<?> invoker; private ConsumerModel consumerModel; + private URL url; + private String protocolServiceKey; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; - String serviceKey = invoker.getUrl().getServiceKey(); + this.url = invoker.getUrl(); + String serviceKey = this.url.getServiceKey(); + this.protocolServiceKey = this.url.getProtocolServiceKey(); if (serviceKey != null) { this.consumerModel = ApplicationModel.getConsumerModel(serviceKey); } @@ -63,7 +68,7 @@ public class InvokerInvocationHandler implements InvocationHandler { } else if (parameterTypes.length == 1 && "equals".equals(methodName)) { return invoker.equals(args[0]); } - RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); + RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), protocolServiceKey, args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java index 53f8921..2f32ff6 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ExceptionFilterTest.java @@ -51,7 +51,7 @@ public class ExceptionFilterTest { RpcException exception = new RpcException("TestRpcException"); ExceptionFilter exceptionFilter = new ExceptionFilter(); - RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"}); + RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"}); Invoker<DemoService> invoker = mock(Invoker.class); given(invoker.getInterface()).willReturn(DemoService.class); given(invoker.invoke(eq(invocation))).willThrow(exception); @@ -75,7 +75,7 @@ public class ExceptionFilterTest { public void testJavaException() { ExceptionFilter exceptionFilter = new ExceptionFilter(); - RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"}); + RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"}); AppResponse appResponse = new AppResponse(); appResponse.setException(new IllegalArgumentException("java")); @@ -95,7 +95,7 @@ public class ExceptionFilterTest { public void testRuntimeException() { ExceptionFilter exceptionFilter = new ExceptionFilter(); - RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"}); + RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"}); AppResponse appResponse = new AppResponse(); appResponse.setException(new LocalException("localException")); @@ -115,7 +115,7 @@ public class ExceptionFilterTest { public void testConvertToRunTimeException() throws Exception { ExceptionFilter exceptionFilter = new ExceptionFilter(); - RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), new Class<?>[]{String.class}, new Object[]{"world"}); + RpcInvocation invocation = new RpcInvocation("sayHello", DemoService.class.getName(), "", new Class<?>[]{String.class}, new Object[]{"world"}); AppResponse mockRpcResult = new AppResponse(); mockRpcResult.setException(new HessianException("hessian")); diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java index e06d2b7..c49aa92 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericFilterTest.java @@ -54,7 +54,7 @@ public class GenericFilterTest { person.put("name", "dubbo"); person.put("age", 10); - RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes(), + RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "", genericInvoke.getParameterTypes(), new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}}); URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" + @@ -82,7 +82,7 @@ public class GenericFilterTest { person.put("name", "dubbo"); person.put("age", 10); - RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes(), + RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "", genericInvoke.getParameterTypes(), new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}}); invocation.setAttachment(GENERIC_KEY, GENERIC_SERIALIZATION_NATIVE_JAVA); @@ -106,7 +106,7 @@ public class GenericFilterTest { person.put("name", "dubbo"); person.put("age", 10); - RpcInvocation invocation = new RpcInvocation("sayHi", GenericService.class.getName(), genericInvoke.getParameterTypes() + RpcInvocation invocation = new RpcInvocation("sayHi", GenericService.class.getName(), "", genericInvoke.getParameterTypes() , new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}}); URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" + @@ -130,7 +130,7 @@ public class GenericFilterTest { person.put("name", "dubbo"); person.put("age", 10); - RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes() + RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "", genericInvoke.getParameterTypes() , new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}}); URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" + diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java index a1908fb..b7828aa 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/GenericImplFilterTest.java @@ -49,7 +49,7 @@ public class GenericImplFilterTest { public void testInvoke() throws Exception { RpcInvocation invocation = new RpcInvocation("getPerson", "org.apache.dubbo.rpc.support.DemoService", - new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)}); + "org.apache.dubbo.rpc.support.DemoService:dubbo", new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)}); URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" + @@ -77,7 +77,7 @@ public class GenericImplFilterTest { public void testInvokeWithException() throws Exception { RpcInvocation invocation = new RpcInvocation("getPerson", "org.apache.dubbo.rpc.support.DemoService", - new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)}); + "org.apache.dubbo.rpc.support.DemoService:dubbo", new Class[]{Person.class}, new Object[]{new Person("dubbo", 10)}); URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" + "accesslog=true&group=dubbo&version=1.1&generic=true"); @@ -104,8 +104,8 @@ public class GenericImplFilterTest { person.put("name", "dubbo"); person.put("age", 10); - RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), genericInvoke.getParameterTypes(), - new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}}); + RpcInvocation invocation = new RpcInvocation($INVOKE, GenericService.class.getName(), "org.apache.dubbo.rpc.support.DemoService:dubbo", + genericInvoke.getParameterTypes(), new Object[]{"getPerson", new String[]{Person.class.getCanonicalName()}, new Object[]{person}}); URL url = URL.valueOf("test://test:11/org.apache.dubbo.rpc.support.DemoService?" + "accesslog=true&group=dubbo&version=1.1&generic=true"); diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java index 1abaf0e..b505ee8 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/proxy/AbstractProxyTest.java @@ -51,7 +51,7 @@ public abstract class AbstractProxyTest { //Assertions.assertEquals(proxy.toString(), invoker.toString()); //Assertions.assertEquals(proxy.hashCode(), invoker.hashCode()); - Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), new Class[]{String.class}, new Object[]{"aa"})).getValue() + Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), DemoService.class.getName() + ":dubbo", new Class[]{String.class}, new Object[]{"aa"})).getValue() , proxy.echo("aa")); } @@ -65,7 +65,7 @@ public abstract class AbstractProxyTest { Assertions.assertEquals(invoker.getInterface(), DemoService.class); - Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), new Class[]{String.class}, new Object[]{"aa"})).getValue(), + Assertions.assertEquals(invoker.invoke(new RpcInvocation("echo", DemoService.class.getName(), DemoService.class.getName() + ":dubbo", new Class[]{String.class}, new Object[]{"aa"})).getValue(), origin.echo("aa")); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java index bdf77b3..586c990 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java @@ -52,6 +52,11 @@ public class MockInvocation implements Invocation { return null; } + @Override + public String getProtocolServiceKey() { + return null; + } + public String getMethodName() { return "echo"; } diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java index f70d8b6..00deb91 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java +++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/RpcUtilsTest.java @@ -51,7 +51,7 @@ public class RpcUtilsTest { URL url = URL.valueOf("dubbo://localhost/?test.async=true"); Map<String, Object> attachments = new HashMap<>(); attachments.put("aa", "bb"); - Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{}, attachments); + Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{}, attachments); RpcUtils.attachInvocationIdIfAsync(url, inv); long id1 = RpcUtils.getInvocationId(inv); RpcUtils.attachInvocationIdIfAsync(url, inv); @@ -68,7 +68,7 @@ public class RpcUtilsTest { @Test public void testAttachInvocationIdIfAsync_sync() { URL url = URL.valueOf("dubbo://localhost/"); - Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{}); + Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{}); RpcUtils.attachInvocationIdIfAsync(url, inv); assertNull(RpcUtils.getInvocationId(inv)); } @@ -80,7 +80,7 @@ public class RpcUtilsTest { @Test public void testAttachInvocationIdIfAsync_nullAttachments() { URL url = URL.valueOf("dubbo://localhost/?test.async=true"); - Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{}); + Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{}); RpcUtils.attachInvocationIdIfAsync(url, inv); assertTrue(RpcUtils.getInvocationId(inv) >= 0L); } @@ -92,7 +92,7 @@ public class RpcUtilsTest { @Test public void testAttachInvocationIdIfAsync_forceNotAttache() { URL url = URL.valueOf("dubbo://localhost/?test.async=true&" + AUTO_ATTACH_INVOCATIONID_KEY + "=false"); - Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{}); + Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{}); RpcUtils.attachInvocationIdIfAsync(url, inv); assertNull(RpcUtils.getInvocationId(inv)); } @@ -104,7 +104,7 @@ public class RpcUtilsTest { @Test public void testAttachInvocationIdIfAsync_forceAttache() { URL url = URL.valueOf("dubbo://localhost/?" + AUTO_ATTACH_INVOCATIONID_KEY + "=true"); - Invocation inv = new RpcInvocation("test", "DemoService", new Class[]{}, new String[]{}); + Invocation inv = new RpcInvocation("test", "DemoService", "", new Class[]{}, new String[]{}); RpcUtils.attachInvocationIdIfAsync(url, inv); assertNotNull(RpcUtils.getInvocationId(inv)); } @@ -117,30 +117,30 @@ public class RpcUtilsTest { given(invoker.getUrl()).willReturn(URL.valueOf("test://127.0.0.1:1/org.apache.dubbo.rpc.support.DemoService?interface=org.apache.dubbo.rpc.support.DemoService")); // void sayHello(String name); - RpcInvocation inv = new RpcInvocation("sayHello", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv = new RpcInvocation("sayHello", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); Class<?> returnType = RpcUtils.getReturnType(inv); Assertions.assertNull(returnType); //String echo(String text); - RpcInvocation inv1 = new RpcInvocation("echo", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv1 = new RpcInvocation("echo", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); Class<?> returnType1 = RpcUtils.getReturnType(inv1); Assertions.assertNotNull(returnType1); Assertions.assertEquals(String.class, returnType1); //int getSize(String[] strs); - RpcInvocation inv2 = new RpcInvocation("getSize", serviceName, new Class<?>[]{String[].class}, null, null, invoker, null); + RpcInvocation inv2 = new RpcInvocation("getSize", serviceName, "", new Class<?>[]{String[].class}, null, null, invoker, null); Class<?> returnType2 = RpcUtils.getReturnType(inv2); Assertions.assertNotNull(returnType2); Assertions.assertEquals(int.class, returnType2); //Person getPerson(Person person); - RpcInvocation inv3 = new RpcInvocation("getPerson", serviceName, new Class<?>[]{Person.class}, null, null, invoker, null); + RpcInvocation inv3 = new RpcInvocation("getPerson", serviceName, "", new Class<?>[]{Person.class}, null, null, invoker, null); Class<?> returnType3 = RpcUtils.getReturnType(inv3); Assertions.assertNotNull(returnType3); Assertions.assertEquals(Person.class, returnType3); //List<String> testReturnType1(String str); - RpcInvocation inv4 = new RpcInvocation("testReturnType1", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv4 = new RpcInvocation("testReturnType1", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); Class<?> returnType4 = RpcUtils.getReturnType(inv4); Assertions.assertNotNull(returnType4); Assertions.assertEquals(List.class, returnType4); @@ -154,7 +154,7 @@ public class RpcUtilsTest { Invoker invoker = mock(Invoker.class); given(invoker.getUrl()).willReturn(URL.valueOf("test://127.0.0.1:1/org.apache.dubbo.rpc.support.DemoService?interface=org.apache.dubbo.rpc.support.DemoService")); - RpcInvocation inv = new RpcInvocation("testReturnType", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv = new RpcInvocation("testReturnType", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); Type[] types = RpcUtils.getReturnTypes(inv); Assertions.assertNotNull(types); Assertions.assertEquals(2, types.length); @@ -162,7 +162,7 @@ public class RpcUtilsTest { Assertions.assertEquals(String.class, types[1]); Assertions.assertArrayEquals(types, inv.getReturnTypes()); - RpcInvocation inv1 = new RpcInvocation("testReturnType1", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv1 = new RpcInvocation("testReturnType1", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); java.lang.reflect.Type[] types1 = RpcUtils.getReturnTypes(inv1); Assertions.assertNotNull(types1); Assertions.assertEquals(2, types1.length); @@ -170,7 +170,7 @@ public class RpcUtilsTest { Assertions.assertEquals(demoServiceClass.getMethod("testReturnType1", String.class).getGenericReturnType(), types1[1]); Assertions.assertArrayEquals(types1, inv1.getReturnTypes()); - RpcInvocation inv2 = new RpcInvocation("testReturnType2", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv2 = new RpcInvocation("testReturnType2", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); java.lang.reflect.Type[] types2 = RpcUtils.getReturnTypes(inv2); Assertions.assertNotNull(types2); Assertions.assertEquals(2, types2.length); @@ -178,7 +178,7 @@ public class RpcUtilsTest { Assertions.assertEquals(String.class, types2[1]); Assertions.assertArrayEquals(types2, inv2.getReturnTypes()); - RpcInvocation inv3 = new RpcInvocation("testReturnType3", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv3 = new RpcInvocation("testReturnType3", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); java.lang.reflect.Type[] types3 = RpcUtils.getReturnTypes(inv3); Assertions.assertNotNull(types3); Assertions.assertEquals(2, types3.length); @@ -187,7 +187,7 @@ public class RpcUtilsTest { Assertions.assertEquals(((ParameterizedType) genericReturnType3).getActualTypeArguments()[0], types3[1]); Assertions.assertArrayEquals(types3, inv3.getReturnTypes()); - RpcInvocation inv4 = new RpcInvocation("testReturnType4", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv4 = new RpcInvocation("testReturnType4", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); java.lang.reflect.Type[] types4 = RpcUtils.getReturnTypes(inv4); Assertions.assertNotNull(types4); Assertions.assertEquals(2, types4.length); @@ -195,7 +195,7 @@ public class RpcUtilsTest { Assertions.assertNull(types4[1]); Assertions.assertArrayEquals(types4, inv4.getReturnTypes()); - RpcInvocation inv5 = new RpcInvocation("testReturnType5", serviceName, new Class<?>[]{String.class}, null, null, invoker, null); + RpcInvocation inv5 = new RpcInvocation("testReturnType5", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); java.lang.reflect.Type[] types5 = RpcUtils.getReturnTypes(inv5); Assertions.assertNotNull(types5); Assertions.assertEquals(2, types5.length); @@ -212,7 +212,7 @@ public class RpcUtilsTest { Invoker invoker = mock(Invoker.class); // void sayHello(String name); - RpcInvocation inv1 = new RpcInvocation("sayHello", serviceName, + RpcInvocation inv1 = new RpcInvocation("sayHello", serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); Class<?>[] parameterTypes1 = RpcUtils.getParameterTypes(inv1); Assertions.assertNotNull(parameterTypes1); @@ -220,12 +220,12 @@ public class RpcUtilsTest { Assertions.assertEquals(String.class, parameterTypes1[0]); //long timestamp(); - RpcInvocation inv2 = new RpcInvocation("timestamp", serviceName, null, null, null, invoker, null); + RpcInvocation inv2 = new RpcInvocation("timestamp", serviceName, "", null, null, null, invoker, null); Class<?>[] parameterTypes2 = RpcUtils.getParameterTypes(inv2); Assertions.assertEquals(0, parameterTypes2.length); //Type enumlength(Type... types); - RpcInvocation inv3 = new RpcInvocation("enumlength", serviceName, + RpcInvocation inv3 = new RpcInvocation("enumlength", serviceName, "", new Class<?>[]{Type.class, Type.class}, null, null, invoker, null); Class<?>[] parameterTypes3 = RpcUtils.getParameterTypes(inv3); Assertions.assertNotNull(parameterTypes3); @@ -234,7 +234,7 @@ public class RpcUtilsTest { Assertions.assertEquals(Type.class, parameterTypes3[1]); //byte getbyte(byte arg); - RpcInvocation inv4 = new RpcInvocation("getbyte", serviceName, + RpcInvocation inv4 = new RpcInvocation("getbyte", serviceName, "", new Class<?>[]{byte.class}, null, null, invoker, null); Class<?>[] parameterTypes4 = RpcUtils.getParameterTypes(inv4); Assertions.assertNotNull(parameterTypes4); @@ -242,7 +242,7 @@ public class RpcUtilsTest { Assertions.assertEquals(byte.class, parameterTypes4[0]); //void $invoke(String s1, String s2); - RpcInvocation inv5 = new RpcInvocation("$invoke", serviceName, + RpcInvocation inv5 = new RpcInvocation("$invoke", serviceName, "", new Class<?>[]{String.class, String[].class}, new Object[]{"method", new String[]{"java.lang.String", "void", "java.lang.Object"}}, null, invoker, null); @@ -265,7 +265,7 @@ public class RpcUtilsTest { String serviceName = demoServiceClass.getName(); Invoker invoker = mock(Invoker.class); - RpcInvocation inv1 = new RpcInvocation(methodName, serviceName, + RpcInvocation inv1 = new RpcInvocation(methodName, serviceName, "", new Class<?>[]{String.class}, null, null, invoker, null); String actual = RpcUtils.getMethodName(inv1); Assertions.assertNotNull(actual); @@ -283,7 +283,7 @@ public class RpcUtilsTest { String serviceName = demoServiceClass.getName(); Invoker invoker = mock(Invoker.class); - RpcInvocation inv = new RpcInvocation("$invoke", serviceName, + RpcInvocation inv = new RpcInvocation("$invoke", serviceName, "", new Class<?>[]{String.class, String[].class}, new Object[]{method, new String[]{"java.lang.String", "void", "java.lang.Object"}}, null, invoker, null); @@ -300,7 +300,7 @@ public class RpcUtilsTest { String serviceName = demoServiceClass.getName(); Invoker invoker = mock(Invoker.class); - RpcInvocation inv = new RpcInvocation("$invoke", serviceName, + RpcInvocation inv = new RpcInvocation("$invoke", serviceName, "", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"method", new String[]{}, args}, null, invoker, null); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java index 7a60e0e..6502e8d 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java @@ -63,11 +63,11 @@ class CallbackServiceCodec { private static final byte CALLBACK_DESTROY = 0x2; private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-"; - private static byte isCallBack(URL url, String methodName, int argIndex) { + private static byte isCallBack(URL url, String protocolServiceKey, String methodName, int argIndex) { // parameter callback rule: method-name.parameter-index(starting from 0).callback byte isCallback = CALLBACK_NONE; - if (url != null && url.hasMethodParameter(methodName)) { - String callback = url.getParameter(methodName + "." + argIndex + ".callback"); + if (url != null && url.hasServiceMethodParameter(protocolServiceKey, methodName)) { + String callback = url.getServiceParameter(protocolServiceKey, methodName + "." + argIndex + ".callback"); if (callback != null) { if ("true".equalsIgnoreCase(callback)) { isCallback = CALLBACK_CREATE; @@ -266,7 +266,7 @@ class CallbackServiceCodec { public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException { // get URL directly URL url = inv.getInvoker() == null ? null : inv.getInvoker().getUrl(); - byte callbackStatus = isCallBack(url, inv.getMethodName(), paraIndex); + byte callbackStatus = isCallBack(url, inv.getProtocolServiceKey(), inv.getMethodName(), paraIndex); Object[] args = inv.getArguments(); Class<?>[] pts = inv.getParameterTypes(); switch (callbackStatus) { @@ -293,7 +293,7 @@ class CallbackServiceCodec { } return inObject; } - byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex); + byte callbackstatus = isCallBack(url, inv.getProtocolServiceKey(), inv.getMethodName(), paraIndex); switch (callbackstatus) { case CallbackServiceCodec.CALLBACK_CREATE: try { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index e095523..791df67 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -195,7 +195,7 @@ public class DubboProtocol extends AbstractProtocol { return null; } - RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]); + RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]); invocation.setAttachment(PATH_KEY, url.getPath()); invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY)); invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY)); @@ -572,10 +572,9 @@ public class DubboProtocol extends AbstractProtocol { // client type setting. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT)); -// url = url.addParameter(CODEC_KEY, DubboCodec.NAME); + url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default - // FIXME, -// url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); + url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index 9229363..8c4d07b 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -19,7 +19,6 @@ package org.apache.dubbo.rpc.protocol.dubbo; import org.apache.dubbo.common.Parameters; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.URLBuilder; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.exchange.ExchangeClient; @@ -181,14 +180,10 @@ final class ReferenceCountExchangeClient implements ExchangeClient { */ private void replaceWithLazyClient() { // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false - URL lazyUrl = URLBuilder.from(url) - .addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE) + URL lazyUrl = url.addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE) .addParameter(RECONNECT_KEY, Boolean.FALSE) .addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString()) - .addParameter("warning", Boolean.TRUE.toString()) - .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true) - .addParameter("_client_memo", "referencecounthandler.replacewithlazyclient") - .build(); + .addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true); /** * the order of judgment in the if statement cannot be changed.
