This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new e1cfc037eb Fix triple client connection shareing race condition 
(#14718)
e1cfc037eb is described below

commit e1cfc037ebdc38f421e0bc2abde56b0c0abeabd1
Author: Ken Liu <[email protected]>
AuthorDate: Fri Sep 27 17:30:35 2024 +0800

    Fix triple client connection shareing race condition (#14718)
    
    * fix triple client connection management issue, #14717, #14716
    
    * fix triple client connection management issue, #14717, #14716
    
    * add comment
    
    * polish warning log format
    
    * polish warning log format
    
    * Make stub implement Destroyable
    
    * format code
---
 .../src/main/resources/Dubbo3TripleStub.mustache   |  8 ++++-
 .../api/connection/AbstractConnectionClient.java   | 11 +++++--
 .../SingleProtocolConnectionManager.java           | 34 +++++++++++++++++-----
 3 files changed, 41 insertions(+), 12 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache 
b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
index 525eb8d634..6f19d17fb1 100644
--- a/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
+++ b/dubbo-plugin/dubbo-compiler/src/main/resources/Dubbo3TripleStub.mustache
@@ -30,6 +30,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
 import org.apache.dubbo.rpc.model.ServiceDescriptor;
 import org.apache.dubbo.rpc.model.StubMethodDescriptor;
 import org.apache.dubbo.rpc.model.StubServiceDescriptor;
+import org.apache.dubbo.rpc.service.Destroyable;
 import org.apache.dubbo.rpc.stub.BiStreamMethodHandler;
 import org.apache.dubbo.rpc.stub.ServerStreamMethodHandler;
 import org.apache.dubbo.rpc.stub.StubInvocationUtil;
@@ -130,13 +131,18 @@ public final class {{className}} {
     {{/biStreamingWithoutClientStreamMethods}}
     }
 
-    public static class {{interfaceClassName}}Stub implements 
{{interfaceClassName}}{
+    public static class {{interfaceClassName}}Stub implements 
{{interfaceClassName}}, Destroyable {
         private final Invoker<{{interfaceClassName}}> invoker;
 
         public {{interfaceClassName}}Stub(Invoker<{{interfaceClassName}}> 
invoker) {
             this.invoker = invoker;
         }
 
+        @Override
+        public void $destroy() {
+              invoker.destroy();
+         }
+
     {{#unaryMethods}}
         {{#javaDoc}}
         {{{javaDoc}}}
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
index 80aa710a28..2fea60ce9d 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
@@ -61,13 +61,17 @@ public abstract class AbstractConnectionClient extends 
AbstractClient {
     /**
      * Increments the reference count by 1.
      */
-    public final AbstractConnectionClient retain() {
+    public final boolean retain() {
         long oldCount = COUNTER_UPDATER.getAndIncrement(this);
         if (oldCount <= 0) {
             COUNTER_UPDATER.getAndDecrement(this);
-            throw new AssertionError("This instance has been destroyed");
+            logger.info(
+                    "Retain failed, because connection " + remote
+                            + " has been destroyed but not yet removed, will 
create a new one instead."
+                            + " Check logs below to confirm that this 
connection finally gets removed to make sure there's no potential memory 
leak!");
+            return false;
         }
-        return this;
+        return true;
     }
 
     /**
@@ -77,6 +81,7 @@ public abstract class AbstractConnectionClient extends 
AbstractClient {
         long remainingCount = COUNTER_UPDATER.decrementAndGet(this);
 
         if (remainingCount == 0) {
+            logger.info("Destroying connection to {}, because the reference 
count reaches 0", remote);
             destroy();
             return true;
         } else if (remainingCount <= -1) {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
index 3f84069671..10f002df61 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/SingleProtocolConnectionManager.java
@@ -17,6 +17,8 @@
 package org.apache.dubbo.remoting.api.connection;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -26,6 +28,9 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.function.Consumer;
 
 public class SingleProtocolConnectionManager implements ConnectionManager {
+    private static final ErrorTypeAwareLogger logger =
+            
LoggerFactory.getErrorTypeAwareLogger(SingleProtocolConnectionManager.class);
+
     public static final String NAME = "single";
 
     private final ConcurrentMap<String, AbstractConnectionClient> connections 
= new ConcurrentHashMap<>(16);
@@ -42,21 +47,34 @@ public class SingleProtocolConnectionManager implements 
ConnectionManager {
             throw new IllegalArgumentException("url == null");
         }
         return connections.compute(url.getAddress(), (address, conn) -> {
+            String transport = url.getParameter(Constants.TRANSPORTER_KEY, 
"netty4");
             if (conn == null) {
-                String transport = url.getParameter(Constants.TRANSPORTER_KEY, 
"netty4");
-                ConnectionManager manager = frameworkModel
-                        .getExtensionLoader(ConnectionManager.class)
-                        .getExtension(transport);
-                final AbstractConnectionClient connectionClient = 
manager.connect(url, handler);
-                connectionClient.addCloseListener(() -> 
connections.remove(address, connectionClient));
-                return connectionClient;
+                return createAbstractConnectionClient(url, handler, address, 
transport);
             } else {
-                conn.retain();
+                boolean shouldReuse = conn.retain();
+                if (!shouldReuse) {
+                    logger.info("Trying to create a new connection for {}.", 
address);
+                    return createAbstractConnectionClient(url, handler, 
address, transport);
+                }
                 return conn;
             }
         });
     }
 
+    private AbstractConnectionClient createAbstractConnectionClient(
+            URL url, ChannelHandler handler, String address, String transport) 
{
+        ConnectionManager manager =
+                
frameworkModel.getExtensionLoader(ConnectionManager.class).getExtension(transport);
+        final AbstractConnectionClient connectionClient = manager.connect(url, 
handler);
+        connectionClient.addCloseListener(() -> {
+            logger.info(
+                    "Remove closed connection (with reference count==0) for 
address {}, a new one will be created for upcoming RPC requests routing to this 
address.",
+                    address);
+            connections.remove(address, connectionClient);
+        });
+        return connectionClient;
+    }
+
     @Override
     public void forEachConnection(Consumer<AbstractConnectionClient> 
connectionConsumer) {
         connections.values().forEach(connectionConsumer);

Reply via email to