This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new f03f374397 Enhance Zookeeper ServiceDiscovery update (#11220)
f03f374397 is described below

commit f03f374397b9710d16c9d5fd3842ab06cbd944ce
Author: Albumen Kevin <[email protected]>
AuthorDate: Fri Dec 30 15:52:26 2022 +0800

    Enhance Zookeeper ServiceDiscovery update (#11220)
---
 .github/workflows/build-and-test-pr.yml            |  4 +-
 .../zookeeper/ZookeeperServiceDiscovery.java       | 81 ++++++----------------
 .../ZookeeperServiceDiscoveryChangeWatcher.java    | 67 +++++++-----------
 .../zookeeper/util/CuratorFrameworkUtils.java      | 59 ----------------
 4 files changed, 49 insertions(+), 162 deletions(-)

diff --git a/.github/workflows/build-and-test-pr.yml 
b/.github/workflows/build-and-test-pr.yml
index 446b1e6193..d01b0f0ece 100644
--- a/.github/workflows/build-and-test-pr.yml
+++ b/.github/workflows/build-and-test-pr.yml
@@ -198,12 +198,12 @@ jobs:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
         run: |
           source ${{ github.workspace }}/.tmp/decrypted-sonarcloud-token
-          ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress 
--fail-fast clean test verify 
org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Pjacoco 
-Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache 
-Dsonar.projectKey=apache_dubbo 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 
-Dmaven.wagon.http.retryHandler.count=5 -DskipTests=false 
-DskipIntegrationTests=false -Dcheckstyle.skip=false 
-Dcheckstyle_unix.skip=false -Drat.skip=false -Dmaven.javad [...]
+          ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress 
--fail-fast clean test verify 
org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Pjacoco 
-Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache 
-Dsonar.projectKey=apache_dubbo -DtrimStackTrace=false 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 
-Dmaven.wagon.http.retryHandler.count=5 -DskipTests=false 
-DskipIntegrationTests=false -Dcheckstyle.skip=false 
-Dcheckstyle_unix.skip=false -Drat.s [...]
       - name: "Test with Maven without SonarCloud Scan"
         if: ${{ github.repository != 'apache/dubbo' }}
         timeout-minutes: 70
         run: |
-          ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress 
--fail-fast clean test verify 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Pjacoco 
-Dmaven.wagon.http.retryHandler.count=5 -DskipTests=false 
-DskipIntegrationTests=false -Dcheckstyle.skip=false 
-Dcheckstyle_unix.skip=false -Drat.skip=false -Dmaven.javadoc.skip=true 
-DembeddedZookeeperPath=${{ github.workspace }}/.tmp/zookeeper
+          ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress 
--fail-fast clean test verify 
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Pjacoco 
-DtrimStackTrace=false -Dmaven.wagon.http.retryHandler.count=5 
-DskipTests=false -DskipIntegrationTests=false -Dcheckstyle.skip=false 
-Dcheckstyle_unix.skip=false -Drat.skip=false -Dmaven.javadoc.skip=true 
-DembeddedZookeeperPath=${{ github.workspace }}/.tmp/zookeeper
       - name: "Upload coverage to Codecov"
         uses: codecov/codecov-action@v3
 
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index f0f2a6fc98..ef7b6daafd 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -31,8 +31,9 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.zookeeper.KeeperException;
+import org.apache.curator.x.discovery.ServiceCache;
 
+import java.io.IOException;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -144,13 +145,17 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
     @Override
     public void 
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) 
throws IllegalArgumentException {
         listener.getServiceNames().forEach(serviceName -> {
-            String servicePath = buildServicePath(serviceName);
-            ZookeeperServiceDiscoveryChangeWatcher watcher = 
watcherCaches.get(servicePath);
+            ZookeeperServiceDiscoveryChangeWatcher watcher = 
watcherCaches.get(serviceName);
             if (watcher != null) {
                 watcher.getListeners().remove(listener);
                 if (watcher.getListeners().isEmpty()) {
-                    watcher.stopWatching();
-                    watcherCaches.remove(servicePath);
+                    watcherCaches.remove(serviceName);
+                    try {
+                        watcher.getCacheInstance().close();
+                    } catch (IOException e) {
+                        logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "curator 
stop watch failed", "",
+                            "Curator Stop service discovery watch failed. 
Service Name: " + serviceName);
+                    }
                 }
             }
         });
@@ -166,66 +171,26 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
     }
 
     protected void registerServiceWatcher(String serviceName, 
ServiceInstancesChangedListener listener) {
-        String path = buildServicePath(serviceName);
-        try {
-            curatorFramework.create().creatingParentsIfNeeded().forPath(path);
-        } catch (KeeperException.NodeExistsException e) {
-            // ignored
-            if (logger.isDebugEnabled()) {
-                logger.debug(e);
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("registerServiceWatcher create 
path=" + path + " fail.", e);
-        }
-
         CountDownLatch latch = new CountDownLatch(1);
-        ZookeeperServiceDiscoveryChangeWatcher watcher = 
watcherCaches.computeIfAbsent(path, key -> {
-            ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new 
ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch);
-            try {
-                
curatorFramework.getChildren().usingWatcher(tmpWatcher).forPath(path);
-            } catch (KeeperException.NoNodeException e) {
-                // ignored
-                if (logger.isErrorEnabled()) {
-                    logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", 
e.getMessage());
-                }
-            } catch (Exception e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-            return tmpWatcher;
-        });
-        watcher.addListener(listener);
-        listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
this.getInstances(serviceName)));
-        latch.countDown();
-    }
 
