This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new f03823eec1 Use ExecutorService instead of event loop for Netty 
connection (#13904)
f03823eec1 is described below

commit f03823eec1610ab40fa894e279614703617ab29b
Author: hanpen24 <[email protected]>
AuthorDate: Sun Apr 7 11:31:23 2024 +0900

    Use ExecutorService instead of event loop for Netty connection (#13904)
    
    * Use ExecutorService instead of event loop for Netty connection
    
    This change addresses the issue of the event loop being blocked for an 
extended period, improving overall performance and responsiveness.
    
    * fix log getConnectAddress
    
    * applay format
    
    * applay format
    
    * applay format
    
    * applay format
    
    * applay format
    
    * Use an independent ExecutorService
    
    * Use ScheduledExecutorService for scheduling tasks
    
    * delete unnecessary files
    
    * Modify to stop ExecutorService using shutdownNow
    
    * Modify to use ScheduledExecutor managed by FrameworkExecutorRepository
    
    * Synchronize reconnectDuration with HeaderExchangeClient's 
reconnectDuration
    
    * get framework model directly
    
    use awaitility
---
 .../dubbo/remoting/transport/AbstractClient.java   | 42 ++++++++++++++++++++++
 .../transport/netty4/NettyConnectionClient.java    | 21 ++++++++---
 .../remoting/transport/netty4/ConnectionTest.java  |  3 ++
 3 files changed, 61 insertions(+), 5 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 7b465c6649..7be50fff2e 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
@@ -28,9 +29,11 @@ import org.apache.dubbo.remoting.Client;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+import org.apache.dubbo.rpc.model.FrameworkModel;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -41,6 +44,11 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
 import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
+import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
+import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION_KEY;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
 
 /**
  * AbstractClient
@@ -55,13 +63,23 @@ public abstract class AbstractClient extends 
AbstractEndpoint implements Client
 
     protected volatile ExecutorService executor;
 
+    protected volatile ScheduledExecutorService connectivityExecutor;
+
+    private FrameworkModel frameworkModel;
+
+    protected long reconnectDuaration;
+
     public AbstractClient(URL url, ChannelHandler handler) throws 
RemotingException {
         super(url, handler);
         // set default needReconnect true when channel is not connected
         needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
 
+        frameworkModel = url.getOrDefaultFrameworkModel();
+
         initExecutor(url);
 
+        reconnectDuaration = getReconnectDuration(url);
+
         try {
             doOpen();
         } catch (Throwable t) {
@@ -134,6 +152,11 @@ public abstract class AbstractClient extends 
AbstractEndpoint implements Client
         url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
                 .addParameterIfAbsent(THREADPOOL_KEY, 
DEFAULT_CLIENT_THREADPOOL);
         executor = executorRepository.createExecutorIfAbsent(url);
+
+        connectivityExecutor = frameworkModel
+                .getBeanFactory()
+                .getBean(FrameworkExecutorRepository.class)
+                .getConnectivityScheduledExecutor();
     }
 
     protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler 
handler) {
@@ -296,6 +319,25 @@ public abstract class AbstractClient extends 
AbstractEndpoint implements Client
         }
     }
 
+    private long getReconnectDuration(URL url) {
+        int idleTimeout = getIdleTimeout(url);
+        long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
+        return calculateReconnectDuration(url, heartbeatTimeoutTick);
+    }
+
+    private long calculateLeastDuration(int time) {
+        if (time / HEARTBEAT_CHECK_TICK <= 0) {
+            return LEAST_HEARTBEAT_DURATION;
+        } else {
+            return time / HEARTBEAT_CHECK_TICK;
+        }
+    }
+
+    private long calculateReconnectDuration(URL url, long tick) {
+        long leastReconnectDuration = 
url.getParameter(LEAST_RECONNECT_DURATION_KEY, LEAST_RECONNECT_DURATION);
+        return Math.max(leastReconnectDuration, tick);
+    }
+
     @Override
     public void reconnect() throws RemotingException {
         connectLock.lock();
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index ab8f39bcf7..85cd565173 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -43,7 +43,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoop;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.timeout.IdleStateHandler;
@@ -75,6 +74,8 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
 
     public static final AttributeKey<AbstractConnectionClient> CONNECTION = 
AttributeKey.valueOf("connection");
 
+    private AtomicBoolean isReconnecting;
+
     public NettyConnectionClient(URL url, ChannelHandler handler) throws 
RemotingException {
         super(url, handler);
     }
@@ -91,6 +92,7 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
         this.closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
         this.init = new AtomicBoolean(false);
         this.increase();
+        this.isReconnecting = new AtomicBoolean(false);
     }
 
     @Override
@@ -158,6 +160,10 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
 
     @Override
     protected void doConnect() throws RemotingException {
+        if (!isReconnecting.compareAndSet(false, true)) {
+            return;
+        }
+
         if (isClosed()) {
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug(
@@ -347,6 +353,11 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
 
         @Override
         public void operationComplete(ChannelFuture future) {
+
+            if (!isReconnecting.compareAndSet(true, false)) {
+                return;
+            }
+
             if (future.isSuccess()) {
                 return;
             }
@@ -364,8 +375,8 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
                         "%s is reconnecting, attempt=%d cause=%s",
                         connectionClient, 0, future.cause().getMessage()));
             }
-            final EventLoop loop = future.channel().eventLoop();
-            loop.schedule(
+
+            connectivityExecutor.schedule(
                     () -> {
                         try {
                             connectionClient.doConnect();
@@ -377,8 +388,8 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
                                     "Failed to connect to server: " + 
getConnectAddress());
                         }
                     },
-                    1L,
-                    TimeUnit.SECONDS);
+                    reconnectDuaration,
+                    TimeUnit.MILLISECONDS);
         }
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
index 37b4439bba..c90418fa4d 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ModuleModel;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import static 
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static org.awaitility.Awaitility.await;
 
 public class ConnectionTest {
 
@@ -138,6 +140,7 @@ public class ConnectionTest {
 
         nettyPortUnificationServer.bind();
         // auto reconnect
+        await().atMost(Duration.ofSeconds(100)).until(() -> 
connectionClient.isAvailable());
         Assertions.assertTrue(connectionClient.isAvailable());
 
         connectionClient.close();

Reply via email to