This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 01ccaf4 Nacos unsub support and RegistryDirectory destroy (#8907)
01ccaf4 is described below
commit 01ccaf46c0d0ea02395f5a7bffe443292ba06ae3
Author: Owen.Cai <[email protected]>
AuthorDate: Tue Sep 28 17:50:45 2021 +0800
Nacos unsub support and RegistryDirectory destroy (#8907)
* nacos_unsub_support & destroy RegistryDirectory
* delete idea import use *
* nacos_unsub_support & destroy RegistryDirectory(#8895)
* delete idea import use *
* change description for code
---
.../support/wrapper/MockClusterInvoker.java | 5 ++
.../registry/integration/DynamicDirectory.java | 3 +-
.../registry/nacos/NacosNamingServiceWrapper.java | 4 ++
.../apache/dubbo/registry/nacos/NacosRegistry.java | 64 +++++++++++++++++-----
4 files changed, 61 insertions(+), 15 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
index 51d0a49..46bab30 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
@@ -75,6 +75,11 @@ public class MockClusterInvoker<T> implements
ClusterInvoker<T> {
@Override
public void destroy() {
+ //directory need destroy, because do not have directory manager, so
who get the directory need destroy
+ //directory need support destroy multi times
+ //other ClusterInvoker maybe also have directory, so it also destroy
+ this.directory.destroy();
+
this.invoker.destroy();
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 84bb67a..f5b64ff 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -238,7 +238,8 @@ public abstract class DynamicDirectory<T> extends
AbstractDirectory<T> implement
// unsubscribe.
try {
if (getSubscribeConsumerurl() != null && registry != null &&
registry.isAvailable()) {
- registry.unsubscribe(getSubscribeConsumerurl(), this);
+ //overwrite by child, so need call function
+ unSubscribe(getSubscribeConsumerurl());
}
} catch (Throwable t) {
logger.warn("unexpected error when unsubscribe service " +
serviceKey + "from registry" + registry.getUrl(), t);
diff --git
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
index 6aa2bdf..18c301b 100644
---
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
+++
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
@@ -49,6 +49,10 @@ public class NacosNamingServiceWrapper {
namingService.subscribe(handleInnerSymbol(serviceName), group,
eventListener);
}
+ public void unsubscribe(String serviceName, String group, EventListener
eventListener) throws NacosException {
+ namingService.unsubscribe(handleInnerSymbol(serviceName), group,
eventListener);
+ }
+
public List<Instance> getAllInstances(String serviceName, String group)
throws NacosException {
return namingService.getAllInstances(handleInnerSymbol(serviceName),
group);
}
diff --git
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
index a235013..9987986 100644
---
a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
+++
b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
@@ -48,6 +48,8 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Collections;
@@ -122,6 +124,8 @@ public class NacosRegistry extends FailbackRegistry {
private final NacosNamingServiceWrapper namingService;
+ private final ConcurrentMap<URL, ConcurrentMap<NotifyListener,
EventListener>> nacosListeners = new ConcurrentHashMap<>();
+
public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
super(url);
this.namingService = namingService;
@@ -240,6 +244,19 @@ public class NacosRegistry extends FailbackRegistry {
if (isAdminProtocol(url)) {
shutdownServiceNamesLookup();
}
+ else {
+ Set<String> serviceNames = getServiceNames(url, listener);
+
+ doUnsubscribe(url, listener, serviceNames);
+ }
+ }
+
+ private void doUnsubscribe(final URL url, final NotifyListener listener,
final Set<String> serviceNames) {
+ execute(namingService -> {
+ for (String serviceName : serviceNames) {
+ unsubscribeEventListener(serviceName, url, listener);
+ }
+ });
}
private void shutdownServiceNamesLookup() {
@@ -495,26 +512,45 @@ public class NacosRegistry extends FailbackRegistry {
private void subscribeEventListener(String serviceName, final URL url,
final NotifyListener listener)
throws NacosException {
- EventListener eventListener = event -> {
- if (event instanceof NamingEvent) {
- NamingEvent e = (NamingEvent) event;
- List<Instance> instances = e.getInstances();
+ ConcurrentMap<NotifyListener, EventListener> listeners =
nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
+ EventListener nacosListener = listeners.computeIfAbsent(listener, k ->
{
+ EventListener eventListener = event -> {
+ if (event instanceof NamingEvent) {
+ NamingEvent e = (NamingEvent) event;
+ List<Instance> instances = e.getInstances();
- if (isServiceNamesWithCompatibleMode(url)) {
+ if (isServiceNamesWithCompatibleMode(url)) {
- // Get all instances with corresponding serviceNames to
avoid instance overwrite and but with empty instance mentioned
- // in https://github.com/apache/dubbo/issues/5885 and
https://github.com/apache/dubbo/issues/5899
-
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName,
instances);
- instances =
NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
- }
+ // Get all instances with corresponding serviceNames
to avoid instance overwrite and but with empty instance mentioned
+ // in https://github.com/apache/dubbo/issues/5885 and
https://github.com/apache/dubbo/issues/5899
+
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName,
instances);
+ instances =
NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
+ }
- notifySubscriber(url, listener, instances);
- }
- };
+ notifySubscriber(url, listener, instances);
+ }
+ };
+ return eventListener;
+ });
namingService.subscribe(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
- eventListener);
+ nacosListener);
+ }
+
+ private void unsubscribeEventListener(String serviceName, final URL url,
final NotifyListener listener)
+ throws NacosException {
+ ConcurrentMap<NotifyListener, EventListener>
notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
+ if(notifyListenerEventListenerConcurrentMap == null){
+ return;
+ }
+ EventListener nacosListener =
notifyListenerEventListenerConcurrentMap.get(listener);
+ if(nacosListener == null){
+ return;
+ }
+ namingService.unsubscribe(serviceName,
+ getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
+ nacosListener);
}
/**