This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 5f4f2c0c41 Fix recover method of ZookeeperServiceDiscovery (#10614)
5f4f2c0c41 is described below
commit 5f4f2c0c41f7f86ba9dc127791ae2302b501e828
Author: 灼华 <[email protected]>
AuthorDate: Thu Sep 15 14:29:22 2022 +0800
Fix recover method of ZookeeperServiceDiscovery (#10614)
* Fix recover method of ZookeeperServiceDiscovery
* Fix ut
---
.../dubbo/metadata/AbstractServiceNameMapping.java | 5 ++---
.../apache/dubbo/metadata/MappingCacheManager.java | 7 ------
.../registry/client/AbstractServiceDiscovery.java | 6 ++++++
.../dubbo/registry/support/AbstractRegistry.java | 9 +++++++-
.../support/CacheableFailbackRegistry.java | 4 ++--
.../registry/zookeeper/ZookeeperRegistry.java | 25 ++++++++++++++++++++++
.../zookeeper/ZookeeperServiceDiscovery.java | 4 ++--
.../ZookeeperServiceDiscoveryChangeWatcher.java | 4 ++++
8 files changed, 49 insertions(+), 15 deletions(-)
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
index 534f00063a..dc4d1c7b6f 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/AbstractServiceNameMapping.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ScopeModelAware;
import java.util.Collections;
import java.util.HashMap;
@@ -50,7 +49,7 @@ import static
org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.common.utils.CollectionUtils.toTreeSet;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;
-public abstract class AbstractServiceNameMapping implements
ServiceNameMapping, ScopeModelAware {
+public abstract class AbstractServiceNameMapping implements ServiceNameMapping
{
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected ApplicationModel applicationModel;
private final MappingCacheManager mappingCacheManager;
@@ -73,7 +72,7 @@ public abstract class AbstractServiceNameMapping implements
ServiceNameMapping,
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
}
- @Override
+ // just for test
public void setApplicationModel(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
index 90d44b3362..2365f6ce66 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingCacheManager.java
@@ -21,7 +21,6 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.model.ScopeModel;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@@ -66,10 +65,4 @@ public class MappingCacheManager extends
AbstractCacheManager<Set<String>> {
protected String getName() {
return "mapping";
}
-
- public void update(Map<String, Set<String>> newCache) {
- for (Map.Entry<String, Set<String>> entry : newCache.entrySet()) {
- cache.put(entry.getKey(), entry.getValue());
- }
- }
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
index 4ede56efb4..0ccaa13043 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
@@ -91,6 +91,9 @@ public abstract class AbstractServiceDiscovery implements
ServiceDiscovery {
@Override
public synchronized void register() throws RuntimeException {
+ if (isDestroy) {
+ return;
+ }
this.serviceInstance = createServiceInstance(this.metadataInfo);
if (!isValidInstance(this.serviceInstance)) {
logger.warn("No valid instance found, stop registering instance
address to registry.");
@@ -134,6 +137,9 @@ public abstract class AbstractServiceDiscovery implements
ServiceDiscovery {
@Override
public synchronized void unregister() throws RuntimeException {
+ if (isDestroy) {
+ return;
+ }
// fixme, this metadata info might still being shared by other
instances
// unReportMetadata(this.metadataInfo);
if (!isValidInstance(this.serviceInstance)) {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
index c65e519720..da86143533 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistry.java
@@ -40,6 +40,7 @@ import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -216,7 +217,7 @@ public abstract class AbstractRegistry implements Registry {
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
- FileChannel channel = raf.getChannel()) {
+ FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
if (lock == null) {
@@ -321,6 +322,12 @@ public abstract class AbstractRegistry implements Registry
{
}
public List<URL> getCacheUrls(URL url) {
+ Map<String, List<URL>> categoryNotified = notified.get(url);
+ if (CollectionUtils.isNotEmptyMap(categoryNotified)) {
+ List<URL> urls =
categoryNotified.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ return urls;
+ }
+
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String key = (String) entry.getKey();
String value = (String) entry.getValue();
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
index f747a3071f..0a98a118fc 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
@@ -123,7 +123,7 @@ public abstract class CacheableFailbackRegistry extends
FailbackRegistry {
protected void evictURLCache(URL url) {
Map<String, ServiceAddressURL> oldURLs = stringUrls.remove(url);
try {
- if (oldURLs != null && !oldURLs.isEmpty()) {
+ if (CollectionUtils.isNotEmptyMap(oldURLs)) {
logger.info("Evicting urls for service " + url.getServiceKey()
+ ", size " + oldURLs.size());
Long currentTimestamp = System.currentTimeMillis();
for (Map.Entry<String, ServiceAddressURL> entry :
oldURLs.entrySet()) {
@@ -165,7 +165,7 @@ public abstract class CacheableFailbackRegistry extends
FailbackRegistry {
if (oldURLs == null) {
for (String rawProvider : providers) {
- // remove timestamp in provider url.
+ // remove VARIABLE_KEYS(timestamp,pid..) in provider url.
rawProvider = stripOffVariableKeys(rawProvider);
// create DubboServiceAddress object using provider url,
consumer url, and extra parameters.
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 48b30056bd..d427cea98b 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -33,6 +33,7 @@ import
org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
import org.apache.dubbo.rpc.RpcException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -117,6 +118,29 @@ public class ZookeeperRegistry extends
CacheableFailbackRegistry {
@Override
public void destroy() {
super.destroy();
+
+ // remove child listener
+ Set<URL> urls = zkListeners.keySet();
+ for (URL url : urls) {
+ ConcurrentMap<NotifyListener, ChildListener> map =
zkListeners.get(url);
+ if (CollectionUtils.isEmptyMap(map)) {
+ continue;
+ }
+ Collection<ChildListener> childListeners = map.values();
+ if (CollectionUtils.isEmpty(childListeners)) {
+ continue;
+ }
+ if (ANY_VALUE.equals(url.getServiceInterface())) {
+ String root = toRootPath();
+ childListeners.stream().forEach(childListener ->
zkClient.removeChildListener(root, childListener));
+ } else {
+ for (String path : toCategoriesPath(url)) {
+ childListeners.stream().forEach(childListener ->
zkClient.removeChildListener(path, childListener));
+ }
+ }
+ }
+ zkListeners.clear();
+
// Just release zkClient reference, but can not close zk client here
for zk client is shared somewhere else.
// See
org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
zkClient = null;
@@ -225,6 +249,7 @@ public class ZookeeperRegistry extends
CacheableFailbackRegistry {
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
+ super.doUnsubscribe(url, listener);
checkDestroyed();
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.get(url);
if (listeners != null) {
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 11276f4815..9e2aea232e 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -192,9 +192,9 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
logger.error("Trying to recover from new zkClient session
failed, path is " + path + ", error msg: " + e.getMessage());
}
- List<ServiceInstance> instances = this.getInstances(serviceName);
+ List<ServiceInstance> instances =
this.getInstances(watcher.getServiceName());
for (ServiceInstancesChangedListener listener : listeners) {
- listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
instances));
+ listener.onEvent(new
ServiceInstancesChangedEvent(watcher.getServiceName(), instances));
}
latch.countDown();
});
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 465d7dda90..681019c58a 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -117,4 +117,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher
implements CuratorWatcher {
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
+
+ public String getServiceName() {
+ return serviceName;
+ }
}