This is an automated email from the ASF dual-hosted git repository.

icodening pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 069fd5e1ce Fix triple client stream executor (#13178)
069fd5e1ce is described below

commit 069fd5e1ce1009e230b4d392602e8be327d34a85
Author: TomlongTK <[email protected]>
AuthorDate: Wed Oct 11 23:23:25 2023 +0800

    Fix triple client stream executor (#13178)
    
    * Fix error stream executor
    
    * add ping pong for tri protocol (#12955)
    
    * add ping pong for tri protocol
    
    * add ping pong for tri protocol
    
    * add ping pong for tri protocol
    
    * add ping pong for tri protocol
    
    * Revert "add ping pong for tri protocol (#12955)"
    
    This reverts commit 5a3be17bf133e023c1a1627406c2c6539cdf0bb1.
    
    ---------
    
    Co-authored-by: Albumen Kevin <[email protected]>
    Co-authored-by: icodening <[email protected]>
---
 .../threadpool/support/cached/CachedThreadPool.java     | 17 +++++++++++++----
 .../threadpool/support/limited/LimitedThreadPool.java   | 17 +++++++++++++----
 .../apache/dubbo/remoting/transport/AbstractClient.java |  3 +--
 .../transport/netty4/NettyConnectionClient.java         | 10 ++--------
 .../apache/dubbo/rpc/protocol/tri/TripleProtocol.java   | 14 +++++++++-----
 5 files changed, 38 insertions(+), 23 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
index b053bd9337..90419c4d90 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/cached/CachedThreadPool.java
@@ -22,6 +22,7 @@ import 
org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
 import org.apache.dubbo.common.threadpool.ThreadPool;
 import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
@@ -53,10 +54,18 @@ public class CachedThreadPool implements ThreadPool {
         int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
         int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
         int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
-        return new ThreadPoolExecutor(cores, threads, alive, 
TimeUnit.MILLISECONDS,
-                queues == 0 ? new SynchronousQueue<Runnable>() :
-                        (queues < 0 ? new 
MemorySafeLinkedBlockingQueue<Runnable>()
-                                : new LinkedBlockingQueue<Runnable>(queues)),
+
+        BlockingQueue<Runnable> blockingQueue;
+
+        if (queues == 0) {
+            blockingQueue = new SynchronousQueue<>();
+        } else if (queues < 0) {
+            blockingQueue = new MemorySafeLinkedBlockingQueue<>();
+        } else {
+            blockingQueue = new LinkedBlockingQueue<>(queues);
+        }
+
+        return new ThreadPoolExecutor(cores, threads, alive, 
TimeUnit.MILLISECONDS, blockingQueue,
                 new NamedInternalThreadFactory(name, true), new 
AbortPolicyWithReport(name, url));
     }
 }
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
index f0203e6676..de70f3c512 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/limited/LimitedThreadPool.java
@@ -23,6 +23,7 @@ import 
org.apache.dubbo.common.threadpool.MemorySafeLinkedBlockingQueue;
 import org.apache.dubbo.common.threadpool.ThreadPool;
 import org.apache.dubbo.common.threadpool.support.AbortPolicyWithReport;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
@@ -50,10 +51,18 @@ public class LimitedThreadPool implements ThreadPool {
         int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
         int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
         int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
-        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, 
TimeUnit.MILLISECONDS,
-                queues == 0 ? new SynchronousQueue<Runnable>() :
-                        (queues < 0 ? new 
MemorySafeLinkedBlockingQueue<Runnable>()
-                                : new LinkedBlockingQueue<Runnable>(queues)),
+
+        BlockingQueue<Runnable> blockingQueue;
+
+        if (queues == 0) {
+            blockingQueue = new SynchronousQueue<>();
+        } else if (queues < 0) {
+            blockingQueue = new MemorySafeLinkedBlockingQueue<>();
+        } else {
+            blockingQueue = new LinkedBlockingQueue<>(queues);
+        }
+
+        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, 
TimeUnit.MILLISECONDS, blockingQueue,
                 new NamedInternalThreadFactory(name, true), new 
AbortPolicyWithReport(name, url));
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index a2cba73bc4..71392adf82 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -40,14 +40,13 @@ import static 
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
 import static 
org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
+import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
 
 /**
  * AbstractClient
  */
 public abstract class AbstractClient extends AbstractEndpoint implements 
Client {
 
-    protected static final String CLIENT_THREAD_POOL_NAME = 
"DubboClientHandler";
-
     private static final ErrorTypeAwareLogger logger = 
LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);
 
     private final Lock connectLock = new ReentrantLock();
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index e585bff61b..d60807f836 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -20,7 +20,6 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
@@ -29,6 +28,7 @@ import org.apache.dubbo.remoting.api.WireProtocol;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.remoting.transport.netty4.ssl.SslClientTlsHandler;
 import org.apache.dubbo.remoting.transport.netty4.ssl.SslContexts;
+import org.apache.dubbo.remoting.utils.UrlUtils;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -45,15 +45,12 @@ import io.netty.util.AttributeKey;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
-import org.apache.dubbo.remoting.utils.UrlUtils;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
-import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT;
@@ -82,10 +79,7 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
 
     @Override
     protected void initConnectionClient() {
-        URL url = ExecutorUtil.setThreadName(getUrl(), "DubboClientHandler");
-        url = url.addParameterIfAbsent(THREADPOOL_KEY, 
DEFAULT_CLIENT_THREADPOOL);
-        setUrl(url);
-        this.protocol = 
url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
+        this.protocol = 
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getExtension(getUrl().getProtocol());
         this.remote = getConnectAddress();
         this.connectingPromise = new AtomicReference<>();
         this.connectionListener = new ConnectionListener();
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 4ec09e4a56..8f9925da79 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -44,6 +44,10 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
+import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
+import static 
org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
+import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
 import static org.apache.dubbo.config.Constants.SERVER_THREAD_POOL_NAME;
 import static org.apache.dubbo.rpc.Constants.H2_IGNORE_1_0_0_KEY;
 import static 
org.apache.dubbo.rpc.Constants.H2_RESOLVE_FALLBACK_TO_DEFAULT_KEY;
@@ -154,8 +158,7 @@ public class TripleProtocol extends AbstractProtocol {
     @Override
     public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
         optimizeSerialization(url);
-        ExecutorService streamExecutor = getOrCreateStreamExecutor(
-            url.getOrDefaultApplicationModel(), url);
+        ExecutorService streamExecutor = 
getOrCreateStreamExecutor(url.getOrDefaultApplicationModel(), url);
         AbstractConnectionClient connectionClient = 
PortUnificationExchanger.connect(url, new DefaultPuHandler());
         TripleInvoker<T> invoker = new TripleInvoker<>(type, url, 
acceptEncodings,
             connectionClient, invokers, streamExecutor);
@@ -164,9 +167,10 @@ public class TripleProtocol extends AbstractProtocol {
     }
 
     private ExecutorService getOrCreateStreamExecutor(ApplicationModel 
applicationModel, URL url) {
-        ExecutorService executor = 
ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(ExecutorUtil.setThreadName(url,
 SERVER_THREAD_POOL_NAME));
-        Objects.requireNonNull(executor,
-            String.format("No available executor found in %s", url));
+        url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
+            .addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
+        ExecutorService executor = 
ExecutorRepository.getInstance(applicationModel).createExecutorIfAbsent(url);
+        Objects.requireNonNull(executor, String.format("No available executor 
found in %s", url));
         return executor;
     }
 

Reply via email to