This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push: new c84511c Merge pull request #3044, code review around RouterChain. c84511c is described below commit c84511c6a7a6665dcf03e1dd06cdf077c13c008a Author: Ian Luo <ian....@gmail.com> AuthorDate: Wed Dec 26 10:29:39 2018 +0800 Merge pull request #3044, code review around RouterChain. --- .../java/org/apache/dubbo/rpc/cluster/Router.java | 20 ++-- .../org/apache/dubbo/rpc/cluster/RouterChain.java | 110 ++++++++------------- .../rpc/cluster/directory/AbstractDirectory.java | 2 +- .../rpc/cluster/directory/StaticDirectory.java | 2 +- .../dubbo/rpc/cluster/router/AbstractRouter.java | 11 --- .../config/AbstractConfigConditionRouter.java | 3 - .../dubbo/rpc/cluster/router/tag/TagRouter.java | 66 +++++-------- .../cluster/router/file/FileRouterEngineTest.java | 3 +- .../java/com/alibaba/dubbo/rpc/cluster/Router.java | 5 - .../registry/integration/RegistryDirectory.java | 7 +- 10 files changed, 80 insertions(+), 149 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java index e775714..46a2c49 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java @@ -22,7 +22,6 @@ import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; import java.util.List; -import java.util.Map; /** * Router. (SPI, Prototype, ThreadSafe) @@ -43,24 +42,25 @@ public interface Router extends Comparable<Router> { /** * Filter invokers with current routing rule and only return the invokers that comply with the rule. * - * @param invokers + * @param invokers invoker list * @param url refer url - * @param invocation + * @param invocation invocation * @return routed invokers * @throws RpcException */ <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; - default <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { - return null; - } /** - * Each router has a reference of the router chain. + * Notify the router the invoker list. Invoker list may change from time to time. This method gives the router a + * chance to prepare before {@link Router#route(List, URL, Invocation)} gets called. * - * @param routerChain + * @param invokers invoker list + * @param <T> invoker's type */ - void addRouterChain(RouterChain routerChain); + default <T> void notify(List<Invoker<T>> invokers) { + + } /** * To decide whether this router need to execute every time an RPC comes or should only execute when addresses or rule change. @@ -83,4 +83,4 @@ public interface Router extends Comparable<Router> { * @return */ int getPriority(); -} \ No newline at end of file +} diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java index 33374af..b370744 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java @@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.cluster; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -33,92 +32,78 @@ import java.util.stream.Collectors; public class RouterChain<T> { // full list of addresses from registry, classified by method name. - private List<Invoker<T>> fullInvokers; + private List<Invoker<T>> invokers = Collections.emptyList(); private URL url; // containing all routers, reconstruct every time 'route://' urls change. - private List<Router> routers = new CopyOnWriteArrayList<>(); - // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the instance will never delete or recreate. - private List<Router> residentRouters; + private volatile List<Router> routers = Collections.emptyList(); - public static <T> RouterChain<T> buildChain(URL url) { - RouterChain<T> routerChain = new RouterChain<>(url); - List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url, (String[]) null); - List<Router> routers = extensionFactories.stream() - .map(factory -> { - Router router = factory.getRouter(url); - router.addRouterChain(routerChain); - return router; - }).collect(Collectors.toList()); - routerChain.setResidentRouters(routers); - return routerChain; - } + // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the rule for each instance may change but the + // instance will never delete or recreate. + private List<Router> builtinRouters = Collections.emptyList(); - protected RouterChain(List<Router> routers) { - this.routers.addAll(routers); + public static <T> RouterChain<T> buildChain(URL url) { + return new RouterChain<>(url); } - protected RouterChain(URL url) { + private RouterChain(URL url) { this.url = url; + + List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class) + .getActivateExtension(url, (String[]) null); + + List<Router> routers = extensionFactories.stream() + .map(factory -> factory.getRouter(url)) + .collect(Collectors.toList()); + + initWithRouters(routers); } /** * the resident routers must being initialized before address notification. - * - * @param residentRouters + * FIXME: this method should not be public */ - public void setResidentRouters(List<Router> residentRouters) { - this.residentRouters = residentRouters; - this.routers.addAll(residentRouters); + public void initWithRouters(List<Router> builtinRouters) { + this.builtinRouters = builtinRouters; + this.routers = new CopyOnWriteArrayList<>(builtinRouters); this.sort(); } + public void addRouter(Router router) { + this.routers.add(router); + } + /** * If we use route:// protocol in version before 2.7.0, each URL will generate a Router instance, * so we should keep the routers up to date, that is, each time router URLs changes, we should update the routers list, - * only keep the residentRouters which are available all the time and the latest notified routers which are generated from URLs. + * only keep the builtinRouters which are available all the time and the latest notified routers which are generated from URLs. * - * @param generatedRouters routers from 'router://' rules in 2.6.x or before. + * @param routers routers from 'router://' rules in 2.6.x or before. */ - public void setGeneratedRouters(List<Router> generatedRouters) { + public void addRouters(List<Router> routers) { + // FIXME will sort cause concurrent problem? since it's kind of a write operation. List<Router> newRouters = new CopyOnWriteArrayList<>(); - newRouters.addAll(residentRouters); - newRouters.addAll(generatedRouters); + newRouters.addAll(builtinRouters); + newRouters.addAll(routers); this.routers = newRouters; - // FIXME will sort cause concurrent problem? since it's kind of a write operation. this.sort(); - /* if (fullInvokers != null) { - this.preRoute(fullInvokers, url, null); + /* if (invokers != null) { + this.rebuild(invokers, url, null); }*/ } - public void sort() { + private void sort() { Collections.sort(routers); } /** - * TODO - * - * Building of router cache can be triggered from within different threads, for example, registry notification and governance notification. - * So this operation should be synchronized. - * @param invokers - * @param url - * @param invocation - */ - public void preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) { - for (Router router : routers) { - router.preRoute(invokers, url, invocation); - } - } - - /** * * @param url * @param invocation * @return */ public List<Invoker<T>> route(URL url, Invocation invocation) { - List<Invoker<T>> finalInvokers = fullInvokers; + List<Invoker<T>> finalInvokers = invokers; for (Router router : routers) { // if (router.isRuntime()) { // finalInvokers = router.route(finalInvokers, url, invocation); @@ -129,28 +114,13 @@ public class RouterChain<T> { } /** - * When any of the router's rule changed, notify the router chain to rebuild cache from scratch. - */ - public void notifyRuleChanged() { - if (CollectionUtils.isEmpty(this.fullInvokers)) { - return; - } - preRoute(this.fullInvokers, url, null); - } - - /** * Notify router chain of the initial addresses from registry at the first time. * Notify whenever addresses in registry change. - * - * @param invokers - * @param url */ - public void notifyFullInvokers(List<Invoker<T>> invokers, URL url) { - setFullMethodInvokers(invokers); - preRoute(invokers, url, null); - } - - public void setFullMethodInvokers(List<Invoker<T>> fullInvokers) { - this.fullInvokers = (fullInvokers == null ? Collections.emptyList() : fullInvokers); + public void setInvokers(List<Invoker<T>> invokers) { + if (invokers != null) { + this.invokers = invokers; + routers.forEach(router -> router.notify(invokers)); + } } } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index 6f3a316..0024227 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -98,7 +98,7 @@ public abstract class AbstractDirectory<T> implements Directory<T> { protected void addRouters(List<Router> routers) { // copy list routers = routers == null ? new ArrayList<>() : new ArrayList<>(routers); - routerChain.setGeneratedRouters(routers); + routerChain.addRouters(routers); } public URL getConsumerUrl() { diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java index e35a404..e868bd7 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java @@ -87,7 +87,7 @@ public class StaticDirectory<T> extends AbstractDirectory<T> { public void buildRouterChain() { RouterChain<T> routerChain = RouterChain.buildChain(getUrl()); - routerChain.notifyFullInvokers(invokers, getUrl()); + routerChain.setInvokers(invokers); this.setRouterChain(routerChain); } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java index 6192f7a..d19e90f 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java @@ -19,10 +19,6 @@ package org.apache.dubbo.rpc.cluster.router; import org.apache.dubbo.common.URL; import org.apache.dubbo.configcenter.DynamicConfiguration; import org.apache.dubbo.rpc.cluster.Router; -import org.apache.dubbo.rpc.cluster.RouterChain; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; /** * TODO Extract more code to here if necessary @@ -31,7 +27,6 @@ public abstract class AbstractRouter implements Router { protected int priority; protected boolean force = false; protected boolean enabled = true; - protected List<RouterChain> routerChains = new CopyOnWriteArrayList<>(); protected URL url; protected DynamicConfiguration configuration; @@ -53,12 +48,6 @@ public abstract class AbstractRouter implements Router { this.url = url; } - @Override - public void addRouterChain(RouterChain routerChain) { - this.routerChains.add(routerChain); - } - - public void setConfiguration(DynamicConfiguration configuration) { this.configuration = configuration; } diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java index 5f27a29..c5edaba 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java @@ -28,7 +28,6 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Router; -import org.apache.dubbo.rpc.cluster.RouterChain; import org.apache.dubbo.rpc.cluster.router.AbstractRouter; import org.apache.dubbo.rpc.cluster.router.condition.ConditionRouter; import org.apache.dubbo.rpc.cluster.router.condition.config.model.BlackWhiteListRule; @@ -73,8 +72,6 @@ public abstract class AbstractConfigConditionRouter extends AbstractRouter imple .getValue(), e); } } - - routerChains.forEach(RouterChain::notifyRuleChanged); } @Override diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java index 8843b33..c0dee51 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java @@ -30,13 +30,11 @@ import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Router; -import org.apache.dubbo.rpc.cluster.RouterChain; import org.apache.dubbo.rpc.cluster.router.AbstractRouter; import org.apache.dubbo.rpc.cluster.router.tag.model.TagRouterRule; import org.apache.dubbo.rpc.cluster.router.tag.model.TagRuleParser; import java.util.List; -import java.util.Map; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -51,15 +49,10 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con private TagRouterRule tagRouterRule; private String application; - private boolean inited = false; - public TagRouter(DynamicConfiguration configuration, URL url) { super(configuration, url); } - protected TagRouter() { - } - @Override public synchronized void process(ConfigChangeEvent event) { if (logger.isDebugEnabled()) { @@ -73,9 +66,6 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con } else { this.tagRouterRule = TagRuleParser.parse(event.getValue()); } - - routerChains.forEach(RouterChain::notifyRuleChanged); - } catch (Exception e) { logger.error("Failed to parse the raw tag router rule and it will not take effect, please check if the rule matches with the template, the raw rule is:\n ", e); } @@ -157,25 +147,6 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con } } - /** - * This method is reserved for building router cache. - * Currently, we rely on this method to do the init task since it will get triggered before route() really happens. - * - * @param invokers - * @param url - * @param invocation - * @param <T> - * @return - * @throws RpcException - */ - @Override - public <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { - if (CollectionUtils.isNotEmpty(invokers)) { - checkAndInit(invokers.get(0).getUrl()); - } - return super.preRoute(invokers, url, invocation); - } - @Override public int getPriority() { return DEFAULT_PRIORITY; @@ -211,25 +182,34 @@ public class TagRouter extends AbstractRouter implements Comparable<Router>, Con this.application = app; } - private synchronized void checkAndInit(URL url) { - String providerApplication = url.getParameter(Constants.REMOTE_APPLICATION_KEY); - if (StringUtils.isEmpty(application) || !application.equals(providerApplication)) { - setApplication(providerApplication); - inited = false; + @Override + public <T> void notify(List<Invoker<T>> invokers) { + if (invokers == null || invokers.isEmpty()) { + return; } - if (StringUtils.isEmpty(application)) { - logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application in this TagRouter is not specified."); + Invoker<T> invoker = invokers.get(0); + URL url = invoker.getUrl(); + String providerApplication = url.getParameter(Constants.REMOTE_APPLICATION_KEY); + + if (StringUtils.isEmpty(providerApplication)) { + logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application " + + "in this TagRouter is not specified."); return; } - if (!inited) { - inited = true; - String key = application + TAGROUTERRULES_DATAID; - configuration.addListener(key, this); - String rawRule = configuration.getConfig(key); - if (rawRule != null) { - this.process(new ConfigChangeEvent(key, rawRule)); + synchronized (this) { + if (!providerApplication.equals(application)) { + if (!StringUtils.isEmpty(application)) { + configuration.removeListener(application + TAGROUTERRULES_DATAID, this); + } + String key = providerApplication + TAGROUTERRULES_DATAID; + configuration.addListener(key, this); + application = providerApplication; + String rawRule = configuration.getConfig(key); + if (rawRule != null) { + this.process(new ConfigChangeEvent(key, rawRule)); + } } } } diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java index 95a50a0..ba376cb 100644 --- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java +++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java @@ -159,9 +159,10 @@ public class FileRouterEngineTest { } private void initDic(URL url) { + // FIXME: this exposes the design flaw in RouterChain dic = new StaticDirectory<>(url, invokers); dic.buildRouterChain(); - dic.getRouterChain().setResidentRouters(Arrays.asList(routerFactory.getRouter(url))); + dic.getRouterChain().initWithRouters(Arrays.asList(routerFactory.getRouter(url))); } static class MockClusterInvoker<T> extends AbstractClusterInvoker<T> { diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java index afe3252..2fb30d1 100644 --- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java +++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java @@ -21,7 +21,6 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.cluster.RouterChain; import java.util.List; import java.util.stream.Collectors; @@ -51,10 +50,6 @@ public interface Router extends org.apache.dubbo.rpc.cluster.Router { } @Override - default void addRouterChain(RouterChain routerChain) { - } - - @Override default boolean isRuntime() { return true; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java index 8a2bcd6..34093da 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java @@ -228,7 +228,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify .getProtocol())) { this.forbidden = true; // Forbid to access this.invokers = null; - routerChain.notifyFullInvokers(this.invokers, getConsumerUrl()); + routerChain.setInvokers(this.invokers); destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access @@ -259,7 +259,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values())); // pre-route and build cache, notice that route cache should build on original Invoker list. // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed. - routerChain.notifyFullInvokers(newInvokers, getConsumerUrl()); + routerChain.setInvokers(newInvokers); // this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; @@ -351,8 +351,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify } try { Router router = routerFactory.getRouter(url); - router.addRouterChain(routerChain); -// routerChain.addRouter(router); + routerChain.addRouter(router); if (!routers.contains(router)) routers.add(router); } catch (Throwable t) { logger.error("convert router url to router error, url: " + url, t);