This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 2a0c981  3.0 instance listener (#8128)
2a0c981 is described below

commit 2a0c981d2091fa0fa0a490ef71575b1bab391d78
Author: ken.lj <[email protected]>
AuthorDate: Thu Jun 24 15:17:19 2021 +0800

    3.0 instance listener (#8128)
---
 .../registry/client/ServiceDiscoveryRegistry.java  | 58 ++++++++--------------
 1 file changed, 21 insertions(+), 37 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index a28c0c5..9937357 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -259,13 +259,10 @@ public class ServiceDiscoveryRegistry implements Registry 
{
             String serviceNamesKey = toStringKeys(serviceNames);
             ServiceInstancesChangedListener instancesChangedListener = 
serviceListeners.get(serviceNamesKey);
             if (instancesChangedListener != null) {
-                String listenerId = createListenerId(url, 
instancesChangedListener);
-
                 instancesChangedListener.removeListener(protocolServiceKey);
                 if (!instancesChangedListener.hasListeners()) {
                     serviceListeners.remove(serviceNamesKey);
                 }
-                registeredListeners.remove(listenerId);
             }
         }
     }
@@ -299,39 +296,30 @@ public class ServiceDiscoveryRegistry implements Registry 
{
         String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR 
+ url.getParameter(PROTOCOL_KEY, DUBBO);
 
         // register ServiceInstancesChangedListener
-        ServiceInstancesChangedListener serviceListener = 
serviceListeners.computeIfAbsent(serviceNamesKey, k -> {
-            ServiceInstancesChangedListener serviceInstancesChangedListener = 
serviceDiscovery.createListener(serviceNames);
-            serviceInstancesChangedListener.setUrl(url);
-            serviceNames.forEach(serviceName -> {
-                List<ServiceInstance> serviceInstances = 
serviceDiscovery.getInstances(serviceName);
-                if (CollectionUtils.isNotEmpty(serviceInstances)) {
-                    serviceInstancesChangedListener.onEvent(new 
ServiceInstancesChangedEvent(serviceName, serviceInstances));
+        boolean serviceListenerRegistered = true;
+        ServiceInstancesChangedListener serviceInstancesChangedListener;
+        synchronized (this) {
+            serviceInstancesChangedListener = 
serviceListeners.get(serviceNamesKey);
+            if (serviceInstancesChangedListener == null) {
+                serviceInstancesChangedListener = 
serviceDiscovery.createListener(serviceNames);
+                serviceInstancesChangedListener.setUrl(url);
+                for (String serviceName : serviceNames) {
+                    List<ServiceInstance> serviceInstances = 
serviceDiscovery.getInstances(serviceName);
+                    if (CollectionUtils.isNotEmpty(serviceInstances)) {
+                        serviceInstancesChangedListener.onEvent(new 
ServiceInstancesChangedEvent(serviceName, serviceInstances));
+                    }
                 }
-            });
-            return serviceInstancesChangedListener;
-        });
-
-        serviceListener.setUrl(url);
-        listener.addServiceListener(serviceListener);
-        serviceListener.addListenerAndNotify(protocolServiceKey, listener);
-        registerServiceInstancesChangedListener(url, serviceListener);
-    }
-
-    /**
-     * Register the {@link ServiceInstancesChangedListener} If absent
-     *
-     * @param url      {@link URL}
-     * @param listener the {@link ServiceInstancesChangedListener}
-     */
-    private void registerServiceInstancesChangedListener(URL url, 
ServiceInstancesChangedListener listener) {
-        String listenerId = createListenerId(url, listener);
-        if (registeredListeners.add(listenerId)) {
-            serviceDiscovery.addServiceInstancesChangedListener(listener);
+                serviceListenerRegistered = false;
+                serviceListeners.put(serviceNamesKey, 
serviceInstancesChangedListener);
+            }
         }
-    }
 
-    private String createListenerId(URL url, ServiceInstancesChangedListener 
listener) {
-        return listener.getServiceNames() + ":" + url.toString(VERSION_KEY, 
GROUP_KEY, PROTOCOL_KEY);
+        serviceInstancesChangedListener.setUrl(url);
+        listener.addServiceListener(serviceInstancesChangedListener);
+        
serviceInstancesChangedListener.addListenerAndNotify(protocolServiceKey, 
listener);
+        if (!serviceListenerRegistered) {
+            
serviceDiscovery.addServiceInstancesChangedListener(serviceInstancesChangedListener);
+        }
     }
 
 //    public void doSubscribe(URL url, NotifyListener listener) {
@@ -417,10 +405,6 @@ public class ServiceDiscoveryRegistry implements Registry {
                 || Objects.equals(protocol, targetURL.getProtocol());
     }
 
-    public Set<String> getRegisteredListeners() {
-        return registeredListeners;
-    }
-
     public Map<String, ServiceInstancesChangedListener> getServiceListeners() {
         return serviceListeners;
     }

Reply via email to