This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b0a9586  RATIS-1419. Use cached thread pools in DataStreamManagement. 
(#522)
b0a9586 is described below

commit b0a958664ac15534b6e977ad8370bcb2196ede54
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 15 13:06:13 2021 +0800

    RATIS-1419. Use cached thread pools in DataStreamManagement. (#522)
---
 .../{AtomicUtils.java => ConcurrentUtils.java}     | 74 +++++++++++-----------
 .../ratis/netty/server/DataStreamManagement.java   | 12 ++--
 .../storage/RaftStorageMetadataFileImpl.java       |  6 +-
 .../datastream/DataStreamAsyncClusterTests.java    |  2 +-
 4 files changed, 47 insertions(+), 47 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
similarity index 56%
rename from ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
rename to ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index 8222702..a929595 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -19,48 +19,18 @@ package org.apache.ratis.util;
 
 import org.apache.ratis.util.function.CheckedFunction;
 
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Utilities related to atomic operations.
+ * Utilities related to concurrent programming.
  */
-public interface AtomicUtils {
-
-  /**
-   * Updates a AtomicLong which is supposed to maintain the minimum values. 
This method is not
-   * synchronized but is thread-safe.
-   */
-  static void updateMin(AtomicLong min, long value) {
-    while (true) {
-      long cur = min.get();
-      if (value >= cur) {
-        break;
-      }
-
-      if (min.compareAndSet(cur, value)) {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Updates a AtomicLong which is supposed to maintain the maximum values. 
This method is not
-   * synchronized but is thread-safe.
-   */
-  static void updateMax(AtomicLong max, long value) {
-    while (true) {
-      long cur = max.get();
-      if (value <= cur) {
-        break;
-      }
-
-      if (max.compareAndSet(cur, value)) {
-        break;
-      }
-    }
-  }
-
+public interface ConcurrentUtils {
   /**
    * Similar to {@link 
AtomicReference#updateAndGet(java.util.function.UnaryOperator)}
    * except that the update function is checked.
@@ -85,4 +55,32 @@ public interface AtomicUtils {
     }
     return updated;
   }
+
+  /**
+   * Creates a {@link ThreadFactory} so that the threads created by the 
factory are named with the given name prefix.
+   *
+   * @param namePrefix the prefix used in the name of the threads created.
+   * @return a new {@link ThreadFactory}.
+   */
+  static ThreadFactory newThreadFactory(String namePrefix) {
+    final AtomicInteger numThread = new AtomicInteger();
+    return runnable -> {
+      final int id = numThread.incrementAndGet();
+      final Thread t = new Thread(runnable);
+      t.setName(namePrefix + "-thread" + id);
+      return t;
+    };
+  }
+
+  /**
+   * The same as {@link java.util.concurrent.Executors#newCachedThreadPool()}
+   * except that this method takes a maximumPoolSize parameter.
+   *
+   * @param maximumPoolSize the maximum number of threads to allow in the pool.
+   * @return a new {@link ExecutorService}.
+   */
+  static ExecutorService newCachedThreadPool(int maximumPoolSize, 
ThreadFactory threadFactory) {
+    return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(), threadFactory);
+  }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 0daa72b..84679f6 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -47,6 +47,7 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
@@ -65,7 +66,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -205,10 +205,12 @@ public class DataStreamManagement {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
 
     final RaftProperties properties = server.getProperties();
-    this.requestExecutor = Executors.newFixedThreadPool(
-        
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
-    this.writeExecutor = Executors.newFixedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
+    this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
+        RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
+        ConcurrentUtils.newThreadFactory(name + "-request-"));
+    this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
+        RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
+        ConcurrentUtils.newThreadFactory(name + "-write-"));
   }
 
   private CompletableFuture<DataStream> 
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
index 7c01da7..1fb723f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
@@ -19,7 +19,7 @@ package org.apache.ratis.server.storage;
 
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.AtomicFileOutputStream;
-import org.apache.ratis.util.AtomicUtils;
+import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JavaUtils;
 
 import java.io.BufferedReader;
@@ -52,12 +52,12 @@ class RaftStorageMetadataFileImpl implements 
RaftStorageMetadataFile {
 
   @Override
   public RaftStorageMetadata getMetadata() throws IOException {
-    return AtomicUtils.updateAndGet(metadata, value -> value != null? value: 
load(file));
+    return ConcurrentUtils.updateAndGet(metadata, value -> value != null? 
value: load(file));
   }
 
   @Override
   public void persist(RaftStorageMetadata newMetadata) throws IOException {
-    AtomicUtils.updateAndGet(metadata,
+    ConcurrentUtils.updateAndGet(metadata,
         old -> Objects.equals(old, newMetadata)? old: atomicWrite(newMetadata, 
file));
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index bdb571d..7ddde30 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -86,7 +86,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER 
extends MiniRaftCluste
 
     final List<CompletableFuture<Long>> futures = new ArrayList<>();
     futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 
5, 10, 1_000_000, 10, stepDownLeader), executor));
-    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 
2, 20, 1_000, 10_000, stepDownLeader), executor));
+    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 
2, 20, 1_000, 5_000, stepDownLeader), executor));
     final long maxIndex = futures.stream()
         .map(CompletableFuture::join)
         .max(Long::compareTo)

Reply via email to