This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ba4ed31c3 RATIS-1892 Unify the lifetime of the RaftServerProxy thread 
pool (#923)
ba4ed31c3 is described below

commit ba4ed31c30346744cc9001d3cb683caf993943fa
Author: Potato <[email protected]>
AuthorDate: Thu Sep 21 00:06:18 2023 +0800

    RATIS-1892 Unify the lifetime of the RaftServerProxy thread pool (#923)
---
 .../apache/ratis/server/impl/RaftServerProxy.java  | 36 ++++++++++++----------
 1 file changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f93120b3a..7d384f601 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -49,6 +49,7 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -120,7 +121,7 @@ class RaftServerProxy implements RaftServer {
       isClosed = true;
       ConcurrentUtils.parallelForEachAsync(map.entrySet(),
           entry -> close(entry.getKey(), entry.getValue()),
-          executor);
+          executor.get());
     }
 
     private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> 
future) {
@@ -192,8 +193,8 @@ class RaftServerProxy implements RaftServer {
   private final DataStreamServerRpc dataStreamServerRpc;
 
   private final ImplMap impls = new ImplMap();
-  private final ExecutorService implExecutor;
-  private final ExecutorService executor;
+  private final MemoizedSupplier<ExecutorService> implExecutor;
+  private final MemoizedSupplier<ExecutorService> executor;
 
   private final JvmPauseMonitor pauseMonitor;
   private final ThreadGroup threadGroup;
@@ -213,11 +214,12 @@ class RaftServerProxy implements RaftServer {
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, 
parameters).getServerRpc();
 
-    this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + 
"-groupManagement");
-    this.executor = ConcurrentUtils.newThreadPoolWithMax(
+    this.implExecutor = MemoizedSupplier.valueOf(
+        () -> ConcurrentUtils.newSingleThreadExecutor(id + 
"-groupManagement"));
+    this.executor = MemoizedSupplier.valueOf(() -> 
ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.proxyCached(properties),
         RaftServerConfigKeys.ThreadPool.proxySize(properties),
-        id + "-impl");
+        id + "-impl"));
 
     final TimeDuration sleepDeviationThreshold = 
RaftServerConfigKeys.sleepDeviationThreshold(properties);
     final TimeDuration rpcSlownessTimeout = 
RaftServerConfigKeys.Rpc.slownessTimeout(properties);
@@ -253,7 +255,7 @@ class RaftServerProxy implements RaftServer {
             .map(Arrays::stream).orElse(Stream.empty())
             .filter(File::isDirectory)
             .forEach(sub -> initGroupDir(sub, shouldAdd)),
-        executor).join();
+        executor.get()).join();
     raftGroup.ifPresent(g -> addGroup(g, option));
   }
 
@@ -290,7 +292,7 @@ class RaftServerProxy implements RaftServer {
       } catch(IOException e) {
         throw new CompletionException(getId() + ": Failed to initialize server 
for " + group, e);
       }
-    }, implExecutor);
+    }, implExecutor.get());
   }
 
   private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -399,7 +401,7 @@ class RaftServerProxy implements RaftServer {
   }
 
   private void startImpl() throws IOException {
-    ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, 
executor).join();
+    ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start, 
executor.get()).join();
 
     LOG.info("{}: start RPC server", getId());
     getServerRpc().start();
@@ -410,12 +412,6 @@ class RaftServerProxy implements RaftServer {
 
   @Override
   public void close() {
-    try {
-      ConcurrentUtils.shutdownAndWait(implExecutor);
-    } catch (Exception ignored) {
-      LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
-    }
-
     lifeCycle.checkStateAndClose(() -> {
       LOG.info("{}: close", getId());
       impls.close();
@@ -433,7 +429,13 @@ class RaftServerProxy implements RaftServer {
       }
 
       try {
-        ConcurrentUtils.shutdownAndWait(executor);
+        ConcurrentUtils.shutdownAndWait(implExecutor.get());
+      } catch (Exception ignored) {
+        LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
+      }
+
+      try {
+        ConcurrentUtils.shutdownAndWait(executor.get());
       } catch (Exception ignored) {
         LOG.warn(getId() + ": Failed to shutdown executor", ignored);
       }
@@ -510,7 +512,7 @@ class RaftServerProxy implements RaftServer {
             throw new CompletionException(e);
           }
           return newImpl.newSuccessReply(request);
-        }, implExecutor)
+        }, implExecutor.get())
         .whenComplete((raftClientReply, throwable) -> {
           if (throwable != null) {
             if (!(throwable.getCause() instanceof AlreadyExistsException)) {

Reply via email to