This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 9312264  [Dubbo-SPECIFY-ADDRESS]support v2 ip spec (#179)
9312264 is described below

commit 9312264235fd906d7a5b7a150ae3731b3dcfd82b
Author: wxbty <[email protected]>
AuthorDate: Mon Dec 5 11:16:03 2022 +0800

    [Dubbo-SPECIFY-ADDRESS]support v2 ip spec (#179)
---
 .../specifyaddress/UserSpecifiedAddressRouter.java | 178 ++++++++++++++++++---
 ...bo.common.threadpool.manager.ExecutorRepository |   1 +
 .../UserSpecifiedAddressRouterTest.java            |  47 ++++--
 3 files changed, 187 insertions(+), 39 deletions(-)

diff --git 
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
 
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
index 7c8bd6e..139ce5c 100644
--- 
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
+++ 
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
@@ -17,37 +17,61 @@
 package org.apache.dubbo.rpc.cluster.specifyaddress;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ClassUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class UserSpecifiedAddressRouter extends AbstractRouter {
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+
+
+public class UserSpecifiedAddressRouter<T> extends AbstractRouter {
     private final static Logger logger = 
LoggerFactory.getLogger(UserSpecifiedAddressRouter.class);
     // protected for ut purpose
     protected static int EXPIRE_TIME = 10 * 60 * 1000;
 
-    private volatile List<Invoker<?>> invokers = Collections.emptyList();
-    private volatile Map<String, Invoker<?>> ip2Invoker;
-    private volatile Map<String, Invoker<?>> address2Invoker;
+    private volatile List<Invoker<T>> invokers = Collections.emptyList();
+    private volatile Map<String, Invoker<T>> ip2Invoker;
+    private volatile Map<String, Invoker<T>> address2Invoker;
+    private final Protocol protocol;
 
     private final Lock cacheLock = new ReentrantLock();
 
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final AtomicBoolean launchRemovalTask = new AtomicBoolean(false);
+
+
+    private final Map<URL, InvokerCache<T>> newInvokerCache = new 
LinkedHashMap<>(16, 0.75f, true);
+
     public UserSpecifiedAddressRouter(URL referenceUrl) {
         super(referenceUrl);
+        this.protocol = 
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
+        this.scheduledExecutorService = 
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().nextScheduledExecutor();
     }
 
     @Override
@@ -73,14 +97,14 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
 
         // 2. check if set address url
         if (address.getUrlAddress() != null) {
-            Invoker<?> invoker = getInvokerByURL(address, invocation);
+            Invoker<?> invoker = getInvokerByURL(address);
             result.add((Invoker) invoker);
             return result;
         }
 
         // 3. check if set ip and port
         if (StringUtils.isNotEmpty(address.getIp())) {
-            Invoker<?> invoker = getInvokerByIp(address, invocation);
+            Invoker<?> invoker = getInvokerByIp(address);
             result.add((Invoker) invoker);
             return result;
         }
@@ -88,7 +112,7 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
         return invokers;
     }
 
-    private Invoker<?> getInvokerByURL(Address address, Invocation invocation) 
{
+    private Invoker<?> getInvokerByURL(Address address) {
         tryLoadSpecifiedMap();
 
         // try to find in directory
@@ -112,11 +136,11 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
             }
         }
 
-        // create new one
-        throw new RpcException("User specified server address not support 
refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use 
dubbo-cluster-specify-address-dubbo3.");
+        URL newUrl = rebuildAddress(address, getUrl());
+        return getOrBuildInvokerCache(newUrl);
     }
 
-    public Invoker<?> getInvokerByIp(Address address, Invocation invocation) {
+    public Invoker<?> getInvokerByIp(Address address) {
         tryLoadSpecifiedMap();
 
         String ip = address.getIp();
@@ -136,29 +160,31 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
         }
 
         if (!address.isNeedToCreate()) {
-            throwException(invocation, address);
+            throwException(address);
         }
 
-        throw new RpcException("User specified server address not support 
refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use 
dubbo-cluster-specify-address-dubbo3.");
+        URL newUrl = buildAddress(invokers, address, getUrl());
+        return getOrBuildInvokerCache(newUrl);
     }
 
-    private void throwException(Invocation invocation, Address address) {
+
+    private void throwException(Address address) {
         throw new RpcException("user specified server address : [" + address + 
"] is not a valid provider for service: ["
-            + getUrl().getServiceKey() + "]");
+                + getUrl().getServiceKey() + "]");
     }
 
 
-    private Map<String, Invoker<?>> processIp(List<Invoker<?>> invokerList) {
-        Map<String, Invoker<?>> ip2Invoker = new HashMap<>();
-        for (Invoker<?> invoker : invokerList) {
+    private Map<String, Invoker<T>> processIp(List<Invoker<T>> invokerList) {
+        Map<String, Invoker<T>> ip2Invoker = new HashMap<>();
+        for (Invoker<T> invoker : invokerList) {
             ip2Invoker.put(invoker.getUrl().getHost(), invoker);
         }
         return Collections.unmodifiableMap(ip2Invoker);
     }
 
-    private Map<String, Invoker<?>> processAddress(List<Invoker<?>> addresses) 
{
-        Map<String, Invoker<?>> address2Invoker = new HashMap<>();
-        for (Invoker<?> invoker : addresses) {
+    private Map<String, Invoker<T>> processAddress(List<Invoker<T>> addresses) 
{
+        Map<String, Invoker<T>> address2Invoker = new HashMap<>();
+        for (Invoker<T> invoker : addresses) {
             address2Invoker.put(invoker.getUrl().getHost() + ":" + 
invoker.getUrl().getPort(), invoker);
         }
         return Collections.unmodifiableMap(address2Invoker);
@@ -166,19 +192,19 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
 
     // For ut only
     @Deprecated
-    protected Map<String, Invoker<?>> getIp2Invoker() {
+    protected Map<String, Invoker<T>> getIp2Invoker() {
         return ip2Invoker;
     }
 
     // For ut only
     @Deprecated
-    protected Map<String, Invoker<?>> getAddress2Invoker() {
+    protected Map<String, Invoker<T>> getAddress2Invoker() {
         return address2Invoker;
     }
 
     // For ut only
     @Deprecated
-    protected List<Invoker<?>> getInvokers() {
+    protected List<Invoker<T>> getInvokers() {
         return invokers;
     }
 
@@ -190,7 +216,7 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
             if (ip2Invoker != null) {
                 return;
             }
-            List<Invoker<?>> invokers = this.invokers;
+            List<Invoker<T>> invokers = this.invokers;
             if (CollectionUtils.isEmpty(invokers)) {
                 address2Invoker = Collections.unmodifiableMap(new HashMap<>());
                 ip2Invoker = Collections.unmodifiableMap(new HashMap<>());
@@ -200,4 +226,108 @@ public class UserSpecifiedAddressRouter extends 
AbstractRouter {
             ip2Invoker = processIp(invokers);
         }
     }
+
+
+    public <T> URL buildAddress(List<Invoker<T>> invokers, Address address, 
URL consumerUrl) {
+        if (!invokers.isEmpty()) {
+            URL template = invokers.iterator().next().getUrl();
+            template = template.setHost(address.getIp());
+            if (address.getPort() != 0) {
+                template = template.setPort(address.getPort());
+            }
+            return template;
+        } else {
+            String ip = address.getIp();
+            int port = address.getPort();
+            if (port == 0) {
+                port = 
ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension().getDefaultPort();
+            }
+            return copyConsumerUrl(consumerUrl, ip, port, new HashMap<>());
+        }
+    }
+
+    private URL copyConsumerUrl(URL url, String ip, int port, Map<String, 
String> parameters) {
+        return URLBuilder.from(url)
+                .setHost(ip)
+                .setPort(port)
+                .setProtocol(url.getProtocol() == null ? DUBBO : 
url.getProtocol())
+                .setPath(url.getPath())
+                .clearParameters()
+                .addParameters(parameters)
+                .removeParameter(MONITOR_KEY)
+                .build();
+    }
+
+    public URL rebuildAddress(Address address, URL consumerUrl) {
+        URL url = address.getUrlAddress();
+        Map<String, String> parameters = new HashMap<>(url.getParameters());
+        parameters.put(VERSION_KEY, consumerUrl.getParameter(VERSION_KEY, 
"0.0.0"));
+        parameters.put(GROUP_KEY, consumerUrl.getParameter(GROUP_KEY));
+        parameters.putAll(consumerUrl.getParameters());
+        return copyConsumerUrl(consumerUrl, url.getHost(), 
url.getPort(),parameters);
+    }
+
+    private Invoker<T> getOrBuildInvokerCache(URL url) {
+        logger.info("Unable to find a proper invoker from directory. Try to 
create new invoker. New URL: " + url);
+
+        InvokerCache<T> cache;
+        cacheLock.lock();
+        try {
+            cache = newInvokerCache.get(url);
+        } finally {
+            cacheLock.unlock();
+        }
+        if (cache == null) {
+            Invoker<T> invoker = refer(url);
+            cacheLock.lock();
+            try {
+                cache = newInvokerCache.get(url);
+                if (cache == null) {
+                    cache = new InvokerCache<>(invoker);
+                    newInvokerCache.put(url, cache);
+                    if (launchRemovalTask.compareAndSet(false, true)) {
+                        scheduledExecutorService.scheduleAtFixedRate(new 
RemovalTask(), EXPIRE_TIME / 2, EXPIRE_TIME / 2, TimeUnit.MILLISECONDS);
+                    }
+                } else {
+                    invoker.destroy();
+                }
+            } finally {
+                cacheLock.unlock();
+            }
+        }
+        return cache.getInvoker();
+    }
+
+    private Invoker<T> refer(URL url) {
+
+        try {
+            Class interfaceClass = 
Class.forName(getUrl().getServiceInterface(), true, 
ClassUtils.getClassLoader());
+            return this.protocol.refer(interfaceClass, url);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    private class RemovalTask implements Runnable {
+        @Override
+        public void run() {
+            cacheLock.lock();
+            try {
+                if (newInvokerCache.size() > 0) {
+                    Iterator<Map.Entry<URL, InvokerCache<T>>> iterator = 
newInvokerCache.entrySet().iterator();
+                    while (iterator.hasNext()) {
+                        Map.Entry<URL, InvokerCache<T>> entry = 
iterator.next();
+                        if (System.currentTimeMillis() - 
entry.getValue().getLastAccess() > EXPIRE_TIME) {
+                            iterator.remove();
+                            entry.getValue().getInvoker().destroy();
+                        } else {
+                            break;
+                        }
+                    }
+                }
+            } finally {
+                cacheLock.unlock();
+            }
+        }
+    }
 }
diff --git 
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
 
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
new file mode 100644
index 0000000..44199b0
--- /dev/null
+++ 
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.manager.ExecutorRepository
@@ -0,0 +1 @@
+default=org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
\ No newline at end of file
diff --git 
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
 
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
index 372eb0f..34cef9e 100644
--- 
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
+++ 
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
@@ -37,8 +37,8 @@ public class UserSpecifiedAddressRouterTest {
 
     @BeforeEach
     public void setup() {
-        consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test", 
"Value").addParameter("check", "false")
-            .addParameter("version", "1.0.0").addParameter("group", "Dubbo");
+        consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test", 
"Value").addParameter("check", "false").addParameter("lazy","true")
+                .addParameter("version", "1.0.0").addParameter("group", 
"Dubbo").addParameter("interface", DemoService.class.getName());
     }
 
     @Test
@@ -56,7 +56,7 @@ public class UserSpecifiedAddressRouterTest {
 
         // no address
         Assertions.assertThrows(RpcException.class, () ->
-            userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
+                userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
 
         
Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker());
         Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker());
@@ -72,18 +72,23 @@ public class UserSpecifiedAddressRouterTest {
         UserSpecifiedAddressRouter userSpecifiedAddressRouter = new 
UserSpecifiedAddressRouter(consumerUrl);
 
         Assertions.assertEquals(Collections.emptyList(),
-            userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
+                userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
 
-        UserSpecifiedAddressUtil.setAddress(new 
Address(URL.valueOf("127.0.0.1:20880")));
-        Assertions.assertThrows(RpcException.class, () ->
-            userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
+        UserSpecifiedAddressUtil.setAddress(new 
Address(URL.valueOf("127.0.0.1:20880?lazy=true")));
+        List<Invoker<Object>> invokers = 
userSpecifiedAddressRouter.route(Collections.emptyList(), consumerUrl, 
Mockito.mock(Invocation.class));
+        Assertions.assertEquals(1, invokers.size());
+        Assertions.assertEquals("127.0.0.1", 
invokers.get(0).getUrl().getHost());
+        Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+        Assertions.assertEquals("Value", 
invokers.get(0).getUrl().getParameter("Test"));
+        Assertions.assertEquals(consumerUrl.getParameter("version"), 
invokers.get(0).getUrl().getParameter("version"));
+        Assertions.assertEquals(consumerUrl.getParameter("group"), 
invokers.get(0).getUrl().getParameter("group"));
 
         Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
         
Mockito.when(mockInvoker.getUrl()).thenReturn(URL.valueOf("simple://127.0.0.1:20880?Test1=Value"));
 
         userSpecifiedAddressRouter.notify(new 
LinkedList<>(Collections.singletonList(mockInvoker)));
         UserSpecifiedAddressUtil.setAddress(new 
Address(URL.valueOf("127.0.0.1:20880")));
-        List<Invoker<Object>> invokers = userSpecifiedAddressRouter.route(new 
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, 
Mockito.mock(Invocation.class));
+        invokers = userSpecifiedAddressRouter.route(new 
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, 
Mockito.mock(Invocation.class));
         Assertions.assertEquals(1, invokers.size());
         Assertions.assertEquals(mockInvoker, invokers.get(0));
 
@@ -100,8 +105,15 @@ public class UserSpecifiedAddressRouterTest {
         Assertions.assertEquals(mockInvoker, invokers.get(0));
 
         UserSpecifiedAddressUtil.setAddress(new 
Address(URL.valueOf("127.0.0.1:20880?Test1=Value&Test2=Value&Test3=Value")));
-        Assertions.assertThrows(RpcException.class, () ->
-            userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
+        invokers = userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class));
+        Assertions.assertEquals(1, invokers.size());
+        Assertions.assertEquals("127.0.0.1", 
invokers.get(0).getUrl().getHost());
+        Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+        Assertions.assertEquals("Value", 
invokers.get(0).getUrl().getParameter("Test1"));
+        Assertions.assertEquals("Value", 
invokers.get(0).getUrl().getParameter("Test2"));
+        Assertions.assertEquals("Value", 
invokers.get(0).getUrl().getParameter("Test3"));
+        Assertions.assertEquals(consumerUrl.getParameter("version"), 
invokers.get(0).getUrl().getParameter("version"));
+        Assertions.assertEquals(consumerUrl.getParameter("group"), 
invokers.get(0).getUrl().getParameter("group"));
     }
 
     @Test
@@ -109,7 +121,7 @@ public class UserSpecifiedAddressRouterTest {
         UserSpecifiedAddressRouter userSpecifiedAddressRouter = new 
UserSpecifiedAddressRouter(consumerUrl);
 
         Assertions.assertEquals(Collections.emptyList(),
-            userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
+                userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
 
         Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
         Mockito.when(mockInvoker.getUrl()).thenReturn(consumerUrl);
@@ -128,14 +140,19 @@ public class UserSpecifiedAddressRouterTest {
 
         UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770));
         Assertions.assertThrows(RpcException.class, () ->
-            userSpecifiedAddressRouter.route(new 
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, 
Mockito.mock(Invocation.class)));
+                userSpecifiedAddressRouter.route(new 
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, 
Mockito.mock(Invocation.class)));
 
         UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.3", 20880));
         Assertions.assertThrows(RpcException.class, () ->
-            userSpecifiedAddressRouter.route(new 
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, 
Mockito.mock(Invocation.class)));
+                userSpecifiedAddressRouter.route(new 
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl, 
Mockito.mock(Invocation.class)));
 
         UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770, 
true));
-        Assertions.assertThrows(RpcException.class, () ->
-            userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class)));
+        invokers = userSpecifiedAddressRouter.route(Collections.emptyList(), 
consumerUrl, Mockito.mock(Invocation.class));
+        Assertions.assertEquals(1, invokers.size());
+        Assertions.assertEquals("127.0.0.2", 
invokers.get(0).getUrl().getHost());
+        Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort());
+        Assertions.assertEquals("Value", 
invokers.get(0).getUrl().getParameter("Test"));
+        Assertions.assertEquals(consumerUrl.getParameter("version"), 
invokers.get(0).getUrl().getParameter("version"));
+        Assertions.assertEquals(consumerUrl.getParameter("group"), 
invokers.get(0).getUrl().getParameter("group"));
     }
 }

Reply via email to