This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 9312264 [Dubbo-SPECIFY-ADDRESS]support v2 ip spec (#179)
9312264 is described below
commit 9312264235fd906d7a5b7a150ae3731b3dcfd82b
Author: wxbty <[email protected]>
AuthorDate: Mon Dec 5 11:16:03 2022 +0800
[Dubbo-SPECIFY-ADDRESS]support v2 ip spec (#179)
---
.../specifyaddress/UserSpecifiedAddressRouter.java | 178 ++++++++++++++++++---
...bo.common.threadpool.manager.ExecutorRepository | 1 +
.../UserSpecifiedAddressRouterTest.java | 47 ++++--
3 files changed, 187 insertions(+), 39 deletions(-)
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
index 7c8bd6e..139ce5c 100644
---
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
@@ -17,37 +17,61 @@
package org.apache.dubbo.rpc.cluster.specifyaddress;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class UserSpecifiedAddressRouter extends AbstractRouter {
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+
+
+public class UserSpecifiedAddressRouter<T> extends AbstractRouter {
private final static Logger logger =
LoggerFactory.getLogger(UserSpecifiedAddressRouter.class);
// protected for ut purpose
protected static int EXPIRE_TIME = 10 * 60 * 1000;
- private volatile List<Invoker<?>> invokers = Collections.emptyList();
- private volatile Map<String, Invoker<?>> ip2Invoker;
- private volatile Map<String, Invoker<?>> address2Invoker;
+ private volatile List<Invoker<T>> invokers = Collections.emptyList();
+ private volatile Map<String, Invoker<T>> ip2Invoker;
+ private volatile Map<String, Invoker<T>> address2Invoker;
+ private final Protocol protocol;
private final Lock cacheLock = new ReentrantLock();
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final AtomicBoolean launchRemovalTask = new AtomicBoolean(false);
+
+
+ private final Map<URL, InvokerCache<T>> newInvokerCache = new
LinkedHashMap<>(16, 0.75f, true);
+
public UserSpecifiedAddressRouter(URL referenceUrl) {
super(referenceUrl);
+ this.protocol =
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
+ this.scheduledExecutorService =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().nextScheduledExecutor();
}
@Override
@@ -73,14 +97,14 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
// 2. check if set address url
if (address.getUrlAddress() != null) {
- Invoker<?> invoker = getInvokerByURL(address, invocation);
+ Invoker<?> invoker = getInvokerByURL(address);
result.add((Invoker) invoker);
return result;
}
// 3. check if set ip and port
if (StringUtils.isNotEmpty(address.getIp())) {
- Invoker<?> invoker = getInvokerByIp(address, invocation);
+ Invoker<?> invoker = getInvokerByIp(address);
result.add((Invoker) invoker);
return result;
}
@@ -88,7 +112,7 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
return invokers;
}
- private Invoker<?> getInvokerByURL(Address address, Invocation invocation)
{
+ private Invoker<?> getInvokerByURL(Address address) {
tryLoadSpecifiedMap();
// try to find in directory
@@ -112,11 +136,11 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
}
}
- // create new one
- throw new RpcException("User specified server address not support
refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use
dubbo-cluster-specify-address-dubbo3.");
+ URL newUrl = rebuildAddress(address, getUrl());
+ return getOrBuildInvokerCache(newUrl);
}
- public Invoker<?> getInvokerByIp(Address address, Invocation invocation) {
+ public Invoker<?> getInvokerByIp(Address address) {
tryLoadSpecifiedMap();
String ip = address.getIp();
@@ -136,29 +160,31 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
}
if (!address.isNeedToCreate()) {
- throwException(invocation, address);
+ throwException(address);
}
- throw new RpcException("User specified server address not support
refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use
dubbo-cluster-specify-address-dubbo3.");
+ URL newUrl = buildAddress(invokers, address, getUrl());
+ return getOrBuildInvokerCache(newUrl);
}
- private void throwException(Invocation invocation, Address address) {
+
+ private void throwException(Address address) {
throw new RpcException("user specified server address : [" + address +
"] is not a valid provider for service: ["
- + getUrl().getServiceKey() + "]");
+ + getUrl().getServiceKey() + "]");
}
- private Map<String, Invoker<?>> processIp(List<Invoker<?>> invokerList) {
- Map<String, Invoker<?>> ip2Invoker = new HashMap<>();
- for (Invoker<?> invoker : invokerList) {
+ private Map<String, Invoker<T>> processIp(List<Invoker<T>> invokerList) {
+ Map<String, Invoker<T>> ip2Invoker = new HashMap<>();
+ for (Invoker<T> invoker : invokerList) {
ip2Invoker.put(invoker.getUrl().getHost(), invoker);
}
return Collections.unmodifiableMap(ip2Invoker);
}
- private Map<String, Invoker<?>> processAddress(List<Invoker<?>> addresses)
{
- Map<String, Invoker<?>> address2Invoker = new HashMap<>();
- for (Invoker<?> invoker : addresses) {
+ private Map<String, Invoker<T>> processAddress(List<Invoker<T>> addresses)
{
+ Map<String, Invoker<T>> address2Invoker = new HashMap<>();
+ for (Invoker<T> invoker : addresses) {
address2Invoker.put(invoker.getUrl().getHost() + ":" +
invoker.getUrl().getPort(), invoker);
}
return Collections.unmodifiableMap(address2Invoker);
@@ -166,19 +192,19 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
// For ut only
@Deprecated
- protected Map<String, Invoker<?>> getIp2Invoker() {
+ protected Map<String, Invoker<T>> getIp2Invoker() {
return ip2Invoker;
}
// For ut only
@Deprecated
- protected Map<String, Invoker<?>> getAddress2Invoker() {
+ protected Map<String, Invoker<T>> getAddress2Invoker() {
return address2Invoker;
}
// For ut only
@Deprecated
- protected List<Invoker<?>> getInvokers() {
+ protected List<Invoker<T>> getInvokers() {
return invokers;
}
@@ -190,7 +216,7 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
if (ip2Invoker != null) {
return;
}
- List<Invoker<?>> invokers = this.invokers;
+ List<Invoker<T>> invokers = this.invokers;
if (CollectionUtils.isEmpty(invokers)) {
address2Invoker = Collections.unmodifiableMap(new HashMap<>());
ip2Invoker = Collections.unmodifiableMap(new HashMap<>());
@@ -200,4 +226,108 @@ public class UserSpecifiedAddressRouter extends
AbstractRouter {
ip2Invoker = processIp(invokers);
}
}
+
+
+ public <T> URL buildAddress(List<Invoker<T>> invokers, Address address,
URL consumerUrl) {
+ if (!invokers.isEmpty()) {
+ URL template = invokers.iterator().next().getUrl();
+ template = template.setHost(address.getIp());
+ if (address.getPort() != 0) {
+ template = template.setPort(address.getPort());
+ }
+ return template;
+ } else {
+ String ip = address.getIp();
+ int port = address.getPort();
+ if (port == 0) {
+ port =
ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension().getDefaultPort();
+ }
+ return copyConsumerUrl(consumerUrl, ip, port, new HashMap<>());
+ }
+ }
+
+ private URL copyConsumerUrl(URL url, String ip, int port, Map<String,
String> parameters) {
+ return URLBuilder.from(url)
+ .setHost(ip)
+ .setPort(port)
+ .setProtocol(url.getProtocol() == null ? DUBBO :
url.getProtocol())
+ .setPath(url.getPath())
+ .clearParameters()
+ .addParameters(parameters)
+ .removeParameter(MONITOR_KEY)
+ .build();
+ }
+
+ public URL rebuildAddress(Address address, URL consumerUrl) {
+ URL url = address.getUrlAddress();
+ Map<String, String> parameters = new HashMap<>(url.getParameters());
+ parameters.put(VERSION_KEY, consumerUrl.getParameter(VERSION_KEY,
"0.0.0"));
+ parameters.put(GROUP_KEY, consumerUrl.getParameter(GROUP_KEY));
+ parameters.putAll(consumerUrl.getParameters());
+ return copyConsumerUrl(consumerUrl, url.getHost(),
url.getPort(),parameters);
+ }
+
+ private Invoker<T> getOrBuildInvokerCache(URL url) {
+ logger.info("Unable to find a proper invoker from directory. Try to
create new invoker. New URL: " + url);
+
+ InvokerCache<T> cache;
+ cacheLock.lock();
+ try {
+ cache = newInvokerCache.get(url);
+ } finally {
+ cacheLock.unlock();
+ }
+ if (cache == null) {
+ Invoker<T> invoker = refer(url);
+ cacheLock.lock();
+ try {
+ cache = newInvokerCache.get(url);
+ if (cache == null) {
+ cache = new InvokerCache<>(invoker);
+ newInvokerCache.put(url, cache);
+ if (launchRemovalTask.compareAndSet(false, true)) {
+ scheduledExecutorService.scheduleAtFixedRate(new
RemovalTask(), EXPIRE_TIME / 2, EXPIRE_TIME / 2, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ invoker.destroy();
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+ return cache.getInvoker();
+ }
+
+ private Invoker<T> refer(URL url) {
+
+ try {
+ Class interfaceClass =
Class.forName(getUrl().getServiceInterface(), true,
ClassUtils.getClassLoader());
+ return this.protocol.refer(interfaceClass, url);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ private class RemovalTask implements Runnable {
+ @Override
+ public void run() {
+ cacheLock.lock();
+ try {
+ if (newInvokerCache.size() > 0) {
+ Iterator<Map.Entry<URL, InvokerCache<T>>> iterator =
newInvokerCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<URL, InvokerCache<T>> entry =
iterator.next();
+ if (System.currentTimeMillis() -
entry.getValue().getLastAccess() > EXPIRE_TIME) {
+ iterator.remove();
+ entry.getValue().getInvoker().destroy();
+ } else {
+ break;
+ }
+ }
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+ }
}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
new file mode 100644
index 0000000..44199b0
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
@@ -0,0 +1 @@
+default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
\ No newline at end of file
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
index 372eb0f..34cef9e 100644
---
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
@@ -37,8 +37,8 @@ public class UserSpecifiedAddressRouterTest {
@BeforeEach
public void setup() {
- consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value").addParameter("check", "false")
- .addParameter("version", "1.0.0").addParameter("group", "Dubbo");
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value").addParameter("check", "false").addParameter("lazy","true")
+ .addParameter("version", "1.0.0").addParameter("group",
"Dubbo").addParameter("interface", DemoService.class.getName());
}
@Test
@@ -56,7 +56,7 @@ public class UserSpecifiedAddressRouterTest {
// no address
Assertions.assertThrows(RpcException.class, () ->
- userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker());
Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker());
@@ -72,18 +72,23 @@ public class UserSpecifiedAddressRouterTest {
UserSpecifiedAddressRouter userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter(consumerUrl);
Assertions.assertEquals(Collections.emptyList(),
- userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
- UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
- Assertions.assertThrows(RpcException.class, () ->
- userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880?lazy=true")));
+ List<Invoker<Object>> invokers =
userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl,
Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getParameter("version"),
invokers.get(0).getUrl().getParameter("version"));
+ Assertions.assertEquals(consumerUrl.getParameter("group"),
invokers.get(0).getUrl().getParameter("group"));
Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
Mockito.when(mockInvoker.getUrl()).thenReturn(URL.valueOf("simple://127.0.0.1:20880?Test1=Value"));
userSpecifiedAddressRouter.notify(new
LinkedList<>(Collections.singletonList(mockInvoker)));
UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
- List<Invoker<Object>> invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
+ invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
Assertions.assertEquals(1, invokers.size());
Assertions.assertEquals(mockInvoker, invokers.get(0));
@@ -100,8 +105,15 @@ public class UserSpecifiedAddressRouterTest {
Assertions.assertEquals(mockInvoker, invokers.get(0));
UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880?Test1=Value&Test2=Value&Test3=Value")));
- Assertions.assertThrows(RpcException.class, () ->
- userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ invokers = userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test1"));
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test2"));
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test3"));
+ Assertions.assertEquals(consumerUrl.getParameter("version"),
invokers.get(0).getUrl().getParameter("version"));
+ Assertions.assertEquals(consumerUrl.getParameter("group"),
invokers.get(0).getUrl().getParameter("group"));
}
@Test
@@ -109,7 +121,7 @@ public class UserSpecifiedAddressRouterTest {
UserSpecifiedAddressRouter userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter(consumerUrl);
Assertions.assertEquals(Collections.emptyList(),
- userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
Mockito.when(mockInvoker.getUrl()).thenReturn(consumerUrl);
@@ -128,14 +140,19 @@ public class UserSpecifiedAddressRouterTest {
UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770));
Assertions.assertThrows(RpcException.class, () ->
- userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class)));
+ userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class)));
UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.3", 20880));
Assertions.assertThrows(RpcException.class, () ->
- userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class)));
+ userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class)));
UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770,
true));
- Assertions.assertThrows(RpcException.class, () ->
- userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ invokers = userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.2",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getParameter("version"),
invokers.get(0).getUrl().getParameter("version"));
+ Assertions.assertEquals(consumerUrl.getParameter("group"),
invokers.get(0).getUrl().getParameter("group"));
}
}