This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f479974  RATIS-1497. Avoid using ForkJoinPool.commonPool() in 
GrpcClientProtocolService. (#587)
f479974 is described below

commit f47997461e2faee04ec68b76cc49eaaf72017f38
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 24 01:05:20 2022 +0800

    RATIS-1497. Avoid using ForkJoinPool.commonPool() in 
GrpcClientProtocolService. (#587)
---
 .../org/apache/ratis/util/ConcurrentUtils.java     | 31 ++++++++++++++++++++++
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 21 +++++++++++++++
 .../GrpcClientProtocolService.java                 | 18 ++++++++-----
 .../org/apache/ratis/grpc/server/GrpcService.java  | 12 +++++++--
 .../ratis/netty/server/DataStreamManagement.java   | 19 ++++---------
 .../org/apache/ratis/OutputStreamBaseTest.java     |  1 -
 .../apache/ratis/grpc/TestGrpcOutputStream.java    |  7 +++--
 .../ratis/grpc/TestRaftOutputStreamWithGrpc.java   |  7 -----
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  |  2 --
 .../ratis/grpc/TestWatchRequestWithGrpc.java       |  2 --
 10 files changed, 83 insertions(+), 37 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index a929595..f602c63 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
 import org.apache.ratis.util.function.CheckedFunction;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -83,4 +84,34 @@ public interface ConcurrentUtils {
     return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
         new LinkedBlockingQueue<>(), threadFactory);
   }
+
+  /**
+   * Create a new {@link ExecutorService} with a maximum pool size.
+   * If it is cached, this method is similar to {@link 
#newCachedThreadPool(int, ThreadFactory)}.
+   * Otherwise, this method is similar to {@link 
java.util.concurrent.Executors#newFixedThreadPool(int)}.
+   *
+   * @param cached Use cached thread pool?  If not, use a fixed thread pool.
+   * @param maximumPoolSize the maximum number of threads to allow in the pool.
+   * @param namePrefix the prefix used in the name of the threads created.
+   * @return a new {@link ExecutorService}.
+   */
+  static ExecutorService newThreadPoolWithMax(boolean cached, int 
maximumPoolSize, String namePrefix) {
+    final ThreadFactory f = newThreadFactory(namePrefix);
+    return cached ? newCachedThreadPool(maximumPoolSize, f)
+        : Executors.newFixedThreadPool(maximumPoolSize, f);
+  }
+
+  /**
+   * Shutdown the given executor and wait for its termination.
+   *
+   * @param executor The executor to be shut down.
+   */
+  static void shutdownAndWait(ExecutorService executor) {
+    try {
+      executor.shutdown();
+      Preconditions.assertTrue(executor.awaitTermination(1, TimeUnit.DAYS));
+    } catch (InterruptedException ignored) {
+      Thread.currentThread().interrupt();
+    }
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index eca51ad..a9dddbb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -157,6 +157,27 @@ public interface GrpcConfigKeys {
       setInt(properties::setInt, PORT_KEY, port);
     }
 
+    String ASYNC_REQUEST_THREAD_POOL_CACHED_KEY = PREFIX + 
".async.request.thread.pool.cached";
+    boolean ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT = true;
+    static boolean asyncRequestThreadPoolCached(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, 
ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
+          ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT, getDefaultLog());
+    }
+    static void setAsyncRequestThreadPoolCached(RaftProperties properties, 
boolean useCached) {
+      setBoolean(properties::setBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY, 
useCached);
+    }
+
+    String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + 
".async.request.thread.pool.size";
+    int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;
+    static int asyncRequestThreadPoolSize(RaftProperties properties) {
+      return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,
+          ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
+          requireMin(0), requireMax(65536));
+    }
+    static void setAsyncRequestThreadPoolSize(RaftProperties properties, int 
port) {
+      setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
+    }
+
     String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
     Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
     static GrpcTlsConfig tlsConf(Parameters parameters) {
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
similarity index 95%
rename from 
ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
rename to 
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 7cd8c08..9c19684 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.grpc.client;
+package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.GrpcUtil;
@@ -41,14 +41,15 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase {
-  public static final Logger LOG = 
LoggerFactory.getLogger(GrpcClientProtocolService.class);
+class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GrpcClientProtocolService.class);
 
   private static class PendingOrderedRequest implements 
SlidingWindow.ServerSideRequest<RaftClientReply> {
     private final RaftClientRequest request;
@@ -131,12 +132,15 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
 
   private final Supplier<RaftPeerId> idSupplier;
   private final RaftClientAsynchronousProtocol protocol;
+  private final ExecutorService executor;
 
   private final OrderedStreamObservers orderedStreamObservers = new 
OrderedStreamObservers();
 
-  public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, 
RaftClientAsynchronousProtocol protocol) {
+  GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, 
RaftClientAsynchronousProtocol protocol,
+      ExecutorService executor) {
     this.idSupplier = idSupplier;
     this.protocol = protocol;
+    this.executor = executor;
   }
 
   RaftPeerId getId() {
@@ -150,7 +154,7 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
     return so;
   }
 
-  public void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
+  void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
     LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
     orderedStreamObservers.closeAllExisting(groupId);
   }
