This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new df61381242 Fix nacos removeServiceInstancesChangedListener (#10440)
df61381242 is described below
commit df613812420d547e14d2295498d79f821aff6316
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Aug 11 11:55:36 2022 +0800
Fix nacos removeServiceInstancesChangedListener (#10440)
---
.../registry/nacos/NacosNamingServiceWrapper.java | 4 ++
.../registry/nacos/NacosServiceDiscovery.java | 76 +++++++++++++++++++---
2 files changed, 70 insertions(+), 10 deletions(-)
diff --git
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
index 95a109d496..de4dc69480 100644
---
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
+++
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
@@ -47,6 +47,10 @@ public class NacosNamingServiceWrapper {
namingService.subscribe(handleInnerSymbol(serviceName), group,
eventListener);
}
+ public void unsubscribe(String serviceName, String group, EventListener
eventListener) throws NacosException {
+ namingService.unsubscribe(handleInnerSymbol(serviceName), group,
eventListener);
+ }
+
public List<Instance> getAllInstances(String serviceName, String group)
throws NacosException {
return namingService.getAllInstances(handleInnerSymbol(serviceName),
group);
}
diff --git
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
index e170e25be8..1fa01d46d9 100644
---
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.function.ThrowableFunction;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
@@ -30,6 +31,8 @@ import
org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils;
import org.apache.dubbo.rpc.model.ApplicationModel;
import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
@@ -37,6 +40,7 @@ import com.alibaba.nacos.api.naming.pojo.ListView;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.alibaba.nacos.api.common.Constants.DEFAULT_GROUP;
@@ -61,6 +65,8 @@ public class NacosServiceDiscovery extends
AbstractServiceDiscovery {
private static final String NACOS_SD_USE_DEFAULT_GROUP_KEY =
"dubbo.nacos-service-discovery.use-default-group";
+ private final ConcurrentHashMap<String, NacosEventListener> eventListeners
= new ConcurrentHashMap<>();
+
public NacosServiceDiscovery(ApplicationModel applicationModel, URL
registryURL) {
super(applicationModel, registryURL);
this.namingService = createNamingService(registryURL);
@@ -114,18 +120,68 @@ public class NacosServiceDiscovery extends
AbstractServiceDiscovery {
if (!instanceListeners.add(listener)) {
return;
}
- execute(namingService, service ->
listener.getServiceNames().forEach(serviceName -> {
- try {
- service.subscribe(serviceName, group, e -> { // Register Nacos
EventListener
- if (e instanceof NamingEvent) {
- NamingEvent event = (NamingEvent) e;
- handleEvent(event, listener);
+ for (String serviceName : listener.getServiceNames()) {
+ NacosEventListener nacosEventListener =
eventListeners.get(serviceName);
+ if (nacosEventListener != null) {
+ nacosEventListener.addListener(listener);
+ } else {
+ try {
+ nacosEventListener = new NacosEventListener();
+ nacosEventListener.addListener(listener);
+ namingService.subscribe(serviceName, group,
nacosEventListener);
+ eventListeners.put(serviceName, nacosEventListener);
+ } catch (NacosException e) {
+ logger.error("add nacos service instances changed listener
fail ", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws IllegalArgumentException {
+ if (!instanceListeners.remove(listener)) {
+ return;
+ }
+ for (String serviceName : listener.getServiceNames()) {
+ NacosEventListener nacosEventListener =
eventListeners.get(serviceName);
+ if (nacosEventListener != null) {
+ nacosEventListener.removeListener(listener);
+ if (nacosEventListener.isEmpty()) {
+ eventListeners.remove(serviceName);
+ try {
+ namingService.unsubscribe(serviceName, group,
nacosEventListener);
+ } catch (NacosException e) {
+ logger.error("remove nacos service instances changed
listener fail ", e);
}
- });
- } catch (NacosException e) {
- logger.error("add nacos service instances changed listener
fail ", e);
+ }
+ }
+ }
+ }
+
+ public class NacosEventListener implements EventListener {
+ private final Set<ServiceInstancesChangedListener> listeners = new
ConcurrentHashSet<>();
+
+ @Override
+ public void onEvent(Event e) {
+ if (e instanceof NamingEvent) {
+ for (ServiceInstancesChangedListener listener : listeners) {
+ NamingEvent event = (NamingEvent) e;
+ handleEvent(event, listener);
+ }
}
- }));
+ }
+
+ public void addListener(ServiceInstancesChangedListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(ServiceInstancesChangedListener listener) {
+ listeners.remove(listener);
+ }
+
+ public boolean isEmpty() {
+ return listeners.isEmpty();
+ }
}
@Override