This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d838c5677 RATIS-1785. Use SingleThreadExecutor to manage the lifetime
of single thread (#824)
d838c5677 is described below
commit d838c56772568211b9af617d78c5182af641faac
Author: Potato <[email protected]>
AuthorDate: Wed Feb 22 06:15:36 2023 +0800
RATIS-1785. Use SingleThreadExecutor to manage the lifetime of single
thread (#824)
---
.../java/org/apache/ratis/util/ConcurrentUtils.java | 15 +++++++++++++++
.../org/apache/ratis/server/impl/RaftServerProxy.java | 3 ++-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 19 +++++++------------
3 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index df214b388..372fa62bc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -80,6 +80,21 @@ public interface ConcurrentUtils {
};
}
+ /**
+ * This method is similar to {@link
java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)}
+ * except that this method takes a specific thread name as there is only
one thread.g
+ *
+ * @param name the thread name for only one thread.
+ * @return a new {@link ExecutorService}.
+ */
+ static ExecutorService newSingleThreadExecutor(String name) {
+ return Executors.newSingleThreadExecutor(runnable -> {
+ final Thread t = new Thread(runnable);
+ t.setName(name);
+ return t;
+ });
+ }
+
/**
* The same as {@link
java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
* except that this method takes a maximumPoolSize parameter.
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index dca32d83c..a09bda02a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -192,7 +192,7 @@ class RaftServerProxy implements RaftServer {
private final DataStreamServerRpc dataStreamServerRpc;
private final ImplMap impls = new ImplMap();
- private final ExecutorService implExecutor =
Executors.newSingleThreadExecutor();
+ private final ExecutorService implExecutor;
private final ExecutorService executor;
private final JvmPauseMonitor pauseMonitor;
@@ -213,6 +213,7 @@ class RaftServerProxy implements RaftServer {
this.dataStreamServerRpc = new DataStreamServerImpl(this,
parameters).getServerRpc();
+ this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id +
"-groupManagement");
this.executor = ConcurrentUtils.newThreadPoolWithMax(
RaftServerConfigKeys.ThreadPool.proxyCached(properties),
RaftServerConfigKeys.ThreadPool.proxySize(properties),
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index dad94db93..c867914fe 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -141,8 +141,7 @@ class SegmentedRaftLogWorker {
private final DataBlockingQueue<Task> queue;
private final WriteLogTasks writeTasks = new WriteLogTasks();
private volatile boolean running = true;
- private final Thread workerThread;
-
+ private final ExecutorService workerThreadExecutor;
private final RaftStorage storage;
private volatile SegmentedRaftLogOutputStream out;
private final Runnable submitUpdateCommitEvent;
@@ -199,7 +198,7 @@ class SegmentedRaftLogWorker {
this.stateMachineDataPolicy = new StateMachineDataPolicy(properties,
metricRegistry);
- this.workerThread = new Thread(this::run, name);
+ this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor(name);
// Server Id can be null in unit tests
metricRegistry.addDataQueueSizeGauge(queue::getNumElements);
@@ -218,7 +217,7 @@ class SegmentedRaftLogWorker {
" and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY);
}
this.flushExecutor = (!asyncFlush && !unsafeFlush)? null
- :
Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name +
"-flush"));
+ : ConcurrentUtils.newSingleThreadExecutor(name + "-flush");
}
void start(long latestIndex, long evictIndex, File openSegmentFile) throws
IOException {
@@ -230,19 +229,15 @@ class SegmentedRaftLogWorker {
Preconditions.assertTrue(openSegmentFile.exists());
allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
}
- workerThread.start();
+ workerThreadExecutor.submit(this::run);
}
void close() {
this.running = false;
sharedBuffer.set(null);
- workerThread.interrupt();
Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
- try {
- workerThread.join(3000);
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
+ ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3),
+ workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " +
timeout, name));
IOUtils.cleanup(LOG, out);
LOG.info("{} close()", name);
}
@@ -288,7 +283,7 @@ class SegmentedRaftLogWorker {
}
boolean isAlive() {
- return running && workerThread.isAlive();
+ return running && !workerThreadExecutor.isTerminated();
}
private void run() {