Polish thread poll create method
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6593294f Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6593294f Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6593294f Branch: refs/heads/rocketmq5 Commit: 6593294f07dec7039668e7f5807f530433c3bbda Parents: 489b1d8 Author: yukon <yu...@apache.org> Authored: Wed Sep 20 17:39:46 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Wed Sep 20 20:37:15 2017 +0800 ---------------------------------------------------------------------- remoting-core/pom.xml | 2 +- .../rocketmq/remoting/external/ThreadUtils.java | 26 +++++++++++++------- .../impl/netty/NettyRemotingAbstract.java | 8 ++---- .../rpc/impl/client/SimpleClientImpl.java | 10 +++----- .../rpc/impl/server/SimpleServerImpl.java | 10 +++----- .../rpc/impl/service/RpcInstanceAbstract.java | 18 ++++---------- .../rpc/impl/service/RpcProxyCommon.java | 19 +++++--------- 7 files changed, 37 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/pom.xml ---------------------------------------------------------------------- diff --git a/remoting-core/pom.xml b/remoting-core/pom.xml index 702b826..997011b 100644 --- a/remoting-core/pom.xml +++ b/remoting-core/pom.xml @@ -77,7 +77,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> - <version>4.1.6.Final</version> + <version>4.1.15.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java index 1a80d20..5a50089 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting.external; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -43,25 +44,32 @@ public final class ThreadUtils { int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, String processName, boolean isDaemon) { - return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newThreadFactory(processName, isDaemon)); + BlockingQueue<Runnable> workQueue, + String processName, boolean isDaemon) { + return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon)); + } + + public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) { + return new ThreadPoolExecutor( + nThreads, + nThreads, + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(workQueueCapacity), + newGenericThreadFactory(processName, isDaemon)); } public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) { - return Executors.newSingleThreadExecutor(newThreadFactory(processName, isDaemon)); + return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) { - return Executors.newSingleThreadScheduledExecutor(newThreadFactory(processName, isDaemon)); + return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon)); } public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName, boolean isDaemon) { - return Executors.newScheduledThreadPool(nThreads, newThreadFactory(processName, isDaemon)); - } - - public static ThreadFactory newThreadFactory(String processName, boolean isDaemon) { - return newGenericThreadFactory("Remoting-" + processName, isDaemon); + return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon)); } public static ThreadFactory newGenericThreadFactory(String processName) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index a5c2118..82b17f4 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -92,13 +92,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) { this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true); this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true); - this.publicExecutor = ThreadUtils.newThreadPoolExecutor( + this.publicExecutor = ThreadUtils.newFixedThreadPool( clientConfig.getClientAsyncCallbackExecutorThreads(), - clientConfig.getClientAsyncCallbackExecutorThreads(), - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(10000), - "PublicExecutor", true); + 10000, "Remoting-PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java index 4483ca3..35931b2 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.client; import java.util.Properties; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingService; @@ -48,13 +47,10 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien super(rpcCommonConfig); this.remotingClient = remotingClient; this.rpcCommonConfig = rpcCommonConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + this.callServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), - "clientCallServiceThread", true); + rpcCommonConfig.getServiceThreadBlockQueueSize(), + "RPC-ClientCallServiceThread", true); } public void initialize() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java index 1fdda49..469e0c7 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.rpc.impl.server; import java.util.Properties; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingService; @@ -40,13 +39,10 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe public SimpleServerImpl(final RpcCommonConfig remotingConfig) { this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this.rpcCommonConfig = remotingConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + this.callServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()), - "serverCallServiceThread", true); + remotingConfig.getServiceThreadBlockQueueSize(), + "RPC-ServerCallServiceThread", true); } public SimpleServerImpl(final RpcCommonConfig remotingConfig, final RemotingServer remotingServer) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java index 8c0ddf2..7ece4a8 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java @@ -18,8 +18,6 @@ package org.apache.rocketmq.rpc.impl.service; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.rpc.impl.command.RpcRequestCode; @@ -29,15 +27,13 @@ import org.apache.rocketmq.rpc.impl.metrics.DefaultServiceAPIImpl; import org.apache.rocketmq.rpc.impl.metrics.ThreadStats; import org.apache.rocketmq.rpc.impl.processor.RpcRequestProcessor; -import static org.apache.rocketmq.remoting.external.ThreadUtils.newThreadFactory; - public abstract class RpcInstanceAbstract extends RpcProxyCommon { protected final RpcRequestProcessor rpcRequestProcessor; protected final ThreadLocal<RpcProviderContext> threadLocalProviderContext = new ThreadLocal<RpcProviderContext>(); protected final RpcCommonConfig rpcCommonConfig; protected ThreadStats threadStats; private DefaultServiceAPIImpl defaultServiceAPI; - private ThreadPoolExecutor invokeServiceThreadPool; + private ExecutorService invokeServiceThreadPool; public RpcInstanceAbstract(RpcCommonConfig rpcCommonConfig) { super(rpcCommonConfig); @@ -45,13 +41,9 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { this.rpcCommonConfig = rpcCommonConfig; this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); - this.invokeServiceThreadPool = new ThreadPoolExecutor( - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + this.invokeServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), - newThreadFactory("rpcInvokeServiceThread", true)); + rpcCommonConfig.getServiceThreadBlockQueueSize(),"RPC-InvokeServiceThread", true); } @@ -81,11 +73,11 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { public abstract void registerServiceListener(); - public ThreadPoolExecutor getInvokeServiceThreadPool() { + public ExecutorService getInvokeServiceThreadPool() { return invokeServiceThreadPool; } - public void setInvokeServiceThreadPool(ThreadPoolExecutor invokeServiceThreadPool) { + public void setInvokeServiceThreadPool(ExecutorService invokeServiceThreadPool) { this.invokeServiceThreadPool = invokeServiceThreadPool; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6593294f/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java index 2487f79..ac8c208 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java @@ -25,7 +25,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingService; @@ -68,20 +67,14 @@ public abstract class RpcProxyCommon { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { this.rpcCommonConfig = rpcCommonConfig; this.serviceStats = new ServiceStats(); - this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor( + this.promiseExecutorService = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadBlockQueueSize(), + "Remoting-PromiseExecutorService", true); + this.callServiceThreadPool = ThreadUtils.newFixedThreadPool( rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), - "promiseExecutorService", true); - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), - "callServiceThread", true); + rpcCommonConfig.getServiceThreadBlockQueueSize(), + "Remoting-CallServiceThread", true); } private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,