@@ -218,9 +222,9 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
 
     CompletableFuture<Void> processClientRequest(RaftClientRequest request, 
Consumer<RaftClientReply> replyHandler) {
       try {
-        String errMsg = LOG.isDebugEnabled() ? "processClientRequest for " + 
request.toString() : "";
+        final String errMsg = LOG.isDebugEnabled() ? "processClientRequest for 
" + request : "";
         return protocol.submitClientRequestAsync(request
-        ).thenAcceptAsync(replyHandler
+        ).thenAcceptAsync(replyHandler, executor
         ).exceptionally(exception -> {
           // TODO: the exception may be from either raft or state machine.
           // Currently we skip all the following responses when getting an
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 0fbdbfc..3ce4fc5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ratis.grpc.server;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
 import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
 import static 
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
@@ -107,6 +108,7 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
   private final Supplier<InetSocketAddress> clientServerAddressSupplier;
   private final Supplier<InetSocketAddress> adminServerAddressSupplier;
 
+  private final ExecutorService executor;
   private final GrpcClientProtocolService clientProtocolService;
 
   private final MetricServerInterceptor serverInterceptor;
@@ -143,7 +145,12 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + 
grpcMessageSizeMax);
     }
 
-    this.clientProtocolService = new GrpcClientProtocolService(idSupplier, 
raftServer);
+    final RaftProperties properties = raftServer.getProperties();
+    this.executor = ConcurrentUtils.newThreadPoolWithMax(
+        GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
+        GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
+        getId() + "-request-");
+    this.clientProtocolService = new GrpcClientProtocolService(idSupplier, 
raftServer, executor);
 
     this.serverInterceptor = new MetricServerInterceptor(
         idSupplier,
@@ -272,6 +279,7 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
     }
 
     serverInterceptor.close();
+    ConcurrentUtils.shutdownAndWait(executor);
   }
 
   @Override
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 31634bb..37cf90e 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -68,7 +68,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -219,21 +218,13 @@ public class DataStreamManagement {
     this.name = server.getId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
 
     final RaftProperties properties = server.getProperties();
-    Boolean useCachedThreadPool = 
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
-    if(useCachedThreadPool) {
-      this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
+    final boolean useCachedThreadPool = 
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
+    this.requestExecutor = 
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
           
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
-          ConcurrentUtils.newThreadFactory(name + "-request-"));
-      this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
+          name + "-request-");
+    this.writeExecutor = 
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
           RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
-          ConcurrentUtils.newThreadFactory(name + "-write-"));
-    } else {
-      this.requestExecutor = Executors.newFixedThreadPool(
-          
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
-      this.writeExecutor = Executors.newFixedThreadPool(
-          
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
-    }
-
+          name + "-write-");
   }
 
   private CompletableFuture<DataStream> 
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java 
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index aa69fa4..8861e2a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -108,7 +108,6 @@ public abstract class OutputStreamBaseTest<CLUSTER extends 
MiniRaftCluster>
       final String message = "log " + entry + " " + log.getLogEntryBodyCase()
           + " " + StringUtils.bytes2HexString(logData)
           + ", expected=" + StringUtils.bytes2HexString(expected);
-      LOG.info(message);
       Assert.assertArrayEquals(message, expected, logData);
       count++;
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
index 77c311c..87cf758 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
@@ -34,8 +34,11 @@ import java.io.OutputStream;
 public class TestGrpcOutputStream
     extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
-  static {
-    Log4jUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
+
+  {
+    final RaftProperties p = getProperties();
+    GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(p, false);
+    GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(p, 8);
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
index 4cc82ee..fc5c91d 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
@@ -17,18 +17,11 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.log4j.Level;
 import org.apache.ratis.OutputStreamBaseTest;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
-import org.apache.ratis.util.Log4jUtils;
 
 public class TestRaftOutputStreamWithGrpc
     extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
-  {
-    Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.TRACE);
-  }
-
   @Override
   public int getGlobalTimeoutSeconds() {
     return 30;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 230c488..8dcff75 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -39,7 +39,6 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
@@ -78,7 +77,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class TestRaftServerWithGrpc extends BaseTest implements 
MiniRaftClusterWithGrpc.FactoryGet {
   {
-    Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
     Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
   }
 
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
index 2861102..65c6fab 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
@@ -21,7 +21,6 @@ import org.apache.log4j.Level;
 import org.apache.ratis.WatchRequestTests;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.UnorderedAsync;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
 import org.apache.ratis.grpc.client.GrpcClientRpc;
 import org.apache.ratis.util.Log4jUtils;
 
@@ -29,7 +28,6 @@ public class TestWatchRequestWithGrpc
     extends WatchRequestTests<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
   {
-    Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
     Log4jUtils.setLogLevel(GrpcClientRpc.LOG, Level.ALL);
     Log4jUtils.setLogLevel(UnorderedAsync.LOG, Level.ALL);
     Log4jUtils.setLogLevel(RaftClient.LOG, Level.ALL);

Reply via email to