This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 5f4f2c0c41 Fix recover method of ZookeeperServiceDiscovery (#10614)
5f4f2c0c41 is described below

commit 5f4f2c0c41f7f86ba9dc127791ae2302b501e828
Author: 灼华 <[email protected]>
AuthorDate: Thu Sep 15 14:29:22 2022 +0800

    Fix recover method of ZookeeperServiceDiscovery (#10614)
    
    * Fix recover method of ZookeeperServiceDiscovery
    
    * Fix ut
---
 .../dubbo/metadata/AbstractServiceNameMapping.java |  5 ++---
 .../apache/dubbo/metadata/MappingCacheManager.java |  7 ------
 .../registry/client/AbstractServiceDiscovery.java  |  6 ++++++
 .../dubbo/registry/support/AbstractRegistry.java   |  9 +++++++-
 .../support/CacheableFailbackRegistry.java         |  4 ++--
 .../registry/zookeeper/ZookeeperRegistry.java      | 25 ++++++++++++++++++++++
 .../zookeeper/ZookeeperServiceDiscovery.java       |  4 ++--
 .../ZookeeperServiceDiscoveryChangeWatcher.java    |  4 ++++
 8 files changed, 49 insertions(+), 15 deletions(-)

diff --git 
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
 
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
index 534f00063a..dc4d1c7b6f 100644
--- 
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
+++ 
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.config.ApplicationConfig;
 import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ScopeModelAware;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,7 +49,7 @@ import static 
org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
 import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
 import static org.apache.dubbo.common.utils.StringUtils.isBlank;
 
-public abstract class AbstractServiceNameMapping implements 
ServiceNameMapping, ScopeModelAware {
+public abstract class AbstractServiceNameMapping implements ServiceNameMapping 
{
     protected final Logger logger = LoggerFactory.getLogger(getClass());
     protected ApplicationModel applicationModel;
     private final MappingCacheManager mappingCacheManager;
@@ -73,7 +72,7 @@ public abstract class AbstractServiceNameMapping implements 
ServiceNameMapping,
             
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
     }
 
-    @Override
+    // just for test
     public void setApplicationModel(ApplicationModel applicationModel) {
         this.applicationModel = applicationModel;
     }
diff --git 
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
 
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
index 90d44b3362..2365f6ce66 100644
--- 
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
+++ 
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
@@ -21,7 +21,6 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.rpc.model.ScopeModel;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -66,10 +65,4 @@ public class MappingCacheManager extends 
AbstractCacheManager<Set<String>> {
     protected String getName() {
         return "mapping";
     }
-
-    public void update(Map<String, Set<String>> newCache) {
-        for (Map.Entry<String, Set<String>> entry : newCache.entrySet()) {
-            cache.put(entry.getKey(), entry.getValue());
-        }
-    }
 }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
index 4ede56efb4..0ccaa13043 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
@@ -91,6 +91,9 @@ public abstract class AbstractServiceDiscovery implements 
ServiceDiscovery {
 
     @Override
     public synchronized void register() throws RuntimeException {
+        if (isDestroy) {
+            return;
+        }
         this.serviceInstance = createServiceInstance(this.metadataInfo);
         if (!isValidInstance(this.serviceInstance)) {
             logger.warn("No valid instance found, stop registering instance 
address to registry.");
@@ -134,6 +137,9 @@ public abstract class AbstractServiceDiscovery implements 
ServiceDiscovery {
 
     @Override
     public synchronized void unregister() throws RuntimeException {
+        if (isDestroy) {
+            return;
+        }
         // fixme, this metadata info might still being shared by other 
instances
 //        unReportMetadata(this.metadataInfo);
         if (!isValidInstance(this.serviceInstance)) {
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index c65e519720..da86143533 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -40,6 +40,7 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -216,7 +217,7 @@ public abstract class AbstractRegistry implements Registry {
             }
 
             try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
-                 FileChannel channel = raf.getChannel()) {
+                FileChannel channel = raf.getChannel()) {
                 FileLock lock = channel.tryLock();
                 if (lock == null) {
 
@@ -321,6 +322,12 @@ public abstract class AbstractRegistry implements Registry 
{
     }
 
     public List<URL> getCacheUrls(URL url) {
+        Map<String, List<URL>> categoryNotified = notified.get(url);
+        if (CollectionUtils.isNotEmptyMap(categoryNotified)) {
+            List<URL> urls = 
categoryNotified.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+            return urls;
+        }
+
         for (Map.Entry<Object, Object> entry : properties.entrySet()) {
             String key = (String) entry.getKey();
             String value = (String) entry.getValue();
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
index f747a3071f..0a98a118fc 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
@@ -123,7 +123,7 @@ public abstract class CacheableFailbackRegistry extends 
FailbackRegistry {
     protected void evictURLCache(URL url) {
         Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url);
         try {
-            if (oldURLs != null && !oldURLs.isEmpty()) {
+            if (CollectionUtils.isNotEmptyMap(oldURLs)) {
                 logger.info("Evicting urls for service " + url.getServiceKey() 
+ ", size " + oldURLs.size());
                 Long currentTimestamp = System.currentTimeMillis();
                 for (Map.Entry<String, ServiceAddressURL> entry : 
oldURLs.entrySet()) {
@@ -165,7 +165,7 @@ public abstract class CacheableFailbackRegistry extends 
FailbackRegistry {
 
         if (oldURLs == null) {
             for (String rawProvider : providers) {
-                // remove timestamp in provider url.
+                // remove VARIABLE_KEYS(timestamp,pid..) in provider url.
                 rawProvider = stripOffVariableKeys(rawProvider);
 
                 // create DubboServiceAddress object using provider url, 
consumer url, and extra parameters.
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 48b30056bd..d427cea98b 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -33,6 +33,7 @@ import 
org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
 import org.apache.dubbo.rpc.RpcException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -117,6 +118,29 @@ public class ZookeeperRegistry extends 
CacheableFailbackRegistry {
     @Override
     public void destroy() {
         super.destroy();
+
+        // remove child listener
+        Set<URL> urls = zkListeners.keySet();
+        for (URL url : urls) {
+            ConcurrentMap<NotifyListener, ChildListener> map = 
zkListeners.get(url);
+            if (CollectionUtils.isEmptyMap(map)) {
+                continue;
+            }
+            Collection<ChildListener> childListeners = map.values();
+            if (CollectionUtils.isEmpty(childListeners)) {
+                continue;
+            }
+            if (ANY_VALUE.equals(url.getServiceInterface())) {
+                String root = toRootPath();
+                childListeners.stream().forEach(childListener -> 
zkClient.removeChildListener(root, childListener));
+            } else {
+                for (String path : toCategoriesPath(url)) {
+                    childListeners.stream().forEach(childListener -> 
zkClient.removeChildListener(path, childListener));
+                }
+            }
+        }
+        zkListeners.clear();
+
         // Just release zkClient reference, but can not close zk client here 
for zk client is shared somewhere else.
         // See 
org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
         zkClient = null;
@@ -225,6 +249,7 @@ public class ZookeeperRegistry extends 
CacheableFailbackRegistry {
 
     @Override
     public void doUnsubscribe(URL url, NotifyListener listener) {
+        super.doUnsubscribe(url, listener);
         checkDestroyed();
         ConcurrentMap<NotifyListener, ChildListener> listeners = 
zkListeners.get(url);
         if (listeners != null) {
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 11276f4815..9e2aea232e 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -192,9 +192,9 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
                 logger.error("Trying to recover from new zkClient session 
failed, path is " + path + ", error msg: " + e.getMessage());
             }
 
-            List<ServiceInstance> instances = this.getInstances(serviceName);
+            List<ServiceInstance> instances = 
this.getInstances(watcher.getServiceName());
             for (ServiceInstancesChangedListener listener : listeners) {
-                listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
instances));
+                listener.onEvent(new 
ServiceInstancesChangedEvent(watcher.getServiceName(), instances));
             }
             latch.countDown();
         });
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 465d7dda90..681019c58a 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -117,4 +117,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher 
implements CuratorWatcher {
     public void setLatch(CountDownLatch latch) {
         this.latch = latch;
     }
+
+    public String getServiceName() {
+        return serviceName;
+    }
 }

Reply via email to