This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 02613e3 support service discovery unsubscribe and re-refer (#6692)
02613e3 is described below
commit 02613e35b14b517faa4ad88d284bba6b95db799f
Author: ken.lj <[email protected]>
AuthorDate: Thu Sep 3 17:59:27 2020 +0800
support service discovery unsubscribe and re-refer (#6692)
* remove not used classes.
* support service discovery unsubscribe and re-refer
---
.../rpc/protocol/thrift/ClassNameGenerator.java | 22 ----------------
.../org/apache/dubbo/metadata/MetadataInfo.java | 2 +-
.../dubbo/registry/client/ServiceDiscovery.java | 9 +++++++
.../registry/client/ServiceDiscoveryRegistry.java | 10 +++++++-
.../listener/ServiceInstancesChangedListener.java | 7 +++++
.../client/migration/MigrationClusterInvoker.java | 3 +++
.../client/migration/MigrationInvoker.java | 30 ++++++++++++++++++++++
.../client/migration/MigrationRuleListener.java | 8 +++---
.../registry/integration/RegistryProtocol.java | 19 +++++++-------
.../integration/RegistryProtocolListener.java | 4 +--
.../zookeeper/ZookeeperServiceDiscovery.java | 10 +++++++-
.../ZookeeperServiceDiscoveryChangeWatcher.java | 18 ++++++++++---
12 files changed, 98 insertions(+), 44 deletions(-)
diff --git
a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/thrift/ClassNameGenerator.java
b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/thrift/ClassNameGenerator.java
deleted file mode 100644
index dec29d3..0000000
---
a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/thrift/ClassNameGenerator.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.dubbo.rpc.protocol.thrift;
-
-@Deprecated
-public interface ClassNameGenerator extends
org.apache.dubbo.rpc.protocol.thrift.ClassNameGenerator {
-}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 7ce4cfb..f2d20f5 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -178,7 +178,7 @@ public class MetadataInfo implements Serializable {
private String path; // most of the time, path is the same with the
interface name.
private Map<String, String> params;
- // params configuried on consumer side,
+ // params configured on consumer side,
private transient Map<String, String> consumerParams;
// cached method params
private transient Map<String, Map<String, String>> methodParams;
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index 9800c35..3b6c4e5 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -217,6 +217,15 @@ public interface ServiceDiscovery extends Prioritized {
}
/**
+ * unsubscribe to instances change event.
+ * @param listener
+ * @throws IllegalArgumentException
+ */
+ default void
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
+ throws IllegalArgumentException {
+ }
+
+ /**
* Dispatch the {@link ServiceInstancesChangedEvent}
*
* @param serviceName the name of service whose service instances have
been changed
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 1315c54..35f18ce 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -111,6 +111,7 @@ public class ServiceDiscoveryRegistry implements Registry {
/* apps - listener */
private final Map<String, ServiceInstancesChangedListener>
serviceListeners = new HashMap<>();
+ private final Map<String, String> serviceToAppsMapping = new HashMap<>();
private URL registryURL;
@@ -280,6 +281,10 @@ public class ServiceDiscoveryRegistry implements Registry {
public void doUnsubscribe(URL url, NotifyListener listener) {
writableMetadataService.unsubscribeURL(url);
+ String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR
+ url.getParameter(PROTOCOL_KEY, DUBBO);
+ String serviceNamesKey =
serviceToAppsMapping.remove(protocolServiceKey);
+ ServiceInstancesChangedListener instancesChangedListener =
serviceListeners.get(serviceNamesKey);
+ instancesChangedListener.removeListener(protocolServiceKey);
}
@Override
@@ -308,6 +313,9 @@ public class ServiceDiscoveryRegistry implements Registry {
protected void subscribeURLs(URL url, NotifyListener listener, Set<String>
serviceNames) {
String serviceNamesKey = serviceNames.toString();
+ String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR
+ url.getParameter(PROTOCOL_KEY, DUBBO);
+ serviceToAppsMapping.put(protocolServiceKey, serviceNamesKey);
+
// register ServiceInstancesChangedListener
ServiceInstancesChangedListener serviceListener =
serviceListeners.computeIfAbsent(serviceNamesKey,
k -> new ServiceInstancesChangedListener(serviceNames,
serviceDiscovery));
@@ -318,7 +326,6 @@ public class ServiceDiscoveryRegistry implements Registry {
List<ServiceInstance> serviceInstances =
serviceDiscovery.getInstances(serviceName);
serviceListener.onEvent(new
ServiceInstancesChangedEvent(serviceName, serviceInstances));
});
- String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR
+ url.getParameter(PROTOCOL_KEY, DUBBO);
listener.notify(serviceListener.getUrls(protocolServiceKey));
@@ -460,6 +467,7 @@ public class ServiceDiscoveryRegistry implements Registry {
return;
}
if (!CollectionUtils.equals(oldApps, newApps) && newApps.size() >=
oldApps.size()) {
+ doUnsubscribe(url, listener);
subscribeURLs(url, listener, newApps);
}
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index e99b40c..7a833e4 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -194,6 +194,13 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
this.listeners.put(serviceKey, listener);
}
+ public void removeListener(String serviceKey) {
+ listeners.remove(serviceKey);
+ if (listeners.isEmpty()) {
+ serviceDiscovery.removeServiceInstancesChangedListener(this);
+ }
+ }
+
public List<URL> getUrls(String serviceKey) {
return toUrlsWithEmpty(serviceUrls.get(serviceKey));
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
index 3bd6fd7..00c727f 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.registry.client.migration;
+import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.client.migration.model.MigrationStep;
import org.apache.dubbo.rpc.cluster.ClusterInvoker;
@@ -35,4 +36,6 @@ public interface MigrationClusterInvoker<T> extends
ClusterInvoker<T> {
void fallbackToInterfaceInvoker();
void migrateToServiceDiscoveryInvoker(boolean forceMigrate);
+
+ void reRefer(URL newSubscribeUrl);
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 01c32dd..c6ba259 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.registry.client.migration;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.client.migration.model.MigrationStep;
import org.apache.dubbo.registry.integration.DynamicDirectory;
@@ -31,6 +32,8 @@ import org.apache.dubbo.rpc.cluster.Directory;
import java.util.Set;
+import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
+
public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
private URL url;
@@ -104,6 +107,33 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
}
@Override
+ public void reRefer(URL newSubscribeUrl) {
+ // update url to prepare for migration refresh
+ this.url = url.addParameter(REFER_KEY,
StringUtils.toQueryString(newSubscribeUrl.getParameters()));
+
+ // re-subscribe immediately
+ if (invoker != null && !invoker.isDestroyed()) {
+ doReSubscribe(invoker, newSubscribeUrl);
+ }
+ if (serviceDiscoveryInvoker != null &&
!serviceDiscoveryInvoker.isDestroyed()) {
+ doReSubscribe(serviceDiscoveryInvoker, newSubscribeUrl);
+ }
+ }
+
+ private void doReSubscribe(ClusterInvoker<T> invoker, URL newSubscribeUrl)
{
+ DynamicDirectory<T> directory =
(DynamicDirectory<T>)invoker.getDirectory();
+ URL oldSubscribeUrl = directory.getRegisteredConsumerUrl();
+ Registry registry = directory.getRegistry();
+ registry.unregister(directory.getRegisteredConsumerUrl());
+
directory.unSubscribe(RegistryProtocol.toSubscribeUrl(oldSubscribeUrl));
+ registry.register(directory.getRegisteredConsumerUrl());
+
+ directory.setRegisteredConsumerUrl(newSubscribeUrl);
+ directory.buildRouterChain(newSubscribeUrl);
+ directory.subscribe(RegistryProtocol.toSubscribeUrl(newSubscribeUrl));
+ }
+
+ @Override
public synchronized void fallbackToInterfaceInvoker() {
refreshInterfaceInvoker();
destroyServiceDiscoveryInvoker();
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 918d9aa..a300025 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -28,7 +28,7 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.registry.integration.RegistryProtocolListener;
import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
import org.apache.dubbo.rpc.model.ApplicationModel;
import java.util.Set;
@@ -77,10 +77,10 @@ public class MigrationRuleListener implements
RegistryProtocolListener, Configur
}
@Override
- public synchronized <T> void onRefer(RegistryProtocol registryProtocol,
Invoker<T> invoker) {
- MigrationInvoker<T> migrationInvoker = (MigrationInvoker<T>) invoker;
+ public synchronized void onRefer(RegistryProtocol registryProtocol,
ClusterInvoker<?> invoker) {
+ MigrationInvoker<?> migrationInvoker = (MigrationInvoker<?>) invoker;
- MigrationRuleHandler<T> migrationListener = new
MigrationRuleHandler<>(migrationInvoker);
+ MigrationRuleHandler<?> migrationListener = new
MigrationRuleHandler<>(migrationInvoker);
listeners.add(migrationListener);
migrationListener.doMigrate(rawRule);
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 855b726..cf4a708 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.RegistryService;
import org.apache.dubbo.registry.client.ServiceDiscoveryRegistryDirectory;
+import org.apache.dubbo.registry.client.migration.MigrationClusterInvoker;
import
org.apache.dubbo.registry.client.migration.ServiceDiscoveryMigrationInvoker;
import org.apache.dubbo.registry.retry.ReExportTask;
import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
@@ -498,19 +499,17 @@ public class RegistryProtocol implements Protocol {
return (ClusterInvoker<T>) cluster.join(directory);
}
- public <T> void reRefer(DynamicDirectory<T> directory, URL
newSubscribeUrl) {
- URL oldSubscribeUrl = directory.getRegisteredConsumerUrl();
- Registry registry = directory.getRegistry();
- registry.unregister(directory.getRegisteredConsumerUrl());
- directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl));
- registry.register(directory.getRegisteredConsumerUrl());
+ public <T> void reRefer(ClusterInvoker<?> invoker, URL newSubscribeUrl) {
+ if (!(invoker instanceof MigrationClusterInvoker)) {
+ logger.error("Only invoker type of MigrationClusterInvoker
supports reRefer, current invoker is " + invoker.getClass());
+ return;
+ }
- directory.setRegisteredConsumerUrl(newSubscribeUrl);
- directory.buildRouterChain(newSubscribeUrl);
- directory.subscribe(toSubscribeUrl(newSubscribeUrl));
+ MigrationClusterInvoker<?> migrationClusterInvoker =
(MigrationClusterInvoker<?>)invoker;
+ migrationClusterInvoker.reRefer(newSubscribeUrl);
}
- protected static URL toSubscribeUrl(URL url) {
+ public static URL toSubscribeUrl(URL url) {
return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," +
CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
index 50bdca1..41926b8 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
@@ -19,7 +19,7 @@ package org.apache.dubbo.registry.integration;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
/**
* RegistryProtocol listener is introduced to provide a chance to user to
customize or change export and refer behavior
@@ -43,7 +43,7 @@ public interface RegistryProtocolListener {
* @param invoker invoker
* @see RegistryProtocol#refer(Class, URL)
*/
- <T> void onRefer(RegistryProtocol registryProtocol, Invoker<T> invoker);
+ void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker);
/**
* Notify RegistryProtocol's listeners when the protocol is destroyed
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 593030d..d5f4f3d 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -69,7 +69,7 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
/**
* The Key is watched Zookeeper path, the value is an instance of {@link
CuratorWatcher}
*/
- private final Map<String, CuratorWatcher> watcherCaches = new
ConcurrentHashMap<>();
+ private final Map<String, ZookeeperServiceDiscoveryChangeWatcher>
watcherCaches = new ConcurrentHashMap<>();
@Override
public void initialize(URL registryURL) throws Exception {
@@ -160,6 +160,14 @@ public class ZookeeperServiceDiscovery extends
AbstractServiceDiscovery {
listener.getServiceNames().forEach(serviceName ->
registerServiceWatcher(serviceName, listener));
}
+ @Override
+ public void
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws IllegalArgumentException {
+ listener.getServiceNames().forEach(serviceName -> {
+ ZookeeperServiceDiscoveryChangeWatcher watcher =
watcherCaches.remove(serviceName);
+ watcher.stopWatching();
+ });
+ }
+
private void
doInServiceRegistry(ThrowableConsumer<org.apache.curator.x.discovery.ServiceDiscovery>
consumer) {
ThrowableConsumer.execute(serviceDiscovery, s -> {
consumer.accept(s);
diff --git
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 5ee429a..fa982d8 100644
---
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -39,6 +39,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher
implements CuratorWatcher {
private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
+ private boolean keepWatching;
+
private final String serviceName;
public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery
zookeeperServiceDiscovery,
@@ -55,9 +57,19 @@ public class ZookeeperServiceDiscoveryChangeWatcher
implements CuratorWatcher {
Watcher.Event.EventType eventType = event.getType();
if (NodeChildrenChanged.equals(eventType) ||
NodeDataChanged.equals(eventType)) {
- listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
zookeeperServiceDiscovery.getInstances(serviceName)));
- zookeeperServiceDiscovery.registerServiceWatcher(serviceName,
listener);
-
zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
+ if (shouldKeepWatching()) {
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
zookeeperServiceDiscovery.getInstances(serviceName)));
+ zookeeperServiceDiscovery.registerServiceWatcher(serviceName,
listener);
+
zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
+ }
}
}
+
+ public boolean shouldKeepWatching() {
+ return keepWatching;
+ }
+
+ public void stopWatching() {
+ this.keepWatching = false;
+ }
}