This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 1ea8d2d762 fix #10052, zookeeper service discovery cannot recover from 
session loss. (#10134)
1ea8d2d762 is described below

commit 1ea8d2d762d920e78782dfe8a7be5e5b7a2a6237
Author: ken.lj <[email protected]>
AuthorDate: Wed Jun 15 16:05:40 2022 +0800

    fix #10052, zookeeper service discovery cannot recover from session loss. 
(#10134)
    
    * fix #10052, zookeeper service discovery cannot recover from session loss
    
    * fix ut
---
 .../zookeeper/ZookeeperServiceDiscovery.java       | 40 +++++++++--
 .../ZookeeperServiceDiscoveryChangeWatcher.java    |  8 +++
 .../zookeeper/util/CuratorFrameworkParams.java     |  7 +-
 .../zookeeper/util/CuratorFrameworkUtils.java      | 78 +++++++++++++++++++++-
 .../zookeeper/util/CuratorFrameworkUtilsTest.java  |  4 +-
 5 files changed, 126 insertions(+), 11 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 6d2f4381da..11276f4815 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -41,25 +41,29 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
 import static org.apache.dubbo.common.function.ThrowableFunction.execute;
-import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildServiceDiscovery;
+import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.getRootPath;
 import static org.apache.dubbo.rpc.RpcException.REGISTRY_EXCEPTION;
 
 /**
  * Zookeeper {@link ServiceDiscovery} implementation based on
  * <a href="https://curator.apache.org/curator-x-discovery/index.html";>Apache 
Curator X Discovery</a>
+ * <p>
+ * TODO, replace curator CuratorFramework with dubbo ZookeeperClient
  */
 public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private CuratorFramework curatorFramework;
+    public static final String DEFAULT_GROUP = "/services";
 
-    private String rootPath;
+    private final CuratorFramework curatorFramework;
 
-    private org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance> 
serviceDiscovery;
+    private final String rootPath;
+
+    private final 
org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance> 
serviceDiscovery;
 
     /**
      * The Key is watched Zookeeper path, the value is an instance of {@link 
CuratorWatcher}
@@ -69,8 +73,8 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
     public ZookeeperServiceDiscovery(ApplicationModel applicationModel, URL 
registryURL) {
         super(applicationModel, registryURL);
         try {
-            this.curatorFramework = buildCuratorFramework(registryURL);
-            this.rootPath = ROOT_PATH.getParameterValue(registryURL);
+            this.curatorFramework = buildCuratorFramework(registryURL, this);
+            this.rootPath = getRootPath(registryURL);
             this.serviceDiscovery = buildServiceDiscovery(curatorFramework, 
rootPath);
             this.serviceDiscovery.start();
         } catch (Exception e) {
@@ -82,6 +86,7 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
     public void doDestroy() throws Exception {
         serviceDiscovery.close();
         curatorFramework.close();
+        watcherCaches.clear();
     }
 
     @Override
@@ -172,6 +177,29 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
         latch.countDown();
     }
 
+    /**
+     * 1. re-register, taken care by curator ServiceDiscovery
+     * 2. re-subscribe, register curator watcher and notify the latest 
provider list
+     */
+    public void recover() {
+        watcherCaches.forEach((path, watcher) -> {
+            CountDownLatch latch = new CountDownLatch(1);
+            Set<ServiceInstancesChangedListener> listeners = 
watcher.getListeners();
+            try {
+                watcher.setLatch(latch);
+                
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
+            } catch (Exception e) {
+                logger.error("Trying to recover from new zkClient session 
failed, path is " + path + ", error msg: " + e.getMessage());
+            }
+
+            List<ServiceInstance> instances = this.getInstances(serviceName);
+            for (ServiceInstancesChangedListener listener : listeners) {
+                listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
instances));
+            }
+            latch.countDown();
+        });
+    }
+
     public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher 
watcher) throws Exception {
         
curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath());
     }
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 79fdd79a53..465d7dda90 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -102,6 +102,10 @@ public class ZookeeperServiceDiscoveryChangeWatcher 
implements CuratorWatcher {
         listeners.add(listener);
     }
 
+    public Set<ServiceInstancesChangedListener> getListeners() {
+        return listeners;
+    }
+
     public boolean shouldKeepWatching() {
         return keepWatching;
     }
