This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 22f5310 [3.0] Fix un-subscription memory leak and some enhancements
to migration approach. (#7476)
22f5310 is described below
commit 22f531008499884b2a1246083ca9fe3c3b132a45
Author: ken.lj <[email protected]>
AuthorDate: Tue Mar 30 14:04:20 2021 +0800
[3.0] Fix un-subscription memory leak and some enhancements to migration
approach. (#7476)
---
.../dubbo/config/bootstrap/DubboBootstrap.java | 12 +++-
dubbo-distribution/dubbo-all/pom.xml | 7 ++-
.../client/ServiceDiscoveryRegistryDirectory.java | 14 ++++-
.../listener/ServiceInstancesChangedListener.java | 64 +++++++++++++--------
.../metadata/store/RemoteMetadataServiceImpl.java | 7 ++-
.../client/migration/MigrationInvoker.java | 66 ++++++++++++++--------
.../migration/PreMigratingConditionChecker.java | 25 ++++++++
.../support/CacheableFailbackRegistry.java | 2 +
.../dubbo/registry/support/FailbackRegistry.java | 4 ++
9 files changed, 147 insertions(+), 54 deletions(-)
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index 0ef8ef4..656b994 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -1159,13 +1159,21 @@ public class DubboBootstrap extends
GenericEventListener {
ServiceInstance serviceInstance = createServiceInstance(serviceName);
- doRegisterServiceInstance(serviceInstance);
+ try {
+ doRegisterServiceInstance(serviceInstance);
+ } catch (Exception e) {
+ logger.error("Register instance error", e);
+ }
// scheduled task for updating Metadata and ServiceInstance
executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
InMemoryWritableMetadataService localMetadataService =
(InMemoryWritableMetadataService) WritableMetadataService.getDefaultExtension();
localMetadataService.blockUntilUpdated();
-
ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
+ try {
+
ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
+ } catch (Exception e) {
+ logger.error("Refresh instance and metadata error", e);
+ }
}, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY,
DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS);
}
diff --git a/dubbo-distribution/dubbo-all/pom.xml
b/dubbo-distribution/dubbo-all/pom.xml
index b464e41..519fbc7 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -737,7 +737,12 @@
META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
</resource>
</transformer>
-
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+ <resource>
+
META-INF/dubbo/internal/org.apache.dubbo.registry.client.migration.PreMigratingConditionChecker
+ </resource>
+ </transformer>
</transformers>
<filters>
<filter>
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 6232097..f1af5bd 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.NotifyListener;
-import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.integration.DynamicDirectory;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
@@ -51,8 +50,6 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> im
// instance address to invoker mapping.
private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial
value is null and the midway may be assigned to null, please use the local
variable reference
- private ServiceInstancesChangedListener listener;
-
public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
super(serviceType, url);
}
@@ -102,6 +99,17 @@ public class ServiceDiscoveryRegistryDirectory<T> extends
DynamicDirectory<T> im
return true;
}
+ /**
+ * This implementation wants to make sure all application names related to
serviceListener received address notification.
+ *
+ * FIXME, make sure deprecated "interface-application" mapping item be
cleared in time.
+ */
+ @Override
+ public boolean isNotificationReceived() {
+ return serviceListener.isDestroyed()
+ || serviceListener.getAllInstances().size() ==
serviceListener.getServiceNames().size();
+ }
+
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty
url list to clear address.");
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index dd0a0cf..62fb08f 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -48,8 +48,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
@@ -70,19 +72,18 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
protected final ServiceDiscovery serviceDiscovery;
protected URL url;
protected Map<String, NotifyListener> listeners;
+ protected AtomicBoolean destroyed = new AtomicBoolean(false);
- private Map<String, List<ServiceInstance>> allInstances;
-
- private Map<String, Object> serviceUrls;
-
- private Map<String, MetadataInfo> revisionToMetadata;
+ protected Map<String, List<ServiceInstance>> allInstances;
+ protected Map<String, Object> serviceUrls;
+ protected Map<String, MetadataInfo> revisionToMetadata;
private volatile long lastRefreshTime;
private volatile long lastFailureTime;
private volatile AtomicInteger failureCounter = new AtomicInteger(0);
private Semaphore retryPermission;
-
- private ScheduledExecutorService scheduler;
+ private volatile ScheduledFuture<?> retryFuture;
+ private static ScheduledExecutorService scheduler =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getMetadataRetryExecutor();
public ServiceInstancesChangedListener(Set<String> serviceNames,
ServiceDiscovery serviceDiscovery) {
this.serviceNames = serviceNames;
@@ -92,7 +93,6 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
this.serviceUrls = new HashMap<>();
this.revisionToMetadata = new HashMap<>();
retryPermission = new Semaphore(1);
- this.scheduler =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getMetadataRetryExecutor();
}
/**
@@ -134,11 +134,11 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
}
}
- logger.info(newRevisionToMetadata.size() + " unique revisions. ");
+ logger.info(newRevisionToMetadata.size() + " unique revisions: " +
newRevisionToMetadata.keySet());
if (hasEmptyMetadata(newRevisionToMetadata)) {// retry every 10 seconds
if (retryPermission.tryAcquire()) {
- scheduler.schedule(new
AddressRefreshRetryTask(retryPermission), 10000, TimeUnit.MILLISECONDS);
+ retryFuture = scheduler.schedule(new
AddressRefreshRetryTask(retryPermission), 10000, TimeUnit.MILLISECONDS);
logger.warn("Address refresh try task submitted.");
}
logger.warn("Address refresh failed because of Metadata Server
failure, wait for retry or new address refresh event.");
@@ -169,7 +169,7 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
public synchronized void addListenerAndNotify(String serviceKey,
NotifyListener listener) {
this.listeners.put(serviceKey, listener);
- List<URL> urls = getAddresses(serviceKey);
+ List<URL> urls = getAddresses(serviceKey, listener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(urls)) {
listener.notify(urls);
}
@@ -177,15 +177,13 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
public void removeListener(String serviceKey) {
listeners.remove(serviceKey);
+ logger.info("Interface listener of interface " + serviceKey + "
removed.");
if (listeners.isEmpty()) {
+ logger.info("No interface listeners exist, will stop instance
listener for " + this.getServiceNames());
serviceDiscovery.removeServiceInstancesChangedListener(this);
}
}
- public List<URL> getUrls(String serviceKey) {
- return toUrlsWithEmpty(getAddresses(serviceKey));
- }
-
/**
* Get the correlative service name
*
@@ -263,18 +261,22 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String
revision, Map<ServiceInfo, Set<String>> localServiceToRevisions,
List<ServiceInstance> subInstances) {
MetadataInfo metadata = revisionToMetadata.get(revision);
+
+ if (metadata != null && metadata != MetadataInfo.EMPTY) {
+ logger.info("MetadataInfo for instance " + instance.getAddress() +
"?revision=" + revision + "&cluster=" + instance.getRegistryCluster() + ", " +
metadata);
+ }
+
if (metadata == null
|| (metadata == MetadataInfo.EMPTY && (failureCounter.get() <
3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) {
metadata = getMetadataInfo(instance);
if (metadata != MetadataInfo.EMPTY) {
- logger.info("MetadataInfo for instance " +
instance.getAddress() + "?revision=" + revision + " is " + metadata);
failureCounter.set(0);
revisionToMetadata.putIfAbsent(revision, metadata);
parseMetadata(revision, metadata, localServiceToRevisions);
} else {
logger.error("Failed to get MetadataInfo for instance " +
instance.getAddress() + "?revision=" + revision
- + ", wait for retry.");
+ + "&cluster=" + instance.getRegistryCluster() + ",
wait for retry.");
lastFailureTime = System.currentTimeMillis();
failureCounter.incrementAndGet();
}
@@ -313,9 +315,7 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
MetadataService metadataServiceProxy =
MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery);
metadataInfo =
metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
}
- if (logger.isDebugEnabled()) {
- logger.info("Metadata " + metadataInfo.toString());
- }
+ logger.info("Metadata " + metadataInfo);
} catch (Exception e) {
logger.error("Failed to load service metadata, meta type is " +
metadataType, e);
metadataInfo = null;
@@ -346,14 +346,14 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
return urls;
}
- protected List<URL> getAddresses(String serviceProtocolKey) {
+ protected List<URL> getAddresses(String serviceProtocolKey, URL
consumerURL) {
return (List<URL>) serviceUrls.get(serviceProtocolKey);
}
protected void notifyAddressChanged() {
listeners.forEach((key, notifyListener) -> {
//FIXME, group wildcard match
- List<URL> urls = toUrlsWithEmpty(getAddresses(key));
+ List<URL> urls = toUrlsWithEmpty(getAddresses(key,
notifyListener.getConsumerUrl()));
logger.info("Notify service " + key + " with urls " + urls.size());
notifyListener.notify(urls);
});
@@ -366,6 +366,26 @@ public class ServiceInstancesChangedListener implements
ConditionalEventListener
return urls;
}
+ /**
+ * Since this listener is shared among interfaces, destroy this listener
only when all interface listener are unsubscribed
+ */
+ public void destroy() {
+ if (destroyed.compareAndSet(false, true)) {
+ if (CollectionUtils.isEmptyMap(listeners)) {
+ allInstances.clear();
+ serviceUrls.clear();
+ revisionToMetadata.clear();
+ if (retryFuture != null && !retryFuture.isDone()) {
+ retryFuture.cancel(true);
+ }
+ }
+ }
+ }
+
+ public boolean isDestroyed() {
+ return destroyed.get();
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index d217e2d..7410404 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -33,6 +33,7 @@ import
org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.RpcException;
+import java.util.HashMap;
import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
@@ -93,7 +94,11 @@ public class RemoteMetadataServiceImpl {
if (metadataReport == null) {
metadataReport =
getMetadataReports().entrySet().iterator().next().getValue();
}
- return metadataReport.getAppMetadata(identifier,
instance.getExtendParams());
+ Map<String, String> params = new HashMap<>(instance.getExtendParams());
+ if (instance.getRegistryCluster() != null &&
!instance.getRegistryCluster().equalsIgnoreCase(params.get(REGISTRY_CLUSTER_KEY)))
{
+ params.put(REGISTRY_CLUSTER_KEY, instance.getRegistryCluster());
+ }
+ return metadataReport.getAppMetadata(identifier, params);
}
private void checkRemoteConfigured() {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
index 2e94d3f..6befc9b 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationInvoker.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.client.migration.model.MigrationRule;
@@ -134,12 +135,16 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
public void fallbackToInterfaceInvoker() {
refreshInterfaceInvoker();
setListener(invoker, () -> {
- this.destroyServiceDiscoveryInvoker(this.serviceDiscoveryInvoker);
+ this.destroyServiceDiscoveryInvoker(this.serviceDiscoveryInvoker,
true);
});
}
@Override
public void migrateToServiceDiscoveryInvoker(boolean forceMigrate) {
+ if (!checkMigratingConditionMatch(consumerUrl)) {
+ fallbackToInterfaceInvoker();
+ return;
+ }
if (!forceMigrate) {
refreshServiceDiscoveryInvoker();
refreshInterfaceInvoker();
@@ -152,11 +157,20 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
} else {
refreshServiceDiscoveryInvoker();
setListener(serviceDiscoveryInvoker, () -> {
- this.destroyInterfaceInvoker(this.invoker);
+ this.destroyInterfaceInvoker(this.invoker, true);
});
}
}
+ private boolean checkMigratingConditionMatch(URL consumerUrl) {
+ Set<PreMigratingConditionChecker> checkers =
ExtensionLoader.getExtensionLoader(PreMigratingConditionChecker.class).getSupportedExtensionInstances();
+ if (CollectionUtils.isNotEmpty(checkers)) {
+ PreMigratingConditionChecker checker = checkers.iterator().next();
+ return checker.checkCondition(consumerUrl);
+ }
+ return true;
+ }
+
@Override
public void refreshServiceDiscoveryInvokerOnMappingCallback(boolean
forceMigrate) {
if (this.serviceDiscoveryInvoker != null) {
@@ -176,10 +190,10 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
switch (step) {
case APPLICATION_FIRST:
// FIXME, check ClusterInvoker.hasProxyInvokers() or
ClusterInvoker.isAvailable()
- if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
- currentAvailableInvoker = serviceDiscoveryInvoker;
- } else {
+ if (checkInvokerAvailable(invoker)) {
currentAvailableInvoker = invoker;
+ } else {
+ currentAvailableInvoker = serviceDiscoveryInvoker;
}
break;
case FORCE_APPLICATION:
@@ -291,27 +305,24 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
*/
private synchronized void compareAddresses(ClusterInvoker<T>
serviceDiscoveryInvoker, ClusterInvoker<T> invoker) {
this.invokersChanged = true;
- if (logger.isDebugEnabled()) {
- logger.info("" + invoker.getDirectory().getAllInvokers().size());
- }
-
Set<MigrationAddressComparator> detectors =
ExtensionLoader.getExtensionLoader(MigrationAddressComparator.class).getSupportedExtensionInstances();
if (detectors != null && detectors.stream().allMatch(migrationDetector
-> migrationDetector.shouldMigrate(serviceDiscoveryInvoker, invoker, rule))) {
logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + "
switch to APP Level address");
- destroyInterfaceInvoker(invoker);
+ destroyInterfaceInvoker(invoker, false);
} else {
logger.info("serviceKey:" + invoker.getUrl().getServiceKey() + "
switch to Service Level address");
- destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker);
+ destroyServiceDiscoveryInvoker(serviceDiscoveryInvoker, false);
}
}
- protected synchronized void
destroyServiceDiscoveryInvoker(ClusterInvoker<?> serviceDiscoveryInvoker) {
+ protected synchronized void
destroyServiceDiscoveryInvoker(ClusterInvoker<?> serviceDiscoveryInvoker,
boolean force) {
if (this.invoker != null) {
this.currentAvailableInvoker = this.invoker;
+// clearListener(this.serviceDiscoveryInvoker);
updateConsumerModel(currentAvailableInvoker,
serviceDiscoveryInvoker);
}
- if (serviceDiscoveryInvoker != null) {
- if
(serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) {
+ if (serviceDiscoveryInvoker != null &&
!serviceDiscoveryInvoker.isDestroyed()) {
+ if (force ||
serviceDiscoveryInvoker.getDirectory().isNotificationReceived()) {
if (logger.isInfoEnabled()) {
logger.info("Destroying instance address invokers, will
not listen for address changes until re-subscribed, " + type.getName());
}
@@ -325,12 +336,12 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
// this.currentAvailableInvoker = this.invoker;
// updateConsumerModel(currentAvailableInvoker,
serviceDiscoveryInvoker);
// }
-// if (serviceDiscoveryInvoker != null) {
+// if (serviceDiscoveryInvoker != null &&
!serviceDiscoveryInvoker.isDestroyed()) {
// if (logger.isDebugEnabled()) {
// List<Invoker<T>> invokers =
serviceDiscoveryInvoker.getDirectory().getAllInvokers();
// logger.debug("Discarding instance addresses, total size " +
(invokers == null ? 0 : invokers.size()));
// }
-//// serviceDiscoveryInvoker.getDirectory().discordAddresses();
+// serviceDiscoveryInvoker.getDirectory().discordAddresses();
// }
// }
@@ -356,13 +367,14 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
}
}
- protected synchronized void destroyInterfaceInvoker(ClusterInvoker<T>
invoker) {
+ protected synchronized void destroyInterfaceInvoker(ClusterInvoker<T>
invoker, boolean force) {
if (this.serviceDiscoveryInvoker != null) {
this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
+// clearListener(this.serviceDiscoveryInvoker);
updateConsumerModel(currentAvailableInvoker, invoker);
}
- if (invoker != null) {
- if (invoker.getDirectory().isNotificationReceived()) {
+ if (invoker != null && !invoker.isDestroyed()) {
+ if (force || invoker.getDirectory().isNotificationReceived()) {
if (logger.isInfoEnabled()) {
logger.info("Destroying interface address invokers, will
not listen for address changes until re-subscribed, " + type.getName());
}
@@ -370,18 +382,22 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
}
}
}
+
//
// protected synchronized void
discardInterfaceInvokerAddress(ClusterInvoker<T> invoker) {
// if (this.serviceDiscoveryInvoker != null) {
// this.currentAvailableInvoker = this.serviceDiscoveryInvoker;
// updateConsumerModel(currentAvailableInvoker, invoker);
// }
-// if (invoker != null) {
+// if (invoker != null && !invoker.isDestroyed()) {
// if (logger.isDebugEnabled()) {
// List<Invoker<T>> invokers =
invoker.getDirectory().getAllInvokers();
// logger.debug("Discarding interface addresses, total address
size " + (invokers == null ? 0 : invokers.size()));
// }
-// //invoker.getDirectory().discordAddresses();
+// invoker.getDirectory().discordAddresses();
+//// if (invokerDestroyStatus == null) {
+//// invokerDestroyStatus = executorService.schedule(new
InvokerDestroyTask(), destroyInterval, TimeUnit.MILLISECONDS);
+//// }
// }
// }
@@ -398,7 +414,7 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
}
private boolean needRefresh(ClusterInvoker<T> invoker) {
- return invoker == null || invoker.isDestroyed();
+ return invoker == null || invoker.isDestroyed() ||
!invoker.hasProxyInvokers();
}
public boolean checkInvokerAvailable(ClusterInvoker<T> invoker) {
@@ -411,9 +427,9 @@ public class MigrationInvoker<T> implements
MigrationClusterInvoker<T> {
if (workingInvoker != null) {
consumerModel.getServiceMetadata().addAttribute("currentClusterInvoker",
workingInvoker);
}
-// if (backInvoker != null) {
-//
consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker",
backInvoker);
-// }
+ if (backInvoker != null) {
+
consumerModel.getServiceMetadata().addAttribute("backupClusterInvoker",
backInvoker);
+ }
}
}
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/PreMigratingConditionChecker.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/PreMigratingConditionChecker.java
new file mode 100644
index 0000000..ccdd7e8
--- /dev/null
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/PreMigratingConditionChecker.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client.migration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI
+public interface PreMigratingConditionChecker {
+ boolean checkCondition(URL consumerUrl);
+}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
index 3289594..9f677c8 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/CacheableFailbackRegistry.java
@@ -164,6 +164,8 @@ public abstract class CacheableFailbackRegistry extends
FailbackRegistry {
List<URL> urls;
if (CollectionUtils.isEmpty(providers)) {
urls = new ArrayList<>(1);
+ // clear cache on empty notification: unsubscribe or provider
offline
+ stringUrls.remove(consumer);
} else {
String rawProvider = providers.iterator().next();
if (rawProvider.startsWith(OVERRIDE_PROTOCOL) ||
rawProvider.startsWith(ROUTE_PROTOCOL) ||
rawProvider.startsWith(ROUTE_SCRIPT_PROTOCOL)) {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
index cde24bd..6266b04 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/FailbackRegistry.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.registry.retry.FailedUnregisteredTask;
import org.apache.dubbo.registry.retry.FailedUnsubscribedTask;
import org.apache.dubbo.remoting.Constants;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -364,6 +365,9 @@ public abstract class FailbackRegistry extends
AbstractRegistry {
try {
// Sending a canceling subscription request to the server side
doUnsubscribe(url, listener);
+ //FIXME, the current thread and the registry event notification
thread may have concurrency issue, but when the unsubscribe occurs, we don't
care much about the accuracy of the address list.
+ // The notify here is to try its best to clean up the invalid
address cache when the unsubscribe action occurs
+ this.notify(url, listener, Collections.emptyList());
} catch (Exception e) {
Throwable t = e;