This is an automated email from the ASF dual-hosted git repository.
icodening pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 069fd5e1ce Fix triple client stream executor (#13178)
069fd5e1ce is described below
commit 069fd5e1ce1009e230b4d392602e8be327d34a85
Author: TomlongTK <[email protected]>
AuthorDate: Wed Oct 11 23:23:25 2023 +0800
Fix triple client stream executor (#13178)
* Fix error stream executor
* add ping pong for tri protocol (#12955)
* add ping pong for tri protocol
* add ping pong for tri protocol
* add ping pong for tri protocol
* add ping pong for tri protocol
* Revert "add ping pong for tri protocol (#12955)"
This reverts commit 5a3be17bf133e023c1a1627406c2c6539cdf0bb1.
---------
Co-authored-by: Albumen Kevin <[email protected]>
Co-authored-by: icodening <[email protected]>
---
.../threadpool/support/cached/CachedThreadPool.java | 17 +++++++++++++----
.../threadpool/support/limited/LimitedThreadPool.java | 17 +++++++++++++----
.../apache/dubbo/remoting/transport/AbstractClient.java | 3 +--
.../transport/netty4/NettyConnectionClient.java | 10 ++--------
.../apache/dubbo/rpc/protocol/tri/TripleProtocol.java | 14 +++++++++-----
5 files changed, 38 insertions(+), 23 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
index b053bd9337..90419c4d90 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
@@ -22,6 +22,7 @@ import
org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@@ -53,10 +54,18 @@ public class CachedThreadPool implements ThreadPool {
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
- return new ThreadPoolExecutor(cores, threads, alive,
TimeUnit.MILLISECONDS,
- queues == 0 ? new SynchronousQueue<Runnable>() :
- (queues < 0 ? new
MemorySafeLinkedBlockingQueue<Runnable>()
- : new LinkedBlockingQueue<Runnable>(queues)),
+
+ BlockingQueue<Runnable> blockingQueue;
+
+ if (queues == 0) {
+ blockingQueue = new SynchronousQueue<>();
+ } else if (queues < 0) {
+ blockingQueue = new MemorySafeLinkedBlockingQueue<>();
+ } else {
+ blockingQueue = new LinkedBlockingQueue<>(queues);
+ }
+
+ return new ThreadPoolExecutor(cores, threads, alive,
TimeUnit.MILLISECONDS, blockingQueue,
new NamedInternalThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
}
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
index f0203e6676..de70f3c512 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
@@ -23,6 +23,7 @@ import
org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@@ -50,10 +51,18 @@ public class LimitedThreadPool implements ThreadPool {
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
- return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE,
TimeUnit.MILLISECONDS,
- queues == 0 ? new SynchronousQueue<Runnable>() :
- (queues < 0 ? new
MemorySafeLinkedBlockingQueue<Runnable>()
- : new LinkedBlockingQueue<Runnable>(queues)),
+
+ BlockingQueue<Runnable> blockingQueue;
+
+ if (queues == 0) {
+ blockingQueue = new SynchronousQueue<>();
+ } else if (queues < 0) {
+ blockingQueue = new MemorySafeLinkedBlockingQueue<>();
+ } else {
+ blockingQueue = new LinkedBlockingQueue<>(queues);
+ }
+
+ return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE,
TimeUnit.MILLISECONDS, blockingQueue,
new NamedInternalThreadFactory(name, true), new
AbortPolicyWithReport(name, url));
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index a2cba73bc4..71392adf82 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -40,14 +40,13 @@ import static
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
+import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
/**
* AbstractClient
*/
public abstract class AbstractClient extends AbstractEndpoint implements
Client {
- protected static final String CLIENT_THREAD_POOL_NAME =
"DubboClientHandler";
-
private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);
private final Lock connectLock = new ReentrantLock();
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index e585bff61b..d60807f836 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -20,7 +20,6 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -29,6 +28,7 @@ import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.transport.netty4.ssl.SslClientTlsHandler;
import org.apache.dubbo.remoting.transport.netty4.ssl.SslContexts;
+import org.apache.dubbo.remoting.utils.UrlUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -45,15 +45,12 @@ import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
-import org.apache.dubbo.remoting.utils.UrlUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT;
@@ -82,10 +79,7 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
@Override
protected void initConnectionClient() {
- URL url = ExecutorUtil.setThreadName(getUrl(), "DubboClientHandler");
- url = url.addParameterIfAbsent(THREADPOOL_KEY,
DEFAULT_CLIENT_THREADPOOL);
- setUrl(url);
- this.protocol =
url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
+ this.protocol =
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getExtension(getUrl().getProtocol());
this.remote = getConnectAddress();
this.connectingPromise = new AtomicReference<>();
this.connectionListener = new ConnectionListener();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 4ec09e4a56..8f9925da79 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -44,6 +44,10 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
+import static
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
+import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
import static org.apache.dubbo.rpc.Constants.H2_IGNORE_1_0_0_KEY;
import static
org.apache.dubbo.rpc.Constants.H2_RESOLVE_FALLBACK_TO_DEFAULT_KEY;
@@ -154,8 +158,7 @@ public class TripleProtocol extends AbstractProtocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
optimizeSerialization(url);
- ExecutorService streamExecutor = getOrCreateStreamExecutor(
- url.getOrDefaultApplicationModel(), url);
+ ExecutorService streamExecutor =
getOrCreateStreamExecutor(url.getOrDefaultApplicationModel(), url);
AbstractConnectionClient connectionClient =
PortUnificationExchanger.connect(url, new DefaultPuHandler());
TripleInvoker<T> invoker = new TripleInvoker<>(type, url,
acceptEncodings,
connectionClient, invokers, streamExecutor);
@@ -164,9 +167,10 @@ public class TripleProtocol extends AbstractProtocol {
}
private ExecutorService getOrCreateStreamExecutor(ApplicationModel
applicationModel, URL url) {
- ExecutorService executor =
ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(ExecutorUtil.setThreadName(url,
SERVER_THREAD_POOL_NAME));
- Objects.requireNonNull(executor,
- String.format("No available executor found in %s", url));
+ url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
+ .addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
+ ExecutorService executor =
ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(url);
+ Objects.requireNonNull(executor, String.format("No available executor
found in %s", url));
return executor;
}