This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new eeac1c3eaf Multiple registries (#10323)
eeac1c3eaf is described below

commit eeac1c3eaf0b06be5e1fd275c7447f085f48a48b
Author: ken.lj <[email protected]>
AuthorDate: Fri Sep 9 15:03:37 2022 +0800

    Multiple registries (#10323)
---
 .../dubbo/registry/multiple/MultipleRegistry.java  | 63 ++++++++++++++++++++--
 .../multiple/MultipleRegistry2S2RTest.java         | 19 +++++++
 2 files changed, 79 insertions(+), 3 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
 
b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
index 0b1e5d923e..7a24a40779 100644
--- 
a/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-multiple/src/main/java/org/apache/dubbo/registry/multiple/MultipleRegistry.java
@@ -19,7 +19,10 @@ package org.apache.dubbo.registry.multiple;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.RegistryFactory;
@@ -35,15 +38,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static org.apache.dubbo.common.constants.CommonConstants.CHECK_KEY;
+import static 
org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR;
 import static 
org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
 
 /**
  * MultipleRegistry
  */
 public class MultipleRegistry extends AbstractRegistry {
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(MultipleRegistry.class);
 
     public static final String REGISTRY_FOR_SERVICE = "service-registry";
     public static final String REGISTRY_FOR_REFERENCE = "reference-registry";
+    public static final String REGISTRY_SEPARATOR = "separator";
     private final Map<String, Registry> serviceRegistries = new 
ConcurrentHashMap<>(4);
     private final Map<String, Registry> referenceRegistries = new 
ConcurrentHashMap<>(4);
     private final Map<NotifyListener, MultipleNotifyListenerWrapper> 
multipleNotifyListenerMap = new ConcurrentHashMap<>(32);
@@ -86,7 +92,9 @@ public class MultipleRegistry extends AbstractRegistry {
     }
 
     protected void initServiceRegistry(URL url, Map<String, Registry> 
registryMap) {
-        origServiceRegistryURLs = url.getParameter(REGISTRY_FOR_SERVICE, new 
ArrayList<>());
+        String serviceRegistryString = url.getParameter(REGISTRY_FOR_SERVICE);
+        char separator = url.getParameter(REGISTRY_SEPARATOR, 
COMMA_SEPARATOR).charAt(0);
+        origServiceRegistryURLs = 
StringUtils.splitToList(serviceRegistryString, separator);
         effectServiceRegistryURLs = 
this.filterServiceRegistry(origServiceRegistryURLs);
         for (String tmpUrl : effectServiceRegistryURLs) {
             if (registryMap.get(tmpUrl) != null) {
@@ -101,7 +109,9 @@ public class MultipleRegistry extends AbstractRegistry {
     }
 
     protected void initReferenceRegistry(URL url, Map<String, Registry> 
registryMap) {
-        origReferenceRegistryURLs = url.getParameter(REGISTRY_FOR_REFERENCE, 
new ArrayList<>());
+        String serviceRegistryString = 
url.getParameter(REGISTRY_FOR_REFERENCE);
+        char separator = url.getParameter(REGISTRY_SEPARATOR, 
COMMA_SEPARATOR).charAt(0);
+        origReferenceRegistryURLs = 
StringUtils.splitToList(serviceRegistryString, separator);
         effectReferenceRegistryURLs = 
this.filterReferenceRegistry(origReferenceRegistryURLs);
         for (String tmpUrl : effectReferenceRegistryURLs) {
             if (registryMap.get(tmpUrl) != null) {
@@ -292,15 +302,56 @@ public class MultipleRegistry extends AbstractRegistry {
                     }
                     continue;
                 }
-                notifyURLs.addAll(tmpUrls);
+                URL registryURL = singleNotifyListener.getRegistry().getUrl();
+                aggregateRegistryUrls(notifyURLs, tmpUrls, registryURL);
             }
             // if no notify URL, add empty protocol URL
             if (emptyURL != null && notifyURLs.isEmpty()) {
                 notifyURLs.add(emptyURL);
+                LOGGER.info("No provider after aggregation, notify url with 
EMPTY protocol.");
+            } else {
+                LOGGER.info("Aggregated provider url size " + 
notifyURLs.size());
             }
+
             this.notify(notifyURLs);
         }
 
+        /**
+         * Aggregate urls from different registries into one unified list 
while appending registry specific 'attachments' into each url.
+         *
+         * These 'attachments' can be very useful for traffic management among 
registries.
+         *
+         * @param notifyURLs unified url list
+         * @param singleURLs single registry url list
+         * @param registryURL single registry configuration url
+         */
+        public static void aggregateRegistryUrls(List<URL> notifyURLs, 
List<URL> singleURLs, URL registryURL) {
+            String registryAttachments = 
registryURL.getParameter("attachments");
+            if (StringUtils.isNotBlank(registryAttachments)) {
+                LOGGER.info("Registry attachments " + registryAttachments + " 
found, will append to provider urls, urls size " + singleURLs.size());
+                String[] pairs = registryAttachments.split(COMMA_SEPARATOR);
+                Map<String, String> attachments = new HashMap<>(pairs.length);
+                for (String rawPair : pairs) {
+                   String[] keyValuePair = rawPair.split("=");
+                   if (keyValuePair.length == 2) {
+                       String key = keyValuePair[0];
+                       String value = keyValuePair[1];
+                       attachments.put(key, value);
+                   }
+                }
+
+                for (URL tmpUrl : singleURLs) {
+                    for (Map.Entry<String, String> entry : 
attachments.entrySet()) {
+                        tmpUrl = tmpUrl.addParameterIfAbsent(entry.getKey(), 
entry.getValue());
+                    }
+                    notifyURLs.add(tmpUrl);
+                }
+            } else {
+                LOGGER.info("Single registry " + registryURL + " has url size 
" + singleURLs.size());
+                notifyURLs.addAll(singleURLs);
+            }
+        }
+
         @Override
         public void notify(List<URL> urls) {
             sourceNotifyListener.notify(urls);
@@ -338,5 +389,11 @@ public class MultipleRegistry extends AbstractRegistry {
         public List<URL> getUrlList() {
             return urlList;
         }
+
+        public Registry getRegistry() {
+            return registry;
+        }
+
+
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
 
b/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
index 441956a349..dfa04ecae8 100644
--- 
a/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
+++ 
b/dubbo-registry/dubbo-registry-multiple/src/test/java/org/apache/dubbo/registry/multiple/MultipleRegistry2S2RTest.java
@@ -175,4 +175,23 @@ public class MultipleRegistry2S2RTest {
         Assertions.assertEquals("empty", list.get(0).getProtocol());
     }
 
+    @Test
+    public void testAggregation() {
+        List<URL> result = new ArrayList<URL>();
+        List<URL> listToAggregate = new ArrayList<URL>();
+        URL url1= URL.valueOf("dubbo://127.0.0.1:20880/service1");
+        URL url2= URL.valueOf("dubbo://127.0.0.1:20880/service1");
+        listToAggregate.add(url1);
+        listToAggregate.add(url2);
+
+        URL registryURL = 
URL.valueOf("mock://127.0.0.1/RegistryService?attachments=zone=hangzhou,tag=middleware&enable-empty-protection=false");
+
+        
MultipleRegistry.MultipleNotifyListenerWrapper.aggregateRegistryUrls(result, 
listToAggregate, registryURL);
+
+        Assertions.assertEquals(2, result.size());
+        Assertions.assertEquals(2, result.get(0).getParameters().size());
+        Assertions.assertEquals("hangzhou", 
result.get(0).getParameter("zone"));
+        Assertions.assertEquals("middleware", 
result.get(1).getParameter("tag"));
+    }
+
 }

Reply via email to