This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 02613e3  support service discovery unsubscribe and re-refer (#6692)
02613e3 is described below

commit 02613e35b14b517faa4ad88d284bba6b95db799f
Author: ken.lj <[email protected]>
AuthorDate: Thu Sep 3 17:59:27 2020 +0800

    support service discovery unsubscribe and re-refer (#6692)
    
    * remove not used classes.
    * support service discovery unsubscribe and re-refer
---
 .../rpc/protocol/thrift/ClassNameGenerator.java    | 22 ----------------
 .../org/apache/dubbo/metadata/MetadataInfo.java    |  2 +-
 .../dubbo/registry/client/ServiceDiscovery.java    |  9 +++++++
 .../registry/client/ServiceDiscoveryRegistry.java  | 10 +++++++-
 .../listener/ServiceInstancesChangedListener.java  |  7 +++++
 .../client/migration/MigrationClusterInvoker.java  |  3 +++
 .../client/migration/MigrationInvoker.java         | 30 ++++++++++++++++++++++
 .../client/migration/MigrationRuleListener.java    |  8 +++---
 .../registry/integration/RegistryProtocol.java     | 19 +++++++-------
 .../integration/RegistryProtocolListener.java      |  4 +--
 .../zookeeper/ZookeeperServiceDiscovery.java       | 10 +++++++-
 .../ZookeeperServiceDiscoveryChangeWatcher.java    | 18 ++++++++++---
 12 files changed, 98 insertions(+), 44 deletions(-)

diff --git 
a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/thrift/ClassNameGenerator.java
 
b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/thrift/ClassNameGenerator.java
deleted file mode 100644
index dec29d3..0000000
--- 
a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/protocol/thrift/ClassNameGenerator.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.dubbo.rpc.protocol.thrift;
-
-@Deprecated
-public interface ClassNameGenerator extends 
org.apache.dubbo.rpc.protocol.thrift.ClassNameGenerator {
-}
diff --git 
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
 
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 7ce4cfb..f2d20f5 100644
--- 
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ 
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -178,7 +178,7 @@ public class MetadataInfo implements Serializable {
         private String path; // most of the time, path is the same with the 
interface name.
         private Map<String, String> params;
 
-        // params configuried on consumer side,
+        // params configured on consumer side,
         private transient Map<String, String> consumerParams;
         // cached method params
         private transient Map<String, Map<String, String>> methodParams;
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index 9800c35..3b6c4e5 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -217,6 +217,15 @@ public interface ServiceDiscovery extends Prioritized {
     }
 
     /**
+     * unsubscribe to instances change event.
+     * @param listener
+     * @throws IllegalArgumentException
+     */
+    default void 
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
+            throws IllegalArgumentException {
+    }
+
+    /**
      * Dispatch the {@link ServiceInstancesChangedEvent}
      *
      * @param serviceName the name of service whose service instances have 
been changed
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 1315c54..35f18ce 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -111,6 +111,7 @@ public class ServiceDiscoveryRegistry implements Registry {
 
     /* apps - listener */
     private final Map<String, ServiceInstancesChangedListener> 
serviceListeners = new HashMap<>();
+    private final Map<String, String> serviceToAppsMapping = new HashMap<>();
 
     private URL registryURL;
 
@@ -280,6 +281,10 @@ public class ServiceDiscoveryRegistry implements Registry {
 
     public void doUnsubscribe(URL url, NotifyListener listener) {
         writableMetadataService.unsubscribeURL(url);
+        String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR 
+ url.getParameter(PROTOCOL_KEY, DUBBO);
+        String serviceNamesKey = 
serviceToAppsMapping.remove(protocolServiceKey);
+        ServiceInstancesChangedListener instancesChangedListener = 
serviceListeners.get(serviceNamesKey);
+        instancesChangedListener.removeListener(protocolServiceKey);
     }
 
     @Override
@@ -308,6 +313,9 @@ public class ServiceDiscoveryRegistry implements Registry {
 
     protected void subscribeURLs(URL url, NotifyListener listener, Set<String> 
serviceNames) {
         String serviceNamesKey = serviceNames.toString();
+        String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR 
+ url.getParameter(PROTOCOL_KEY, DUBBO);
+        serviceToAppsMapping.put(protocolServiceKey, serviceNamesKey);
+
         // register ServiceInstancesChangedListener
         ServiceInstancesChangedListener serviceListener = 
serviceListeners.computeIfAbsent(serviceNamesKey,
                 k -> new ServiceInstancesChangedListener(serviceNames, 
serviceDiscovery));
@@ -318,7 +326,6 @@ public class ServiceDiscoveryRegistry implements Registry {
             List<ServiceInstance> serviceInstances = 
serviceDiscovery.getInstances(serviceName);
             serviceListener.onEvent(new 
ServiceInstancesChangedEvent(serviceName, serviceInstances));
         });
-        String protocolServiceKey = url.getServiceKey() + GROUP_CHAR_SEPARATOR 
+ url.getParameter(PROTOCOL_KEY, DUBBO);
 
         listener.notify(serviceListener.getUrls(protocolServiceKey));
 
@@ -460,6 +467,7 @@ public class ServiceDiscoveryRegistry implements Registry {
                 return;
             }
             if (!CollectionUtils.equals(oldApps, newApps) && newApps.size() >= 
oldApps.size()) {
+                doUnsubscribe(url, listener);
                 subscribeURLs(url, listener, newApps);
             }
         }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index e99b40c..7a833e4 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -194,6 +194,13 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
         this.listeners.put(serviceKey, listener);
     }
 
+    public void removeListener(String serviceKey) {
+        listeners.remove(serviceKey);
+        if (listeners.isEmpty()) {
+            serviceDiscovery.removeServiceInstancesChangedListener(this);
+        }
+    }
+
     public List<URL> getUrls(String serviceKey) {
         return toUrlsWithEmpty(serviceUrls.get(serviceKey));
     }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
index 3bd6fd7..00c727f 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationClusterInvoker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.registry.client.migration;
 
+import org.apache.dubbo.common.URL;
 import org.apache.dubbo.registry.client.migration.model.MigrationStep;
 import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 
@@ -35,4 +36,6 @@ public interface MigrationClusterInvoker<T> extends 
ClusterInvoker<T> {
     void fallbackToInterfaceInvoker();
 
     void migrateToServiceDiscoveryInvoker(boolean forceMigrate);
+
+    void reRefer(URL newSubscribeUrl);
 }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 01c32dd..c6ba259 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.registry.client.migration;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.client.migration.model.MigrationStep;
 import org.apache.dubbo.registry.integration.DynamicDirectory;
@@ -31,6 +32,8 @@ import org.apache.dubbo.rpc.cluster.Directory;
 
 import java.util.Set;
 
+import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
+
 public class MigrationInvoker<T> implements MigrationClusterInvoker<T> {
 
     private URL url;
@@ -104,6 +107,33 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
     }
 
     @Override
+    public void reRefer(URL newSubscribeUrl) {
+        // update url to prepare for migration refresh
+        this.url = url.addParameter(REFER_KEY, 
StringUtils.toQueryString(newSubscribeUrl.getParameters()));
+
+        // re-subscribe immediately
+        if (invoker != null && !invoker.isDestroyed()) {
+            doReSubscribe(invoker, newSubscribeUrl);
+        }
+        if (serviceDiscoveryInvoker != null && 
!serviceDiscoveryInvoker.isDestroyed()) {
+            doReSubscribe(serviceDiscoveryInvoker, newSubscribeUrl);
+        }
+    }
+
+    private void doReSubscribe(ClusterInvoker<T> invoker, URL newSubscribeUrl) 
{
+        DynamicDirectory<T> directory = 
(DynamicDirectory<T>)invoker.getDirectory();
+        URL oldSubscribeUrl = directory.getRegisteredConsumerUrl();
+        Registry registry = directory.getRegistry();
+        registry.unregister(directory.getRegisteredConsumerUrl());
+        
directory.unSubscribe(RegistryProtocol.toSubscribeUrl(oldSubscribeUrl));
+        registry.register(directory.getRegisteredConsumerUrl());
+
+        directory.setRegisteredConsumerUrl(newSubscribeUrl);
+        directory.buildRouterChain(newSubscribeUrl);
+        directory.subscribe(RegistryProtocol.toSubscribeUrl(newSubscribeUrl));
+    }
+
+    @Override
     public synchronized void fallbackToInterfaceInvoker() {
         refreshInterfaceInvoker();
         destroyServiceDiscoveryInvoker();
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
index 918d9aa..a300025 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java
@@ -28,7 +28,7 @@ import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.registry.integration.RegistryProtocol;
 import org.apache.dubbo.registry.integration.RegistryProtocolListener;
 import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import java.util.Set;
@@ -77,10 +77,10 @@ public class MigrationRuleListener implements 
RegistryProtocolListener, Configur
     }
 
     @Override
-    public synchronized <T> void onRefer(RegistryProtocol registryProtocol, 
Invoker<T> invoker) {
-        MigrationInvoker<T> migrationInvoker = (MigrationInvoker<T>) invoker;
+    public synchronized void onRefer(RegistryProtocol registryProtocol, 
ClusterInvoker<?> invoker) {
+        MigrationInvoker<?> migrationInvoker = (MigrationInvoker<?>) invoker;
 
-        MigrationRuleHandler<T> migrationListener = new 
MigrationRuleHandler<>(migrationInvoker);
+        MigrationRuleHandler<?> migrationListener = new 
MigrationRuleHandler<>(migrationInvoker);
         listeners.add(migrationListener);
 
         migrationListener.doMigrate(rawRule);
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 855b726..cf4a708 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.RegistryFactory;
 import org.apache.dubbo.registry.RegistryService;
 import org.apache.dubbo.registry.client.ServiceDiscoveryRegistryDirectory;
+import org.apache.dubbo.registry.client.migration.MigrationClusterInvoker;
 import 
org.apache.dubbo.registry.client.migration.ServiceDiscoveryMigrationInvoker;
 import org.apache.dubbo.registry.retry.ReExportTask;
 import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
@@ -498,19 +499,17 @@ public class RegistryProtocol implements Protocol {
         return (ClusterInvoker<T>) cluster.join(directory);
     }
 
-    public <T> void reRefer(DynamicDirectory<T> directory, URL 
newSubscribeUrl) {
-        URL oldSubscribeUrl = directory.getRegisteredConsumerUrl();
-        Registry registry = directory.getRegistry();
-        registry.unregister(directory.getRegisteredConsumerUrl());
-        directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl));
-        registry.register(directory.getRegisteredConsumerUrl());
+    public <T> void reRefer(ClusterInvoker<?> invoker, URL newSubscribeUrl) {
+        if (!(invoker instanceof MigrationClusterInvoker)) {
+            logger.error("Only invoker type of MigrationClusterInvoker 
supports reRefer, current invoker is " + invoker.getClass());
+            return;
+        }
 
-        directory.setRegisteredConsumerUrl(newSubscribeUrl);
-        directory.buildRouterChain(newSubscribeUrl);
-        directory.subscribe(toSubscribeUrl(newSubscribeUrl));
+        MigrationClusterInvoker<?> migrationClusterInvoker = 
(MigrationClusterInvoker<?>)invoker;
+        migrationClusterInvoker.reRefer(newSubscribeUrl);
     }
 
-    protected static URL toSubscribeUrl(URL url) {
+    public static URL toSubscribeUrl(URL url) {
         return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + 
CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
     }
 
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
index 50bdca1..41926b8 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
@@ -19,7 +19,7 @@ package org.apache.dubbo.registry.integration;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.SPI;
 import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 
 /**
  * RegistryProtocol listener is introduced to provide a chance to user to 
customize or change export and refer behavior
@@ -43,7 +43,7 @@ public interface RegistryProtocolListener {
      * @param invoker          invoker
      * @see RegistryProtocol#refer(Class, URL)
      */
-    <T> void onRefer(RegistryProtocol registryProtocol, Invoker<T> invoker);
+    void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker);
 
     /**
      * Notify RegistryProtocol's listeners when the protocol is destroyed
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 593030d..d5f4f3d 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -69,7 +69,7 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
     /**
      * The Key is watched Zookeeper path, the value is an instance of {@link 
CuratorWatcher}
      */
-    private final Map<String, CuratorWatcher> watcherCaches = new 
ConcurrentHashMap<>();
+    private final Map<String, ZookeeperServiceDiscoveryChangeWatcher> 
watcherCaches = new ConcurrentHashMap<>();
 
     @Override
     public void initialize(URL registryURL) throws Exception {
@@ -160,6 +160,14 @@ public class ZookeeperServiceDiscovery extends 
AbstractServiceDiscovery {
         listener.getServiceNames().forEach(serviceName -> 
registerServiceWatcher(serviceName, listener));
     }
 
+    @Override
+    public void 
removeServiceInstancesChangedListener(ServiceInstancesChangedListener listener) 
throws IllegalArgumentException {
+        listener.getServiceNames().forEach(serviceName -> {
+            ZookeeperServiceDiscoveryChangeWatcher watcher = 
watcherCaches.remove(serviceName);
+            watcher.stopWatching();
+        });
+    }
+
     private void 
doInServiceRegistry(ThrowableConsumer<org.apache.curator.x.discovery.ServiceDiscovery>
 consumer) {
         ThrowableConsumer.execute(serviceDiscovery, s -> {
             consumer.accept(s);
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 5ee429a..fa982d8 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -39,6 +39,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher 
implements CuratorWatcher {
 
     private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
 
+    private boolean keepWatching;
+
     private final String serviceName;
 
     public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery 
zookeeperServiceDiscovery,
@@ -55,9 +57,19 @@ public class ZookeeperServiceDiscoveryChangeWatcher 
implements CuratorWatcher {
         Watcher.Event.EventType eventType = event.getType();
 
         if (NodeChildrenChanged.equals(eventType) || 
NodeDataChanged.equals(eventType)) {
-            listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
zookeeperServiceDiscovery.getInstances(serviceName)));
-            zookeeperServiceDiscovery.registerServiceWatcher(serviceName, 
listener);
-            
zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
+            if (shouldKeepWatching()) {
+                listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
zookeeperServiceDiscovery.getInstances(serviceName)));
+                zookeeperServiceDiscovery.registerServiceWatcher(serviceName, 
listener);
+                
zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
+            }
         }
     }
+
+    public boolean shouldKeepWatching() {
+        return keepWatching;
+    }
+
+    public void stopWatching() {
+        this.keepWatching = false;
+    }
 }

Reply via email to