This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new f03f374397 Enhance Zookeeper ServiceDiscovery update (#11220)
f03f374397 is described below
commit f03f374397b9710d16c9d5fd3842ab06cbd944ce
Author: Albumen Kevin <[email protected]>
AuthorDate: Fri Dec 30 15:52:26 2022 +0800
Enhance Zookeeper ServiceDiscovery update (#11220)
---
.github/workflows/build-and-test-pr.yml | 4 +-
.../zookeeper/ZookeeperServiceDiscovery.java | 81 ++++++----------------
.../ZookeeperServiceDiscoveryChangeWatcher.java | 67 +++++++-----------
.../zookeeper/util/CuratorFrameworkUtils.java | 59 ----------------
4 files changed, 49 insertions(+), 162 deletions(-)
diff --git a/.github/workflows/build-and-test-pr.yml
b/.github/workflows/build-and-test-pr.yml
index 446b1e6193..d01b0f0ece 100644
--- a/.github/workflows/build-and-test-pr.yml
+++ b/.github/workflows/build-and-test-pr.yml
@@ -198,12 +198,12 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
source ${{ github.workspace }}/.tmp/decrypted-sonarcloud-token
- ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress
--fail-fast clean test verify
org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Pjacoco
-Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache
-Dsonar.projectKey=apache_dubbo
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Dmaven.wagon.http.retryHandler.count=5 -DskipTests=false
-DskipIntegrationTests=false -Dcheckstyle.skip=false
-Dcheckstyle_unix.skip=false -Drat.skip=false -Dmaven.javad [...]
+ ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress
--fail-fast clean test verify
org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Pjacoco
-Dsonar.host.url=https://sonarcloud.io -Dsonar.organization=apache
-Dsonar.projectKey=apache_dubbo -DtrimStackTrace=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Dmaven.wagon.http.retryHandler.count=5 -DskipTests=false
-DskipIntegrationTests=false -Dcheckstyle.skip=false
-Dcheckstyle_unix.skip=false -Drat.s [...]
- name: "Test with Maven without SonarCloud Scan"
if: ${{ github.repository != 'apache/dubbo' }}
timeout-minutes: 70
run: |
- ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress
--fail-fast clean test verify
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Pjacoco
-Dmaven.wagon.http.retryHandler.count=5 -DskipTests=false
-DskipIntegrationTests=false -Dcheckstyle.skip=false
-Dcheckstyle_unix.skip=false -Drat.skip=false -Dmaven.javadoc.skip=true
-DembeddedZookeeperPath=${{ github.workspace }}/.tmp/zookeeper
+ ./mvnw --batch-mode --no-snapshot-updates -e --no-transfer-progress
--fail-fast clean test verify
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Pjacoco
-DtrimStackTrace=false -Dmaven.wagon.http.retryHandler.count=5
-DskipTests=false -DskipIntegrationTests=false -Dcheckstyle.skip=false
-Dcheckstyle_unix.skip=false -Drat.skip=false -Dmaven.javadoc.skip=true
-DembeddedZookeeperPath=${{ github.workspace }}/.tmp/zookeeper
- name: "Upload coverage to Codecov"
uses: codecov/codecov-action@v3
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index f0f2a6fc98..ef7b6daafd 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -31,8 +31,9 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.zookeeper.KeeperException;
+import org.apache.curator.x.discovery.ServiceCache;
+import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -144,13 +145,17 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
@Override
public void
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws IllegalArgumentException {
listener.getServiceNames().forEach(serviceName -> {
- String servicePath = buildServicePath(serviceName);
- ZookeeperServiceDiscoveryChangeWatcher watcher =
watcherCaches.get(servicePath);
+ ZookeeperServiceDiscoveryChangeWatcher watcher =
watcherCaches.get(serviceName);
if (watcher != null) {
watcher.getListeners().remove(listener);
if (watcher.getListeners().isEmpty()) {
- watcher.stopWatching();
- watcherCaches.remove(servicePath);
+ watcherCaches.remove(serviceName);
+ try {
+ watcher.getCacheInstance().close();
+ } catch (IOException e) {
+ logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "curator
stop watch failed", "",
+ "Curator Stop service discovery watch failed.
Service Name: " + serviceName);
+ }
}
}
});
@@ -166,66 +171,26 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
}
protected void registerServiceWatcher(String serviceName,
ServiceInstancesChangedListener listener) {
- String path = buildServicePath(serviceName);
- try {
- curatorFramework.create().creatingParentsIfNeeded().forPath(path);
- } catch (KeeperException.NodeExistsException e) {
- // ignored
- if (logger.isDebugEnabled()) {
- logger.debug(e);
- }
- } catch (Exception e) {
- throw new IllegalStateException("registerServiceWatcher create
path=" + path + " fail.", e);
- }
-
CountDownLatch latch = new CountDownLatch(1);
- ZookeeperServiceDiscoveryChangeWatcher watcher =
watcherCaches.computeIfAbsent(path, key -> {
- ZookeeperServiceDiscoveryChangeWatcher tmpWatcher = new
ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, path, latch);
- try {
-
curatorFramework.getChildren().usingWatcher(tmpWatcher).forPath(path);
- } catch (KeeperException.NoNodeException e) {
- // ignored
- if (logger.isErrorEnabled()) {
- logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "", "",
e.getMessage());
- }
- } catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- return tmpWatcher;
- });
- watcher.addListener(listener);
- listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
this.getInstances(serviceName)));
- latch.countDown();
- }
- /**
- * 1. re-register, taken care by curator ServiceDiscovery
- * 2. re-subscribe, register curator watcher and notify the latest
provider list
- */
- public void recover() {
- watcherCaches.forEach((path, watcher) -> {
- CountDownLatch latch = new CountDownLatch(1);
- Set<ServiceInstancesChangedListener> listeners =
watcher.getListeners();
+ ZookeeperServiceDiscoveryChangeWatcher watcher =
watcherCaches.computeIfAbsent(serviceName, name -> {
+ ServiceCache<ZookeeperInstance> serviceCache =
serviceDiscovery.serviceCacheBuilder()
+ .name(name)
+ .build();
+ ZookeeperServiceDiscoveryChangeWatcher newer = new
ZookeeperServiceDiscoveryChangeWatcher(this, serviceCache, name, latch);
+ serviceCache.addListener(newer);
+
try {
- watcher.setLatch(latch);
-
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
+ serviceCache.start();
} catch (Exception e) {
- logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Trying to
recover from new zkClient session failed, path is " + path + ", error msg: " +
e.getMessage());
+ throw new RpcException(REGISTRY_EXCEPTION, "Failed subscribe
service: " + name, e);
}
- List<ServiceInstance> instances =
this.getInstances(watcher.getServiceName());
- for (ServiceInstancesChangedListener listener : listeners) {
- listener.onEvent(new
ServiceInstancesChangedEvent(watcher.getServiceName(), instances));
- }
- latch.countDown();
+ return newer;
});
- }
-
- public void reRegisterWatcher(ZookeeperServiceDiscoveryChangeWatcher
watcher) throws Exception {
-
curatorFramework.getChildren().usingWatcher(watcher).forPath(watcher.getPath());
- }
- private String buildServicePath(String serviceName) {
- return rootPath + "/" + serviceName;
+ watcher.addListener(listener);
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
this.getInstances(serviceName)));
+ latch.countDown();
}
}
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 681019c58a..f4e5168431 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -25,17 +25,17 @@ import
org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.rpc.model.ScopeModelUtil;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
-import org.apache.zookeeper.WatchedEvent;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.zookeeper.Watcher;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import static org.apache.zookeeper.Watcher.Event.EventType.NodeChildrenChanged;
-import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
-
/**
* Zookeeper {@link ServiceDiscovery} Change {@link CuratorWatcher watcher}
only interests in
* {@link Watcher.Event.EventType#NodeChildrenChanged} and {@link
Watcher.Event.EventType#NodeDataChanged} event types,
@@ -43,29 +43,26 @@ import static
org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
*
* @since 2.7.5
*/
-public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
- private Set<ServiceInstancesChangedListener> listeners = new
ConcurrentHashSet<>();
+public class ZookeeperServiceDiscoveryChangeWatcher implements
ServiceCacheListener {
+ private final Set<ServiceInstancesChangedListener> listeners = new
ConcurrentHashSet<>();
+
+ private final ServiceCache<ZookeeperInstance> cacheInstance;
private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
private final RegistryNotifier notifier;
- private volatile boolean keepWatching = true;
-
private final String serviceName;
- private final String path;
-
- private CountDownLatch latch;
+ private final CountDownLatch latch;
public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery
zookeeperServiceDiscovery,
+
ServiceCache<ZookeeperInstance> cacheInstance,
String serviceName,
- String path,
CountDownLatch latch) {
this.zookeeperServiceDiscovery = zookeeperServiceDiscovery;
+ this.cacheInstance = cacheInstance;
this.serviceName = serviceName;
- this.path = path;
- this.latch = latch;
this.notifier = new
RegistryNotifier(zookeeperServiceDiscovery.getUrl(),
zookeeperServiceDiscovery.getDelay(),
ScopeModelUtil.getFrameworkModel(zookeeperServiceDiscovery.getUrl().getScopeModel()).getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getServiceDiscoveryAddressNotificationExecutor())
{
@@ -74,51 +71,35 @@ public class ZookeeperServiceDiscoveryChangeWatcher
implements CuratorWatcher {
listeners.forEach(listener ->
listener.onEvent((ServiceInstancesChangedEvent) rawAddresses));
}
};
+ this.latch = latch;
}
@Override
- public void process(WatchedEvent event) throws Exception {
+ public void cacheChanged() {
try {
latch.await();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
}
- Watcher.Event.EventType eventType = event.getType();
-
- if (NodeChildrenChanged.equals(eventType) ||
NodeDataChanged.equals(eventType)) {
- if (shouldKeepWatching()) {
- zookeeperServiceDiscovery.reRegisterWatcher(this);
- List<ServiceInstance> instanceList =
zookeeperServiceDiscovery.getInstances(serviceName);
- notifier.notify(new ServiceInstancesChangedEvent(serviceName,
instanceList));
- }
- }
+ List<ServiceInstance> instanceList =
zookeeperServiceDiscovery.getInstances(serviceName);
+ notifier.notify(new ServiceInstancesChangedEvent(serviceName,
instanceList));
}
- public String getPath() {
- return path;
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework,
ConnectionState connectionState) {
+ // ignore: taken care by curator ServiceDiscovery
}
- public void addListener(ServiceInstancesChangedListener listener) {
- listeners.add(listener);
+ public ServiceCache<ZookeeperInstance> getCacheInstance() {
+ return cacheInstance;
}
public Set<ServiceInstancesChangedListener> getListeners() {
return listeners;
}
- public boolean shouldKeepWatching() {
- return keepWatching;
- }
-
- public void stopWatching() {
- this.keepWatching = false;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public String getServiceName() {
- return serviceName;
+ public void addListener(ServiceInstancesChangedListener listener) {
+ listeners.add(listener);
}
}
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
index 93fe405931..178757b5b9 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/util/CuratorFrameworkUtils.java
@@ -17,8 +17,6 @@
package org.apache.dubbo.registry.zookeeper.util;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
@@ -31,8 +29,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -47,9 +43,6 @@ import java.util.stream.Collectors;
import static org.apache.curator.x.discovery.ServiceInstance.builder;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_SEPARATOR;
-import static org.apache.dubbo.common.constants.CommonConstants.SESSION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ZOOKEEPER_EXCEPTION;
import static
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscovery.DEFAULT_GROUP;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BASE_SLEEP_TIME;
import static
org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.BLOCK_UNTIL_CONNECTED_UNIT;
@@ -95,8 +88,6 @@ public abstract class CuratorFrameworkUtils {
}
CuratorFramework curatorFramework = builder.build();
- curatorFramework.getConnectionStateListenable().addListener(new
CuratorConnectionStateListener(connectionURL, serviceDiscovery));
-
curatorFramework.start();
curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(connectionURL),
BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(connectionURL));
@@ -169,54 +160,4 @@ public abstract class CuratorFrameworkUtils {
}
return group;
}
-
- private static class CuratorConnectionStateListener implements
ConnectionStateListener {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(CuratorConnectionStateListener.class);
- private final long UNKNOWN_SESSION_ID = -1L;
- protected final int DEFAULT_CONNECTION_TIMEOUT_MS = 30 * 1000;
- protected final int DEFAULT_SESSION_TIMEOUT_MS = 60 * 1000;
-
- private long lastSessionId;
- private final int timeout;
- private final int sessionExpireMs;
- private final ZookeeperServiceDiscovery serviceDiscovery;
-
- public CuratorConnectionStateListener(URL url,
ZookeeperServiceDiscovery serviceDiscovery) {
- this.timeout = url.getParameter(TIMEOUT_KEY,
DEFAULT_CONNECTION_TIMEOUT_MS);
- this.sessionExpireMs = url.getParameter(SESSION_KEY,
DEFAULT_SESSION_TIMEOUT_MS);
- this.serviceDiscovery = serviceDiscovery;
- }
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState
state) {
- long sessionId = UNKNOWN_SESSION_ID;
- try {
- sessionId =
client.getZookeeperClient().getZooKeeper().getSessionId();
- } catch (Exception e) {
- logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator
client state changed, but failed to get the related zk session instance.");
- }
-
- if (state == ConnectionState.LOST) {
- logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator
zookeeper session " + Long.toHexString(lastSessionId) + " expired.");
- } else if (state == ConnectionState.SUSPENDED) {
- logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator
zookeeper connection of session " + Long.toHexString(sessionId) + " timed out.
" +
- "connection timeout value is " + timeout + ", session
expire timeout value is " + sessionExpireMs);
- } else if (state == ConnectionState.CONNECTED) {
- lastSessionId = sessionId;
- logger.info("Curator zookeeper client instance initiated
successfully, session id is " + Long.toHexString(sessionId));
- } else if (state == ConnectionState.RECONNECTED) {
- if (lastSessionId == sessionId && sessionId !=
UNKNOWN_SESSION_ID) {
- logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "Curator
zookeeper connection recovered from connection lose, " +
- "reuse the old session " +
Long.toHexString(sessionId));
- serviceDiscovery.recover();
- } else {
- logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "New
session created after old session lost, " +
- "old session " + Long.toHexString(lastSessionId) + ",
new session " + Long.toHexString(sessionId));
- lastSessionId = sessionId;
- serviceDiscovery.recover();
- }
- }
- }
-
- }
}