This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0d8e70b RATIS-1472. make use CachedThreadPool configurable. (#566)
0d8e70b is described below
commit 0d8e70bb7197df7a8f71611c774718a322b3e0ff
Author: micah zhao <[email protected]>
AuthorDate: Sat Dec 18 14:22:56 2021 +0800
RATIS-1472. make use CachedThreadPool configurable. (#566)
---
.../ratis/netty/server/DataStreamManagement.java | 22 ++++++++++++++++------
.../apache/ratis/server/RaftServerConfigKeys.java | 12 ++++++++++++
2 files changed, 28 insertions(+), 6 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 81621c3..a817372 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -68,6 +68,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -218,12 +219,21 @@ public class DataStreamManagement {
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
final RaftProperties properties = server.getProperties();
- this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
- RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
- ConcurrentUtils.newThreadFactory(name + "-request-"));
- this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
- RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
- ConcurrentUtils.newThreadFactory(name + "-write-"));
+ Boolean useCachedThreadPool =
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
+ if(useCachedThreadPool) {
+ this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
+
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
+ ConcurrentUtils.newThreadFactory(name + "-request-"));
+ this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
+ RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
+ ConcurrentUtils.newThreadFactory(name + "-write-"));
+ } else {
+ this.requestExecutor = Executors.newFixedThreadPool(
+
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
+ this.writeExecutor = Executors.newFixedThreadPool(
+
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
+ }
+
}
private CompletableFuture<DataStream>
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 3bf56a7..51178d3 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -457,6 +457,18 @@ public interface RaftServerConfigKeys {
interface DataStream {
String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
+ String ASYNC_REQUEST_THREAD_POOL_CACHED_KEY = PREFIX +
".async.request.thread.pool.cached";
+ boolean ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT = false;
+
+ static boolean asyncRequestThreadPoolCached(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
+ ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT, getDefaultLog());
+ }
+
+ static void setAsyncRequestThreadPoolCached(RaftProperties properties,
boolean useCached) {
+ setBoolean(properties::setBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
useCached);
+ }
+
String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX +
".async.request.thread.pool.size";
int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;