This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 753b68e  [ISSUE-#7125]  Promise doSubscribe notify and childListen 
notify synchronize. (#7126)
753b68e is described below

commit 753b68e9bb39fad3db01d56ee92c065229af2f5a
Author: 赵延 <[email protected]>
AuthorDate: Fri Mar 26 17:33:45 2021 +0800

    [ISSUE-#7125]  Promise doSubscribe notify and childListen notify 
synchronize. (#7126)
    
    fix #7125
---
 .../registry/zookeeper/ZookeeperRegistry.java      | 39 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 2 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 7bd2eaa..6e01fd6 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@ -55,7 +56,6 @@ import static 
org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGO
 
 /**
  * ZookeeperRegistry
- *
  */
 public class ZookeeperRegistry extends FailbackRegistry {
 
@@ -164,10 +164,14 @@ public class ZookeeperRegistry extends FailbackRegistry {
                     }
                 }
             } else {
+                CountDownLatch latch = new CountDownLatch(1);
                 List<URL> urls = new ArrayList<>();
                 for (String path : toCategoriesPath(url)) {
                     ConcurrentMap<NotifyListener, ChildListener> listeners = 
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
-                    ChildListener zkListener = 
listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> 
ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, 
currentChilds)));
+                    ChildListener zkListener = 
listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k, 
latch));
+                    if (zkListener instanceof RegistryChildListenerImpl) {
+                        ((RegistryChildListenerImpl) 
zkListener).setLatch(latch);
+                    }
                     zkClient.create(path, false);
                     List<String> children = zkClient.addChildListener(path, 
zkListener);
                     if (children != null) {
@@ -175,6 +179,8 @@ public class ZookeeperRegistry extends FailbackRegistry {
                     }
                 }
                 notify(url, listener, urls);
+                // tells the listener to run only after the sync notification 
of main thread finishes.
+                latch.countDown();
             }
         } catch (Throwable e) {
             throw new RpcException("Failed to subscribe " + url + " to 
zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
@@ -308,4 +314,33 @@ public class ZookeeperRegistry extends FailbackRegistry {
         }
     }
 
+    private class RegistryChildListenerImpl implements ChildListener {
+
+        private URL url;
+
+        private NotifyListener listener;
+
+        private volatile CountDownLatch latch;
+
+        RegistryChildListenerImpl(URL url, NotifyListener listener, 
CountDownLatch latch) {
+            this.url = url;
+            this.listener = listener;
+            this.latch = latch;
+        }
+
+        void setLatch(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        @Override
+        public void childChanged(String path, List<String> children) {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                logger.warn("Zookeeper children listener thread was 
interrupted unexpectedly, may cause race condition with the main thread.");
+            }
+            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, 
path, children));
+        }
+    }
+
 }

Reply via email to