This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 552112ed0157a0360f1499e7b04152047b350472 Author: ken.lj <[email protected]> AuthorDate: Tue Mar 16 23:18:43 2021 +0800 Revert "3.0 enhancement, do not merge (#7381)" (#7387) This reverts commit d6c9bf69e00b468d7415a0c6db102f5bc0e2136b. --- .../org/apache/dubbo/rpc/cluster/Directory.java | 4 - .../dubbo/rpc/cluster/filter/ClusterFilter.java | 24 --- .../cluster/filter/DefaultFilterChainBuilder.java | 10 +- .../rpc/cluster/filter/FilterChainBuilder.java | 36 +--- .../rpc/cluster/filter/ProtocolFilterWrapper.java | 9 +- .../cluster/interceptor/ClusterInterceptor.java | 1 - .../ConsumerContextClusterInterceptor.java | 60 +++++++ .../ZoneAwareClusterInterceptor.java} | 19 +-- .../cluster/support/wrapper/AbstractCluster.java | 190 ++++++++++----------- ...g.apache.dubbo.rpc.cluster.filter.ClusterFilter | 2 - ...ubbo.rpc.cluster.interceptor.ClusterInterceptor | 2 + dubbo-common/pom.xml | 4 - .../java/org/apache/dubbo/common/URLStrParser.java | 9 +- .../apache/dubbo/common/config/Environment.java | 20 --- .../dubbo/common/constants/CommonConstants.java | 6 - .../dubbo/common/url/component/URLAddress.java | 2 +- .../dubbo/common/url/component/URLItemCache.java | 44 ++--- .../dubbo/common/url/component/URLParam.java | 4 +- .../org/apache/dubbo/common/utils/ConfigUtils.java | 47 ----- .../org/apache/dubbo/config/AbstractConfig.java | 3 +- .../java/org/apache/dubbo/config/Constants.java | 2 - .../dubbo/config/bootstrap/DubboBootstrap.java | 3 +- .../ServiceInstanceHostPortCustomizer.java | 5 +- .../apache/dubbo/config/AbstractConfigTest.java | 4 +- .../src/main/resources/dubbo-migration.yaml | 3 - dubbo-dependencies-bom/pom.xml | 6 - dubbo-distribution/dubbo-all/pom.xml | 8 - .../org/apache/dubbo/metadata/MetadataInfo.java | 4 - .../dubbo/monitor/support/MonitorFilter.java | 3 +- .../dubbo/auth/filter/ConsumerSignFilter.java | 2 +- .../org/apache/dubbo/registry/NotifyListener.java | 4 - .../registry/client/DefaultServiceInstance.java | 58 +++---- .../client/EventPublishingServiceDiscovery.java | 15 -- .../client/FileSystemServiceDiscovery.java | 2 +- .../client/SelfHostMetaServiceDiscovery.java | 2 +- .../dubbo/registry/client/ServiceDiscovery.java | 5 - .../registry/client/ServiceDiscoveryRegistry.java | 2 +- .../client/ServiceDiscoveryRegistryDirectory.java | 5 +- .../dubbo/registry/client/ServiceInstance.java | 13 +- .../listener/ServiceInstancesChangedListener.java | 117 ++++++------- .../registry/client/metadata/MetadataUtils.java | 2 +- .../metadata/store/RemoteMetadataServiceImpl.java | 2 +- .../DefaultMigrationAddressComparator.java | 4 +- .../client/migration/MigrationInvoker.java | 89 +++++----- .../client/migration/MigrationRuleHandler.java | 4 +- .../client/migration/MigrationRuleListener.java | 8 +- .../registry/integration/DynamicDirectory.java | 8 +- .../client/DefaultServiceInstanceTest.java | 3 +- .../client/FileSystemServiceDiscoveryTest.java | 2 - .../multiple/MultipleServiceDiscovery.java | 38 ++--- .../nacos/util/NacosNamingServiceUtils.java | 4 +- .../zookeeper/ZookeeperServiceDiscovery.java | 34 ++-- .../ZookeeperServiceDiscoveryChangeWatcher.java | 38 +---- .../zookeeper/util/CuratorFrameworkUtils.java | 2 +- .../zookeeper/ZookeeperServiceDiscoveryTest.java | 3 +- .../main/java/org/apache/dubbo/rpc/BaseFilter.java | 31 ---- .../src/main/java/org/apache/dubbo/rpc/Filter.java | 40 ++--- .../dubbo/rpc/filter}/ConsumerContextFilter.java | 26 +-- .../apache/dubbo/rpc/protocol/AbstractInvoker.java | 2 +- .../dubbo/internal/org.apache.dubbo.rpc.Filter | 2 + .../rpc/protocol/dubbo/filter/FutureFilter.java | 4 +- .../dubbo/internal/org.apache.dubbo.rpc.Filter | 3 +- ...g.apache.dubbo.rpc.cluster.filter.ClusterFilter | 1 - .../dubbo/rpc/protocol/dubbo/FutureFilterTest.java | 4 +- 64 files changed, 425 insertions(+), 688 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java index 5a92d97..9fd3ca2 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java @@ -65,8 +65,4 @@ public interface Directory<T> extends Node { void discordAddresses(); RouterChain<T> getRouterChain(); - - default boolean isNotificationReceived() { - return false; - } } \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java deleted file mode 100644 index 7d48dc9..0000000 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ClusterFilter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.cluster.filter; - -import org.apache.dubbo.common.extension.SPI; -import org.apache.dubbo.rpc.BaseFilter; - -@SPI -public interface ClusterFilter extends BaseFilter { -} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java index e2e26d2..982008b 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java @@ -27,9 +27,6 @@ import java.util.List; @Activate(order = 0) public class DefaultFilterChainBuilder implements FilterChainBuilder { - /** - * build consumer/provider filter chain - */ @Override public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) { Invoker<T> last = originalInvoker; @@ -46,17 +43,14 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder { return last; } - /** - * build consumer cluster filter chain - */ @Override public <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> originalInvoker, String key, String group) { ClusterInvoker<T> last = originalInvoker; - List<ClusterFilter> filters = ExtensionLoader.getExtensionLoader(ClusterFilter.class).getActivateExtension(originalInvoker.getUrl(), key, group); + List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { - final ClusterFilter filter = filters.get(i); + final Filter filter = filters.get(i); final Invoker<T> next = last; last = new ClusterFilterChainNode<>(originalInvoker, next, filter); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java index 949a66c..20275e2 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java @@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.cluster.filter; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.SPI; -import org.apache.dubbo.rpc.BaseFilter; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -30,27 +29,16 @@ import org.apache.dubbo.rpc.cluster.Directory; @SPI("default") public interface FilterChainBuilder { - /** - * build consumer/provider filter chain - */ <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group); - /** - * build consumer cluster filter chain - */ <T> ClusterInvoker<T> buildClusterInvokerChain(final ClusterInvoker<T> invoker, String key, String group); - /** - * Works on provider side - * @param <T> - * @param <TYPE> - */ - class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{ + class FilterChainNode<T, TYPE extends Invoker<T>> implements Invoker<T>{ TYPE originalInvoker; Invoker<T> nextNode; - FILTER filter; + Filter filter; - public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) { + public FilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) { this.originalInvoker = originalInvoker; this.nextNode = nextNode; this.filter = filter; @@ -91,8 +79,8 @@ public interface FilterChainBuilder { } finally { listenableFilter.removeListener(invocation); } - } else if (filter instanceof FILTER.Listener) { - FILTER.Listener listener = (FILTER.Listener) filter; + } else if (filter instanceof Filter.Listener) { + Filter.Listener listener = (Filter.Listener) filter; listener.onError(e, originalInvoker, invocation); } throw e; @@ -114,8 +102,8 @@ public interface FilterChainBuilder { } finally { listenableFilter.removeListener(invocation); } - } else if (filter instanceof FILTER.Listener) { - FILTER.Listener listener = (FILTER.Listener) filter; + } else if (filter instanceof Filter.Listener) { + Filter.Listener listener = (Filter.Listener) filter; if (t == null) { listener.onResponse(r, originalInvoker, invocation); } else { @@ -136,14 +124,8 @@ public interface FilterChainBuilder { } } - /** - * Works on consumer side - * @param <T> - * @param <TYPE> - */ - class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter> - extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> { - public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) { + class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>> extends FilterChainNode<T, TYPE> implements ClusterInvoker<T> { + public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, Filter filter) { super(originalInvoker, nextNode, filter); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java index 389b173..65e8813 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/ProtocolFilterWrapper.java @@ -28,9 +28,10 @@ import org.apache.dubbo.rpc.ProtocolServer; import org.apache.dubbo.rpc.RpcException; import java.util.List; +import java.util.Objects; -import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY; import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY; +import static org.apache.dubbo.rpc.cluster.Constants.PEER_KEY; /** * ListenerProtocol @@ -67,7 +68,11 @@ public class ProtocolFilterWrapper implements Protocol { if (UrlUtils.isRegistry(url)) { return protocol.refer(type, url); } - return builder.buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); + // if it's peer-to-peer url + if (!Objects.isNull(url.getAttribute(PEER_KEY))) { + return builder.buildInvokerChain(protocol.refer(type, url), SERVICE_FILTER_KEY, CommonConstants.CONSUMER); + } + return protocol.refer(type, url); } @Override diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java index 821dd2e..199361f 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ClusterInterceptor.java @@ -26,7 +26,6 @@ import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; /** * Different from {@link Filter}, ClusterInterceptor works at the outmost layer, before one specific address/invoker is picked. */ -@Deprecated @SPI public interface ClusterInterceptor { diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java new file mode 100644 index 0000000..053bc87 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.interceptor; + +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; + +@Activate +public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener { + + @Override + public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) { + RpcContext context = RpcContext.getContext(); + context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0); + if (invocation instanceof RpcInvocation) { + ((RpcInvocation) invocation).setInvoker(invoker); + } + RpcContext.removeServerContext(); + } + + @Override + public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) { + RpcContext.removeContext(true); + } + + @Override + public Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException { + return clusterInvoker.invoke(invocation); + } + + @Override + public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) { + RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments()); + } + + @Override + public void onError(Throwable t, AbstractClusterInvoker<?> invoker, Invocation invocation) { + + } +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java similarity index 80% rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java index cd0a7ab..6daec08 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ZoneAwareFilter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ZoneAwareClusterInterceptor.java @@ -14,19 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.rpc.cluster.filter.support; +package org.apache.dubbo.rpc.cluster.interceptor; -import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.ZoneDetector; -import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; +import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_FORCE; @@ -36,11 +32,11 @@ import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_ZONE_ * * active only when url has key 'cluster=zone-aware' */ -@Activate(group = CommonConstants.CONSUMER, value = "cluster:zone-aware") -public class ZoneAwareFilter implements ClusterFilter { +@Activate(value = "cluster:zone-aware") +public class ZoneAwareClusterInterceptor implements ClusterInterceptor { @Override - public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { + public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) { RpcContext rpcContext = RpcContext.getContext(); String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE); String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE); @@ -57,7 +53,10 @@ public class ZoneAwareFilter implements ClusterFilter { if (StringUtils.isNotEmpty(force)) { invocation.setAttachment(REGISTRY_ZONE_FORCE, force); } + } + + @Override + public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) { - return invoker.invoke(invocation); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java index 1c2d047..9ce920d 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractCluster.java @@ -17,7 +17,6 @@ package org.apache.dubbo.rpc.cluster.support.wrapper; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.CollectionUtils; @@ -37,7 +36,6 @@ import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker; import java.util.List; -import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_INTERCEPTOR_COMPATIBLE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.INVOCATION_INTERCEPTOR_KEY; import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY; import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_INTERCEPTOR_KEY; @@ -46,10 +44,15 @@ public abstract class AbstractCluster implements Cluster { private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) { // AbstractClusterInvoker<T> last = clusterInvoker; - AbstractClusterInvoker<T> last = buildInterceptorInvoker(new ClusterFilterInvoker<>(clusterInvoker)); + AbstractClusterInvoker<T> last = buildInterceptorInvoker(new FilterInvoker<>(clusterInvoker)); + List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions(); - if (Boolean.parseBoolean(ConfigurationUtils.getProperty(CLUSTER_INTERCEPTOR_COMPATIBLE_KEY, "false"))) { - return build27xCompatibleClusterInterceptors(clusterInvoker, last); + if (!interceptors.isEmpty()) { + for (int i = interceptors.size() - 1; i >= 0; i--) { + final ClusterInterceptor interceptor = interceptors.get(i); + final AbstractClusterInvoker<T> next = last; + last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); + } } return last; } @@ -67,15 +70,90 @@ public abstract class AbstractCluster implements Cluster { if (CollectionUtils.isEmpty(builders)) { return invoker; } - return new InvocationInterceptorInvoker<>(invoker, builders); + return new InterceptorInvoker<>(invoker, builders); } protected abstract <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException; - static class ClusterFilterInvoker<T> extends AbstractClusterInvoker<T> { + static class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> { + + private AbstractClusterInvoker<T> clusterInvoker; + private ClusterInterceptor interceptor; + private AbstractClusterInvoker<T> next; + + public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker, + ClusterInterceptor interceptor, + AbstractClusterInvoker<T> next) { + this.clusterInvoker = clusterInvoker; + this.interceptor = interceptor; + this.next = next; + } + + @Override + public Class<T> getInterface() { + return clusterInvoker.getInterface(); + } + + @Override + public URL getUrl() { + return clusterInvoker.getUrl(); + } + + @Override + public boolean isAvailable() { + return clusterInvoker.isAvailable(); + } + + @Override + public Result invoke(Invocation invocation) throws RpcException { + Result asyncResult; + try { + interceptor.before(next, invocation); + asyncResult = interceptor.intercept(next, invocation); + } catch (Exception e) { + // onError callback + if (interceptor instanceof ClusterInterceptor.Listener) { + ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; + listener.onError(e, clusterInvoker, invocation); + } + throw e; + } finally { + interceptor.after(next, invocation); + } + return asyncResult.whenCompleteWithContext((r, t) -> { + // onResponse callback + if (interceptor instanceof ClusterInterceptor.Listener) { + ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; + if (t == null) { + listener.onMessage(r, clusterInvoker, invocation); + } else { + listener.onError(t, clusterInvoker, invocation); + } + } + }); + } + + @Override + public void destroy() { + clusterInvoker.destroy(); + } + + @Override + public String toString() { + return clusterInvoker.toString(); + } + + @Override + protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { + // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter. + return null; + } + } + + static class FilterInvoker<T> extends AbstractClusterInvoker<T> { private ClusterInvoker<T> filterInvoker; - public ClusterFilterInvoker(AbstractClusterInvoker<T> invoker) { + public FilterInvoker(AbstractClusterInvoker<T> invoker) { List<FilterChainBuilder> builders = ExtensionLoader.getExtensionLoader(FilterChainBuilder.class).getActivateExtensions(); if (CollectionUtils.isEmpty(builders)) { filterInvoker = invoker; @@ -125,10 +203,10 @@ public abstract class AbstractCluster implements Cluster { } } - static class InvocationInterceptorInvoker<T> extends AbstractClusterInvoker<T> { + static class InterceptorInvoker<T> extends AbstractClusterInvoker<T> { private ClusterInvoker<T> interceptorInvoker; - public InvocationInterceptorInvoker(AbstractClusterInvoker<T> invoker, List<InvocationInterceptorBuilder> builders) { + public InterceptorInvoker(AbstractClusterInvoker<T> invoker, List<InvocationInterceptorBuilder> builders) { ClusterInvoker<T> tmpInvoker = invoker; for (InvocationInterceptorBuilder builder : builders) { tmpInvoker = builder.buildClusterInterceptorChain(tmpInvoker, INVOCATION_INTERCEPTOR_KEY, CommonConstants.CONSUMER); @@ -170,96 +248,4 @@ public abstract class AbstractCluster implements Cluster { return null; } } - - @Deprecated - private <T> ClusterInvoker<T> build27xCompatibleClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, AbstractClusterInvoker<T> last) { - List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtensions(); - - if (!interceptors.isEmpty()) { - for (int i = interceptors.size() - 1; i >= 0; i--) { - final ClusterInterceptor interceptor = interceptors.get(i); - final AbstractClusterInvoker<T> next = last; - last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next); - } - } - return last; - } - - @Deprecated - static class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> { - - private AbstractClusterInvoker<T> clusterInvoker; - private ClusterInterceptor interceptor; - private AbstractClusterInvoker<T> next; - - public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker, - ClusterInterceptor interceptor, - AbstractClusterInvoker<T> next) { - this.clusterInvoker = clusterInvoker; - this.interceptor = interceptor; - this.next = next; - } - - @Override - public Class<T> getInterface() { - return clusterInvoker.getInterface(); - } - - @Override - public URL getUrl() { - return clusterInvoker.getUrl(); - } - - @Override - public boolean isAvailable() { - return clusterInvoker.isAvailable(); - } - - @Override - public Result invoke(Invocation invocation) throws RpcException { - Result asyncResult; - try { - interceptor.before(next, invocation); - asyncResult = interceptor.intercept(next, invocation); - } catch (Exception e) { - // onError callback - if (interceptor instanceof ClusterInterceptor.Listener) { - ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; - listener.onError(e, clusterInvoker, invocation); - } - throw e; - } finally { - interceptor.after(next, invocation); - } - return asyncResult.whenCompleteWithContext((r, t) -> { - // onResponse callback - if (interceptor instanceof ClusterInterceptor.Listener) { - ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor; - if (t == null) { - listener.onMessage(r, clusterInvoker, invocation); - } else { - listener.onError(t, clusterInvoker, invocation); - } - } - }); - } - - @Override - public void destroy() { - clusterInvoker.destroy(); - } - - @Override - public String toString() { - return clusterInvoker.toString(); - } - - @Override - protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { - // The only purpose is to build a interceptor chain, so the cluster related logic doesn't matter. - return null; - } - } - - } diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter deleted file mode 100644 index 8f70d31..0000000 --- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter +++ /dev/null @@ -1,2 +0,0 @@ -zone-aware=org.apache.dubbo.rpc.cluster.filter.support.ZoneAwareFilter -consumercontext=org.apache.dubbo.rpc.cluster.filter.support.ConsumerContextFilter \ No newline at end of file diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor new file mode 100644 index 0000000..3f3f008 --- /dev/null +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.interceptor.ClusterInterceptor @@ -0,0 +1,2 @@ +context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterInterceptor +zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor \ No newline at end of file diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml index b424661..44d9fab 100644 --- a/dubbo-common/pom.xml +++ b/dubbo-common/pom.xml @@ -72,10 +72,6 @@ <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> </dependency> - <dependency> - <groupId>org.eclipse.collections</groupId> - <artifactId>eclipse-collections</artifactId> - </dependency> </dependencies> </project> \ No newline at end of file diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java index 5bd2970..619b1bb 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URLStrParser.java @@ -21,9 +21,8 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.url.component.ServiceConfigURL; import org.apache.dubbo.common.url.component.URLItemCache; -import org.eclipse.collections.impl.map.mutable.UnifiedMap; - import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY_PREFIX; @@ -74,7 +73,7 @@ public final class URLStrParser { } TempBuf tempBuf = DECODE_TEMP_BUF.get(); - Map<String, String> params = new UnifiedMap<>(); + Map<String, String> params = new HashMap<>(); int nameStart = from; int valueStart = -1; int i; @@ -170,7 +169,7 @@ public final class URLStrParser { } // check cache - protocol = URLItemCache.intern(protocol); + protocol = URLItemCache.checkProtocol(protocol); path = URLItemCache.checkPath(path); return new ServiceConfigURL(protocol, username, password, host, port, path, parameters); @@ -234,7 +233,7 @@ public final class URLStrParser { } TempBuf tempBuf = DECODE_TEMP_BUF.get(); - Map<String, String> params = new UnifiedMap<>(); + Map<String, String> params = new HashMap<>(); int nameStart = from; int valueStart = -1; int i; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java index 3da6cca..ea4b4e5 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/Environment.java @@ -17,13 +17,11 @@ package org.apache.dubbo.common.config; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; -import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.context.FrameworkExt; import org.apache.dubbo.common.context.LifecycleAdapter; import org.apache.dubbo.common.extension.DisableInject; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.config.AbstractConfig; import org.apache.dubbo.config.ConfigCenterConfig; import org.apache.dubbo.config.context.ConfigConfigurationAdapter; @@ -56,7 +54,6 @@ public class Environment extends LifecycleAdapter implements FrameworkExt { private boolean configCenterFirst = true; private DynamicConfiguration dynamicConfiguration; - private String localMigrationRule; public Environment() { this.propertiesConfiguration = new PropertiesConfiguration(); @@ -79,19 +76,6 @@ public class Environment extends LifecycleAdapter implements FrameworkExt { this.externalConfiguration.setProperties(externalConfigurationMap); this.appExternalConfiguration.setProperties(appExternalConfigurationMap); - - loadMigrationRule(); - } - - private void loadMigrationRule() { - String path = System.getProperty(CommonConstants.DUBBO_MIGRATION_KEY); - if (path == null || path.length() == 0) { - path = System.getenv(CommonConstants.DUBBO_MIGRATION_KEY); - if (path == null || path.length() == 0) { - path = CommonConstants.DEFAULT_DUBBO_MIGRATION_FILE; - } - } - this.localMigrationRule = ConfigUtils.loadMigrationRule(path); } @DisableInject @@ -236,10 +220,6 @@ public class Environment extends LifecycleAdapter implements FrameworkExt { return appExternalConfiguration; } - public String getLocalMigrationRule() { - return localMigrationRule; - } - // For test public void clearExternalConfigs() { this.externalConfiguration.clear(); diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java index 953b11b..268c313 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java @@ -45,10 +45,6 @@ public interface CommonConstants { String DEFAULT_DUBBO_PROPERTIES = "dubbo.properties"; - String DUBBO_MIGRATION_KEY = "dubbo.migration.file"; - - String DEFAULT_DUBBO_MIGRATION_FILE = "dubbo-migration.yaml"; - String ANY_VALUE = "*"; /** @@ -380,6 +376,4 @@ public interface CommonConstants { String CACHE_CLEAR_TASK_INTERVAL = "dubbo.application.url.cache.task.interval"; String CACHE_CLEAR_WAITING_THRESHOLD = "dubbo.application.url.cache.clear.waiting"; - String CLUSTER_INTERCEPTOR_COMPATIBLE_KEY = "dubbo.application.cluster.interceptor.compatible"; - } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java index a0e89f1..63d140b 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLAddress.java @@ -252,7 +252,7 @@ public class URLAddress implements Serializable { } // check cache - protocol = URLItemCache.intern(protocol); + protocol = URLItemCache.checkProtocol(protocol); path = URLItemCache.checkPath(path); return new PathURLAddress(protocol, username, password, path, host, port, rawAddress); diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java index 54372c2..2384493 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLItemCache.java @@ -19,13 +19,14 @@ package org.apache.dubbo.common.url.component; import org.apache.dubbo.common.utils.LRUCache; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class URLItemCache { // thread safe with limited size, by default 1000 private static final Map<String, String> PARAM_KEY_CACHE = new LRUCache<>(10000); - private static final Map<String, String> PARAM_VALUE_CACHE = new LRUCache<>(50000); + private static final Map<String, String> PARAM_VALUE_CACHE = new LRUCache<>(100000); private static final Map<String, String> PATH_CACHE = new LRUCache<>(10000); - private static final Map<String, String> REVISION_CACHE = new LRUCache<>(10000); + private static final Map<String, String> PROTOCOL_CACHE = new ConcurrentHashMap<>(); public static void putParams(Map<String, String> params, String key, String value) { String cachedKey = PARAM_KEY_CACHE.get(key); @@ -42,6 +43,17 @@ public class URLItemCache { params.put(cachedKey, cachedValue); } + public static String checkProtocol(String _protocol) { + if (_protocol == null) { + return _protocol; + } + String cachedProtocol = PROTOCOL_CACHE.putIfAbsent(_protocol, _protocol); + if (cachedProtocol != null) { + return cachedProtocol; + } + return _protocol; + } + public static String checkPath(String _path) { if (_path == null) { return _path; @@ -52,32 +64,4 @@ public class URLItemCache { } return _path; } - - public static String checkRevision(String _revision) { - if (_revision == null) { - return _revision; - } - String revision = REVISION_CACHE.putIfAbsent(_revision, _revision); - if (revision != null) { - return revision; - } - return _revision; - } - - public static String intern(String _protocol) { - if (_protocol == null) { - return _protocol; - } - return _protocol.intern(); - } - - public static void putParamsIntern(Map<String, String> params, String key, String value) { - if (key == null || value == null) { - params.put(key, value); - return; - } - key = key.intern(); - value = value.intern(); - params.put(key, value); - } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java index cea92cb..b2dca0c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/url/component/URLParam.java @@ -21,8 +21,6 @@ import org.apache.dubbo.common.URLStrParser; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; -import org.eclipse.collections.impl.map.mutable.UnifiedMap; - import java.io.Serializable; import java.util.Collections; import java.util.HashMap; @@ -265,7 +263,7 @@ public class URLParam implements Serializable { public static URLParam parse(String rawParam) { String[] parts = rawParam.split("&"); - Map<String, String> parameters = new UnifiedMap<>((int) (parts.length/.75f) + 1); + Map<String, String> parameters = new HashMap<>((int) (parts.length/.75f) + 1); for (String part : parts) { part = part.trim(); if (part.length() > 0) { diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java index d0d0693..c1f4711 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ConfigUtils.java @@ -21,16 +21,12 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; -import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; -import java.util.Arrays; import java.util.Enumeration; import java.util.List; import java.util.Map; @@ -300,49 +296,6 @@ public class ConfigUtils { return properties; } - public static String loadMigrationRule(String fileName) { - String rawRule = ""; - if (checkFileNameExist(fileName)) { - try { - try (FileInputStream input = new FileInputStream(fileName)) { - rawRule = readString(input); - } - } catch (Throwable e) { - logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e); - } - return rawRule; - } - - try { - InputStream is = ClassUtils.getClassLoader().getResourceAsStream(fileName); - if (is != null) { - rawRule = readString(is); - } - } catch (Throwable e) { - logger.warn("Failed to load " + fileName + " file from " + fileName + "(ignore this file): " + e.getMessage(), e); - } - return rawRule; - } - - private static String readString(InputStream is) { - StringBuilder stringBuilder = new StringBuilder(); - char[] buffer = new char[10]; - try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))){ - int n; - while ((n = reader.read(buffer)) != -1) { - if (n < 10) { - buffer = Arrays.copyOf(buffer, n); - } - stringBuilder.append(String.valueOf(buffer)); - buffer = new char[10]; - } - } catch (IOException e) { - logger.error("Read migration file error.", e); - } - - return stringBuilder.toString(); - } - /** * check if the fileName can be found in filesystem * diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java index 47556c5..4e7bbf4 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java @@ -36,7 +36,6 @@ import javax.annotation.PostConstruct; import java.io.Serializable; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -337,7 +336,7 @@ public abstract class AbstractConfig implements Serializable { String value = entry.getValue(); result.put(pre + key, value); // For compatibility, key like "registry-type" will has a duplicate key "registry.type" - if (Arrays.binarySearch(Constants.DOT_COMPATIBLE_KEYS, key) != -1) { + if (key.contains("-")) { result.put(pre + key.replace('-', '.'), value); } } diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java index 894f138..f8fed84 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/config/Constants.java @@ -117,6 +117,4 @@ public interface Constants { String REGISTER_KEY = "register"; String MULTI_SERIALIZATION_KEY = "serialize.multiple"; - - String[] DOT_COMPATIBLE_KEYS = new String[]{"qos-enable", "qos-port", "qos-accept-foreign-ip"}; } diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java index cdf66dd..a4116f4 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java @@ -520,7 +520,6 @@ public class DubboBootstrap extends GenericEventListener { } ApplicationModel.initFrameworkExts(); - startConfigCenter(); @@ -1163,7 +1162,7 @@ public class DubboBootstrap extends GenericEventListener { private void doRegisterServiceInstance(ServiceInstance serviceInstance) { // register instance only when at least one service is exported. - if (serviceInstance.getPort() > 0) { + if (serviceInstance.getPort() != null && serviceInstance.getPort() != -1) { publishMetadataToRemote(serviceInstance); logger.info("Start registering instance address to registry."); getServiceDiscoveries().forEach(serviceDiscovery -> diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java index 8693828..3831337 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ServiceInstanceHostPortCustomizer.java @@ -36,14 +36,14 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi @Override public void customize(ServiceInstance serviceInstance) { - if (serviceInstance.getPort() > 0) { + if (serviceInstance.getPort() != null) { return; } WritableMetadataService writableMetadataService = WritableMetadataService.getDefaultExtension(); String host = null; - int port = -1; + Integer port = null; Set<URL> urls = writableMetadataService.getExportedServiceURLs(); if (CollectionUtils.isNotEmpty(urls)) { String preferredProtocol = ApplicationModel.getApplicationConfig().getProtocol(); @@ -64,6 +64,7 @@ public class ServiceInstanceHostPortCustomizer implements ServiceInstanceCustomi DefaultServiceInstance instance = (DefaultServiceInstance) serviceInstance; instance.setHost(host); instance.setPort(port); + instance.setId(host + ":" + port); } } } diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java index 04d5f72..041cd75 100644 --- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java +++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/AbstractConfigTest.java @@ -116,6 +116,8 @@ public class AbstractConfigTest { Assertions.assertEquals("ONE,1", parameters.get("prefix.num")); Assertions.assertEquals("hello%2Fworld", parameters.get("prefix.naming")); Assertions.assertEquals("30", parameters.get("prefix.age")); + Assertions.assertTrue(parameters.containsKey("prefix.key-2")); + Assertions.assertTrue(parameters.containsKey("prefix.key.2")); Assertions.assertFalse(parameters.containsKey("prefix.secret")); } @@ -805,7 +807,7 @@ public class AbstractConfigTest { public Map getParameters() { Map<String, String> map = new HashMap<String, String>(); map.put("key.1", "one"); - map.put("key.2", "two"); + map.put("key-2", "two"); return map; } } diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml deleted file mode 100644 index dc2e8f2..0000000 --- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/dubbo-migration.yaml +++ /dev/null @@ -1,3 +0,0 @@ -key: demo-consumer -step: FORCE_APPLICATION -threshold: 0.1 \ No newline at end of file diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml index 8cc1d43..e6b67c1 100644 --- a/dubbo-dependencies-bom/pom.xml +++ b/dubbo-dependencies-bom/pom.xml @@ -91,7 +91,6 @@ <!-- Common libs --> <spring_version>4.3.16.RELEASE</spring_version> <javassist_version>3.20.0-GA</javassist_version> - <eclipse_collections_version>10.4.0</eclipse_collections_version> <netty_version>3.2.5.Final</netty_version> <netty4_version>4.1.56.Final</netty4_version> <mina_version>1.1.7</mina_version> @@ -187,11 +186,6 @@ <version>${javassist_version}</version> </dependency> <dependency> - <groupId>org.eclipse.collections</groupId> - <artifactId>eclipse-collections</artifactId> - <version>${eclipse_collections_version}</version> - </dependency> - <dependency> <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> <version>${netty_version}</version> diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml index b464e41..5729680 100644 --- a/dubbo-distribution/dubbo-all/pom.xml +++ b/dubbo-distribution/dubbo-all/pom.xml @@ -312,10 +312,6 @@ <artifactId>javassist</artifactId> </dependency> <dependency> - <groupId>org.eclipse.collections</groupId> - <artifactId>eclipse-collections</artifactId> - </dependency> - <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> @@ -517,10 +513,6 @@ </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter</resource> - </transformer> - <transformer - implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/dubbo/internal/org.apache.dubbo.rpc.InvokerListener</resource> </transformer> <transformer diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java index 4106ad2..024328e 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java @@ -42,8 +42,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPAR import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY; public class MetadataInfo implements Serializable { - public static final MetadataInfo EMPTY = new MetadataInfo(); - private String app; private String revision; private Map<String, ServiceInfo> services; @@ -52,8 +50,6 @@ public class MetadataInfo implements Serializable { private transient Map<String, String> extendParams; private transient AtomicBoolean reported = new AtomicBoolean(false); - public MetadataInfo() {} - public MetadataInfo(String app) { this(app, null, null); } diff --git a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java index 47e4bc7..1492e0b 100644 --- a/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java +++ b/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY; @@ -48,7 +49,7 @@ import static org.apache.dubbo.rpc.Constants.OUTPUT_KEY; /** * MonitorFilter. (SPI, Singleton, ThreadSafe) */ -@Activate(group = {PROVIDER}) +@Activate(group = {PROVIDER, CONSUMER}) public class MonitorFilter implements Filter, Filter.Listener { private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class); diff --git a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java index 96438c5..cf984a5 100644 --- a/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java +++ b/dubbo-plugin/dubbo-auth/src/main/java/org/apache/dubbo/auth/filter/ConsumerSignFilter.java @@ -33,7 +33,7 @@ import org.apache.dubbo.rpc.RpcException; * * @see org.apache.dubbo.rpc.Filter */ -@Activate(group = CommonConstants.CONSUMER, value = Constants.SERVICE_AUTH, order = -10000) +@Activate(group = CommonConstants.CONSUMER, order = -10000) public class ConsumerSignFilter implements Filter { @Override diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java index 4c02cdc..89e3e75 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java @@ -45,8 +45,4 @@ public interface NotifyListener { default void addServiceListener(ServiceInstancesChangedListener instanceListener) { } - default URL getConsumerUrl() { - return null; - } - } \ No newline at end of file diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java index 1bcf284..93ba70e 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java @@ -19,7 +19,6 @@ package org.apache.dubbo.registry.client; import org.apache.dubbo.metadata.MetadataInfo; import com.alibaba.fastjson.JSON; -import org.eclipse.collections.impl.map.mutable.UnifiedMap; import java.util.HashMap; import java.util.List; @@ -40,23 +39,24 @@ public class DefaultServiceInstance implements ServiceInstance { private static final long serialVersionUID = 1149677083747278100L; + private String id; + private String serviceName; private String host; - private int port; + private Integer port; private boolean enabled; private boolean healthy; - private Map<String, String> metadata = new UnifiedMap<>(); + private Map<String, String> metadata = new HashMap<>(); private transient String address; private transient MetadataInfo serviceMetadata; // used at runtime - private transient String registryCluster; // extendParams can be more flexiable, but one single property uses less space - private transient Map<String, String> extendParams; + private transient Map<String, String> extendParams = new HashMap<>(); private transient List<Endpoint> endpoints; public DefaultServiceInstance() { @@ -70,16 +70,17 @@ public class DefaultServiceInstance implements ServiceInstance { this.healthy = other.healthy; this.metadata = other.metadata; this.serviceMetadata = other.serviceMetadata; - this.registryCluster = other.registryCluster; this.extendParams = other.extendParams; this.endpoints = other.endpoints; this.address = null; + this.id = null; } - public DefaultServiceInstance(String serviceName, String host, Integer port) { + public DefaultServiceInstance(String id, String serviceName, String host, Integer port) { if (port != null && port.intValue() < 1) { throw new IllegalArgumentException("The port must be greater than zero!"); } + this.id = id; this.serviceName = serviceName; this.host = host; this.port = port; @@ -87,10 +88,18 @@ public class DefaultServiceInstance implements ServiceInstance { this.healthy = true; } + public DefaultServiceInstance(String serviceName, String host, Integer port) { + this(host + ":" + port, serviceName, host, port); + } + public DefaultServiceInstance(String serviceName) { this.serviceName = serviceName; } + public void setId(String id) { + this.id = id; + } + public void setServiceName(String serviceName) { this.serviceName = serviceName; } @@ -100,6 +109,11 @@ public class DefaultServiceInstance implements ServiceInstance { } @Override + public String getId() { + return id; + } + + @Override public String getServiceName() { return serviceName; } @@ -109,12 +123,12 @@ public class DefaultServiceInstance implements ServiceInstance { return host; } - public void setPort(int port) { + public void setPort(Integer port) { this.port = port; } @Override - public int getPort() { + public Integer getPort() { return port; } @@ -159,19 +173,7 @@ public class DefaultServiceInstance implements ServiceInstance { } @Override - public String getRegistryCluster() { - return registryCluster; - } - - public void setRegistryCluster(String registryCluster) { - this.registryCluster = registryCluster; - } - - @Override public Map<String, String> getExtendParams() { - if (extendParams == null) { - extendParams = new HashMap<>(); - } return extendParams; } @@ -185,19 +187,16 @@ public class DefaultServiceInstance implements ServiceInstance { public DefaultServiceInstance copy(Endpoint endpoint) { DefaultServiceInstance copyOfInstance = new DefaultServiceInstance(this); copyOfInstance.setPort(endpoint.getPort()); + copyOfInstance.setId(copyOfInstance.getAddress()); return copyOfInstance; } @Override public Map<String, String> getAllParams() { - if (extendParams == null) { - return metadata; - } else { - Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1)); - allParams.putAll(metadata); - allParams.putAll(extendParams); - return allParams; - } + Map<String, String> allParams = new HashMap<>((int) ((metadata.size() + extendParams.size()) / 0.75f + 1)); + allParams.putAll(metadata); + allParams.putAll(extendParams); + return allParams; } public void setMetadata(Map<String, String> metadata) { @@ -250,6 +249,7 @@ public class DefaultServiceInstance implements ServiceInstance { @Override public String toString() { return "DefaultServiceInstance{" + + "id='" + id + '\'' + ", serviceName='" + serviceName + '\'' + ", host='" + host + '\'' + ", port=" + port + diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java index f9c8019..ee99000 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java @@ -224,21 +224,6 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery { } @Override - public ServiceInstancesChangedListener createListener(Set<String> serviceNames) { - return serviceDiscovery.createListener(serviceNames); - } - - @Override - public void removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws IllegalArgumentException { - serviceDiscovery.removeServiceInstancesChangedListener(listener); - } - - @Override - public long getDelay() { - return serviceDiscovery.getDelay(); - } - - @Override public URL getUrl() { return serviceDiscovery.getUrl(); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java index 1482a9d..2a51168 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java @@ -110,7 +110,7 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen } private String getServiceInstanceId(ServiceInstance serviceInstance) { - String id = serviceInstance.getAddress(); + String id = serviceInstance.getId(); if (StringUtils.isBlank(id)) { return serviceInstance.getHost() + "." + serviceInstance.getPort(); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java index 7537090..1034420 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java @@ -207,7 +207,7 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery { @SuppressWarnings("unchecked") public final void fillServiceInstance(DefaultServiceInstance serviceInstance) { - String hostId = serviceInstance.getAddress(); + String hostId = serviceInstance.getId(); if (metadataMap.containsKey(hostId)) { // Use cached metadata. // Metadata will be updated by provider callback diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java index aa318d4..90ba196 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java @@ -219,7 +219,6 @@ public interface ServiceDiscovery extends Prioritized { /** * unsubscribe to instances change event. - * * @param listener * @throws IllegalArgumentException */ @@ -227,10 +226,6 @@ public interface ServiceDiscovery extends Prioritized { throws IllegalArgumentException { } - default ServiceInstancesChangedListener createListener(Set<String> serviceNames) { - return new ServiceInstancesChangedListener(serviceNames, this); - } - /** * Dispatch the {@link ServiceInstancesChangedEvent} * diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index 31ae6f9..d19c8de 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -291,7 +291,7 @@ public class ServiceDiscoveryRegistry implements Registry { // register ServiceInstancesChangedListener ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceNamesKey, k -> { - ServiceInstancesChangedListener serviceInstancesChangedListener = serviceDiscovery.createListener(serviceNames); + ServiceInstancesChangedListener serviceInstancesChangedListener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery); serviceInstancesChangedListener.setUrl(url); serviceNames.forEach(serviceName -> { List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java index 6232097..c41e028 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java @@ -32,10 +32,9 @@ import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.cluster.RouterChain; -import org.eclipse.collections.impl.map.mutable.UnifiedMap; - import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -153,7 +152,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im * @return invokers */ private Map<String, Invoker<T>> toInvokers(List<URL> urls) { - Map<String, Invoker<T>> newUrlInvokerMap = new UnifiedMap<>(); + Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java index 3a55c4e..5bb2bfe 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java @@ -29,6 +29,13 @@ import java.util.SortedMap; public interface ServiceInstance extends Serializable { /** + * The id of the registered service instance. + * + * @return nullable + */ + String getId(); + + /** * The name of service that current instance belongs to. * * @return non-null @@ -47,7 +54,7 @@ public interface ServiceInstance extends Serializable { * * @return the positive integer if present */ - int getPort(); + Integer getPort(); String getAddress(); @@ -80,10 +87,6 @@ public interface ServiceInstance extends Serializable { SortedMap<String, String> getSortedMetadata(); - String getRegistryCluster(); - - void setRegistryCluster(String registryCluster); - Map<String, String> getExtendParams(); Map<String, String> getAllParams(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java index dd0a0cf..22280bd 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE; +import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY; import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision; @@ -66,14 +67,14 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener private static final Logger logger = LoggerFactory.getLogger(ServiceInstancesChangedListener.class); - protected final Set<String> serviceNames; - protected final ServiceDiscovery serviceDiscovery; - protected URL url; - protected Map<String, NotifyListener> listeners; + private final Set<String> serviceNames; + private final ServiceDiscovery serviceDiscovery; + private URL url; + private Map<String, NotifyListener> listeners; private Map<String, List<ServiceInstance>> allInstances; - private Map<String, Object> serviceUrls; + private Map<String, List<URL>> serviceUrls; private Map<String, MetadataInfo> revisionToMetadata; @@ -111,8 +112,8 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>(); Map<ServiceInfo, Set<String>> localServiceToRevisions = new HashMap<>(); - Map<String, Map<Set<String>, Object>> protocolRevisionsToUrls = new HashMap<>(); - Map<String, Object> newServiceUrls = new HashMap<>();//TODO + Map<String, Map<Set<String>, List<URL>>> protocolRevisionsToUrls = new HashMap<>(); + Map<String, List<URL>> newServiceUrls = new HashMap<>();//TODO Map<String, MetadataInfo> newRevisionToMetadata = new HashMap<>(); for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) { @@ -150,14 +151,27 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener localServiceToRevisions.forEach((serviceInfo, revisions) -> { String protocol = serviceInfo.getProtocol(); - Map<Set<String>, Object> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> { + Map<Set<String>, List<URL>> revisionsToUrls = protocolRevisionsToUrls.computeIfAbsent(protocol, k -> { return new HashMap<>(); }); - Object urls = revisionsToUrls.get(revisions); + List<URL> urls = revisionsToUrls.get(revisions); if (urls != null) { newServiceUrls.put(serviceInfo.getMatchKey(), urls); } else { - urls = getServiceUrlsCache(revisionToInstances, revisions, protocol); + urls = new ArrayList<>(); + for (String r : revisions) { + for (ServiceInstance i : revisionToInstances.get(r)) { + // different protocols may have ports specified in meta + if (ServiceInstanceMetadataUtils.hasEndpoints(i)) { + DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol); + if (endpoint != null && !endpoint.getPort().equals(i.getPort())) { + urls.add(((DefaultServiceInstance)i).copy(endpoint).toURL()); + break; + } + } + urls.add(i.toURL()); + } + } revisionsToUrls.put(revisions, urls); newServiceUrls.put(serviceInfo.getMatchKey(), urls); } @@ -169,7 +183,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener public synchronized void addListenerAndNotify(String serviceKey, NotifyListener listener) { this.listeners.put(serviceKey, listener); - List<URL> urls = getAddresses(serviceKey); + List<URL> urls = this.serviceUrls.get(serviceKey); if (CollectionUtils.isNotEmpty(urls)) { listener.notify(urls); } @@ -183,7 +197,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener } public List<URL> getUrls(String serviceKey) { - return toUrlsWithEmpty(getAddresses(serviceKey)); + return toUrlsWithEmpty(serviceUrls.get(serviceKey)); } /** @@ -227,7 +241,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener return serviceNames.contains(event.getServiceName()); } - protected boolean isRetryAndExpired(ServiceInstancesChangedEvent event) { + private boolean isRetryAndExpired(ServiceInstancesChangedEvent event) { String appName = event.getServiceName(); List<ServiceInstance> appInstances = event.getServiceInstances(); @@ -247,13 +261,13 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener return false; } - protected boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) { + private boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) { if (revisionToMetadata == null) { return false; } boolean result = false; for (Map.Entry<String, MetadataInfo> entry : revisionToMetadata.entrySet()) { - if (entry.getValue() == MetadataInfo.EMPTY) { + if (entry.getValue() == null) { result = true; break; } @@ -261,31 +275,31 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener return result; } - protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) { + private MetadataInfo getRemoteMetadata(ServiceInstance instance, String revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, List<ServiceInstance> subInstances) { MetadataInfo metadata = revisionToMetadata.get(revision); - if (metadata == null - || (metadata == MetadataInfo.EMPTY && (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) { - metadata = getMetadataInfo(instance); - - if (metadata != MetadataInfo.EMPTY) { - logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata); - failureCounter.set(0); - revisionToMetadata.putIfAbsent(revision, metadata); - parseMetadata(revision, metadata, localServiceToRevisions); - } else { - logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision - + ", wait for retry."); - lastFailureTime = System.currentTimeMillis(); - failureCounter.incrementAndGet(); + if (metadata == null) { + if (failureCounter.get() < 3 || (System.currentTimeMillis() - lastFailureTime > 10000)) { + metadata = getMetadataInfo(instance); + if (metadata != null) { + logger.info("MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + " is " + metadata); + failureCounter.set(0); + revisionToMetadata.putIfAbsent(revision, metadata); + parseMetadata(revision, metadata, localServiceToRevisions); + } else { + logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision + + ", wait for retry."); + lastFailureTime = System.currentTimeMillis(); + failureCounter.incrementAndGet(); + } } - } else if (metadata != MetadataInfo.EMPTY && subInstances.size() == 1) { + } else if (subInstances.size() == 1) { // "subInstances.size() >= 2" means metadata of this revision has been parsed, ignore parseMetadata(revision, metadata, localServiceToRevisions); } return metadata; } - protected Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) { + private Map<ServiceInfo, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<ServiceInfo, Set<String>> localServiceToRevisions) { Map<String, ServiceInfo> serviceInfos = metadata.getServices(); for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) { Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getValue(), k -> new TreeSet<>()); @@ -295,12 +309,10 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener return localServiceToRevisions; } - protected MetadataInfo getMetadataInfo(ServiceInstance instance) { + private MetadataInfo getMetadataInfo(ServiceInstance instance) { String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance); // FIXME, check "REGISTRY_CLUSTER_KEY" must be set by every registry implementation. - if (instance.getRegistryCluster() == null) { - instance.setRegistryCluster(RegistryClusterIdentifier.getExtension(url).consumerKey(url)); - } + instance.getExtendParams().putIfAbsent(REGISTRY_CLUSTER_KEY, RegistryClusterIdentifier.getExtension(url).consumerKey(url)); MetadataInfo metadataInfo; try { if (logger.isDebugEnabled()) { @@ -320,46 +332,19 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener logger.error("Failed to load service metadata, meta type is " + metadataType, e); metadataInfo = null; } - - if (metadataInfo == null) { - metadataInfo = MetadataInfo.EMPTY; - } return metadataInfo; } - protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>> revisionToInstances, Set<String> revisions, String protocol) { - List<URL> urls; - urls = new ArrayList<>(); - for (String r : revisions) { - for (ServiceInstance i : revisionToInstances.get(r)) { - // different protocols may have ports specified in meta - if (ServiceInstanceMetadataUtils.hasEndpoints(i)) { - DefaultServiceInstance.Endpoint endpoint = ServiceInstanceMetadataUtils.getEndpoint(i, protocol); - if (endpoint != null && !endpoint.getPort().equals(i.getPort())) { - urls.add(((DefaultServiceInstance) i).copy(endpoint).toURL()); - break; - } - } - urls.add(i.toURL()); - } - } - return urls; - } - - protected List<URL> getAddresses(String serviceProtocolKey) { - return (List<URL>) serviceUrls.get(serviceProtocolKey); - } - - protected void notifyAddressChanged() { + private void notifyAddressChanged() { listeners.forEach((key, notifyListener) -> { //FIXME, group wildcard match - List<URL> urls = toUrlsWithEmpty(getAddresses(key)); + List<URL> urls = toUrlsWithEmpty(serviceUrls.get(key)); logger.info("Notify service " + key + " with urls " + urls.size()); notifyListener.notify(urls); }); } - protected List<URL> toUrlsWithEmpty(List<URL> urls) { + private List<URL> toUrlsWithEmpty(List<URL> urls) { if (urls == null) { urls = Collections.emptyList(); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java index 42382f8..27f66f5 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java @@ -86,7 +86,7 @@ public class MetadataUtils { } public static String computeKey(ServiceInstance serviceInstance) { - return serviceInstance.getServiceName() + "##" + serviceInstance.getAddress() + "##" + + return serviceInstance.getServiceName() + "##" + serviceInstance.getId() + "##" + ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java index d217e2d..bca273b 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java @@ -85,7 +85,7 @@ public class RemoteMetadataServiceImpl { SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), ServiceInstanceMetadataUtils.getExportedServicesRevision(instance)); - String registryCluster = instance.getRegistryCluster(); + String registryCluster = instance.getExtendParams().get(REGISTRY_CLUSTER_KEY); checkRemoteConfigured(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java index a9c7b8c..30f1ea1 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/DefaultMigrationAddressComparator.java @@ -39,11 +39,11 @@ public class DefaultMigrationAddressComparator implements MigrationAddressCompar @Override public <T> boolean shouldMigrate(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker, MigrationRule rule) { if (!serviceDiscoveryInvoker.hasProxyInvokers()) { - logger.info("No instance address available, stop compare."); + logger.info("No instance address available, will not migrate."); return false; } if (!invoker.hasProxyInvokers()) { - logger.info("No interface address available, stop compare."); + logger.info("No interface address available, will migrate."); return true; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java index 2e94d3f..15e2434 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java @@ -27,6 +27,7 @@ import org.apache.dubbo.registry.client.migration.model.MigrationStep; import org.apache.dubbo.registry.integration.DynamicDirectory; import org.apache.dubbo.registry.integration.RegistryProtocol; import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Cluster; @@ -35,6 +36,7 @@ import org.apache.dubbo.rpc.cluster.Directory; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.ConsumerModel; +import java.util.List; import java.util.Set; import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY; @@ -286,9 +288,6 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { private volatile boolean invokersChanged; - /** - * Need to know which invoker change triggered this compare. - */ private synchronized void compareAddresses(ClusterInvoker<T> serviceDiscoveryInvoker, ClusterInvoker<T> invoker) { this.invokersChanged = true; if (logger.isDebugEnabled()) { @@ -298,10 +297,10 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { Set<MigrationAddressComparator> detectors = ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances(); if (detectors != null && detectors.stream().allMatch(migrationDetector -> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) { logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to APP Level address"); - destroyInterfaceInvoker(invoker); + discardInterfaceInvokerAddress(invoker); } else { logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " switch to Service Level address"); - destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker); + discardServiceDiscoveryInvokerAddress(serviceDiscoveryInvoker); } } @@ -311,28 +310,26 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker); } if (serviceDiscoveryInvoker != null) { - if (serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) { - if (logger.isInfoEnabled()) { - logger.info("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName()); - } - serviceDiscoveryInvoker.destroy(); + if (logger.isDebugEnabled()) { + logger.debug("Destroying instance address invokers, will not listen for address changes until re-subscribed, " + type.getName()); } + serviceDiscoveryInvoker.destroy(); } } -// protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<T> serviceDiscoveryInvoker) { -// if (this.invoker != null) { -// this.currentAvailableInvoker = this.invoker; -// updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker); -// } -// if (serviceDiscoveryInvoker != null) { -// if (logger.isDebugEnabled()) { -// List<Invoker<T>> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers(); -// logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size())); -// } -//// serviceDiscoveryInvoker.getDirectory().discordAddresses(); -// } -// } + protected synchronized void discardServiceDiscoveryInvokerAddress(ClusterInvoker<T> serviceDiscoveryInvoker) { + if (this.invoker != null) { + this.currentAvailableInvoker = this.invoker; + updateConsumerModel(currentAvailableInvoker, serviceDiscoveryInvoker); + } + if (serviceDiscoveryInvoker != null) { + if (logger.isDebugEnabled()) { + List<Invoker<T>> invokers = serviceDiscoveryInvoker.getDirectory().getAllInvokers(); + logger.debug("Discarding instance addresses, total size " + (invokers == null ? 0 : invokers.size())); + } +// serviceDiscoveryInvoker.getDirectory().discordAddresses(); + } + } protected void refreshServiceDiscoveryInvoker() { clearListener(serviceDiscoveryInvoker); @@ -341,6 +338,8 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { logger.debug("Re-subscribing instance addresses, current interface " + type.getName()); } serviceDiscoveryInvoker = registryProtocol.getServiceDiscoveryInvoker(cluster, registry, type, url); + } else { + ((DynamicDirectory) serviceDiscoveryInvoker.getDirectory()).markInvokersChanged(); } } @@ -353,6 +352,8 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { } invoker = registryProtocol.getInvoker(cluster, registry, type, url); + } else { + ((DynamicDirectory) invoker.getDirectory()).markInvokersChanged(); } } @@ -362,28 +363,26 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { updateConsumerModel(currentAvailableInvoker, invoker); } if (invoker != null) { - if (invoker.getDirectory().isNotificationReceived()) { - if (logger.isInfoEnabled()) { - logger.info("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName()); - } - invoker.destroy(); + if (logger.isDebugEnabled()) { + logger.debug("Destroying interface address invokers, will not listen for address changes until re-subscribed, " + type.getName()); + } + invoker.destroy(); + } + } + + protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) { + if (this.serviceDiscoveryInvoker != null) { + this.currentAvailableInvoker = this.serviceDiscoveryInvoker; + updateConsumerModel(currentAvailableInvoker, invoker); + } + if (invoker != null) { + if (logger.isDebugEnabled()) { + List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers(); + logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size())); } + //invoker.getDirectory().discordAddresses(); } } -// -// protected synchronized void discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) { -// if (this.serviceDiscoveryInvoker != null) { -// this.currentAvailableInvoker = this.serviceDiscoveryInvoker; -// updateConsumerModel(currentAvailableInvoker, invoker); -// } -// if (invoker != null) { -// if (logger.isDebugEnabled()) { -// List<Invoker<T>> invokers = invoker.getDirectory().getAllInvokers(); -// logger.debug("Discarding interface addresses, total address size " + (invokers == null ? 0 : invokers.size())); -// } -// //invoker.getDirectory().discordAddresses(); -// } -// } private void clearListener(ClusterInvoker<T> invoker) { if (invoker == null) return; @@ -411,9 +410,9 @@ public class MigrationInvoker<T> implements MigrationClusterInvoker<T> { if (workingInvoker != null) { consumerModel.getServiceMetadata().addAttribute("currentClusterInvoker", workingInvoker); } -// if (backInvoker != null) { -// consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker); -// } + if (backInvoker != null) { + consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", backInvoker); + } } } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java index b9718f4..5972e06 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleHandler.java @@ -30,8 +30,8 @@ import java.util.Set; @Activate public class MigrationRuleHandler<T> { - public static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration"; private static final Logger logger = LoggerFactory.getLogger(MigrationRuleHandler.class); + private static final String DUBBO_SERVICEDISCOVERY_MIGRATION = "dubbo.application.service-discovery.migration"; private MigrationClusterInvoker<T> migrationInvoker; private MigrationStep currentStep; @@ -119,10 +119,8 @@ public class MigrationRuleHandler<T> { } if (step == MigrationStep.APPLICATION_FIRST) { - setCurrentStepAndThreshold(step, threshold); migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(false); } else if (step == MigrationStep.FORCE_APPLICATION) { - setCurrentStepAndThreshold(step, threshold); migrationInvoker.refreshServiceDiscoveryInvokerOnMappingCallback(true); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java index 85c7349..399a260 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java @@ -67,24 +67,20 @@ public class MigrationRuleListener implements RegistryProtocolListener, Configur public MigrationRuleListener() { this.configuration = ApplicationModel.getEnvironment().getDynamicConfiguration().orElse(null); - - String localRawRule = ApplicationModel.getEnvironment().getLocalMigrationRule(); - String defaultRawRule = StringUtils.isEmpty(localRawRule) ? INIT : localRawRule; - if (this.configuration != null) { logger.info("Listening for migration rules on dataId " + RULE_KEY + ", group " + DUBBO_SERVICEDISCOVERY_MIGRATION); configuration.addListener(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, this); String rawRule = configuration.getConfig(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION); if (StringUtils.isEmpty(rawRule)) { - rawRule = defaultRawRule; + rawRule = INIT; } this.rawRule = rawRule; } else { if (logger.isWarnEnabled()) { logger.warn("Using default configuration rule because config center is not configured!"); } - rawRule = defaultRawRule; + rawRule = INIT; } // process(new ConfigChangedEvent(RULE_KEY, DUBBO_SERVICEDISCOVERY_MIGRATION, rawRule)); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java index c7b0daa..8b29483 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java @@ -251,18 +251,20 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement this.invokersChangedListener = listener; if (invokersChangedListener != null && invokersChanged) { invokersChangedListener.onChange(); + invokersChanged = false; } } protected synchronized void invokersChanged() { invokersChanged = true; - if (invokersChangedListener != null) { + if (invokersChangedListener != null && invokersChanged) { invokersChangedListener.onChange(); + invokersChanged = false; } } - public boolean isNotificationReceived() { - return invokersChanged; + public synchronized void markInvokersChanged() { + this.invokersChanged = true; } protected abstract void destroyAllInvokers(); diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java index e2909ce..85b63de 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/DefaultServiceInstanceTest.java @@ -19,6 +19,7 @@ package org.apache.dubbo.registry.client; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static java.lang.String.valueOf; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URLS_PROPERTY_NAME; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -36,7 +37,7 @@ public class DefaultServiceInstanceTest { public DefaultServiceInstance instance; public static DefaultServiceInstance createInstance() { - DefaultServiceInstance instance = new DefaultServiceInstance("A", "127.0.0.1", 8080); + DefaultServiceInstance instance = new DefaultServiceInstance(valueOf(System.nanoTime()), "A", "127.0.0.1", 8080); instance.getMetadata().put("dubbo.metadata-service.urls", "[ \"dubbo://192.168.0.102:20881/com.alibaba.cloud.dubbo.service.DubboMetadataService?anyhost=true&application=spring-cloud-alibaba-dubbo-provider&bind.ip=192.168.0.102&bind.port=20881&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=spring-cloud-alibaba-dubbo-provider&interface=com.alibaba.cloud.dubbo.service.DubboMetadataService&methods=getAllServiceKeys,getServiceRestMetadata,getExportedURLs,getAllExportedU [...] instance.getMetadata().put("dubbo.metadata-service.url-params", "{\"dubbo\":{\"application\":\"dubbo-provider-demo\",\"deprecated\":\"false\",\"group\":\"dubbo-provider-demo\",\"version\":\"1.0.0\",\"timestamp\":\"1564845042651\",\"dubbo\":\"2.0.2\",\"provider.host\":\"192.168.0.102\",\"provider.port\":\"20880\"}}"); return instance; diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java index 65527df..8677381 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/FileSystemServiceDiscoveryTest.java @@ -20,7 +20,6 @@ import org.apache.dubbo.common.URLBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.createInstance; @@ -30,7 +29,6 @@ import static org.apache.dubbo.registry.client.DefaultServiceInstanceTest.create * * @since 2.7.5 */ -@Disabled("FileSystemServiceDiscovery implementation is not stable enough at present") public class FileSystemServiceDiscoveryTest { private FileSystemServiceDiscovery serviceDiscovery; diff --git a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java index 15bf2d1..084376b 100644 --- a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleServiceDiscovery.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.utils.DefaultPage; import org.apache.dubbo.common.utils.Page; +import org.apache.dubbo.event.ConditionalEventListener; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceDiscoveryFactory; import org.apache.dubbo.registry.client.ServiceInstance; @@ -32,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Function; public class MultipleServiceDiscovery implements ServiceDiscovery { public static final String REGISTRY_PREFIX_KEY = "child."; @@ -88,23 +88,16 @@ public class MultipleServiceDiscovery implements ServiceDiscovery { @Override public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { - MultiServiceInstancesChangedListener multiListener = (MultiServiceInstancesChangedListener) listener; + MultiServiceInstancesChangedListener multiListener = new MultiServiceInstancesChangedListener(listener); for (String registryKey : serviceDiscoveries.keySet()) { - ServiceDiscovery serviceDiscovery = serviceDiscoveries.get(registryKey); - SingleServiceInstancesChangedListener singleListener = multiListener.getAndComputeIfAbsent(registryKey, k -> { - return new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscovery, multiListener); - }); - serviceDiscovery.addServiceInstancesChangedListener(singleListener); + SingleServiceInstancesChangedListener singleListener = new SingleServiceInstancesChangedListener(listener.getServiceNames(), serviceDiscoveries.get(registryKey), multiListener); + multiListener.putSingleListener(registryKey, singleListener); + serviceDiscoveries.get(registryKey).addServiceInstancesChangedListener(singleListener); } } @Override - public ServiceInstancesChangedListener createListener(Set<String> serviceNames) { - return new MultiServiceInstancesChangedListener(serviceNames, this); - } - - @Override public Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly) throws NullPointerException, IllegalArgumentException, UnsupportedOperationException { List<ServiceInstance> serviceInstanceList = new ArrayList<>(); @@ -130,12 +123,17 @@ public class MultipleServiceDiscovery implements ServiceDiscovery { return serviceInstance; } - protected static class MultiServiceInstancesChangedListener extends ServiceInstancesChangedListener { - private final Map<String, SingleServiceInstancesChangedListener> singleListenerMap; + protected static class MultiServiceInstancesChangedListener implements ConditionalEventListener<ServiceInstancesChangedEvent> { + private final ServiceInstancesChangedListener sourceListener; + private final Map<String, SingleServiceInstancesChangedListener> singleListenerMap = new ConcurrentHashMap<>(); - public MultiServiceInstancesChangedListener(Set<String> serviceNames, ServiceDiscovery serviceDiscovery) { - super(serviceNames, serviceDiscovery); - this.singleListenerMap = new ConcurrentHashMap<>(); + public MultiServiceInstancesChangedListener(ServiceInstancesChangedListener sourceListener) { + this.sourceListener = sourceListener; + } + + @Override + public boolean accept(ServiceInstancesChangedEvent event) { + return sourceListener.getServiceNames().contains(event.getServiceName()); } @Override @@ -151,16 +149,12 @@ public class MultipleServiceDiscovery implements ServiceDiscovery { } } - super.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances)); + sourceListener.onEvent(new ServiceInstancesChangedEvent(event.getServiceName(), serviceInstances)); } public void putSingleListener(String registryKey, SingleServiceInstancesChangedListener singleListener) { singleListenerMap.put(registryKey, singleListener); } - - public SingleServiceInstancesChangedListener getAndComputeIfAbsent(String registryKey, Function<String, SingleServiceInstancesChangedListener> func) { - return singleListenerMap.computeIfAbsent(registryKey, func); - } } protected static class SingleServiceInstancesChangedListener extends ServiceInstancesChangedListener { diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java index abc7467..7542b5e 100644 --- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java +++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/util/NacosNamingServiceUtils.java @@ -57,6 +57,7 @@ public class NacosNamingServiceUtils { */ public static Instance toInstance(ServiceInstance serviceInstance) { Instance instance = new Instance(); + instance.setInstanceId(serviceInstance.getId()); instance.setServiceName(serviceInstance.getServiceName()); instance.setIp(serviceInstance.getHost()); instance.setPort(serviceInstance.getPort()); @@ -74,7 +75,8 @@ public class NacosNamingServiceUtils { * @since 2.7.5 */ public static ServiceInstance toServiceInstance(Instance instance) { - DefaultServiceInstance serviceInstance = new DefaultServiceInstance(NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort()); + DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getInstanceId(), + NamingUtils.getServiceName(instance.getServiceName()), instance.getIp(), instance.getPort()); serviceInstance.setMetadata(instance.getMetadata()); serviceInstance.setEnabled(instance.isEnabled()); serviceInstance.setHealthy(instance.isHealthy()); diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java index 549b803..dce7d20 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java @@ -26,7 +26,6 @@ import org.apache.dubbo.common.utils.Page; import org.apache.dubbo.registry.client.AbstractServiceDiscovery; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; -import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import org.apache.dubbo.rpc.RpcException; @@ -41,7 +40,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import static org.apache.dubbo.common.function.ThrowableFunction.execute; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated; @@ -201,28 +199,18 @@ public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery { throw new IllegalStateException("registerServiceWatcher create path=" + path + " fail.", e); } - CountDownLatch latch = new CountDownLatch(1); - ZookeeperServiceDiscoveryChangeWatcher watcher = watcherCaches.computeIfAbsent(path, key -> { - ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch); - try { - curatorFramework.getChildren().usingWatcher(tmpWatcher).forPath(path); - } catch (KeeperException.NoNodeException e) { - // ignored - if (logger.isErrorEnabled()) { - logger.error(e.getMessage()); - } - } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); + CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key -> + new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, listener)); + try { + curatorFramework.getChildren().usingWatcher(watcher).forPath(path); + } catch (KeeperException.NoNodeException e) { + // ignored + if (logger.isErrorEnabled()) { + logger.error(e.getMessage()); } - return tmpWatcher; - }); - watcher.addListener(listener); - listener.onEvent(new ServiceInstancesChangedEvent(serviceName, this.getInstances(serviceName))); - latch.countDown(); - } - - public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher watcher) throws Exception { - curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath()); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } } private String buildServicePath(String serviceName) { diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java index 8cffc3e..e34d3e4 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java @@ -16,10 +16,8 @@ */ package org.apache.dubbo.registry.zookeeper; -import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.registry.RegistryNotifier; import org.apache.dubbo.registry.client.ServiceDiscovery; -import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; @@ -27,10 +25,6 @@ import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; - import static org.apache.dubbo.rpc.model.ApplicationModel.getExecutorRepository; import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged; import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged; @@ -43,7 +37,7 @@ import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged; * @since 2.7.5 */ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher { - private Set<ServiceInstancesChangedListener> listeners = new ConcurrentHashSet<>(); + private ServiceInstancesChangedListener listener; private final ZookeeperServiceDiscovery zookeeperServiceDiscovery; @@ -53,52 +47,34 @@ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher { private final String serviceName; - private final String path; - - private CountDownLatch latch; - public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery zookeeperServiceDiscovery, String serviceName, - String path, - CountDownLatch latch) { + ServiceInstancesChangedListener listener) { this.zookeeperServiceDiscovery = zookeeperServiceDiscovery; this.serviceName = serviceName; - this.path = path; - this.latch = latch; + this.listener = listener; this.notifier = new RegistryNotifier(zookeeperServiceDiscovery.getDelay(), getExecutorRepository().getServiceDiscoveryAddressNotificationExecutor()) { @Override protected void doNotify(Object rawAddresses) { - listeners.forEach(listener -> listener.onEvent((ServiceInstancesChangedEvent)rawAddresses)); + listener.onEvent((ServiceInstancesChangedEvent)rawAddresses); } }; } @Override public void process(WatchedEvent event) throws Exception { - try { - latch.await(); - } catch (InterruptedException e) { - } Watcher.Event.EventType eventType = event.getType(); if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) { if (shouldKeepWatching()) { - zookeeperServiceDiscovery.reRegisterWatcher(this); - List<ServiceInstance> instanceList = zookeeperServiceDiscovery.getInstances(serviceName); - notifier.notify(new ServiceInstancesChangedEvent(serviceName, instanceList)); + notifier.notify(new ServiceInstancesChangedEvent(serviceName, zookeeperServiceDiscovery.getInstances(serviceName))); + zookeeperServiceDiscovery.registerServiceWatcher(serviceName, listener); + zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName); } } } - public String getPath() { - return path; - } - - public void addListener(ServiceInstancesChangedListener listener) { - listeners.add(listener); - } - public boolean shouldKeepWatching() { return keepWatching; } diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java index ed9be54..278aae7 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java @@ -85,7 +85,7 @@ public abstract class CuratorFrameworkUtils { String host = instance.getAddress(); int port = instance.getPort(); ZookeeperInstance zookeeperInstance = instance.getPayload(); - DefaultServiceInstance serviceInstance = new DefaultServiceInstance(name, host, port); + DefaultServiceInstance serviceInstance = new DefaultServiceInstance(instance.getId(), name, host, port); serviceInstance.setMetadata(zookeeperInstance.getMetadata()); return serviceInstance; } diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java index 359d62a..5f3b680 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryTest.java @@ -34,6 +34,7 @@ import java.util.Map; import static java.util.Arrays.asList; import static org.apache.dubbo.common.utils.NetUtils.getAvailablePort; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.INSTANCE_REVISION_UPDATED_KEY; +import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.generateId; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -103,7 +104,7 @@ public class ZookeeperServiceDiscoveryTest { } private DefaultServiceInstance createServiceInstance(String serviceName, String host, int port) { - return new DefaultServiceInstance(serviceName, host, port); + return new DefaultServiceInstance(generateId(host, port), serviceName, host, port); } // @Test diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java deleted file mode 100644 index d0b7848..0000000 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc; - -public interface BaseFilter { - /** - * Make sure call invoker.invoke() in your implementation. - */ - Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; - - interface Listener { - - void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); - - void onError(Throwable t, Invoker<?> invoker, Invocation invocation); - } -} diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java index 74acb51..58e8215 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java @@ -33,32 +33,6 @@ import org.apache.dubbo.common.extension.SPI; * Caching is implemented in dubbo using filter approach. If cache is configured for invocation then before * remote call configured caching type's (e.g. Thread Local, JCache etc) implementation invoke method gets called. * </pre> - * - * Start from 3.0, the semantics of the Filter component at the consumer side has changed. - * Instead of intercepting a specific instance of invoker, Filter in 3.0 now intercepts ClusterInvoker. A new SPI named - * InstanceFilter is introduced to work as the same semantic as Filter in 2.x. - * - * The difference of Filter is as follows: - * - * 3.x Filter - * - * -> InstanceFilter -> Invoker - * - * Proxy -> Filter -> Filter -> ClusterInvoker -> InstanceFilter -> Invoker - * - * -> InstanceFilter -> Invoker - * - * - * 2.x Filter - * - * Filter -> Filter -> Invoker - * - * Proxy -> ClusterInvoker -> Filter -> Filter -> Invoker - * - * Filter -> Filter -> Invoker - * - * If you want to a Filter - * * Filter. (SPI, Singleton, ThreadSafe) * * @see org.apache.dubbo.rpc.filter.GenericFilter @@ -67,5 +41,17 @@ import org.apache.dubbo.common.extension.SPI; * @see org.apache.dubbo.rpc.filter.TpsLimitFilter */ @SPI -public interface Filter extends BaseFilter { +public interface Filter { + /** + * Make sure call invoker.invoke() in your implementation. + */ + Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; + + interface Listener { + + void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); + + void onError(Throwable t, Invoker<?> invoker, Invocation invocation); + } + } \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java similarity index 83% rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java rename to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java index 4a1ed3b..0fead6b 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.rpc.cluster.filter.support; +package org.apache.dubbo.rpc.filter; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.utils.CollectionUtils; @@ -28,7 +28,6 @@ import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.TimeoutCountDown; -import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import java.util.Map; @@ -40,11 +39,11 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_K * ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port * for consumer invoker.It does it to make the requires info available to execution thread's RpcContext. * - * @see Filter + * @see org.apache.dubbo.rpc.Filter * @see RpcContext */ @Activate(group = CONSUMER, order = -10000) -public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Listener { +public class ConsumerContextFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { @@ -77,24 +76,7 @@ public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Liste + invocation.getMethodName() + ", terminate directly."), invocation); } } - - try { - RpcContext.removeServerContext(); - return invoker.invoke(invocation); - } finally { - RpcContext.removeContext(); - } - } - - @Override - public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { - // pass attachments to result - RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments()); - } - - @Override - public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { - + return invoker.invoke(invocation); } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java index 33a9b64..d7e8c44 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java @@ -52,7 +52,7 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractInvoker<T> implements Invoker<T> { - protected static final Logger logger = LoggerFactory.getLogger(AbstractInvoker.class); + protected final Logger logger = LoggerFactory.getLogger(getClass()); private final Class<T> type; diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index 7c526c2..5255f55 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -3,8 +3,10 @@ generic=org.apache.dubbo.rpc.filter.GenericFilter genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter token=org.apache.dubbo.rpc.filter.TokenFilter accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter +activelimit=org.apache.dubbo.rpc.filter.ActiveLimitFilter classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter context=org.apache.dubbo.rpc.filter.ContextFilter +consumercontext=org.apache.dubbo.rpc.filter.ConsumerContextFilter exception=org.apache.dubbo.rpc.filter.ExceptionFilter executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java index ad95481..ee922af 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/filter/FutureFilter.java @@ -20,11 +20,11 @@ import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.AsyncMethodInfo; import org.apache.dubbo.rpc.model.ConsumerModel; @@ -39,7 +39,7 @@ import static org.apache.dubbo.rpc.protocol.dubbo.Constants.ASYNC_METHOD_INFO; * EventFilter */ @Activate(group = CommonConstants.CONSUMER) -public class FutureFilter implements ClusterFilter, ClusterFilter.Listener { +public class FutureFilter implements Filter, Filter.Listener { protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class); diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter index ee41594..79a7a38 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter @@ -1 +1,2 @@ -trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter \ No newline at end of file +trace=org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter +future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter deleted file mode 100644 index 4783214..0000000 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.filter.ClusterFilter +++ /dev/null @@ -1 +0,0 @@ -future=org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java index d34f49a..1a1b0fd 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/FutureFilterTest.java @@ -18,11 +18,11 @@ package org.apache.dubbo.rpc.protocol.dubbo; import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.AppResponse; +import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcInvocation; -import org.apache.dubbo.rpc.cluster.filter.ClusterFilter; import org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter; import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService; @@ -40,7 +40,7 @@ import static org.mockito.Mockito.mock; */ public class FutureFilterTest { private static RpcInvocation invocation; - private ClusterFilter eventFilter = new FutureFilter(); + private Filter eventFilter = new FutureFilter(); @BeforeAll public static void setUp() {
