This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch reduce_serialization_pool
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b686e94118036dd2885da46d83893b7cdf5b5ac6
Author: jt <[email protected]>
AuthorDate: Tue Jan 5 10:22:30 2021 +0800

    make serialization pool in LogDispatcher static to reduece the number of 
pools
---
 .../java/org/apache/iotdb/cluster/log/LogDispatcher.java     | 12 ++++++------
 .../java/org/apache/iotdb/cluster/server/RaftServer.java     |  7 ++++++-
 .../org/apache/iotdb/cluster/integration/SingleNodeTest.java |  4 +++-
 3 files changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a813fa4..bd0bd3f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.cluster.log;
 
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.Future;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@ -53,8 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * A LogDispatcher servers a raft leader by queuing logs that the leader wants 
to send to the
- * follower and send the logs in an ordered manner so that the followers will 
not wait for previous
+ * A LogDispatcher serves a raft leader by queuing logs that the leader wants 
to send to its
+ * followers and send the logs in an ordered manner so that the followers will 
not wait for previous
  * logs for too long. For example: if the leader send 3 logs, log1, log2, 
log3, concurrently to
  * follower A, the actual reach order may be log3, log2, and log1. According 
to the protocol, log3
  * and log2 must halt until log1 reaches, as a result, the total delay may 
increase significantly.
@@ -68,12 +69,13 @@ public class LogDispatcher {
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues =
       new ArrayList<>();
   private ExecutorService executorService;
-  private ExecutorService serializationService;
+  private static ExecutorService serializationService =
+      Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
     executorService = Executors.newCachedThreadPool();
-    serializationService = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
     for (Node node : member.getAllNodes()) {
       if (!node.equals(member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
@@ -85,8 +87,6 @@ public class LogDispatcher {
   public void close() throws InterruptedException {
     executorService.shutdownNow();
     executorService.awaitTermination(10, TimeUnit.SECONDS);
-    serializationService.shutdownNow();
-    serializationService.awaitTermination(10, TimeUnit.SECONDS);
   }
 
   public void offer(SendLogRequest log) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index fe0cc63..2e925c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.cluster.server;
 
+import java.util.ConcurrentModificationException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.SynchronousQueue;
@@ -144,7 +145,11 @@ public abstract class RaftServer implements 
RaftService.AsyncIface, RaftService.
       return;
     }
 
-    poolServer.stop();
+    try {
+      poolServer.stop();
+    } catch (ConcurrentModificationException e) {
+      // ignore
+    }
     socket.close();
     clientService.shutdownNow();
     socket = null;
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
index b1a30c9..5027ad4 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
@@ -49,7 +49,9 @@ public class SingleNodeTest extends BaseSingleNodeTest {
   @After
   public void tearDown() throws Exception {
     super.tearDown();
-    session.close();
+    if (session != null) {
+      session.close();
+    }
   }
 
   @Test

Reply via email to