This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new f03823eec1 Use ExecutorService instead of event loop for Netty
connection (#13904)
f03823eec1 is described below
commit f03823eec1610ab40fa894e279614703617ab29b
Author: hanpen24 <[email protected]>
AuthorDate: Sun Apr 7 11:31:23 2024 +0900
Use ExecutorService instead of event loop for Netty connection (#13904)
* Use ExecutorService instead of event loop for Netty connection
This change addresses the issue of the event loop being blocked for an
extended period, improving overall performance and responsiveness.
* fix log getConnectAddress
* applay format
* applay format
* applay format
* applay format
* applay format
* Use an independent ExecutorService
* Use ScheduledExecutorService for scheduling tasks
* delete unnecessary files
* Modify to stop ExecutorService using shutdownNow
* Modify to use ScheduledExecutor managed by FrameworkExecutorRepository
* Synchronize reconnectDuration with HeaderExchangeClient's
reconnectDuration
* get framework model directly
use awaitility
---
.../dubbo/remoting/transport/AbstractClient.java | 42 ++++++++++++++++++++++
.../transport/netty4/NettyConnectionClient.java | 21 ++++++++---
.../remoting/transport/netty4/ConnectionTest.java | 3 ++
3 files changed, 61 insertions(+), 5 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 7b465c6649..7be50fff2e 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -28,9 +29,11 @@ import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
+import org.apache.dubbo.rpc.model.FrameworkModel;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -41,6 +44,11 @@ import static
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
+import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
+import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION;
+import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION_KEY;
+import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
/**
* AbstractClient
@@ -55,13 +63,23 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
protected volatile ExecutorService executor;
+ protected volatile ScheduledExecutorService connectivityExecutor;
+
+ private FrameworkModel frameworkModel;
+
+ protected long reconnectDuaration;
+
public AbstractClient(URL url, ChannelHandler handler) throws
RemotingException {
super(url, handler);
// set default needReconnect true when channel is not connected
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
+ frameworkModel = url.getOrDefaultFrameworkModel();
+
initExecutor(url);
+ reconnectDuaration = getReconnectDuration(url);
+
try {
doOpen();
} catch (Throwable t) {
@@ -134,6 +152,11 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
.addParameterIfAbsent(THREADPOOL_KEY,
DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
+
+ connectivityExecutor = frameworkModel
+ .getBeanFactory()
+ .getBean(FrameworkExecutorRepository.class)
+ .getConnectivityScheduledExecutor();
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler
handler) {
@@ -296,6 +319,25 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
}
}
+ private long getReconnectDuration(URL url) {
+ int idleTimeout = getIdleTimeout(url);
+ long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
+ return calculateReconnectDuration(url, heartbeatTimeoutTick);
+ }
+
+ private long calculateLeastDuration(int time) {
+ if (time / HEARTBEAT_CHECK_TICK <= 0) {
+ return LEAST_HEARTBEAT_DURATION;
+ } else {
+ return time / HEARTBEAT_CHECK_TICK;
+ }
+ }
+
+ private long calculateReconnectDuration(URL url, long tick) {
+ long leastReconnectDuration =
url.getParameter(LEAST_RECONNECT_DURATION_KEY, LEAST_RECONNECT_DURATION);
+ return Math.max(leastReconnectDuration, tick);
+ }
+
@Override
public void reconnect() throws RemotingException {
connectLock.lock();
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index ab8f39bcf7..85cd565173 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -43,7 +43,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
@@ -75,6 +74,8 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
public static final AttributeKey<AbstractConnectionClient> CONNECTION =
AttributeKey.valueOf("connection");
+ private AtomicBoolean isReconnecting;
+
public NettyConnectionClient(URL url, ChannelHandler handler) throws
RemotingException {
super(url, handler);
}
@@ -91,6 +92,7 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
this.closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
this.init = new AtomicBoolean(false);
this.increase();
+ this.isReconnecting = new AtomicBoolean(false);
}
@Override
@@ -158,6 +160,10 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
@Override
protected void doConnect() throws RemotingException {
+ if (!isReconnecting.compareAndSet(false, true)) {
+ return;
+ }
+
if (isClosed()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -347,6 +353,11 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
@Override
public void operationComplete(ChannelFuture future) {
+
+ if (!isReconnecting.compareAndSet(true, false)) {
+ return;
+ }
+
if (future.isSuccess()) {
return;
}
@@ -364,8 +375,8 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
"%s is reconnecting, attempt=%d cause=%s",
connectionClient, 0, future.cause().getMessage()));
}
- final EventLoop loop = future.channel().eventLoop();
- loop.schedule(
+
+ connectivityExecutor.schedule(
() -> {
try {
connectionClient.doConnect();
@@ -377,8 +388,8 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
"Failed to connect to server: " +
getConnectAddress());
}
},
- 1L,
- TimeUnit.SECONDS);
+ reconnectDuaration,
+ TimeUnit.MILLISECONDS);
}
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
index 37b4439bba..c90418fa4d 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ConnectionTest.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static
org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
+import static org.awaitility.Awaitility.await;
public class ConnectionTest {
@@ -138,6 +140,7 @@ public class ConnectionTest {
nettyPortUnificationServer.bind();
// auto reconnect
+ await().atMost(Duration.ofSeconds(100)).until(() ->
connectionClient.isAvailable());
Assertions.assertTrue(connectionClient.isAvailable());
connectionClient.close();