This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit f348cac8b07ff111e57af702a0efd94999a7e2c5
Author: Potato <[email protected]>
AuthorDate: Wed Feb 22 06:15:36 2023 +0800

    RATIS-1785. Use SingleThreadExecutor to manage the lifetime of single 
thread (#824)
    
    (cherry picked from commit d838c56772568211b9af617d78c5182af641faac)
---
 .../java/org/apache/ratis/util/ConcurrentUtils.java   | 15 +++++++++++++++
 .../org/apache/ratis/server/impl/RaftServerProxy.java |  3 ++-
 .../raftlog/segmented/SegmentedRaftLogWorker.java     | 19 +++++++------------
 3 files changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index df214b388..372fa62bc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -80,6 +80,21 @@ public interface ConcurrentUtils {
     };
   }
 
+  /**
+    * This method is similar to {@link 
java.util.concurrent.Executors#newSingleThreadExecutor(ThreadFactory)}
+    * except that this method takes a specific thread name as there is only 
one thread.g
+    *
+    * @param name the thread name for only one thread.
+    * @return a new {@link ExecutorService}.
+    */
+  static ExecutorService newSingleThreadExecutor(String name) {
+      return Executors.newSingleThreadExecutor(runnable -> {
+          final Thread t = new Thread(runnable);
+          t.setName(name);
+          return t;
+        });
+  }
+
   /**
    * The same as {@link 
java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)}
    * except that this method takes a maximumPoolSize parameter.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index bef72ee0d..54cf15c65 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -189,7 +189,7 @@ class RaftServerProxy implements RaftServer {
   private final DataStreamServerRpc dataStreamServerRpc;
 
   private final ImplMap impls = new ImplMap();
-  private final ExecutorService implExecutor = 
Executors.newSingleThreadExecutor();
+  private final ExecutorService implExecutor;
   private final ExecutorService executor;
 
   private final JvmPauseMonitor pauseMonitor;
@@ -210,6 +210,7 @@ class RaftServerProxy implements RaftServer {
 
     this.dataStreamServerRpc = new DataStreamServerImpl(this, 
parameters).getServerRpc();
 
+    this.implExecutor = ConcurrentUtils.newSingleThreadExecutor(id + 
"-groupManagement");
     this.executor = ConcurrentUtils.newThreadPoolWithMax(
         RaftServerConfigKeys.ThreadPool.proxyCached(properties),
         RaftServerConfigKeys.ThreadPool.proxySize(properties),
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 7e49522fb..a7bbcab23 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -142,8 +142,7 @@ class SegmentedRaftLogWorker {
   private final DataBlockingQueue<Task> queue;
   private final WriteLogTasks writeTasks = new WriteLogTasks();
   private volatile boolean running = true;
-  private final Thread workerThread;
-
+  private final ExecutorService workerThreadExecutor;
   private final RaftStorage storage;
   private volatile SegmentedRaftLogOutputStream out;
   private final Runnable submitUpdateCommitEvent;
@@ -205,7 +204,7 @@ class SegmentedRaftLogWorker {
 
     this.stateMachineDataPolicy = new StateMachineDataPolicy(properties, 
metricRegistry);
 
-    this.workerThread = new Thread(this::run, name);
+    this.workerThreadExecutor = ConcurrentUtils.newSingleThreadExecutor(name);
 
     // Server Id can be null in unit tests
     metricRegistry.addDataQueueSizeGauge(queue);
@@ -228,7 +227,7 @@ class SegmentedRaftLogWorker {
           " and " + RaftServerConfigKeys.Log.ASYNC_FLUSH_ENABLED_KEY);
     }
     this.flushExecutor = (!asyncFlush && !unsafeFlush)? null
-        : 
Executors.newSingleThreadExecutor(ConcurrentUtils.newThreadFactory(name + 
"-flush"));
+        : ConcurrentUtils.newSingleThreadExecutor(name + "-flush");
   }
 
   void start(long latestIndex, long evictIndex, File openSegmentFile) throws 
IOException {
@@ -240,19 +239,15 @@ class SegmentedRaftLogWorker {
       Preconditions.assertTrue(openSegmentFile.exists());
       allocateSegmentedRaftLogOutputStream(openSegmentFile, true);
     }
-    workerThread.start();
+    workerThreadExecutor.submit(this::run);
   }
 
   void close() {
     this.running = false;
     sharedBuffer.set(null);
-    workerThread.interrupt();
     Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown);
-    try {
-      workerThread.join(3000);
-    } catch (InterruptedException ignored) {
-      Thread.currentThread().interrupt();
-    }
+    ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3),
+        workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + 
timeout, name));
     IOUtils.cleanup(LOG, out);
     LOG.info("{} close()", name);
   }
@@ -300,7 +295,7 @@ class SegmentedRaftLogWorker {
   }
 
   boolean isAlive() {
-    return running && workerThread.isAlive();
+    return running && !workerThreadExecutor.isTerminated();
   }
 
   private void run() {

Reply via email to