This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit f348cac8b07ff111e57af702a0efd94999a7e2c5 Author: Potato <[email protected]> AuthorDate: Wed Feb 22 06:15:36 2023 +0800 RATIS-1785. Use SingleThreadExecutor to manage the lifetime of single thread (#824) (cherry picked from commit d838c56772568211b9af617d78c5182af641faac) --- .../java/org/apache/ratis/util/ConcurrentUtils.java | 15 +++++++++++++++ .../org/apache/ratis/server/impl/RaftServerProxy.java | 3 ++- .../raftlog/segmented/SegmentedRaftLogWorker.java | 19 +++++++------------ 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java index df214b388..372fa62bc 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java @@ -80,6 +80,21 @@ public interface ConcurrentUtils { }; } + /** + * This method is similar to {@link java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)} + * except that this method takes a specific thread name as there is only one thread.g + * + * @param name the thread name for only one thread. + * @return a new {@link ExecutorService}. + */ + static ExecutorService newSingleThreadExecutor(String name) { + return Executors.newSingleThreadExecutor(runnable -> { + final Thread t = new Thread(runnable); + t.setName(name); + return t; + }); + } + /** * The same as {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)} * except that this method takes a maximumPoolSize parameter. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index bef72ee0d..54cf15c65 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -189,7 +189,7 @@ class RaftServerProxy implements RaftServer { private final DataStreamServerRpc dataStreamServerRpc; private final ImplMap impls = new ImplMap(); - private final ExecutorService implExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService implExecutor; private final ExecutorService executor; private final JvmPauseMonitor pauseMonitor; @@ -210,6 +210,7 @@ class RaftServerProxy implements RaftServer { this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc(); + this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + "-groupManagement"); this.executor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.proxyCached(properties), RaftServerConfigKeys.ThreadPool.proxySize(properties), diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 7e49522fb..a7bbcab23 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -142,8 +142,7 @@ class SegmentedRaftLogWorker { private final DataBlockingQueue<Task> queue; private final WriteLogTasks writeTasks = new WriteLogTasks(); private volatile boolean running = true; - private final Thread workerThread; - + private final ExecutorService workerThreadExecutor; private final RaftStorage storage; private volatile SegmentedRaftLogOutputStream out; private final Runnable submitUpdateCommitEvent; @@ -205,7 +204,7 @@ class SegmentedRaftLogWorker { this.stateMachineDataPolicy = new StateMachineDataPolicy(properties, metricRegistry); - this.workerThread = new Thread(this::run, name); + this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor(name); // Server Id can be null in unit tests metricRegistry.addDataQueueSizeGauge(queue); @@ -228,7 +227,7 @@ class SegmentedRaftLogWorker { " and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY); } this.flushExecutor = (!asyncFlush && !unsafeFlush)? null - : Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + "-flush")); + : ConcurrentUtils.newSingleThreadExecutor(name + "-flush"); } void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOException { @@ -240,19 +239,15 @@ class SegmentedRaftLogWorker { Preconditions.assertTrue(openSegmentFile.exists()); allocateSegmentedRaftLogOutputStream(openSegmentFile, true); } - workerThread.start(); + workerThreadExecutor.submit(this::run); } void close() { this.running = false; sharedBuffer.set(null); - workerThread.interrupt(); Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown); - try { - workerThread.join(3000); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } + ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3), + workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, name)); IOUtils.cleanup(LOG, out); LOG.info("{} close()", name); } @@ -300,7 +295,7 @@ class SegmentedRaftLogWorker { } boolean isAlive() { - return running && workerThread.isAlive(); + return running && !workerThreadExecutor.isTerminated(); } private void run() {