-    /**
-     * 1. re-register, taken care by curator ServiceDiscovery
-     * 2. re-subscribe, register curator watcher and notify the latest 
provider list
-     */
-    public void recover() {
-        watcherCaches.forEach((path, watcher) -> {
-            CountDownLatch latch = new CountDownLatch(1);
-            Set<ServiceInstancesChangedListener> listeners = 
watcher.getListeners();
+        ZookeeperServiceDiscoveryChangeWatcher watcher = 
watcherCaches.computeIfAbsent(serviceName, name -> {
+            ServiceCache<ZookeeperInstance> serviceCache = 
serviceDiscovery.serviceCacheBuilder()
+                .name(name)
+                .build();
+            ZookeeperServiceDiscoveryChangeWatcher newer = new 
ZookeeperServiceDiscoveryChangeWatcher(this, serviceCache, name, latch);
+            serviceCache.addListener(newer);
+
             try {
-                watcher.setLatch(latch);
-                
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
+                serviceCache.start();
             } catch (Exception e) {
-                logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Trying to 
recover from new zkClient session failed, path is " + path + ", error msg: " + 
e.getMessage());
+                throw new RpcException(REGISTRY_EXCEPTION, "Failed subscribe 
service: " + name, e);
             }
 
-            List<ServiceInstance> instances = 
this.getInstances(watcher.getServiceName());
-            for (ServiceInstancesChangedListener listener : listeners) {
-                listener.onEvent(new 
ServiceInstancesChangedEvent(watcher.getServiceName(), instances));
-            }
-            latch.countDown();
+            return newer;
         });
-    }
-
-    public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher 
watcher) throws Exception {
-        
curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath());
-    }
 
-    private String buildServicePath(String serviceName) {
-        return rootPath + "/" + serviceName;
+        watcher.addListener(listener);
+        listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
this.getInstances(serviceName)));
+        latch.countDown();
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 681019c58a..f4e5168431 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -25,17 +25,17 @@ import 
org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import 
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 import org.apache.dubbo.rpc.model.ScopeModelUtil;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.zookeeper.WatchedEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.details.ServiceCacheListener;
 import org.apache.zookeeper.Watcher;
 
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
-import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged;
-import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
-
 /**
  * Zookeeper {@link ServiceDiscovery} Change {@link CuratorWatcher watcher} 
only interests in
  * {@link Watcher.Event.EventType#NodeChildrenChanged} and {@link 
Watcher.Event.EventType#NodeDataChanged} event types,
@@ -43,29 +43,26 @@ import static 
org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
  *
  * @since 2.7.5
  */
