This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new e1cfc037eb Fix triple client connection shareing race condition
(#14718)
e1cfc037eb is described below
commit e1cfc037ebdc38f421e0bc2abde56b0c0abeabd1
Author: Ken Liu <[email protected]>
AuthorDate: Fri Sep 27 17:30:35 2024 +0800
Fix triple client connection shareing race condition (#14718)
* fix triple client connection management issue, #14717, #14716
* fix triple client connection management issue, #14717, #14716
* add comment
* polish warning log format
* polish warning log format
* Make stub implement Destroyable
* format code
---
.../src/main/resources/Dubbo3TripleStub.mustache | 8 ++++-
.../api/connection/AbstractConnectionClient.java | 11 +++++--
.../SingleProtocolConnectionManager.java | 34 +++++++++++++++++-----
3 files changed, 41 insertions(+), 12 deletions(-)
diff --git
a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
index 525eb8d634..6f19d17fb1 100644
--- a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
@@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.model.StubServiceDescriptor;
+import org.apache.dubbo.rpc.service.Destroyable;
import org.apache.dubbo.rpc.stub.BiStreamMethodHandler;
import org.apache.dubbo.rpc.stub.ServerStreamMethodHandler;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
@@ -130,13 +131,18 @@ public final class {{className}} {
{{/biStreamingWithoutClientStreamMethods}}
}
- public static class {{interfaceClassName}}Stub implements
{{interfaceClassName}}{
+ public static class {{interfaceClassName}}Stub implements
{{interfaceClassName}}, Destroyable {
private final Invoker<{{interfaceClassName}}> invoker;
public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}>
invoker) {
this.invoker = invoker;
}
+ @Override
+ public void $destroy() {
+ invoker.destroy();
+ }
+
{{#unaryMethods}}
{{#javaDoc}}
{{{javaDoc}}}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
index 80aa710a28..2fea60ce9d 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
@@ -61,13 +61,17 @@ public abstract class AbstractConnectionClient extends
AbstractClient {
/**
* Increments the reference count by 1.
*/
- public final AbstractConnectionClient retain() {
+ public final boolean retain() {
long oldCount = COUNTER_UPDATER.getAndIncrement(this);
if (oldCount <= 0) {
COUNTER_UPDATER.getAndDecrement(this);
- throw new AssertionError("This instance has been destroyed");
+ logger.info(
+ "Retain failed, because connection " + remote
+ + " has been destroyed but not yet removed, will
create a new one instead."
+ + " Check logs below to confirm that this
connection finally gets removed to make sure there's no potential memory
leak!");
+ return false;
}
- return this;
+ return true;
}
/**
@@ -77,6 +81,7 @@ public abstract class AbstractConnectionClient extends
AbstractClient {
long remainingCount = COUNTER_UPDATER.decrementAndGet(this);
if (remainingCount == 0) {
+ logger.info("Destroying connection to {}, because the reference
count reaches 0", remote);
destroy();
return true;
} else if (remainingCount <= -1) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
index 3f84069671..10f002df61 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
@@ -17,6 +17,8 @@
package org.apache.dubbo.remoting.api.connection;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -26,6 +28,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
public class SingleProtocolConnectionManager implements ConnectionManager {
+ private static final ErrorTypeAwareLogger logger =
+
LoggerFactory.getErrorTypeAwareLogger(SingleProtocolConnectionManager.class);
+
public static final String NAME = "single";
private final ConcurrentMap<String, AbstractConnectionClient> connections
= new ConcurrentHashMap<>(16);
@@ -42,21 +47,34 @@ public class SingleProtocolConnectionManager implements
ConnectionManager {
throw new IllegalArgumentException("url == null");
}
return connections.compute(url.getAddress(), (address, conn) -> {
+ String transport = url.getParameter(Constants.TRANSPORTER_KEY,
"netty4");
if (conn == null) {
- String transport = url.getParameter(Constants.TRANSPORTER_KEY,
"netty4");
- ConnectionManager manager = frameworkModel
- .getExtensionLoader(ConnectionManager.class)
- .getExtension(transport);
- final AbstractConnectionClient connectionClient =
manager.connect(url, handler);
- connectionClient.addCloseListener(() ->
connections.remove(address, connectionClient));
- return connectionClient;
+ return createAbstractConnectionClient(url, handler, address,
transport);
} else {
- conn.retain();
+ boolean shouldReuse = conn.retain();
+ if (!shouldReuse) {
+ logger.info("Trying to create a new connection for {}.",
address);
+ return createAbstractConnectionClient(url, handler,
address, transport);
+ }
return conn;
}
});
}
+ private AbstractConnectionClient createAbstractConnectionClient(
+ URL url, ChannelHandler handler, String address, String transport)
{
+ ConnectionManager manager =
+
frameworkModel.getExtensionLoader(ConnectionManager.class).getExtension(transport);
+ final AbstractConnectionClient connectionClient = manager.connect(url,
handler);
+ connectionClient.addCloseListener(() -> {
+ logger.info(
+ "Remove closed connection (with reference count==0) for
address {}, a new one will be created for upcoming RPC requests routing to this
address.",
+ address);
+ connections.remove(address, connectionClient);
+ });
+ return connectionClient;
+ }
+
@Override
public void forEachConnection(Consumer<AbstractConnectionClient>
connectionConsumer) {
connections.values().forEach(connectionConsumer);