This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new df61381242 Fix nacos removeServiceInstancesChangedListener (#10440)
df61381242 is described below

commit df613812420d547e14d2295498d79f821aff6316
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Aug 11 11:55:36 2022 +0800

    Fix nacos removeServiceInstancesChangedListener (#10440)
---
 .../registry/nacos/NacosNamingServiceWrapper.java  |  4 ++
 .../registry/nacos/NacosServiceDiscovery.java      | 76 +++++++++++++++++++---
 2 files changed, 70 insertions(+), 10 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
 
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
index 95a109d496..de4dc69480 100644
--- 
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
+++ 
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
@@ -47,6 +47,10 @@ public class NacosNamingServiceWrapper {
         namingService.subscribe(handleInnerSymbol(serviceName), group, 
eventListener);
     }
 
+    public void unsubscribe(String serviceName, String group, EventListener 
eventListener) throws NacosException {
+        namingService.unsubscribe(handleInnerSymbol(serviceName), group, 
eventListener);
+    }
+
     public List<Instance> getAllInstances(String serviceName, String group) 
throws NacosException {
         return namingService.getAllInstances(handleInnerSymbol(serviceName), 
group);
     }
diff --git 
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
index e170e25be8..1fa01d46d9 100644
--- 
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.function.ThrowableFunction;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
 import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
@@ -30,6 +31,8 @@ import 
org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
 import com.alibaba.nacos.api.naming.listener.NamingEvent;
 import com.alibaba.nacos.api.naming.pojo.Instance;
 import com.alibaba.nacos.api.naming.pojo.ListView;
@@ -37,6 +40,7 @@ import com.alibaba.nacos.api.naming.pojo.ListView;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 import static com.alibaba.nacos.api.common.Constants.DEFAULT_GROUP;
@@ -61,6 +65,8 @@ public class NacosServiceDiscovery extends 
AbstractServiceDiscovery {
 
     private static final String NACOS_SD_USE_DEFAULT_GROUP_KEY = 
"dubbo.nacos-service-discovery.use-default-group";
 
+    private final ConcurrentHashMap<String, NacosEventListener> eventListeners 
= new ConcurrentHashMap<>();
+
     public NacosServiceDiscovery(ApplicationModel applicationModel, URL 
registryURL) {
         super(applicationModel, registryURL);
         this.namingService = createNamingService(registryURL);
@@ -114,18 +120,68 @@ public class NacosServiceDiscovery extends 
AbstractServiceDiscovery {
         if (!instanceListeners.add(listener)) {
             return;
         }
-        execute(namingService, service -> 
listener.getServiceNames().forEach(serviceName -> {
-            try {
-                service.subscribe(serviceName, group, e -> { // Register Nacos 
EventListener
-                    if (e instanceof NamingEvent) {
-                        NamingEvent event = (NamingEvent) e;
-                        handleEvent(event, listener);
+        for (String serviceName : listener.getServiceNames()) {
+            NacosEventListener nacosEventListener = 
eventListeners.get(serviceName);
+            if (nacosEventListener != null) {
+                nacosEventListener.addListener(listener);
+            } else {
+                try {
+                    nacosEventListener = new NacosEventListener();
+                    nacosEventListener.addListener(listener);
+                    namingService.subscribe(serviceName, group, 
nacosEventListener);
+                    eventListeners.put(serviceName, nacosEventListener);
+                } catch (NacosException e) {
+                    logger.error("add nacos service instances changed listener 
fail ", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void 
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) 
throws IllegalArgumentException {
+        if (!instanceListeners.remove(listener)) {
+            return;
+        }
+        for (String serviceName : listener.getServiceNames()) {
+            NacosEventListener nacosEventListener = 
eventListeners.get(serviceName);
+            if (nacosEventListener != null) {
+                nacosEventListener.removeListener(listener);
+                if (nacosEventListener.isEmpty()) {
+                    eventListeners.remove(serviceName);
+                    try {
+                        namingService.unsubscribe(serviceName, group, 
nacosEventListener);
+                    } catch (NacosException e) {
+                        logger.error("remove nacos service instances changed 
listener fail ", e);
                     }
-                });
-            } catch (NacosException e) {
-                logger.error("add nacos service instances changed listener 
fail ", e);
+                }
+            }
+        }
+    }
+
+    public class NacosEventListener implements EventListener {
+        private final Set<ServiceInstancesChangedListener> listeners = new 
ConcurrentHashSet<>();
+
+        @Override
+        public void onEvent(Event e) {
+            if (e instanceof NamingEvent) {
+                for (ServiceInstancesChangedListener listener : listeners) {
+                    NamingEvent event = (NamingEvent) e;
+                    handleEvent(event, listener);
+                }
             }
-        }));
+        }
+
+        public void addListener(ServiceInstancesChangedListener listener) {
+            listeners.add(listener);
+        }
+
+        public void removeListener(ServiceInstancesChangedListener listener) {
+            listeners.remove(listener);
+        }
+
+        public boolean isEmpty() {
+            return listeners.isEmpty();
+        }
     }
 
     @Override

Reply via email to