This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 22f5310  [3.0] Fix un-subscription memory leak and some enhancements 
to migration approach. (#7476)
22f5310 is described below

commit 22f531008499884b2a1246083ca9fe3c3b132a45
Author: ken.lj <[email protected]>
AuthorDate: Tue Mar 30 14:04:20 2021 +0800

    [3.0] Fix un-subscription memory leak and some enhancements to migration 
approach. (#7476)
---
 .../dubbo/config/bootstrap/DubboBootstrap.java     | 12 +++-
 dubbo-distribution/dubbo-all/pom.xml               |  7 ++-
 .../client/ServiceDiscoveryRegistryDirectory.java  | 14 ++++-
 .../listener/ServiceInstancesChangedListener.java  | 64 +++++++++++++--------
 .../metadata/store/RemoteMetadataServiceImpl.java  |  7 ++-
 .../client/migration/MigrationInvoker.java         | 66 ++++++++++++++--------
 .../migration/PreMigratingConditionChecker.java    | 25 ++++++++
 .../support/CacheableFailbackRegistry.java         |  2 +
 .../dubbo/registry/support/FailbackRegistry.java   |  4 ++
 9 files changed, 147 insertions(+), 54 deletions(-)

diff --git 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index 0ef8ef4..656b994 100644
--- 
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ 
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -1159,13 +1159,21 @@ public class DubboBootstrap extends 
GenericEventListener {
 
         ServiceInstance serviceInstance = createServiceInstance(serviceName);
 
-        doRegisterServiceInstance(serviceInstance);
+        try {
+            doRegisterServiceInstance(serviceInstance);
+        } catch (Exception e) {
+            logger.error("Register instance error", e);
+        }
 
         // scheduled task for updating Metadata and ServiceInstance
         executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
             InMemoryWritableMetadataService localMetadataService = 
(InMemoryWritableMetadataService) WritableMetadataService.getDefaultExtension();
             localMetadataService.blockUntilUpdated();
-            
ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
+            try {
+                
ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
+            } catch (Exception e) {
+                logger.error("Refresh instance and metadata error", e);
+            }
         }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, 
DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS);
     }
 
diff --git a/dubbo-distribution/dubbo-all/pom.xml 
b/dubbo-distribution/dubbo-all/pom.xml
index b464e41..519fbc7 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -737,7 +737,12 @@
                                         
META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
                                     </resource>
                                 </transformer>
-
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>
+                                        
META-INF/dubbo/internal/org.apache.dubbo.registry.client.migration.PreMigratingConditionChecker
+                                    </resource>
+                                </transformer>
                             </transformers>
                             <filters>
                                 <filter>
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 6232097..f1af5bd 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.registry.AddressListener;
 import org.apache.dubbo.registry.NotifyListener;
-import 
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 import org.apache.dubbo.registry.integration.DynamicDirectory;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Protocol;
@@ -51,8 +50,6 @@ public class ServiceDiscoveryRegistryDirectory<T> extends 
DynamicDirectory<T> im
     // instance address to invoker mapping.
     private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial 
value is null and the midway may be assigned to null, please use the local 
variable reference
 
-    private ServiceInstancesChangedListener listener;
-
     public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
         super(serviceType, url);
     }
@@ -102,6 +99,17 @@ public class ServiceDiscoveryRegistryDirectory<T> extends 
DynamicDirectory<T> im
         return true;
     }
 
