This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ba4ed31c3 RATIS-1892 Unify the lifetime of the RaftServerProxy thread
pool (#923)
ba4ed31c3 is described below
commit ba4ed31c30346744cc9001d3cb683caf993943fa
Author: Potato <[email protected]>
AuthorDate: Thu Sep 21 00:06:18 2023 +0800
RATIS-1892 Unify the lifetime of the RaftServerProxy thread pool (#923)
---
.../apache/ratis/server/impl/RaftServerProxy.java | 36 ++++++++++++----------
1 file changed, 19 insertions(+), 17 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f93120b3a..7d384f601 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -49,6 +49,7 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
@@ -120,7 +121,7 @@ class RaftServerProxy implements RaftServer {
isClosed = true;
ConcurrentUtils.parallelForEachAsync(map.entrySet(),
entry -> close(entry.getKey(), entry.getValue()),
- executor);
+ executor.get());
}
private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl>
future) {
@@ -192,8 +193,8 @@ class RaftServerProxy implements RaftServer {
private final DataStreamServerRpc dataStreamServerRpc;
private final ImplMap impls = new ImplMap();
- private final ExecutorService implExecutor;
- private final ExecutorService executor;
+ private final MemoizedSupplier<ExecutorService> implExecutor;
+ private final MemoizedSupplier<ExecutorService> executor;
private final JvmPauseMonitor pauseMonitor;
private final ThreadGroup threadGroup;
@@ -213,11 +214,12 @@ class RaftServerProxy implements RaftServer {
this.dataStreamServerRpc = new DataStreamServerImpl(this,
parameters).getServerRpc();
- this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id +
"-groupManagement");
- this.executor = ConcurrentUtils.newThreadPoolWithMax(
+ this.implExecutor = MemoizedSupplier.valueOf(
+ () -> ConcurrentUtils.newSingleThreadExecutor(id +
"-groupManagement"));
+ this.executor = MemoizedSupplier.valueOf(() ->
ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.proxyCached(properties),
RaftServerConfigKeys.ThreadPool.proxySize(properties),
- id + "-impl");
+ id + "-impl"));
final TimeDuration sleepDeviationThreshold =
RaftServerConfigKeys.sleepDeviationThreshold(properties);
final TimeDuration rpcSlownessTimeout =
RaftServerConfigKeys.Rpc.slownessTimeout(properties);
@@ -253,7 +255,7 @@ class RaftServerProxy implements RaftServer {
.map(Arrays::stream).orElse(Stream.empty())
.filter(File::isDirectory)
.forEach(sub -> initGroupDir(sub, shouldAdd)),
- executor).join();
+ executor.get()).join();
raftGroup.ifPresent(g -> addGroup(g, option));
}
@@ -290,7 +292,7 @@ class RaftServerProxy implements RaftServer {
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server
for " + group, e);
}
- }, implExecutor);
+ }, implExecutor.get());
}
private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -399,7 +401,7 @@ class RaftServerProxy implements RaftServer {
}
private void startImpl() throws IOException {
- ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start,
executor).join();
+ ConcurrentUtils.parallelForEachAsync(getImpls(), RaftServerImpl::start,
executor.get()).join();
LOG.info("{}: start RPC server", getId());
getServerRpc().start();
@@ -410,12 +412,6 @@ class RaftServerProxy implements RaftServer {
@Override
public void close() {
- try {
- ConcurrentUtils.shutdownAndWait(implExecutor);
- } catch (Exception ignored) {
- LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
- }
-
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: close", getId());
impls.close();
@@ -433,7 +429,13 @@ class RaftServerProxy implements RaftServer {
}
try {
- ConcurrentUtils.shutdownAndWait(executor);
+ ConcurrentUtils.shutdownAndWait(implExecutor.get());
+ } catch (Exception ignored) {
+ LOG.warn(getId() + ": Failed to shutdown implExecutor", ignored);
+ }
+
+ try {
+ ConcurrentUtils.shutdownAndWait(executor.get());
} catch (Exception ignored) {
LOG.warn(getId() + ": Failed to shutdown executor", ignored);
}
@@ -510,7 +512,7 @@ class RaftServerProxy implements RaftServer {
throw new CompletionException(e);
}
return newImpl.newSuccessReply(request);
- }, implExecutor)
+ }, implExecutor.get())
.whenComplete((raftClientReply, throwable) -> {
if (throwable != null) {
if (!(throwable.getCause() instanceof AlreadyExistsException)) {