This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 753b68e [ISSUE-#7125] Promise doSubscribe notify and childListen
notify synchronize. (#7126)
753b68e is described below
commit 753b68e9bb39fad3db01d56ee92c065229af2f5a
Author: 赵延 <[email protected]>
AuthorDate: Fri Mar 26 17:33:45 2021 +0800
[ISSUE-#7125] Promise doSubscribe notify and childListen notify
synchronize. (#7126)
fix #7125
---
.../registry/zookeeper/ZookeeperRegistry.java | 39 ++++++++++++++++++++--
1 file changed, 37 insertions(+), 2 deletions(-)
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 7bd2eaa..6e01fd6 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@ -55,7 +56,6 @@ import static
org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGO
/**
* ZookeeperRegistry
- *
*/
public class ZookeeperRegistry extends FailbackRegistry {
@@ -164,10 +164,14 @@ public class ZookeeperRegistry extends FailbackRegistry {
}
}
} else {
+ CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
- ChildListener zkListener =
listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) ->
ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath,
currentChilds)));
+ ChildListener zkListener =
listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, k,
latch));
+ if (zkListener instanceof RegistryChildListenerImpl) {
+ ((RegistryChildListenerImpl)
zkListener).setLatch(latch);
+ }
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path,
zkListener);
if (children != null) {
@@ -175,6 +179,8 @@ public class ZookeeperRegistry extends FailbackRegistry {
}
}
notify(url, listener, urls);
+ // tells the listener to run only after the sync notification
of main thread finishes.
+ latch.countDown();
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to
zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
@@ -308,4 +314,33 @@ public class ZookeeperRegistry extends FailbackRegistry {
}
}
+ private class RegistryChildListenerImpl implements ChildListener {
+
+ private URL url;
+
+ private NotifyListener listener;
+
+ private volatile CountDownLatch latch;
+
+ RegistryChildListenerImpl(URL url, NotifyListener listener,
CountDownLatch latch) {
+ this.url = url;
+ this.listener = listener;
+ this.latch = latch;
+ }
+
+ void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void childChanged(String path, List<String> children) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ logger.warn("Zookeeper children listener thread was
interrupted unexpectedly, may cause race condition with the main thread.");
+ }
+ ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url,
path, children));
+ }
+ }
+
}