This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new d27fb1f Merge pull request #3295, unregister consumer url when client
destroyed (referenceconfig#destroy).
d27fb1f is described below
commit d27fb1f5aa46706a52f63b336932dc24ab95b82f
Author: yì jí <[email protected]>
AuthorDate: Wed Jan 23 12:44:37 2019 +0800
Merge pull request #3295, unregister consumer url when client destroyed
(referenceconfig#destroy).
* fix client reconnect offline provider.
* refactor cancel future.
* fix client reconnect offline provider.
* refactor cancel future.
* fix client reconnect offline provider.
* refactor cancel future.
* fix unregister when client destroyed
---
.../registry/integration/RegistryDirectory.java | 21 +++++++++++++-
.../registry/integration/RegistryProtocol.java | 5 ++--
.../dubbo/remoting/transport/AbstractClient.java | 32 +++++++++++++++++++---
3 files changed, 51 insertions(+), 7 deletions(-)
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 41b55b2..8ec3bcb 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -23,10 +23,10 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
-import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
@@ -90,6 +90,8 @@ public class RegistryDirectory<T> extends
AbstractDirectory<T> implements Notify
private volatile URL overrideDirectoryUrl; // Initialization at
construction time, assertion not null, and always assign non null value
+ private volatile URL registeredConsumerUrl;
+
/**
* override rules
* Priority: override>-D>consumer>provider
@@ -158,6 +160,15 @@ public class RegistryDirectory<T> extends
AbstractDirectory<T> implements Notify
if (isDestroyed()) {
return;
}
+
+ // unregister.
+ try {
+ if (getRegisteredConsumerUrl() != null && registry != null &&
registry.isAvailable()) {
+ registry.unregister(getRegisteredConsumerUrl());
+ }
+ } catch (Throwable t) {
+ logger.warn("unexpected error when unregister service " +
serviceKey + "from registry" + registry.getUrl(), t);
+ }
// unsubscribe.
try {
if (getConsumerUrl() != null && registry != null &&
registry.isAvailable()) {
@@ -565,6 +576,14 @@ public class RegistryDirectory<T> extends
AbstractDirectory<T> implements Notify
return this.overrideDirectoryUrl;
}
+ public URL getRegisteredConsumerUrl() {
+ return registeredConsumerUrl;
+ }
+
+ public void setRegisteredConsumerUrl(URL registeredConsumerUrl) {
+ this.registeredConsumerUrl = registeredConsumerUrl;
+ }
+
@Override
public boolean isAvailable() {
if (isDestroyed()) {
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index a89e11c..5343e183 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -372,7 +372,8 @@ public class RegistryProtocol implements Protocol {
Map<String, String> parameters = new HashMap<String,
String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL,
parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) &&
url.getParameter(REGISTER_KEY, true)) {
- registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
+
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
+ registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
@@ -383,7 +384,7 @@ public class RegistryProtocol implements Protocol {
return invoker;
}
- private URL getRegisteredConsumerUrl(final URL consumerUrl, URL
registryUrl) {
+ public URL getRegisteredConsumerUrl(final URL consumerUrl, URL
registryUrl) {
if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
return consumerUrl.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY,
CHECK_KEY, String.valueOf(false));
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 9b1c9ba..7280b50 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -153,6 +153,8 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
@Override
public void run() {
try {
+ if (cancelFutureIfOffline()) return;
+
if (!isConnected()) {
connect();
} else {
@@ -173,7 +175,29 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
}
}
}
+
+ private boolean cancelFutureIfOffline() {
+ /**
+ * If the provider service is detected offline,
+ * the client should not attempt to connect again.
+ *
+ * issue:
https://github.com/apache/incubator-dubbo/issues/3158
+ */
+ if(isClosed()) {
+ ScheduledFuture<?> future = reconnectExecutorFuture;
+ if(future != null && !future.isCancelled()){
+ /**
+ * Client has been destroyed and
+ * scheduled task should be cancelled.
+ */
+ future.cancel(true);
+ }
+ return true;
+ }
+ return false;
+ }
};
+
reconnectExecutorFuture =
reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand,
reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
@@ -345,14 +369,14 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
@Override
public void close() {
try {
- if (executor != null) {
- ExecutorUtil.shutdownNow(executor, 100);
- }
+ super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
- super.close();
+ if (executor != null) {
+ ExecutorUtil.shutdownNow(executor, 100);
+ }
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}