This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b0a9586 RATIS-1419. Use cached thread pools in DataStreamManagement.
(#522)
b0a9586 is described below
commit b0a958664ac15534b6e977ad8370bcb2196ede54
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 15 13:06:13 2021 +0800
RATIS-1419. Use cached thread pools in DataStreamManagement. (#522)
---
.../{AtomicUtils.java => ConcurrentUtils.java} | 74 +++++++++++-----------
.../ratis/netty/server/DataStreamManagement.java | 12 ++--
.../storage/RaftStorageMetadataFileImpl.java | 6 +-
.../datastream/DataStreamAsyncClusterTests.java | 2 +-
4 files changed, 47 insertions(+), 47 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
similarity index 56%
rename from ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
rename to ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index 8222702..a929595 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -19,48 +19,18 @@ package org.apache.ratis.util;
import org.apache.ratis.util.function.CheckedFunction;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Utilities related to atomic operations.
+ * Utilities related to concurrent programming.
*/
-public interface AtomicUtils {
-
- /**
- * Updates a AtomicLong which is supposed to maintain the minimum values.
This method is not
- * synchronized but is thread-safe.
- */
- static void updateMin(AtomicLong min, long value) {
- while (true) {
- long cur = min.get();
- if (value >= cur) {
- break;
- }
-
- if (min.compareAndSet(cur, value)) {
- break;
- }
- }
- }
-
- /**
- * Updates a AtomicLong which is supposed to maintain the maximum values.
This method is not
- * synchronized but is thread-safe.
- */
- static void updateMax(AtomicLong max, long value) {
- while (true) {
- long cur = max.get();
- if (value <= cur) {
- break;
- }
-
- if (max.compareAndSet(cur, value)) {
- break;
- }
- }
- }
-
+public interface ConcurrentUtils {
/**
* Similar to {@link
AtomicReference#updateAndGet(java.util.function.UnaryOperator)}
* except that the update function is checked.
@@ -85,4 +55,32 @@ public interface AtomicUtils {
}
return updated;
}
+
+ /**
+ * Creates a {@link ThreadFactory} so that the threads created by the
factory are named with the given name prefix.
+ *
+ * @param namePrefix the prefix used in the name of the threads created.
+ * @return a new {@link ThreadFactory}.
+ */
+ static ThreadFactory newThreadFactory(String namePrefix) {
+ final AtomicInteger numThread = new AtomicInteger();
+ return runnable -> {
+ final int id = numThread.incrementAndGet();
+ final Thread t = new Thread(runnable);
+ t.setName(namePrefix + "-thread" + id);
+ return t;
+ };
+ }
+
+ /**
+ * The same as {@link java.util.concurrent.Executors#newCachedThreadPool()}
+ * except that this method takes a maximumPoolSize parameter.
+ *
+ * @param maximumPoolSize the maximum number of threads to allow in the pool.
+ * @return a new {@link ExecutorService}.
+ */
+ static ExecutorService newCachedThreadPool(int maximumPoolSize,
ThreadFactory threadFactory) {
+ return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), threadFactory);
+ }
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 0daa72b..84679f6 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -47,6 +47,7 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
@@ -65,7 +66,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -205,10 +205,12 @@ public class DataStreamManagement {
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
final RaftProperties properties = server.getProperties();
- this.requestExecutor = Executors.newFixedThreadPool(
-
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
- this.writeExecutor = Executors.newFixedThreadPool(
- RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
+ this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
+ RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
+ ConcurrentUtils.newThreadFactory(name + "-request-"));
+ this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
+ RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
+ ConcurrentUtils.newThreadFactory(name + "-write-"));
}
private CompletableFuture<DataStream>
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
index 7c01da7..1fb723f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
@@ -19,7 +19,7 @@ package org.apache.ratis.server.storage;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.AtomicFileOutputStream;
-import org.apache.ratis.util.AtomicUtils;
+import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JavaUtils;
import java.io.BufferedReader;
@@ -52,12 +52,12 @@ class RaftStorageMetadataFileImpl implements
RaftStorageMetadataFile {
@Override
public RaftStorageMetadata getMetadata() throws IOException {
- return AtomicUtils.updateAndGet(metadata, value -> value != null? value:
load(file));
+ return ConcurrentUtils.updateAndGet(metadata, value -> value != null?
value: load(file));
}
@Override
public void persist(RaftStorageMetadata newMetadata) throws IOException {
- AtomicUtils.updateAndGet(metadata,
+ ConcurrentUtils.updateAndGet(metadata,
old -> Objects.equals(old, newMetadata)? old: atomicWrite(newMetadata,
file));
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index bdb571d..7ddde30 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -86,7 +86,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER
extends MiniRaftCluste
final List<CompletableFuture<Long>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
5, 10, 1_000_000, 10, stepDownLeader), executor));
- futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
2, 20, 1_000, 10_000, stepDownLeader), executor));
+ futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster,
2, 20, 1_000, 5_000, stepDownLeader), executor));
final long maxIndex = futures.stream()
.map(CompletableFuture::join)
.max(Long::compareTo)