This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new c84511c Merge pull request #3044, code review around RouterChain.
c84511c is described below
commit c84511c6a7a6665dcf03e1dd06cdf077c13c008a
Author: Ian Luo <[email protected]>
AuthorDate: Wed Dec 26 10:29:39 2018 +0800
Merge pull request #3044, code review around RouterChain.
---
.../java/org/apache/dubbo/rpc/cluster/Router.java | 20 ++--
.../org/apache/dubbo/rpc/cluster/RouterChain.java | 110 ++++++++-------------
.../rpc/cluster/directory/AbstractDirectory.java | 2 +-
.../rpc/cluster/directory/StaticDirectory.java | 2 +-
.../dubbo/rpc/cluster/router/AbstractRouter.java | 11 ---
.../config/AbstractConfigConditionRouter.java | 3 -
.../dubbo/rpc/cluster/router/tag/TagRouter.java | 66 +++++--------
.../cluster/router/file/FileRouterEngineTest.java | 3 +-
.../java/com/alibaba/dubbo/rpc/cluster/Router.java | 5 -
.../registry/integration/RegistryDirectory.java | 7 +-
10 files changed, 80 insertions(+), 149 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
index e775714..46a2c49 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Router.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import java.util.List;
-import java.util.Map;
/**
* Router. (SPI, Prototype, ThreadSafe)
@@ -43,24 +42,25 @@ public interface Router extends Comparable<Router> {
/**
* Filter invokers with current routing rule and only return the invokers
that comply with the rule.
*
- * @param invokers
+ * @param invokers invoker list
* @param url refer url
- * @param invocation
+ * @param invocation invocation
* @return routed invokers
* @throws RpcException
*/
<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation
invocation) throws RpcException;
- default <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>>
invokers, URL url, Invocation invocation) throws RpcException {
- return null;
- }
/**
- * Each router has a reference of the router chain.
+ * Notify the router the invoker list. Invoker list may change from time
to time. This method gives the router a
+ * chance to prepare before {@link Router#route(List, URL, Invocation)}
gets called.
*
- * @param routerChain
+ * @param invokers invoker list
+ * @param <T> invoker's type
*/
- void addRouterChain(RouterChain routerChain);
+ default <T> void notify(List<Invoker<T>> invokers) {
+
+ }
/**
* To decide whether this router need to execute every time an RPC comes
or should only execute when addresses or rule change.
@@ -83,4 +83,4 @@ public interface Router extends Comparable<Router> {
* @return
*/
int getPriority();
-}
\ No newline at end of file
+}
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
index 33374af..b370744 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/RouterChain.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.cluster;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
-import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -33,92 +32,78 @@ import java.util.stream.Collectors;
public class RouterChain<T> {
// full list of addresses from registry, classified by method name.
- private List<Invoker<T>> fullInvokers;
+ private List<Invoker<T>> invokers = Collections.emptyList();
private URL url;
// containing all routers, reconstruct every time 'route://' urls change.
- private List<Router> routers = new CopyOnWriteArrayList<>();
- // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the
rule for each instance may change but the instance will never delete or
recreate.
- private List<Router> residentRouters;
+ private volatile List<Router> routers = Collections.emptyList();
- public static <T> RouterChain<T> buildChain(URL url) {
- RouterChain<T> routerChain = new RouterChain<>(url);
- List<RouterFactory> extensionFactories =
ExtensionLoader.getExtensionLoader(RouterFactory.class).getActivateExtension(url,
(String[]) null);
- List<Router> routers = extensionFactories.stream()
- .map(factory -> {
- Router router = factory.getRouter(url);
- router.addRouterChain(routerChain);
- return router;
- }).collect(Collectors.toList());
- routerChain.setResidentRouters(routers);
- return routerChain;
- }
+ // Fixed router instances: ConfigConditionRouter, TagRouter, e.g., the
rule for each instance may change but the
+ // instance will never delete or recreate.
+ private List<Router> builtinRouters = Collections.emptyList();
- protected RouterChain(List<Router> routers) {
- this.routers.addAll(routers);
+ public static <T> RouterChain<T> buildChain(URL url) {
+ return new RouterChain<>(url);
}
- protected RouterChain(URL url) {
+ private RouterChain(URL url) {
this.url = url;
+
+ List<RouterFactory> extensionFactories =
ExtensionLoader.getExtensionLoader(RouterFactory.class)
+ .getActivateExtension(url, (String[]) null);
+
+ List<Router> routers = extensionFactories.stream()
+ .map(factory -> factory.getRouter(url))
+ .collect(Collectors.toList());
+
+ initWithRouters(routers);
}
/**
* the resident routers must being initialized before address notification.
- *
- * @param residentRouters
+ * FIXME: this method should not be public
*/
- public void setResidentRouters(List<Router> residentRouters) {
- this.residentRouters = residentRouters;
- this.routers.addAll(residentRouters);
+ public void initWithRouters(List<Router> builtinRouters) {
+ this.builtinRouters = builtinRouters;
+ this.routers = new CopyOnWriteArrayList<>(builtinRouters);
this.sort();
}
+ public void addRouter(Router router) {
+ this.routers.add(router);
+ }
+
/**
* If we use route:// protocol in version before 2.7.0, each URL will
generate a Router instance,
* so we should keep the routers up to date, that is, each time router
URLs changes, we should update the routers list,
- * only keep the residentRouters which are available all the time and the
latest notified routers which are generated from URLs.
+ * only keep the builtinRouters which are available all the time and the
latest notified routers which are generated from URLs.
*
- * @param generatedRouters routers from 'router://' rules in 2.6.x or
before.
+ * @param routers routers from 'router://' rules in 2.6.x or before.
*/
- public void setGeneratedRouters(List<Router> generatedRouters) {
+ public void addRouters(List<Router> routers) {
+ // FIXME will sort cause concurrent problem? since it's kind of a
write operation.
List<Router> newRouters = new CopyOnWriteArrayList<>();
- newRouters.addAll(residentRouters);
- newRouters.addAll(generatedRouters);
+ newRouters.addAll(builtinRouters);
+ newRouters.addAll(routers);
this.routers = newRouters;
- // FIXME will sort cause concurrent problem? since it's kind of a
write operation.
this.sort();
- /* if (fullInvokers != null) {
- this.preRoute(fullInvokers, url, null);
+ /* if (invokers != null) {
+ this.rebuild(invokers, url, null);
}*/
}
- public void sort() {
+ private void sort() {
Collections.sort(routers);
}
/**
- * TODO
- *
- * Building of router cache can be triggered from within different
threads, for example, registry notification and governance notification.
- * So this operation should be synchronized.
- * @param invokers
- * @param url
- * @param invocation
- */
- public void preRoute(List<Invoker<T>> invokers, URL url, Invocation
invocation) {
- for (Router router : routers) {
- router.preRoute(invokers, url, invocation);
- }
- }
-
- /**
*
* @param url
* @param invocation
* @return
*/
public List<Invoker<T>> route(URL url, Invocation invocation) {
- List<Invoker<T>> finalInvokers = fullInvokers;
+ List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {
// if (router.isRuntime()) {
// finalInvokers = router.route(finalInvokers, url, invocation);
@@ -129,28 +114,13 @@ public class RouterChain<T> {
}
/**
- * When any of the router's rule changed, notify the router chain to
rebuild cache from scratch.
- */
- public void notifyRuleChanged() {
- if (CollectionUtils.isEmpty(this.fullInvokers)) {
- return;
- }
- preRoute(this.fullInvokers, url, null);
- }
-
- /**
* Notify router chain of the initial addresses from registry at the first
time.
* Notify whenever addresses in registry change.
- *
- * @param invokers
- * @param url
*/
- public void notifyFullInvokers(List<Invoker<T>> invokers, URL url) {
- setFullMethodInvokers(invokers);
- preRoute(invokers, url, null);
- }
-
- public void setFullMethodInvokers(List<Invoker<T>> fullInvokers) {
- this.fullInvokers = (fullInvokers == null ? Collections.emptyList() :
fullInvokers);
+ public void setInvokers(List<Invoker<T>> invokers) {
+ if (invokers != null) {
+ this.invokers = invokers;
+ routers.forEach(router -> router.notify(invokers));
+ }
}
}
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 6f3a316..0024227 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -98,7 +98,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
protected void addRouters(List<Router> routers) {
// copy list
routers = routers == null ? new ArrayList<>() : new
ArrayList<>(routers);
- routerChain.setGeneratedRouters(routers);
+ routerChain.addRouters(routers);
}
public URL getConsumerUrl() {
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
index e35a404..e868bd7 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/StaticDirectory.java
@@ -87,7 +87,7 @@ public class StaticDirectory<T> extends AbstractDirectory<T> {
public void buildRouterChain() {
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
- routerChain.notifyFullInvokers(invokers, getUrl());
+ routerChain.setInvokers(invokers);
this.setRouterChain(routerChain);
}
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
index 6192f7a..d19e90f 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/AbstractRouter.java
@@ -19,10 +19,6 @@ package org.apache.dubbo.rpc.cluster.router;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.rpc.cluster.Router;
-import org.apache.dubbo.rpc.cluster.RouterChain;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* TODO Extract more code to here if necessary
@@ -31,7 +27,6 @@ public abstract class AbstractRouter implements Router {
protected int priority;
protected boolean force = false;
protected boolean enabled = true;
- protected List<RouterChain> routerChains = new CopyOnWriteArrayList<>();
protected URL url;
protected DynamicConfiguration configuration;
@@ -53,12 +48,6 @@ public abstract class AbstractRouter implements Router {
this.url = url;
}
- @Override
- public void addRouterChain(RouterChain routerChain) {
- this.routerChains.add(routerChain);
- }
-
-
public void setConfiguration(DynamicConfiguration configuration) {
this.configuration = configuration;
}
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java
index 5f27a29..c5edaba 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/condition/config/AbstractConfigConditionRouter.java
@@ -28,7 +28,6 @@ import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Router;
-import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
import org.apache.dubbo.rpc.cluster.router.condition.ConditionRouter;
import
org.apache.dubbo.rpc.cluster.router.condition.config.model.BlackWhiteListRule;
@@ -73,8 +72,6 @@ public abstract class AbstractConfigConditionRouter extends
AbstractRouter imple
.getValue(), e);
}
}
-
- routerChains.forEach(RouterChain::notifyRuleChanged);
}
@Override
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
index 8843b33..c0dee51 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/router/tag/TagRouter.java
@@ -30,13 +30,11 @@ import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Router;
-import org.apache.dubbo.rpc.cluster.RouterChain;
import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
import org.apache.dubbo.rpc.cluster.router.tag.model.TagRouterRule;
import org.apache.dubbo.rpc.cluster.router.tag.model.TagRuleParser;
import java.util.List;
-import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -51,15 +49,10 @@ public class TagRouter extends AbstractRouter implements
Comparable<Router>, Con
private TagRouterRule tagRouterRule;
private String application;
- private boolean inited = false;
-
public TagRouter(DynamicConfiguration configuration, URL url) {
super(configuration, url);
}
- protected TagRouter() {
- }
-
@Override
public synchronized void process(ConfigChangeEvent event) {
if (logger.isDebugEnabled()) {
@@ -73,9 +66,6 @@ public class TagRouter extends AbstractRouter implements
Comparable<Router>, Con
} else {
this.tagRouterRule = TagRuleParser.parse(event.getValue());
}
-
- routerChains.forEach(RouterChain::notifyRuleChanged);
-
} catch (Exception e) {
logger.error("Failed to parse the raw tag router rule and it will
not take effect, please check if the rule matches with the template, the raw
rule is:\n ", e);
}
@@ -157,25 +147,6 @@ public class TagRouter extends AbstractRouter implements
Comparable<Router>, Con
}
}
- /**
- * This method is reserved for building router cache.
- * Currently, we rely on this method to do the init task since it will get
triggered before route() really happens.
- *
- * @param invokers
- * @param url
- * @param invocation
- * @param <T>
- * @return
- * @throws RpcException
- */
- @Override
- public <T> Map<String, List<Invoker<T>>> preRoute(List<Invoker<T>>
invokers, URL url, Invocation invocation) throws RpcException {
- if (CollectionUtils.isNotEmpty(invokers)) {
- checkAndInit(invokers.get(0).getUrl());
- }
- return super.preRoute(invokers, url, invocation);
- }
-
@Override
public int getPriority() {
return DEFAULT_PRIORITY;
@@ -211,25 +182,34 @@ public class TagRouter extends AbstractRouter implements
Comparable<Router>, Con
this.application = app;
}
- private synchronized void checkAndInit(URL url) {
- String providerApplication =
url.getParameter(Constants.REMOTE_APPLICATION_KEY);
- if (StringUtils.isEmpty(application) ||
!application.equals(providerApplication)) {
- setApplication(providerApplication);
- inited = false;
+ @Override
+ public <T> void notify(List<Invoker<T>> invokers) {
+ if (invokers == null || invokers.isEmpty()) {
+ return;
}
- if (StringUtils.isEmpty(application)) {
- logger.error("TagRouter must getConfig from or subscribe to a
specific application, but the application in this TagRouter is not specified.");
+ Invoker<T> invoker = invokers.get(0);
+ URL url = invoker.getUrl();
+ String providerApplication =
url.getParameter(Constants.REMOTE_APPLICATION_KEY);
+
+ if (StringUtils.isEmpty(providerApplication)) {
+ logger.error("TagRouter must getConfig from or subscribe to a
specific application, but the application " +
+ "in this TagRouter is not specified.");
return;
}
- if (!inited) {
- inited = true;
- String key = application + TAGROUTERRULES_DATAID;
- configuration.addListener(key, this);
- String rawRule = configuration.getConfig(key);
- if (rawRule != null) {
- this.process(new ConfigChangeEvent(key, rawRule));
+ synchronized (this) {
+ if (!providerApplication.equals(application)) {
+ if (!StringUtils.isEmpty(application)) {
+ configuration.removeListener(application +
TAGROUTERRULES_DATAID, this);
+ }
+ String key = providerApplication + TAGROUTERRULES_DATAID;
+ configuration.addListener(key, this);
+ application = providerApplication;
+ String rawRule = configuration.getConfig(key);
+ if (rawRule != null) {
+ this.process(new ConfigChangeEvent(key, rawRule));
+ }
}
}
}
diff --git
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java
index 95a50a0..ba376cb 100644
---
a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java
+++
b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/router/file/FileRouterEngineTest.java
@@ -159,9 +159,10 @@ public class FileRouterEngineTest {
}
private void initDic(URL url) {
+ // FIXME: this exposes the design flaw in RouterChain
dic = new StaticDirectory<>(url, invokers);
dic.buildRouterChain();
-
dic.getRouterChain().setResidentRouters(Arrays.asList(routerFactory.getRouter(url)));
+
dic.getRouterChain().initWithRouters(Arrays.asList(routerFactory.getRouter(url)));
}
static class MockClusterInvoker<T> extends AbstractClusterInvoker<T> {
diff --git
a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java
b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java
index afe3252..2fb30d1 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/cluster/Router.java
@@ -21,7 +21,6 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.cluster.RouterChain;
import java.util.List;
import java.util.stream.Collectors;
@@ -51,10 +50,6 @@ public interface Router extends
org.apache.dubbo.rpc.cluster.Router {
}
@Override
- default void addRouterChain(RouterChain routerChain) {
- }
-
- @Override
default boolean isRuntime() {
return true;
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 8a2bcd6..34093da 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -228,7 +228,7 @@ public class RegistryDirectory<T> extends
AbstractDirectory<T> implements Notify
.getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = null;
- routerChain.notifyFullInvokers(this.invokers, getConsumerUrl());
+ routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
@@ -259,7 +259,7 @@ public class RegistryDirectory<T> extends
AbstractDirectory<T> implements Notify
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new
ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build
on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having
different groups, those wrapped invokers not should be routed.
- routerChain.notifyFullInvokers(newInvokers, getConsumerUrl());
+ routerChain.setInvokers(newInvokers);
// this.methodInvokerMap = multiGroup ?
toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) :
newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
@@ -351,8 +351,7 @@ public class RegistryDirectory<T> extends
AbstractDirectory<T> implements Notify
}
try {
Router router = routerFactory.getRouter(url);
- router.addRouterChain(routerChain);
-// routerChain.addRouter(router);
+ routerChain.addRouter(router);
if (!routers.contains(router)) routers.add(router);
} catch (Throwable t) {
logger.error("convert router url to router error, url: " +
url, t);