-public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
-    private Set<ServiceInstancesChangedListener> listeners = new 
ConcurrentHashSet<>();
+public class ZookeeperServiceDiscoveryChangeWatcher implements 
ServiceCacheListener {
+    private final Set<ServiceInstancesChangedListener> listeners = new 
ConcurrentHashSet<>();
+
+    private final ServiceCache<ZookeeperInstance> cacheInstance;
 
     private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
 
     private final RegistryNotifier notifier;
 
-    private volatile boolean keepWatching = true;
-
     private final String serviceName;
 
-    private final String path;
-
-    private CountDownLatch latch;
+    private final CountDownLatch latch;
 
     public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery 
zookeeperServiceDiscovery,
+                                                  
ServiceCache<ZookeeperInstance> cacheInstance,
                                                   String serviceName,
-                                                  String path,
                                                   CountDownLatch latch) {
         this.zookeeperServiceDiscovery = zookeeperServiceDiscovery;
+        this.cacheInstance = cacheInstance;
         this.serviceName = serviceName;
-        this.path = path;
-        this.latch = latch;
         this.notifier = new 
RegistryNotifier(zookeeperServiceDiscovery.getUrl(), 
zookeeperServiceDiscovery.getDelay(),
             
ScopeModelUtil.getFrameworkModel(zookeeperServiceDiscovery.getUrl().getScopeModel()).getBeanFactory()
                 
.getBean(FrameworkExecutorRepository.class).getServiceDiscoveryAddressNotificationExecutor())
 {
@@ -74,51 +71,35 @@ public class ZookeeperServiceDiscoveryChangeWatcher 
implements CuratorWatcher {
                 listeners.forEach(listener -> 
listener.onEvent((ServiceInstancesChangedEvent) rawAddresses));
             }
         };
+        this.latch = latch;
     }
 
     @Override