@@ -109,4 +113,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher 
implements CuratorWatcher {
     public void stopWatching() {
         this.keepWatching = false;
     }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
 }
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
index 3e6ad64102..fd095a2ff6 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.CuratorFramework;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static 
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP;
+
 /**
  * The enumeration for the parameters  of {@link CuratorFramework}
  *
@@ -31,11 +33,12 @@ import java.util.function.Function;
  * @since 2.7.5
  */
 public enum CuratorFrameworkParams {
-
     /**
      * The root path of Dubbo Service
      */
-    ROOT_PATH("rootPath", "/services", value -> value),
+    ROOT_PATH("rootPath", DEFAULT_GROUP, value -> value),
+
+    GROUP_PATH("group", DEFAULT_GROUP, value -> value),
 
     /**
      * The host of current {@link ServiceInstance service instance} that will 
be registered
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
index 2fb3e5ef37..f3d43c64e5 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
@@ -17,15 +17,20 @@
 package org.apache.dubbo.registry.zookeeper.util;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.registry.client.DefaultServiceInstance;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.zookeeper.ZookeeperInstance;
+import org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery;
 import org.apache.dubbo.rpc.model.ScopeModelUtil;
 
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.x.discovery.ServiceDiscovery;
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -37,11 +42,17 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.curator.x.discovery.ServiceInstance.builder;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static 
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BASE_SLEEP_TIME;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_UNIT;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_WAIT;
+import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.GROUP_PATH;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.MAX_RETRIES;
 import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.MAX_SLEEP;
+import static 
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
 
 /**
  * Curator Framework Utilities Class
@@ -58,11 +69,14 @@ public abstract class CuratorFrameworkUtils {
             .build();
     }
 
-    public static CuratorFramework buildCuratorFramework(URL connectionURL) 
throws Exception {
+    public static CuratorFramework buildCuratorFramework(URL connectionURL, 
ZookeeperServiceDiscovery serviceDiscovery) throws Exception {
         CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
             .connectString(connectionURL.getBackupAddress())
             .retryPolicy(buildRetryPolicy(connectionURL))
             .build();
+
+        curatorFramework.getConnectionStateListenable().addListener(new 
CuratorConnectionStateListener(connectionURL, serviceDiscovery));
+
         curatorFramework.start();
         
curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(connectionURL),
             BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(connectionURL));
@@ -74,6 +88,7 @@ public abstract class CuratorFrameworkUtils {
         if (!curatorFramework.getZookeeperClient().isConnected()) {
             throw new IllegalStateException("failed to connect to zookeeper 
server");
         }
+
         return curatorFramework;
     }
 
@@ -123,4 +138,65 @@ public abstract class CuratorFrameworkUtils {
     public static String generateId(String host, int port) {
         return host + ":" + port;
     }
+
+    public static String getRootPath(URL registryURL) {
+        String group = ROOT_PATH.getParameterValue(registryURL);
+        if (group.equalsIgnoreCase(DEFAULT_GROUP)) {
+            group = GROUP_PATH.getParameterValue(registryURL);
+            if (!group.startsWith(PATH_SEPARATOR)) {
+                group = PATH_SEPARATOR + group;
+            }
+        }
+        return group;
+    }
+
+    private static class CuratorConnectionStateListener implements 
ConnectionStateListener {
+        private static final Logger logger = 
LoggerFactory.getLogger(CuratorConnectionStateListener.class);
+        private final long UNKNOWN_SESSION_ID = -1L;
+        protected final int DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000;
+        protected final int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
+
+        private long lastSessionId;
+        private final int timeout;
+        private final int sessionExpireMs;
+        private final ZookeeperServiceDiscovery serviceDiscovery;
+
+        public CuratorConnectionStateListener(URL url, 
ZookeeperServiceDiscovery serviceDiscovery) {
+            this.timeout = url.getParameter(TIMEOUT_KEY, 
DEFAULT_CONNECTION_TIMEOUT_MS);
+            this.sessionExpireMs = url.getParameter(SESSION_KEY, 
DEFAULT_SESSION_TIMEOUT_MS);
+            this.serviceDiscovery = serviceDiscovery;
+        }
+
+        @Override
+        public void stateChanged(CuratorFramework client, ConnectionState 
state) {
+            long sessionId = UNKNOWN_SESSION_ID;
+            try {
+                sessionId = 
client.getZookeeperClient().getZooKeeper().getSessionId();
+            } catch (Exception e) {
+                logger.warn("Curator client state changed, but failed to get 
the related zk session instance.");
+            }
+
+            if (state == ConnectionState.LOST) {
+                logger.warn("Curator zookeeper session " + 
Long.toHexString(lastSessionId) + " expired.");
+            } else if (state == ConnectionState.SUSPENDED) {
+                logger.warn("Curator zookeeper connection of session " + 
Long.toHexString(sessionId) + " timed out. " +
+                    "connection timeout value is " + timeout + ", session 
expire timeout value is " + sessionExpireMs);
+            } else if (state == ConnectionState.CONNECTED) {
+                lastSessionId = sessionId;
+                logger.info("Curator zookeeper client instance initiated 
successfully, session id is " + Long.toHexString(sessionId));
+            } else if (state == ConnectionState.RECONNECTED) {
+                if (lastSessionId == sessionId && sessionId != 
UNKNOWN_SESSION_ID) {
+                    logger.warn("Curator zookeeper connection recovered from 
connection lose, " +
+                        "reuse the old session " + 
Long.toHexString(sessionId));
+                    serviceDiscovery.recover();
+                } else {
+                    logger.warn("New session created after old session lost, " 
+
+                        "old session " + Long.toHexString(lastSessionId) + ", 
new session " + Long.toHexString(sessionId));
+                    lastSessionId = sessionId;
+                    serviceDiscovery.recover();
+                }
+            }
+        }
+
+    }
 }
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
index 04e2051580..01ed19db28 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
@@ -58,7 +58,7 @@ class CuratorFrameworkUtilsTest {
 
     @Test
     void testBuildCuratorFramework() throws Exception {
-        CuratorFramework curatorFramework = 
CuratorFrameworkUtils.buildCuratorFramework(registryUrl);
+        CuratorFramework curatorFramework = 
CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
         Assertions.assertNotNull(curatorFramework);
         
Assertions.assertTrue(curatorFramework.getZookeeperClient().isConnected());
         curatorFramework.getZookeeperClient().close();
@@ -66,7 +66,7 @@ class CuratorFrameworkUtilsTest {
 
     @Test
     void testBuildServiceDiscovery() throws Exception {
-        CuratorFramework curatorFramework = 
CuratorFrameworkUtils.buildCuratorFramework(registryUrl);
+        CuratorFramework curatorFramework = 
CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
         ServiceDiscovery<ZookeeperInstance> discovery = 
CuratorFrameworkUtils.buildServiceDiscovery(curatorFramework, 
ROOT_PATH.getParameterValue(registryUrl));
         Assertions.assertNotNull(discovery);
         discovery.close();

Reply via email to