This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch dev-metadata in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
commit 91fddde79a718b3223b5caa421c4112bf5744f69 Author: ken.lj <[email protected]> AuthorDate: Wed Oct 17 14:05:47 2018 +0800 Improvements for dynamic config and router: 1. support route in StaticDirectory, multi-registry & group merger 2. tolerance of abnormal configs from configserver. 3. add check policy when failed to connect to configserver --- .../java/org/apache/dubbo/rpc/cluster/Router.java | 4 +- .../org/apache/dubbo/rpc/cluster/RouterChain.java | 14 +- .../rpc/cluster/directory/AbstractDirectory.java | 11 +- .../rpc/cluster/directory/StaticDirectory.java | 15 +- .../dubbo/rpc/cluster/router/AbstractRouter.java | 5 + .../condition/config/ConfigConditionRouter.java | 30 ++- .../router/{ => mock}/MockInvokersSelector.java | 229 +++++++++------------ .../rpc/cluster/router/mock/MockRouterFactory.java | 32 ++- .../dubbo/rpc/cluster/router/tag/TagRouter.java | 13 +- .../cluster/support/AbstractClusterInvoker.java | 6 +- .../dubbo/rpc/cluster/support/ClusterUtils.java | 9 +- .../cluster/support/MergeableClusterInvoker.java | 29 ++- .../rpc/cluster/support/RegistryAwareCluster.java | 30 +-- .../support/RegistryAwareClusterInvoker.java | 59 ++++++ .../internal/org.apache.dubbo.rpc.cluster.Cluster | 3 +- .../org.apache.dubbo.rpc.cluster.RouterFactory | 3 +- .../java/org/apache/dubbo/common/Constants.java | 1 + .../dubbo/common/extension/ExtensionLoader.java | 4 + .../org/apache/dubbo/config/ReferenceConfig.java | 9 +- dubbo-config/dubbo-config-dynamic/pom.xml | 2 +- .../dynamic/AbstractDynamicConfiguration.java | 4 +- .../support/apollo/ApolloDynamicConfiguration.java | 49 ++--- .../archaius/ArchaiusDynamicConfiguration.java | 3 +- .../sources/ZooKeeperConfigurationSource.java | 16 +- .../support/nop/NopDynamicConfiguration.java | 2 +- .../registry/integration/RegistryDirectory.java | 86 +++++--- .../java/org/apache/dubbo/rpc/RpcException.java | 5 + 27 files changed, 375 insertions(+), 298 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java index 65cf69f..3aaf298 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java @@ -63,9 +63,7 @@ public interface Router extends Comparable<Router> { return ""; } - default boolean isRuntime() { - return true; - } + boolean isRuntime(); default String getKey() { return TreeNode.FAILOVER_KEY; diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java index a1cb9d5..d5db399 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java @@ -75,7 +75,7 @@ public class RouterChain<T> { /** * @param methodInvokers * @param url - * @param invocation TODO has no been used yet + * @param invocation TODO has not being used yet */ public void preRoute(Map<String, List<Invoker<T>>> methodInvokers, URL url, Invocation invocation) { if (CollectionUtils.isEmpty(routers)) { @@ -103,7 +103,7 @@ public class RouterChain<T> { }); } - public List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) { + public List<Invoker<T>> route(URL url, Invocation invocation) { List<Invoker<T>> finalInvokers = treeCache.getInvokers(treeCache.getTree(), url, invocation); for (Router router : routers) { if (router.isRuntime()) { @@ -113,6 +113,16 @@ public class RouterChain<T> { return finalInvokers; } + public List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) { + List<Invoker<T>> finalInvokers = invokers; + for (Router router : routers) { + if (router.isRuntime()) { + finalInvokers = router.route(invokers, url, invocation); + } + } + return finalInvokers; + } + public void notifyRuleChanged() { preRoute(this.fullMethodInvokers, url, null); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index 07e6dd3..2cee248 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -68,16 +68,7 @@ public abstract class AbstractDirectory<T> implements Directory<T> { throw new RpcException("Directory already destroyed .url: " + getUrl()); } - List<Invoker<T>> invokers = doList(invocation); - - try { - // Get invokers from cache, only runtime routers will be executed. - return routerChain.route(invokers, getConsumerUrl(), invocation); - } catch (Throwable t) { - logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); - } - - return invokers; + return doList(invocation); } @Override diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java index 9367bdf..154fafe 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java @@ -17,6 +17,8 @@ package org.apache.dubbo.rpc.cluster.directory; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; @@ -31,6 +33,7 @@ import java.util.List; * */ public class StaticDirectory<T> extends AbstractDirectory<T> { + private static final Logger logger = LoggerFactory.getLogger(StaticDirectory.class); private final List<Invoker<T>> invokers; @@ -85,8 +88,16 @@ public class StaticDirectory<T> extends AbstractDirectory<T> { @Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { - - return invokers; + List<Invoker<T>> finalInvokers = invokers; + if (routerChain != null) { + try { + // Get invokers from cache, only runtime routers will be executed. + finalInvokers = routerChain.route(getConsumerUrl(), invocation); + } catch (Throwable t) { + logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); + } + } + return finalInvokers; } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java index f814336..bb76236 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java @@ -74,6 +74,11 @@ public abstract class AbstractRouter implements Router { } @Override + public boolean isRuntime() { + return true; + } + + @Override public boolean isForce() { return force; } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java index 8605afe..1d60ee3 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/ConfigConditionRouter.java @@ -72,23 +72,23 @@ public class ConfigConditionRouter extends AbstractRouter implements Configurati generateAppConditions(); } } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); + throw new IllegalStateException("Failed to init the condition router.", e); } } @Override public void process(ConfigChangeEvent event) { - try { - if (event.getChangeType().equals(ConfigChangeType.DELETED)) { - // Now, we can only recognize if it's a app level or service level change by try to match event key. - if (event.getKey().endsWith(this.url.getParameter(Constants.APPLICATION_KEY) + Constants.ROUTERS_SUFFIX)) { - appRouterRule = null; - conditionRouters.clear(); - } else { - routerRule = null; - appConditionRouters.clear(); - } + if (event.getChangeType().equals(ConfigChangeType.DELETED)) { + // Now, we can only recognize if it's a app level or service level change by try to match event key. + if (event.getKey().endsWith(this.url.getParameter(Constants.APPLICATION_KEY) + Constants.ROUTERS_SUFFIX)) { + appRouterRule = null; + conditionRouters.clear(); } else { + routerRule = null; + appConditionRouters.clear(); + } + } else { + try { if (event.getKey().endsWith(this.url.getParameter(Constants.APPLICATION_KEY) + Constants.ROUTERS_SUFFIX)) { appRouterRule = ConditionRuleParser.parse(event.getNewValue()); generateAppConditions(); @@ -96,12 +96,11 @@ public class ConfigConditionRouter extends AbstractRouter implements Configurati routerRule = ConditionRuleParser.parse(event.getNewValue()); generateConditions(); } + } catch (Exception e) { + logger.error("Failed to parse the raw condition rule and it will not take effect, please check if the condition rule matches with the template, the raw rule is:\n " + event.getNewValue(), e); } - routerChain.notifyRuleChanged(); - } catch (Exception e) { - logger.error(e); - // TODO } + routerChain.notifyRuleChanged(); } @Override @@ -139,7 +138,6 @@ public class ConfigConditionRouter extends AbstractRouter implements Configurati return invokers; } - if (isAppRuleEnabled()) { for (Router router : appConditionRouters) { invokers = router.route(invokers, url, invocation); diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/MockInvokersSelector.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockInvokersSelector.java similarity index 76% rename from dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/MockInvokersSelector.java rename to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockInvokersSelector.java index f775f89..5dc954a 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/MockInvokersSelector.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockInvokersSelector.java @@ -1,133 +1,96 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.rpc.cluster.router; - -import org.apache.dubbo.common.Constants; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.utils.CollectionUtils; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.Router; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A specific Router designed to realize mock feature. - * If a request is configured to use mock, then this router guarantees that only the invokers with protocol MOCK appear in final the invoker list, all other invokers will be excluded. - * - */ -public class MockInvokersSelector implements Router { - - @Override - public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, - URL url, final Invocation invocation) throws RpcException { - if (invocation.getAttachments() == null) { - return getNormalInvokers(invokers); - } else { - String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK); - if (value == null) - return getNormalInvokers(invokers); - else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { - return getMockedInvokers(invokers); - } - } - return invokers; - } - - @Override - public <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { - Map<String, List<Invoker<T>>> map = new HashMap<>(); - - if (CollectionUtils.isEmpty(invokers)) { - return map; - } - - if (isRuntime()) { - map.put(TreeNode.FAILOVER_KEY, invokers); - return map; - } - return map; - } - - @Override - public boolean isRuntime() { - return true; - } - - @Override - public String getKey() { - return TreeNode.FAILOVER_KEY; - } - - @Override - public boolean isForce() { - return false; - } - - private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) { - if (!hasMockProviders(invokers)) { - return null; - } - List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1); - for (Invoker<T> invoker : invokers) { - if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { - sInvokers.add(invoker); - } - } - return sInvokers; - } - - private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) { - if (!hasMockProviders(invokers)) { - return invokers; - } else { - List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size()); - for (Invoker<T> invoker : invokers) { - if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { - sInvokers.add(invoker); - } - } - return sInvokers; - } - } - - private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) { - boolean hasMockProvider = false; - for (Invoker<T> invoker : invokers) { - if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { - hasMockProvider = true; - break; - } - } - return hasMockProvider; - } - - @Override - public URL getUrl() { - return null; - } - - @Override - public int compareTo(Router o) { - return 1; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.router.mock; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.Router; +import org.apache.dubbo.rpc.cluster.router.AbstractRouter; + +import java.util.ArrayList; +import java.util.List; + +/** + * A specific Router designed to realize mock feature. + * If a request is configured to use mock, then this router guarantees that only the invokers with protocol MOCK appear in final the invoker list, all other invokers will be excluded. + * + */ +public class MockInvokersSelector extends AbstractRouter { + + @Override + public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, + URL url, final Invocation invocation) throws RpcException { + if (invocation.getAttachments() == null) { + return getNormalInvokers(invokers); + } else { + String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK); + if (value == null) + return getNormalInvokers(invokers); + else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { + return getMockedInvokers(invokers); + } + } + return invokers; + } + + private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) { + if (!hasMockProviders(invokers)) { + return null; + } + List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1); + for (Invoker<T> invoker : invokers) { + if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { + sInvokers.add(invoker); + } + } + return sInvokers; + } + + private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) { + if (!hasMockProviders(invokers)) { + return invokers; + } else { + List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size()); + for (Invoker<T> invoker : invokers) { + if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { + sInvokers.add(invoker); + } + } + return sInvokers; + } + } + + private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) { + boolean hasMockProvider = false; + for (Invoker<T> invoker : invokers) { + if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { + hasMockProvider = true; + break; + } + } + return hasMockProvider; + } + + @Override + public int compareTo(Router o) { + return 1; + } + +} diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockRouterFactory.java similarity index 57% copy from dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java copy to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockRouterFactory.java index 75e8bf9..79b9b99 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/mock/MockRouterFactory.java @@ -14,33 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.config.dynamic.support.nop; +package org.apache.dubbo.rpc.cluster.router.mock; -import org.apache.dubbo.config.dynamic.AbstractDynamicConfiguration; -import org.apache.dubbo.config.dynamic.ConfigurationListener; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.Activate; +import org.apache.dubbo.config.dynamic.DynamicConfiguration; +import org.apache.dubbo.rpc.cluster.Router; +import org.apache.dubbo.rpc.cluster.RouterFactory; /** * */ -public class NopDynamicConfiguration extends AbstractDynamicConfiguration { +@Activate +public class MockRouterFactory implements RouterFactory { @Override - public void init() { - - } - - @Override - protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) { - return null; - } - - @Override - protected void addTargetListener(String key, Object o) { - + public Router getRouter(URL url) { + return new MockInvokersSelector(); } @Override - protected Object createTargetConfigListener(String key, ConfigurationListener listener) { - return null; + public Router getRouter(DynamicConfiguration dynamicConfiguration, URL url) { + return getRouter(url); } -} +} \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java index c970974..4aadc82 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java @@ -77,9 +77,13 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con if (StringUtils.isEmpty(application)) { logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application in this TagRouter is not specified."); } - String rawRule = this.configuration.getConfig(application + TAGROUTERRULES_DATAID, "dubbo", this); - if (StringUtils.isNotEmpty(rawRule)) { - this.tagRouterRule = TagRuleParser.parse(rawRule); + try { + String rawRule = this.configuration.getConfig(application + TAGROUTERRULES_DATAID, "dubbo", this); + if (StringUtils.isNotEmpty(rawRule)) { + this.tagRouterRule = TagRuleParser.parse(rawRule); + } + } catch (Exception e) { + logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the rule matches with the template, the raw rule is:\n ", e); } } @@ -93,8 +97,7 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con } routerChain.notifyRuleChanged(); } catch (Exception e) { - // TODO - logger.error(e); + logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the rule matches with the template, the raw rule is:\n ", e); } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java index 51fe092..92d2ee7 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java @@ -26,10 +26,10 @@ import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.cluster.Directory; import org.apache.dubbo.rpc.cluster.LoadBalance; import org.apache.dubbo.rpc.support.RpcUtils; @@ -254,7 +254,7 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> { protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) { if (invokers == null || invokers.isEmpty()) { - throw new RpcException("Failed to invoke the method " + throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey() + " from registry " + directory.getUrl().getAddress() diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java index 1c62d9f..5b4b589 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ClusterUtils.java @@ -35,7 +35,6 @@ public class ClusterUtils { Map<String, String> map = new HashMap<String, String>(); Map<String, String> remoteMap = remoteUrl.getParameters(); - if (remoteMap != null && remoteMap.size() > 0) { map.putAll(remoteMap); @@ -66,6 +65,10 @@ public class ClusterUtils { } if (localMap != null && localMap.size() > 0) { + // All providers come to here have been filtered by group, which means only those providers that have the exact same group value with the consumer could come to here. + // So, generally, we don't need to care about the group value here. + // But when comes to group merger, there is an exception, the consumer group may be '*' while the provider group can be empty or any other values. + localMap.remove(Constants.GROUP_KEY); map.putAll(localMap); } if (remoteMap != null && remoteMap.size() > 0) { @@ -78,10 +81,6 @@ public class ClusterUtils { if (version != null && version.length() > 0) { map.put(Constants.VERSION_KEY, version); } - String group = remoteMap.get(Constants.GROUP_KEY); - if (group != null && group.length() > 0) { - map.put(Constants.GROUP_KEY, group); - } String methods = remoteMap.get(Constants.METHODS_KEY); if (methods != null && methods.length() > 0) { map.put(Constants.METHODS_KEY, methods); diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java index 0c68eab..82a0e8a 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvoker.java @@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.RpcResult; import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.LoadBalance; import org.apache.dubbo.rpc.cluster.Merger; import org.apache.dubbo.rpc.cluster.merger.MergerFactory; @@ -47,26 +48,31 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @SuppressWarnings("unchecked") -public class MergeableClusterInvoker<T> implements Invoker<T> { +public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class); - private final Directory<T> directory; private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true)); public MergeableClusterInvoker(Directory<T> directory) { - this.directory = directory; + super(directory); } @Override - @SuppressWarnings("rawtypes") - public Result invoke(final Invocation invocation) throws RpcException { - List<Invoker<T>> invokers = directory.list(invocation); - + protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { + checkInvokers(invokers, invocation); String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY); if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group for (final Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { - return invoker.invoke(invocation); + try { + return invoker.invoke(invocation); + } catch (RpcException e) { + if (e.isNoInvokerAvailableAfterFilter()) { + log.debug("No available provider for service" + directory.getUrl().getServiceKey() + " on group " + invoker.getUrl().getParameter(Constants.GROUP_KEY) + ", will continue to try another group."); + } else { + throw e; + } + } } } return invokers.iterator().next().invoke(invocation); @@ -101,8 +107,8 @@ public class MergeableClusterInvoker<T> implements Invoker<T> { try { Result r = future.get(timeout, TimeUnit.MILLISECONDS); if (r.hasException()) { - log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + - " failed: " + r.getException().getMessage(), + log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + + " failed: " + r.getException().getMessage(), r.getException()); } else { resultList.add(r); @@ -128,7 +134,7 @@ public class MergeableClusterInvoker<T> implements Invoker<T> { try { method = returnType.getMethod(merger, returnType); } catch (NoSuchMethodException e) { - throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + + throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + returnType.getClass().getName() + " ]"); } if (!Modifier.isPublic(method.getModifiers())) { @@ -170,6 +176,7 @@ public class MergeableClusterInvoker<T> implements Invoker<T> { return new RpcResult(result); } + @Override public Class<T> getInterface() { return directory.getInterface(); diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java similarity index 55% copy from dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java copy to dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java index 75e8bf9..81b21a3 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareCluster.java @@ -14,33 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.config.dynamic.support.nop; +package org.apache.dubbo.rpc.cluster.support; -import org.apache.dubbo.config.dynamic.AbstractDynamicConfiguration; -import org.apache.dubbo.config.dynamic.ConfigurationListener; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.Cluster; +import org.apache.dubbo.rpc.cluster.Directory; /** * */ -public class NopDynamicConfiguration extends AbstractDynamicConfiguration { +public class RegistryAwareCluster implements Cluster { - @Override - public void init() { - - } + public final static String NAME = "registryaware"; @Override - protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) { - return null; + public <T> Invoker<T> join(Directory<T> directory) throws RpcException { + return new RegistryAwareClusterInvoker<T>(directory); } - @Override - protected void addTargetListener(String key, Object o) { - - } - - @Override - protected Object createTargetConfigListener(String key, ConfigurationListener listener) { - return null; - } -} +} \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java new file mode 100644 index 0000000..dfd8156 --- /dev/null +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/RegistryAwareClusterInvoker.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.cluster.support; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.cluster.Directory; +import org.apache.dubbo.rpc.cluster.LoadBalance; + +import java.util.List; + +/** + * + */ +public class RegistryAwareClusterInvoker<T> extends AbstractClusterInvoker<T> { + + private static final Logger logger = LoggerFactory.getLogger(RegistryAwareClusterInvoker.class); + + public RegistryAwareClusterInvoker(Directory<T> directory) { + super(directory); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { + // First, pick the invoker (XXXClusterInvoker) that comes from the local registry, distinguish by a 'default' key. + for (Invoker<T> invoker : invokers) { + if (invoker.getUrl().getParameter(Constants.REGISTRY_KEY + "." + Constants.DEFAULT_KEY, false)) { + return invoker.invoke(invocation); + } + } + // If none of the invokers has a local signal, pick the first one available. + for (Invoker<T> invoker : invokers) { + if (invoker.isAvailable()) { + return invoker.invoke(invocation); + } + } + throw new RpcException("No provider available in " + invokers); + } +} diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster index ef212d7..ba30247 100644 --- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.Cluster @@ -6,4 +6,5 @@ failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster available=org.apache.dubbo.rpc.cluster.support.AvailableCluster mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster -broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster \ No newline at end of file +broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster +registryaware=org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster \ No newline at end of file diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory index 3c36311..4a27434 100644 --- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory @@ -2,4 +2,5 @@ file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory configcondition=org.apache.dubbo.rpc.cluster.router.condition.config.ConfigConditionRouterFactory -tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory \ No newline at end of file +tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory +mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory \ No newline at end of file diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index 49b0bd1..8b99c9d 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -61,6 +61,7 @@ public class Constants { public static final String CONFIG_ENV_KEY = "config.env"; public static final String CONFIG_CLUSTER_KEY = "config.cluster"; public static final String CONFIG_NAMESPACE_KEY = "config.namespace"; + public static final String CONFIG_CHECK_KEY = "config.check"; public static final String DEFAULT_CATEGORY = PROVIDERS_CATEGORY; diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java index 10e9fbb..e402a21 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionLoader.java @@ -24,6 +24,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.ConcurrentHashSet; import org.apache.dubbo.common.utils.ConfigUtils; import org.apache.dubbo.common.utils.Holder; +import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.common.utils.StringUtils; import java.io.BufferedReader; @@ -528,6 +529,9 @@ public class ExtensionLoader<T> { && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) { Class<?> pt = method.getParameterTypes()[0]; + if (ReflectUtils.isPrimitives(pt)) { + continue; + } try { String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : ""; Object object = objectFactory.getExtension(pt, property); diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java index 43e3ca0..b6cea16 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java @@ -36,8 +36,8 @@ import org.apache.dubbo.rpc.ProxyFactory; import org.apache.dubbo.rpc.StaticContext; import org.apache.dubbo.rpc.cluster.Cluster; import org.apache.dubbo.rpc.cluster.directory.StaticDirectory; -import org.apache.dubbo.rpc.cluster.support.AvailableCluster; import org.apache.dubbo.rpc.cluster.support.ClusterUtils; +import org.apache.dubbo.rpc.cluster.support.RegistryAwareCluster; import org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol; import org.apache.dubbo.rpc.service.GenericService; import org.apache.dubbo.rpc.support.ProtocolUtils; @@ -404,10 +404,11 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig { } } if (registryURL != null) { // registry url is available - // use AvailableCluster only when register's cluster is available - URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); + // use RegistryAwareCluster only when register's cluster is available + URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME); + // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker invoker = cluster.join(new StaticDirectory(u, invokers)); - } else { // not a registry url + } else { // not a registry url, must be direct invoke. invoker = cluster.join(new StaticDirectory(invokers)); } } diff --git a/dubbo-config/dubbo-config-dynamic/pom.xml b/dubbo-config/dubbo-config-dynamic/pom.xml index 3243be6..b08bc84 100644 --- a/dubbo-config/dubbo-config-dynamic/pom.xml +++ b/dubbo-config/dubbo-config-dynamic/pom.xml @@ -43,7 +43,7 @@ <dependency> <groupId>com.ctrip.framework.apollo</groupId> <artifactId>apollo-client</artifactId> - <version>1.0.0</version> + <version>1.1.1</version> </dependency> <dependency> <groupId>com.netflix.archaius</groupId> diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java index 5b8a189..a75db46 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java +++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/AbstractDynamicConfiguration.java @@ -57,7 +57,7 @@ public abstract class AbstractDynamicConfiguration<TargetConfigListener> impleme if (listener != null) { this.addListener(key, listener); } - return getInternalProperty(key, group, timeout, listener); + return getInternalProperty(key, group, timeout); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } @@ -71,7 +71,7 @@ public abstract class AbstractDynamicConfiguration<TargetConfigListener> impleme this.url = url; } - protected abstract String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener); + protected abstract String getInternalProperty(String key, String group, long timeout); protected abstract void addTargetListener(String key, TargetConfigListener listener); diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java index 489bf47..a70b113 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java +++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/apollo/ApolloDynamicConfiguration.java @@ -19,11 +19,14 @@ package org.apache.dubbo.config.dynamic.support.apollo; import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigChangeListener; import com.ctrip.framework.apollo.ConfigService; +import com.ctrip.framework.apollo.enums.ConfigSourceType; import com.ctrip.framework.apollo.enums.PropertyChangeType; import com.ctrip.framework.apollo.model.ConfigChange; import com.ctrip.framework.apollo.model.ConfigChangeEvent; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.config.dynamic.AbstractDynamicConfiguration; import org.apache.dubbo.config.dynamic.ConfigChangeType; @@ -37,15 +40,13 @@ import java.util.Set; * */ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<ConfigChangeListener> { + private static final Logger logger = LoggerFactory.getLogger(ApolloDynamicConfiguration.class); private static final String APOLLO_ENV_KEY = "env"; private static final String APOLLO_ADDR_KEY = "apollo.meta"; private static final String APOLLO_CLUSTER_KEY = "apollo.cluster"; private static final String APPLO_DEFAULT_NAMESPACE = "dubbo"; - /** - * support two namespaces: application -> dubbo - */ + private Config dubboConfig; - private Config appConfig; public ApolloDynamicConfiguration() { @@ -72,33 +73,28 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<Con } dubboConfig = ConfigService.getConfig(url.getParameter(Constants.CONFIG_NAMESPACE_KEY, APPLO_DEFAULT_NAMESPACE)); - appConfig = ConfigService.getAppConfig(); - } - - @Override - public void addListener(String key, ConfigurationListener listener) { - Set<String> keys = new HashSet<>(1); - keys.add(key); - this.appConfig.addChangeListener(new ApolloListener(listener), keys); - this.dubboConfig.addChangeListener(new ApolloListener(listener), keys); + // Decide to fail or to continue when failed to connect to remote server. + boolean check = url.getParameter(Constants.CONFIG_CHECK_KEY, false); + if (dubboConfig.getSourceType() != ConfigSourceType.REMOTE) { + if (check) { + throw new IllegalStateException("Failed to connect to ConfigCenter, the ConfigCenter is Apollo, the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv)); + } else { + logger.warn("Failed to connect to ConfigCenter, the ConfigCenter is Apollo, " + + "the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv) + + ". will use the local cache value instead before finally connected."); + } + } } @Override - protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) { - // FIXME According to Apollo, if it fails to get a value from one namespace, it will keep logging warning msg. They are working to improve it. - String value = appConfig.getProperty(key, null); - if (value == null) { - value = dubboConfig.getProperty(key, null); - } - - return value; + protected String getInternalProperty(String key, String group, long timeout) { + return dubboConfig.getProperty(key, null); } @Override protected void addTargetListener(String key, ConfigChangeListener listener) { Set<String> keys = new HashSet<>(1); keys.add(key); - this.appConfig.addChangeListener(listener, keys); this.dubboConfig.addChangeListener(listener, keys); } @@ -107,8 +103,8 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<Con return new ApolloListener(listener); } - public ConfigChangeType getChangeType(PropertyChangeType changeType) { - if (changeType.equals(PropertyChangeType.DELETED)) { + public ConfigChangeType getChangeType(ConfigChange change) { + if (change.getChangeType() == PropertyChangeType.DELETED || StringUtils.isEmpty(change.getNewValue())) { return ConfigChangeType.DELETED; } return ConfigChangeType.MODIFIED; @@ -127,16 +123,15 @@ public class ApolloDynamicConfiguration extends AbstractDynamicConfiguration<Con this.listener = listener; } - // FIXME will Apollo consider an empty value "" as deleted? @Override public void onChange(ConfigChangeEvent changeEvent) { for (String key : changeEvent.changedKeys()) { ConfigChange change = changeEvent.getChange(key); // TODO Maybe we no longer need to identify the type of change. Because there's no scenario that a callback will subscribe for both configurators and routers if (change.getPropertyName().endsWith(Constants.CONFIGURATORS_SUFFIX)) { - listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.CONFIGURATORS, getChangeType(change.getChangeType()))); + listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.CONFIGURATORS, getChangeType(change))); } else { - listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.ROUTERS, getChangeType(change.getChangeType()))); + listener.process(new org.apache.dubbo.config.dynamic.ConfigChangeEvent(key, change.getNewValue(), ConfigType.ROUTERS, getChangeType(change))); } } } diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java index feb177a..6639584 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java +++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/ArchaiusDynamicConfiguration.java @@ -48,6 +48,7 @@ public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration<R System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_SOURCE_ADDRESS_KEY, address); } System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_ROOT_PATH_KEY, url.getParameter(Constants.CONFIG_NAMESPACE_KEY, ZooKeeperConfigurationSource.DEFAULT_CONFIG_ROOT_PATH)); + System.setProperty(ZooKeeperConfigurationSource.ARCHAIUS_CONFIG_CHECK_KEY, url.getParameter(Constants.CONFIG_CHECK_KEY, "false")); try { ZooKeeperConfigurationSource zkConfigSource = new ZooKeeperConfigurationSource(); @@ -63,7 +64,7 @@ public class ArchaiusDynamicConfiguration extends AbstractDynamicConfiguration<R } @Override - protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) { + protected String getInternalProperty(String key, String group, long timeout) { return DynamicPropertyFactory.getInstance() .getStringProperty(key, null) .get(); diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java index 41d6861..fdfcba7 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java +++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/archaius/sources/ZooKeeperConfigurationSource.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; public class ZooKeeperConfigurationSource implements WatchedConfigurationSource, Closeable { public static final String ARCHAIUS_SOURCE_ADDRESS_KEY = "archaius.zk.address"; public static final String ARCHAIUS_CONFIG_ROOT_PATH_KEY = "archaius.zk.rootpath"; + public static final String ARCHAIUS_CONFIG_CHECK_KEY = "archaius.zk.check"; public static final String DEFAULT_CONFIG_ROOT_PATH = "/dubbo/config"; private static final Logger logger = LoggerFactory.getLogger(com.netflix.config.source.ZooKeeperConfigurationSource.class); @@ -63,7 +64,7 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource, private List<WatchedUpdateListener> listeners = new CopyOnWriteArrayList<WatchedUpdateListener>(); public ZooKeeperConfigurationSource() { - this(System.getProperty(ARCHAIUS_SOURCE_ADDRESS_KEY), 60 * 1000, 60 * 1000, System.getProperty(ARCHAIUS_CONFIG_ROOT_PATH_KEY, DEFAULT_CONFIG_ROOT_PATH)); + this(System.getProperty(ARCHAIUS_SOURCE_ADDRESS_KEY), 60 * 1000, 10000, System.getProperty(ARCHAIUS_CONFIG_ROOT_PATH_KEY, DEFAULT_CONFIG_ROOT_PATH)); } public ZooKeeperConfigurationSource(int sessionTimeout, int connectTimeout, String configRootPath) { @@ -80,12 +81,17 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource, new ExponentialBackoffRetry(1000, 3)); client.start(); try { - connected = client.blockUntilConnected(connectTimeout * 4, TimeUnit.MILLISECONDS); + connected = client.blockUntilConnected(connectTimeout, TimeUnit.MILLISECONDS); if (!connected) { - logger.warn("Cannot connect to ConfigCenter at zookeeper " + connectString + " in " + connectTimeout * 4 + "ms"); + boolean check = Boolean.parseBoolean(System.getProperty(ARCHAIUS_CONFIG_CHECK_KEY, "false")); + if (check) { + throw new IllegalStateException("Failed to connect to ConfigCenter Zookeeper : " + connectString + " in " + connectTimeout + "ms."); + } else { + logger.warn("Cannot connect to ConfigCenter at zookeeper " + connectString + " in " + connectTimeout + "ms"); + } } } catch (InterruptedException e) { - logger.error("The thread was interrupted unexpectedly when try connecting to zookeeper " + connectString + " as ConfigCenter, ", e); + throw new IllegalStateException("The thread was interrupted unexpectedly when try connecting to zookeeper " + connectString + " as ConfigCenter, ", e); } this.client = client; this.configRootPath = configRootPath; @@ -181,7 +187,7 @@ public class ZooKeeperConfigurationSource implements WatchedConfigurationSource, Map<String, Object> all = new HashMap<>(); if (!connected) { - logger.warn("ConfigServer is not connected yet, zookeeper don't support local snapshot yet, so there's no old data to use!"); + logger.warn("ConfigCenter is not connected yet, zookeeper don't support local snapshot yet, so there's no old data to use!"); return all; } diff --git a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java index 75e8bf9..83f475e 100644 --- a/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java +++ b/dubbo-config/dubbo-config-dynamic/src/main/java/org/apache/dubbo/config/dynamic/support/nop/NopDynamicConfiguration.java @@ -30,7 +30,7 @@ public class NopDynamicConfiguration extends AbstractDynamicConfiguration { } @Override - protected String getInternalProperty(String key, String group, long timeout, ConfigurationListener listener) { + protected String getInternalProperty(String key, String group, long timeout) { return null; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java index c3403df..0270e96 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java @@ -53,7 +53,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -116,13 +115,22 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify this.serviceType = serviceType; this.serviceKey = url.getServiceKey(); this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); - this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY); + this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url); String group = directoryUrl.getParameter(Constants.GROUP_KEY, ""); this.multiGroup = group != null && ("*".equals(group) || group.contains(",")); String methods = queryMap.get(Constants.METHODS_KEY); this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods); } + private URL turnRegistryUrlToConsumerUrl(URL url) { + // save any parameter in registry that will be useful to the new url. + String isDefault = url.getParameter(Constants.DEFAULT_KEY); + if (StringUtils.isNotEmpty(isDefault)) { + queryMap.put(Constants.REGISTRY_KEY + "." + Constants.DEFAULT_KEY, isDefault); + } + return url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY); + } + /** * Convert override urls to map for use when re-refer. * Send all rules every time, the urls will be reassembled and calculated @@ -176,14 +184,24 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify public void subscribe(URL url) { setConsumerUrl(url); - String rawConfig = dynamicConfiguration.getConfig(url.getServiceKey() + Constants.CONFIGURATORS_SUFFIX, "dubbo", this); - String rawConfigApp = dynamicConfiguration.getConfig(url.getParameter(Constants.APPLICATION_KEY) + Constants.CONFIGURATORS_SUFFIX, "dubbo", this); - - if (StringUtils.isNotEmpty(rawConfig)) { - this.dynamicConfigurators = configToConfiguratiors(rawConfig); + String rawConfig = null; + try { + rawConfig = dynamicConfiguration.getConfig(url.getServiceKey() + Constants.CONFIGURATORS_SUFFIX, "dubbo", this); + if (StringUtils.isNotEmpty(rawConfig)) { + this.dynamicConfigurators = configToConfiguratiors(rawConfig); + } + } catch (Exception e) { + logger.error("Failed to load or parse dynamic config (service level), the raw config is: " + rawConfig, e); } - if (StringUtils.isNotEmpty(rawConfigApp)) { - this.appDynamicConfigurators = configToConfiguratiors(rawConfigApp); + + String rawConfigApp = null; + try { + rawConfigApp = dynamicConfiguration.getConfig(url.getParameter(Constants.APPLICATION_KEY) + Constants.CONFIGURATORS_SUFFIX, "dubbo", this); + if (StringUtils.isNotEmpty(rawConfigApp)) { + this.appDynamicConfigurators = configToConfiguratiors(rawConfigApp); + } + } catch (Exception e) { + logger.error("Failed to load or parse dynamic config (application level), the raw config is: " + rawConfigApp, e); } registry.subscribe(url, this); @@ -306,10 +324,12 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString())); return; } + // pre-route and build cache, notice that route cache should build on original Invoker list. + // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers should be routed. + routerChain.notifyFullInvokers(newMethodInvokerMap, getConsumerUrl()); this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; - // Route and build cache - routerChain.notifyFullInvokers(methodInvokerMap, getConsumerUrl()); + try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { @@ -338,7 +358,9 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify } else if (groupMap.size() > 1) { List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>(); for (List<Invoker<T>> groupList : groupMap.values()) { - groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList))); + StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList); + staticDirectory.setRouterChain(routerChain); + groupInvokers.add(cluster.join(staticDirectory)); } result.put(method, groupInvokers); } else { @@ -624,18 +646,27 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } + + if (multiGroup) { + Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference + String methodName = RpcUtils.getMethodName(invocation); + return localMethodInvokerMap.get(methodName); + } + List<Invoker<T>> invokers = null; - Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference + try { + // Get invokers from cache, only runtime routers will be executed. + invokers = routerChain.route(getConsumerUrl(), invocation); + } catch (Throwable t) { + logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); + } + + + // FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null? + /*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { String methodName = RpcUtils.getMethodName(invocation); - Object[] args = RpcUtils.getArguments(invocation); - if (args != null && args.length > 0 && args[0] != null - && (args[0] instanceof String || args[0].getClass().isEnum())) { - invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter - } - if (invokers == null) { - invokers = localMethodInvokerMap.get(methodName); - } + invokers = localMethodInvokerMap.get(methodName); if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } @@ -645,8 +676,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify invokers = iterator.next(); } } - } - return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; + }*/ + return invokers == null ? new ArrayList<>(0) : invokers; } @Override @@ -656,7 +687,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify @Override public void process(ConfigChangeEvent event) { - List<URL> urls; + List<URL> urls = new ArrayList<>(); if (event.getChangeType().equals(ConfigChangeType.DELETED)) { URL url = getConsumerUrl().clearParameters().setProtocol(Constants.EMPTY_PROTOCOL); if (event.getKey().endsWith(this.queryMap.get(APPLICATION_KEY) + CONFIGURATORS_SUFFIX)) { @@ -664,10 +695,13 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify } else { url = url.addParameter(Constants.CATEGORY_KEY, Constants.DYNAMIC_CONFIGURATORS_CATEGORY); } - urls = new ArrayList<>(); urls.add(url); } else { - urls = ConfigParser.parseConfigurators(event.getNewValue()); + try { + urls = ConfigParser.parseConfigurators(event.getNewValue()); + } catch (Exception e) { + logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " + event.getNewValue(), e); + } } notify(urls); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java index 2596912..deb37ca 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java @@ -32,6 +32,7 @@ public /**final**/ class RpcException extends RuntimeException { public static final int BIZ_EXCEPTION = 3; public static final int FORBIDDEN_EXCEPTION = 4; public static final int SERIALIZATION_EXCEPTION = 5; + public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6; private static final long serialVersionUID = 7815426752583648734L; private int code; // RpcException cannot be extended, use error code for exception type to keep compatibility @@ -98,4 +99,8 @@ public /**final**/ class RpcException extends RuntimeException { public boolean isSerialization() { return code == SERIALIZATION_EXCEPTION; } + + public boolean isNoInvokerAvailableAfterFilter() { + return code == NO_INVOKER_AVAILABLE_AFTER_FILTER; + } } \ No newline at end of file
