This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d8e70b  RATIS-1472. make use CachedThreadPool configurable. (#566)
0d8e70b is described below

commit 0d8e70bb7197df7a8f71611c774718a322b3e0ff
Author: micah zhao <[email protected]>
AuthorDate: Sat Dec 18 14:22:56 2021 +0800

    RATIS-1472. make use CachedThreadPool configurable. (#566)
---
 .../ratis/netty/server/DataStreamManagement.java   | 22 ++++++++++++++++------
 .../apache/ratis/server/RaftServerConfigKeys.java  | 12 ++++++++++++
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 81621c3..a817372 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -68,6 +68,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -218,12 +219,21 @@ public class DataStreamManagement {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
 
     final RaftProperties properties = server.getProperties();
-    this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
-        ConcurrentUtils.newThreadFactory(name + "-request-"));
-    this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
-        ConcurrentUtils.newThreadFactory(name + "-write-"));
+    Boolean useCachedThreadPool = 
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
+    if(useCachedThreadPool) {
+      this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
+          
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
+          ConcurrentUtils.newThreadFactory(name + "-request-"));
+      this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
+          RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
+          ConcurrentUtils.newThreadFactory(name + "-write-"));
+    } else {
+      this.requestExecutor = Executors.newFixedThreadPool(
+          
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
+      this.writeExecutor = Executors.newFixedThreadPool(
+          
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
+    }
+
   }
 
   private CompletableFuture<DataStream> 
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 3bf56a7..51178d3 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -457,6 +457,18 @@ public interface RaftServerConfigKeys {
   interface DataStream {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
 
+    String ASYNC_REQUEST_THREAD_POOL_CACHED_KEY = PREFIX + 
".async.request.thread.pool.cached";
+    boolean ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT = false;
+
+    static boolean asyncRequestThreadPoolCached(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, 
ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
+          ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT, getDefaultLog());
+    }
+
+    static void setAsyncRequestThreadPoolCached(RaftProperties properties, 
boolean useCached) {
+      setBoolean(properties::setBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY, 
useCached);
+    }
+
     String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + 
".async.request.thread.pool.size";
     int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;
 

Reply via email to