+    /**
+     * This implementation wants to make sure all application names related to 
serviceListener received  address notification.
+     *
+     * FIXME, make sure deprecated "interface-application" mapping item be 
cleared in time.
+     */
+    @Override
+    public boolean isNotificationReceived() {
+        return serviceListener.isDestroyed()
+                || serviceListener.getAllInstances().size() == 
serviceListener.getServiceNames().size();
+    }
+
     private void refreshInvoker(List<URL> invokerUrls) {
         Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty 
url list to clear address.");
 
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index dd0a0cf..62fb08f 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -48,8 +48,10 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
@@ -70,19 +72,18 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
     protected final ServiceDiscovery serviceDiscovery;
     protected URL url;
     protected Map<String, NotifyListener> listeners;
+    protected AtomicBoolean destroyed = new AtomicBoolean(false);
 
-    private Map<String, List<ServiceInstance>> allInstances;
-
-    private Map<String, Object> serviceUrls;
-
-    private Map<String, MetadataInfo> revisionToMetadata;
+    protected Map<String, List<ServiceInstance>> allInstances;
+    protected Map<String, Object> serviceUrls;
+    protected Map<String, MetadataInfo> revisionToMetadata;
 
     private volatile long lastRefreshTime;
     private volatile long lastFailureTime;
     private volatile AtomicInteger failureCounter = new AtomicInteger(0);
     private Semaphore retryPermission;
-
-    private ScheduledExecutorService scheduler;
+    private volatile ScheduledFuture<?> retryFuture;
+    private static ScheduledExecutorService scheduler = 
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getMetadataRetryExecutor();
 
     public ServiceInstancesChangedListener(Set<String> serviceNames, 
ServiceDiscovery serviceDiscovery) {
         this.serviceNames = serviceNames;
@@ -92,7 +93,6 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
         this.serviceUrls = new HashMap<>();
         this.revisionToMetadata = new HashMap<>();
         retryPermission = new Semaphore(1);
-        this.scheduler = 
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getMetadataRetryExecutor();
     }
 
     /**
@@ -134,11 +134,11 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
             }
         }
 
-        logger.info(newRevisionToMetadata.size() + " unique revisions. ");
+        logger.info(newRevisionToMetadata.size() + " unique revisions: " + 
newRevisionToMetadata.keySet());
 
         if (hasEmptyMetadata(newRevisionToMetadata)) {// retry every 10 seconds
             if (retryPermission.tryAcquire()) {
-                scheduler.schedule(new 
AddressRefreshRetryTask(retryPermission), 10000, TimeUnit.MILLISECONDS);
+                retryFuture = scheduler.schedule(new 
AddressRefreshRetryTask(retryPermission), 10000, TimeUnit.MILLISECONDS);
                 logger.warn("Address refresh try task submitted.");
             }
             logger.warn("Address refresh failed because of Metadata Server 
failure, wait for retry or new address refresh event.");
@@ -169,7 +169,7 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
 
     public synchronized void addListenerAndNotify(String serviceKey, 
NotifyListener listener) {
         this.listeners.put(serviceKey, listener);
-        List<URL> urls = getAddresses(serviceKey);
+        List<URL> urls = getAddresses(serviceKey, listener.getConsumerUrl());
         if (CollectionUtils.isNotEmpty(urls)) {
             listener.notify(urls);
         }
@@ -177,15 +177,13 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
 
     public void removeListener(String serviceKey) {
         listeners.remove(serviceKey);
+        logger.info("Interface listener of interface " + serviceKey + " 
removed.");
         if (listeners.isEmpty()) {
+            logger.info("No interface listeners exist, will stop instance 
listener for " + this.getServiceNames());
             serviceDiscovery.removeServiceInstancesChangedListener(this);
         }
     }
 
-    public List<URL> getUrls(String serviceKey) {
-        return toUrlsWithEmpty(getAddresses(serviceKey));
-    }
-
     /**
      * Get the correlative service name
      *
@@ -263,18 +261,22 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
 
     protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String 
revision, Map<ServiceInfo, Set<String>> localServiceToRevisions, 
List<ServiceInstance> subInstances) {
         MetadataInfo metadata = revisionToMetadata.get(revision);
+
+        if (metadata != null && metadata != MetadataInfo.EMPTY) {
+            logger.info("MetadataInfo for instance " + instance.getAddress() + 
"?revision=" + revision + "&cluster=" + instance.getRegistryCluster() + ", " + 
metadata);
+        }
+
         if (metadata == null
                 || (metadata == MetadataInfo.EMPTY && (failureCounter.get() < 
3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) {
             metadata = getMetadataInfo(instance);
 
             if (metadata != MetadataInfo.EMPTY) {
-                logger.info("MetadataInfo for instance " + 
instance.getAddress() + "?revision=" + revision + " is " + metadata);
                 failureCounter.set(0);
                 revisionToMetadata.putIfAbsent(revision, metadata);
                 parseMetadata(revision, metadata, localServiceToRevisions);
             } else {
                 logger.error("Failed to get MetadataInfo for instance " + 
instance.getAddress() + "?revision=" + revision
-                        + ", wait for retry.");
+                        + "&cluster=" + instance.getRegistryCluster() + ", 
wait for retry.");
                 lastFailureTime = System.currentTimeMillis();
                 failureCounter.incrementAndGet();
             }
@@ -313,9 +315,7 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
                 MetadataService metadataServiceProxy = 
MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery);
                 metadataInfo = 
metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
             }
-            if (logger.isDebugEnabled()) {
-                logger.info("Metadata " + metadataInfo.toString());
-            }
+            logger.info("Metadata " + metadataInfo);
         } catch (Exception e) {
             logger.error("Failed to load service metadata, meta type is " + 
metadataType, e);
             metadataInfo = null;
@@ -346,14 +346,14 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
         return urls;
     }
 
-    protected List<URL> getAddresses(String serviceProtocolKey) {
+    protected List<URL> getAddresses(String serviceProtocolKey, URL 
consumerURL) {
         return (List<URL>) serviceUrls.get(serviceProtocolKey);
     }
 
     protected void notifyAddressChanged() {
         listeners.forEach((key, notifyListener) -> {
             //FIXME, group wildcard match
-            List<URL> urls = toUrlsWithEmpty(getAddresses(key));
+            List<URL> urls = toUrlsWithEmpty(getAddresses(key, 
notifyListener.getConsumerUrl()));
             logger.info("Notify service " + key + " with urls " + urls.size());
             notifyListener.notify(urls);
         });
@@ -366,6 +366,26 @@ public class ServiceInstancesChangedListener implements 
ConditionalEventListener
         return urls;
     }
 
+    /**
+     * Since this listener is shared among interfaces, destroy this listener 
only when all interface listener are unsubscribed
+     */
+    public void destroy() {
+        if (destroyed.compareAndSet(false, true)) {
+            if (CollectionUtils.isEmptyMap(listeners)) {
+                allInstances.clear();
+                serviceUrls.clear();
+                revisionToMetadata.clear();
+                if (retryFuture != null && !retryFuture.isDone()) {
+                    retryFuture.cancel(true);
+                }
+            }
+        }
+    }
+
+    public boolean isDestroyed() {
+        return destroyed.get();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index d217e2d..7410404 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -33,6 +33,7 @@ import 
org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.RpcException;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
@@ -93,7 +94,11 @@ public class RemoteMetadataServiceImpl {
         if (metadataReport == null) {
             metadataReport = 
getMetadataReports().entrySet().iterator().next().getValue();
         }
-        return metadataReport.getAppMetadata(identifier, 
instance.getExtendParams());
+        Map<String, String> params = new HashMap<>(instance.getExtendParams());
+        if (instance.getRegistryCluster() != null && 
!instance.getRegistryCluster().equalsIgnoreCase(params.get(REGISTRY_CLUSTER_KEY)))
 {
+            params.put(REGISTRY_CLUSTER_KEY, instance.getRegistryCluster());
+        }
+        return metadataReport.getAppMetadata(identifier, params);
     }
 
     private void checkRemoteConfigured() {
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 2e94d3f..6befc9b 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.registry.Registry;
 import org.apache.dubbo.registry.client.migration.model.MigrationRule;
@@ -134,12 +135,16 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
     public void fallbackToInterfaceInvoker() {
         refreshInterfaceInvoker();
         setListener(invoker, () -> {
-            this.destroyServiceDiscoveryInvoker(this.serviceDiscoveryInvoker);
+            this.destroyServiceDiscoveryInvoker(this.serviceDiscoveryInvoker, 
true);
         });
     }
 
     @Override
     public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
+        if (!checkMigratingConditionMatch(consumerUrl)) {
+            fallbackToInterfaceInvoker();
+            return;
+        }
         if (!forceMigrate) {
             refreshServiceDiscoveryInvoker();
             refreshInterfaceInvoker();
@@ -152,11 +157,20 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
         } else {
             refreshServiceDiscoveryInvoker();
             setListener(serviceDiscoveryInvoker, () -> {
-                this.destroyInterfaceInvoker(this.invoker);
+                this.destroyInterfaceInvoker(this.invoker, true);
             });
         }
     }
 
+    private boolean checkMigratingConditionMatch(URL consumerUrl) {
+        Set<PreMigratingConditionChecker> checkers = 
ExtensionLoader.getExtensionLoader(PreMigratingConditionChecker.class).getSupportedExtensionInstances();
+        if (CollectionUtils.isNotEmpty(checkers)) {
+            PreMigratingConditionChecker checker = checkers.iterator().next();
+            return checker.checkCondition(consumerUrl);
+        }
+        return true;
+    }
+
     @Override
     public void refreshServiceDiscoveryInvokerOnMappingCallback(boolean 
forceMigrate) {
         if (this.serviceDiscoveryInvoker != null) {
@@ -176,10 +190,10 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
         switch (step) {
             case APPLICATION_FIRST:
                 // FIXME, check ClusterInvoker.hasProxyInvokers() or 
ClusterInvoker.isAvailable()
-                if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
-                    currentAvailableInvoker = serviceDiscoveryInvoker;
-                } else {
+                if (checkInvokerAvailable(invoker)) {
                     currentAvailableInvoker = invoker;
+                } else {
+                    currentAvailableInvoker = serviceDiscoveryInvoker;
                 }
                 break;
             case FORCE_APPLICATION:
@@ -291,27 +305,24 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
      */
     private synchronized void compareAddresses(ClusterInvoker<T> 
serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
         this.invokersChanged = true;
-        if (logger.isDebugEnabled()) {
-            logger.info("" + invoker.getDirectory().getAllInvokers().size());
-        }
-
         Set<MigrationAddressComparator> detectors = 
ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
         if (detectors != null && detectors.stream().allMatch(migrationDetector 
-> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) {
             logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " 
switch to APP Level address");
-            destroyInterfaceInvoker(invoker);
+            destroyInterfaceInvoker(invoker, false);
         } else {
             logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + " 
switch to Service Level address");
-            destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker);
+            destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker, false);
         }
     }
 
-    protected synchronized void 
destroyServiceDiscoveryInvoker(ClusterInvoker<?> serviceDiscoveryInvoker) {
+    protected synchronized void 
destroyServiceDiscoveryInvoker(ClusterInvoker<?> serviceDiscoveryInvoker, 
boolean force) {
         if (this.invoker != null) {
             this.currentAvailableInvoker = this.invoker;
+//            clearListener(this.serviceDiscoveryInvoker);
             updateConsumerModel(currentAvailableInvoker, 
serviceDiscoveryInvoker);
         }
-        if (serviceDiscoveryInvoker != null) {
-            if 
(serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) {
+        if (serviceDiscoveryInvoker != null && 
!serviceDiscoveryInvoker.isDestroyed()) {
+            if (force || 
serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) {
                 if (logger.isInfoEnabled()) {
                     logger.info("Destroying instance address invokers, will 
not listen for address changes until re-subscribed, " + type.getName());
                 }
@@ -325,12 +336,12 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
 //            this.currentAvailableInvoker = this.invoker;
 //            updateConsumerModel(currentAvailableInvoker, 
serviceDiscoveryInvoker);
 //        }
-//        if (serviceDiscoveryInvoker != null) {
+//        if (serviceDiscoveryInvoker != null && 
!serviceDiscoveryInvoker.isDestroyed()) {
 //            if (logger.isDebugEnabled()) {
 //                List<Invoker<T>> invokers = 
serviceDiscoveryInvoker.getDirectory().getAllInvokers();
 //                logger.debug("Discarding instance addresses, total size " + 
(invokers == null ? 0 : invokers.size()));
 //            }
-////            serviceDiscoveryInvoker.getDirectory().discordAddresses();
+//            serviceDiscoveryInvoker.getDirectory().discordAddresses();
 //        }
 //    }
 
@@ -356,13 +367,14 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
         }
     }
 
-    protected synchronized void destroyInterfaceInvoker(ClusterInvoker<T> 
invoker) {
+    protected synchronized void destroyInterfaceInvoker(ClusterInvoker<T> 
invoker, boolean force) {
         if (this.serviceDiscoveryInvoker != null) {
             this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
+//            clearListener(this.serviceDiscoveryInvoker);
             updateConsumerModel(currentAvailableInvoker, invoker);
         }
-        if (invoker != null) {
-            if (invoker.getDirectory().isNotificationReceived()) {
+        if (invoker != null && !invoker.isDestroyed()) {
+            if (force || invoker.getDirectory().isNotificationReceived()) {
                 if (logger.isInfoEnabled()) {
                     logger.info("Destroying interface address invokers, will 
not listen for address changes until re-subscribed, " + type.getName());
                 }
@@ -370,18 +382,22 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
             }
         }
     }
+
 //
 //    protected synchronized void 
discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) {
 //        if (this.serviceDiscoveryInvoker != null) {
 //            this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
 //            updateConsumerModel(currentAvailableInvoker, invoker);
 //        }
-//        if (invoker != null) {
+//        if (invoker != null && !invoker.isDestroyed()) {
 //            if (logger.isDebugEnabled()) {
 //                List<Invoker<T>> invokers = 
invoker.getDirectory().getAllInvokers();
 //                logger.debug("Discarding interface addresses, total address 
size " + (invokers == null ? 0 : invokers.size()));
 //            }
-//            //invoker.getDirectory().discordAddresses();
+//            invoker.getDirectory().discordAddresses();
+////            if (invokerDestroyStatus == null) {
+////                invokerDestroyStatus = executorService.schedule(new 
InvokerDestroyTask(), destroyInterval, TimeUnit.MILLISECONDS);
+////            }
 //        }
 //    }
 
@@ -398,7 +414,7 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
     }
 
     private boolean needRefresh(ClusterInvoker<T> invoker) {
-        return invoker == null || invoker.isDestroyed();
+        return invoker == null || invoker.isDestroyed() || 
!invoker.hasProxyInvokers();
     }
 
     public boolean checkInvokerAvailable(ClusterInvoker<T> invoker) {
@@ -411,9 +427,9 @@ public class MigrationInvoker<T> implements 
MigrationClusterInvoker<T> {
             if (workingInvoker != null) {
                 
consumerModel.getServiceMetadata().addAttribute("currentClusterInvoker", 
workingInvoker);
             }
-//            if (backInvoker != null) {
-//                
consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", 
backInvoker);
-//            }
+            if (backInvoker != null) {
+                
consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker", 
backInvoker);
+            }
         }
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/PreMigratingConditionChecker.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/PreMigratingConditionChecker.java
new file mode 100644
index 0000000..ccdd7e8
--- /dev/null
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/PreMigratingConditionChecker.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client.migration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI
+public interface PreMigratingConditionChecker {
+    boolean checkCondition(URL consumerUrl);
+}
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
index 3289594..9f677c8 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
@@ -164,6 +164,8 @@ public abstract class CacheableFailbackRegistry extends 
FailbackRegistry {
         List<URL> urls;
         if (CollectionUtils.isEmpty(providers)) {
             urls = new ArrayList<>(1);
+            // clear cache on empty notification: unsubscribe or provider 
offline
+            stringUrls.remove(consumer);
         } else {
             String rawProvider = providers.iterator().next();
             if (rawProvider.startsWith(OVERRIDE_PROTOCOL) || 
rawProvider.startsWith(ROUTE_PROTOCOL) || 
rawProvider.startsWith(ROUTE_SCRIPT_PROTOCOL)) {
diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
index cde24bd..6266b04 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.registry.retry.FailedUnregisteredTask;
 import org.apache.dubbo.registry.retry.FailedUnsubscribedTask;
 import org.apache.dubbo.remoting.Constants;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -364,6 +365,9 @@ public abstract class FailbackRegistry extends 
AbstractRegistry {
         try {
             // Sending a canceling subscription request to the server side
             doUnsubscribe(url, listener);
+            //FIXME, the current thread and the registry event notification 
thread may have concurrency issue, but when the unsubscribe occurs, we don't 
care much about the accuracy of the address list.
+            // The notify here is to try its best to clean up the invalid 
address cache when the unsubscribe action occurs
+            this.notify(url, listener, Collections.emptyList());
         } catch (Exception e) {
             Throwable t = e;
 

Reply via email to