-    public void process(WatchedEvent event) throws Exception {
+    public void cacheChanged() {
         try {
             latch.await();
-        } catch (InterruptedException e) {
+        } catch (InterruptedException ignore) {
+            Thread.currentThread().interrupt();
         }
 
-        Watcher.Event.EventType eventType = event.getType();
-
-        if (NodeChildrenChanged.equals(eventType) || 
NodeDataChanged.equals(eventType)) {
-            if (shouldKeepWatching()) {
-                zookeeperServiceDiscovery.reRegisterWatcher(this);
-                List<ServiceInstance> instanceList = 
zookeeperServiceDiscovery.getInstances(serviceName);
-                notifier.notify(new ServiceInstancesChangedEvent(serviceName, 
instanceList));
-            }
-        }
+        List<ServiceInstance> instanceList = 
zookeeperServiceDiscovery.getInstances(serviceName);
+        notifier.notify(new ServiceInstancesChangedEvent(serviceName, 
instanceList));
     }
 
-    public String getPath() {
-        return path;
+    @Override
+    public void stateChanged(CuratorFramework curatorFramework, 
ConnectionState connectionState) {
+        // ignore: taken care by curator ServiceDiscovery
     }
 
-    public void addListener(ServiceInstancesChangedListener listener) {
-        listeners.add(listener);
+    public ServiceCache<ZookeeperInstance> getCacheInstance() {
+        return cacheInstance;
     }
 
     public Set<ServiceInstancesChangedListener> getListeners() {
         return listeners;
     }
 
-    public boolean shouldKeepWatching() {
-        return keepWatching;
-    }
-
-    public void stopWatching() {
-        this.keepWatching = false;
-    }
-
-    public void setLatch(CountDownLatch latch) {
-        this.latch = latch;
-    }
-
-    public String getServiceName() {
-        return serviceName;
+    public void addListener(ServiceInstancesChangedListener listener) {
+        listeners.add(listener);
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
index 93fe405931..178757b5b9 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
@@ -17,8 +17,6 @@
 package org.apache.dubbo.registry.zookeeper.util;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.registry.client.DefaultServiceInstance;
 import org.apache.dubbo.registry.client.ServiceInstance;
@@ -31,8 +29,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -47,9 +43,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.curator.x.discovery.ServiceInstance.builder;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ZOOKEEPER_EXCEPTION;
 import static 
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BASE_SLEEP_TIME;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_UNIT;
@@ -95,8 +88,6 @@ public abstract class CuratorFrameworkUtils {
         }
         CuratorFramework curatorFramework = builder.build();
 
-        curatorFramework.getConnectionStateListenable().addListener(new 
CuratorConnectionStateListener(connectionURL, serviceDiscovery));
-
         curatorFramework.start();
         
curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(connectionURL),
             BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(connectionURL));
@@ -169,54 +160,4 @@ public abstract class CuratorFrameworkUtils {
         }
         return group;
     }
-
-    private static class CuratorConnectionStateListener implements 
ConnectionStateListener {
-        private static final ErrorTypeAwareLogger logger = 
LoggerFactory.getErrorTypeAwareLogger(CuratorConnectionStateListener.class);
-        private final long UNKNOWN_SESSION_ID = -1L;
-        protected final int DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000;
-        protected final int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
-
-        private long lastSessionId;
-        private final int timeout;
-        private final int sessionExpireMs;
-        private final ZookeeperServiceDiscovery serviceDiscovery;
-
-        public CuratorConnectionStateListener(URL url, 
ZookeeperServiceDiscovery serviceDiscovery) {
-            this.timeout = url.getParameter(TIMEOUT_KEY, 
DEFAULT_CONNECTION_TIMEOUT_MS);
-            this.sessionExpireMs = url.getParameter(SESSION_KEY, 
DEFAULT_SESSION_TIMEOUT_MS);
-            this.serviceDiscovery = serviceDiscovery;
-        }
-
-        @Override
-        public void stateChanged(CuratorFramework client, ConnectionState 
state) {
-            long sessionId = UNKNOWN_SESSION_ID;
-            try {
-                sessionId = 
client.getZookeeperClient().getZooKeeper().getSessionId();
-            } catch (Exception e) {
-                logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator 
client state changed, but failed to get the related zk session instance.");
-            }
-
-            if (state == ConnectionState.LOST) {
-                logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator 
zookeeper session " + Long.toHexString(lastSessionId) + " expired.");
-            } else if (state == ConnectionState.SUSPENDED) {
-                logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator 
zookeeper connection of session " + Long.toHexString(sessionId) + " timed out. 
" +
-                    "connection timeout value is " + timeout + ", session 
expire timeout value is " + sessionExpireMs);
-            } else if (state == ConnectionState.CONNECTED) {
-                lastSessionId = sessionId;
-                logger.info("Curator zookeeper client instance initiated 
successfully, session id is " + Long.toHexString(sessionId));
-            } else if (state == ConnectionState.RECONNECTED) {
-                if (lastSessionId == sessionId && sessionId != 
UNKNOWN_SESSION_ID) {
-                    logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator 
zookeeper connection recovered from connection lose, " +
-                        "reuse the old session " + 
Long.toHexString(sessionId));
-                    serviceDiscovery.recover();
-                } else {
-                    logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "New 
session created after old session lost, " +
-                        "old session " + Long.toHexString(lastSessionId) + ", 
new session " + Long.toHexString(sessionId));
-                    lastSessionId = sessionId;
-                    serviceDiscovery.recover();
-                }
-            }
-        }
-
-    }
 }

Reply via email to