This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 1ea8d2d762 fix #10052, zookeeper service discovery cannot recover from
session loss. (#10134)
1ea8d2d762 is described below
commit 1ea8d2d762d920e78782dfe8a7be5e5b7a2a6237
Author: ken.lj <[email protected]>
AuthorDate: Wed Jun 15 16:05:40 2022 +0800
fix #10052, zookeeper service discovery cannot recover from session loss.
(#10134)
* fix #10052, zookeeper service discovery cannot recover from session loss
* fix ut
---
.../zookeeper/ZookeeperServiceDiscovery.java | 40 +++++++++--
.../ZookeeperServiceDiscoveryChangeWatcher.java | 8 +++
.../zookeeper/util/CuratorFrameworkParams.java | 7 +-
.../zookeeper/util/CuratorFrameworkUtils.java | 78 +++++++++++++++++++++-
.../zookeeper/util/CuratorFrameworkUtilsTest.java | 4 +-
5 files changed, 126 insertions(+), 11 deletions(-)
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 6d2f4381da..11276f4815 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -41,25 +41,29 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import static org.apache.dubbo.common.function.ThrowableFunction.execute;
-import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildServiceDiscovery;
+import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.getRootPath;
import static org.apache.dubbo.rpc.RpcException.REGISTRY_EXCEPTION;
/**
* Zookeeper {@link ServiceDiscovery} implementation based on
* <a href="https://curator.apache.org/curator-x-discovery/index.html">Apache
Curator X Discovery</a>
+ * <p>
+ * TODO, replace curator CuratorFramework with dubbo ZookeeperClient
*/
public class ZookeeperServiceDiscovery extends AbstractServiceDiscovery {
private final Logger logger = LoggerFactory.getLogger(getClass());
- private CuratorFramework curatorFramework;
+ public static final String DEFAULT_GROUP = "/services";
- private String rootPath;
+ private final CuratorFramework curatorFramework;
- private org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance>
serviceDiscovery;
+ private final String rootPath;
+
+ private final
org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance>
serviceDiscovery;
/**
* The Key is watched Zookeeper path, the value is an instance of {@link
CuratorWatcher}
@@ -69,8 +73,8 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
public ZookeeperServiceDiscovery(ApplicationModel applicationModel, URL
registryURL) {
super(applicationModel, registryURL);
try {
- this.curatorFramework = buildCuratorFramework(registryURL);
- this.rootPath = ROOT_PATH.getParameterValue(registryURL);
+ this.curatorFramework = buildCuratorFramework(registryURL, this);
+ this.rootPath = getRootPath(registryURL);
this.serviceDiscovery = buildServiceDiscovery(curatorFramework,
rootPath);
this.serviceDiscovery.start();
} catch (Exception e) {
@@ -82,6 +86,7 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
public void doDestroy() throws Exception {
serviceDiscovery.close();
curatorFramework.close();
+ watcherCaches.clear();
}
@Override
@@ -172,6 +177,29 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
latch.countDown();
}
+ /**
+ * 1. re-register, taken care by curator ServiceDiscovery
+ * 2. re-subscribe, register curator watcher and notify the latest
provider list
+ */
+ public void recover() {
+ watcherCaches.forEach((path, watcher) -> {
+ CountDownLatch latch = new CountDownLatch(1);
+ Set<ServiceInstancesChangedListener> listeners =
watcher.getListeners();
+ try {
+ watcher.setLatch(latch);
+
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
+ } catch (Exception e) {
+ logger.error("Trying to recover from new zkClient session
failed, path is " + path + ", error msg: " + e.getMessage());
+ }
+
+ List<ServiceInstance> instances = this.getInstances(serviceName);
+ for (ServiceInstancesChangedListener listener : listeners) {
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
instances));
+ }
+ latch.countDown();
+ });
+ }
+
public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher
watcher) throws Exception {
curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath());
}
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 79fdd79a53..465d7dda90 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -102,6 +102,10 @@ public class ZookeeperServiceDiscoveryChangeWatcher
implements CuratorWatcher {
listeners.add(listener);
}
+ public Set<ServiceInstancesChangedListener> getListeners() {
+ return listeners;
+ }
+
public boolean shouldKeepWatching() {
return keepWatching;
}
@@ -109,4 +113,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher
implements CuratorWatcher {
public void stopWatching() {
this.keepWatching = false;
}
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
}
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
index 3e6ad64102..fd095a2ff6 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkParams.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import static
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP;
+
/**
* The enumeration for the parameters of {@link CuratorFramework}
*
@@ -31,11 +33,12 @@ import java.util.function.Function;
* @since 2.7.5
*/
public enum CuratorFrameworkParams {
-
/**
* The root path of Dubbo Service
*/
- ROOT_PATH("rootPath", "/services", value -> value),
+ ROOT_PATH("rootPath", DEFAULT_GROUP, value -> value),
+
+ GROUP_PATH("group", DEFAULT_GROUP, value -> value),
/**
* The host of current {@link ServiceInstance service instance} that will
be registered
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
index 2fb3e5ef37..f3d43c64e5 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
@@ -17,15 +17,20 @@
package org.apache.dubbo.registry.zookeeper.util;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.zookeeper.ZookeeperInstance;
+import org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -37,11 +42,17 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.curator.x.discovery.ServiceInstance.builder;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
+import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BASE_SLEEP_TIME;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_UNIT;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_WAIT;
+import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.GROUP_PATH;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.MAX_RETRIES;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.MAX_SLEEP;
+import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
/**
* Curator Framework Utilities Class
@@ -58,11 +69,14 @@ public abstract class CuratorFrameworkUtils {
.build();
}
- public static CuratorFramework buildCuratorFramework(URL connectionURL)
throws Exception {
+ public static CuratorFramework buildCuratorFramework(URL connectionURL,
ZookeeperServiceDiscovery serviceDiscovery) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(connectionURL.getBackupAddress())
.retryPolicy(buildRetryPolicy(connectionURL))
.build();
+
+ curatorFramework.getConnectionStateListenable().addListener(new
CuratorConnectionStateListener(connectionURL, serviceDiscovery));
+
curatorFramework.start();
curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(connectionURL),
BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(connectionURL));
@@ -74,6 +88,7 @@ public abstract class CuratorFrameworkUtils {
if (!curatorFramework.getZookeeperClient().isConnected()) {
throw new IllegalStateException("failed to connect to zookeeper
server");
}
+
return curatorFramework;
}
@@ -123,4 +138,65 @@ public abstract class CuratorFrameworkUtils {
public static String generateId(String host, int port) {
return host + ":" + port;
}
+
+ public static String getRootPath(URL registryURL) {
+ String group = ROOT_PATH.getParameterValue(registryURL);
+ if (group.equalsIgnoreCase(DEFAULT_GROUP)) {
+ group = GROUP_PATH.getParameterValue(registryURL);
+ if (!group.startsWith(PATH_SEPARATOR)) {
+ group = PATH_SEPARATOR + group;
+ }
+ }
+ return group;
+ }
+
+ private static class CuratorConnectionStateListener implements
ConnectionStateListener {
+ private static final Logger logger =
LoggerFactory.getLogger(CuratorConnectionStateListener.class);
+ private final long UNKNOWN_SESSION_ID = -1L;
+ protected final int DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000;
+ protected final int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
+
+ private long lastSessionId;
+ private final int timeout;
+ private final int sessionExpireMs;
+ private final ZookeeperServiceDiscovery serviceDiscovery;
+
+ public CuratorConnectionStateListener(URL url,
ZookeeperServiceDiscovery serviceDiscovery) {
+ this.timeout = url.getParameter(TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.sessionExpireMs = url.getParameter(SESSION_KEY,
DEFAULT_SESSION_TIMEOUT_MS);
+ this.serviceDiscovery = serviceDiscovery;
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState
state) {
+ long sessionId = UNKNOWN_SESSION_ID;
+ try {
+ sessionId =
client.getZookeeperClient().getZooKeeper().getSessionId();
+ } catch (Exception e) {
+ logger.warn("Curator client state changed, but failed to get
the related zk session instance.");
+ }
+
+ if (state == ConnectionState.LOST) {
+ logger.warn("Curator zookeeper session " +
Long.toHexString(lastSessionId) + " expired.");
+ } else if (state == ConnectionState.SUSPENDED) {
+ logger.warn("Curator zookeeper connection of session " +
Long.toHexString(sessionId) + " timed out. " +
+ "connection timeout value is " + timeout + ", session
expire timeout value is " + sessionExpireMs);
+ } else if (state == ConnectionState.CONNECTED) {
+ lastSessionId = sessionId;
+ logger.info("Curator zookeeper client instance initiated
successfully, session id is " + Long.toHexString(sessionId));
+ } else if (state == ConnectionState.RECONNECTED) {
+ if (lastSessionId == sessionId && sessionId !=
UNKNOWN_SESSION_ID) {
+ logger.warn("Curator zookeeper connection recovered from
connection lose, " +
+ "reuse the old session " +
Long.toHexString(sessionId));
+ serviceDiscovery.recover();
+ } else {
+ logger.warn("New session created after old session lost, "
+
+ "old session " + Long.toHexString(lastSessionId) + ",
new session " + Long.toHexString(sessionId));
+ lastSessionId = sessionId;
+ serviceDiscovery.recover();
+ }
+ }
+ }
+
+ }
}
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
index 04e2051580..01ed19db28 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtilsTest.java
@@ -58,7 +58,7 @@ class CuratorFrameworkUtilsTest {
@Test
void testBuildCuratorFramework() throws Exception {
- CuratorFramework curatorFramework =
CuratorFrameworkUtils.buildCuratorFramework(registryUrl);
+ CuratorFramework curatorFramework =
CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
Assertions.assertNotNull(curatorFramework);
Assertions.assertTrue(curatorFramework.getZookeeperClient().isConnected());
curatorFramework.getZookeeperClient().close();
@@ -66,7 +66,7 @@ class CuratorFrameworkUtilsTest {
@Test
void testBuildServiceDiscovery() throws Exception {
- CuratorFramework curatorFramework =
CuratorFrameworkUtils.buildCuratorFramework(registryUrl);
+ CuratorFramework curatorFramework =
CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
ServiceDiscovery<ZookeeperInstance> discovery =
CuratorFrameworkUtils.buildServiceDiscovery(curatorFramework,
ROOT_PATH.getParameterValue(registryUrl));
Assertions.assertNotNull(discovery);
discovery.close();