This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch reduce_serialization_pool in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b686e94118036dd2885da46d83893b7cdf5b5ac6 Author: jt <[email protected]> AuthorDate: Tue Jan 5 10:22:30 2021 +0800 make serialization pool in LogDispatcher static to reduece the number of pools --- .../java/org/apache/iotdb/cluster/log/LogDispatcher.java | 12 ++++++------ .../java/org/apache/iotdb/cluster/server/RaftServer.java | 7 ++++++- .../org/apache/iotdb/cluster/integration/SingleNodeTest.java | 4 +++- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java index a813fa4..bd0bd3f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java @@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.log; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.Future; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; @@ -53,8 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** - * A LogDispatcher servers a raft leader by queuing logs that the leader wants to send to the - * follower and send the logs in an ordered manner so that the followers will not wait for previous + * A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its + * followers and send the logs in an ordered manner so that the followers will not wait for previous * logs for too long. For example: if the leader send 3 logs, log1, log2, log3, concurrently to * follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3 * and log2 must halt until log1 reaches, as a result, the total delay may increase significantly. @@ -68,12 +69,13 @@ public class LogDispatcher { private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>(); private ExecutorService executorService; - private ExecutorService serializationService; + private static ExecutorService serializationService = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build()); public LogDispatcher(RaftMember member) { this.member = member; executorService = Executors.newCachedThreadPool(); - serializationService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); for (Node node : member.getAllNodes()) { if (!node.equals(member.getThisNode())) { nodeLogQueues.add(createQueueAndBindingThread(node)); @@ -85,8 +87,6 @@ public class LogDispatcher { public void close() throws InterruptedException { executorService.shutdownNow(); executorService.awaitTermination(10, TimeUnit.SECONDS); - serializationService.shutdownNow(); - serializationService.awaitTermination(10, TimeUnit.SECONDS); } public void offer(SendLogRequest log) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java index fe0cc63..2e925c1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server; +import java.util.ConcurrentModificationException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; @@ -144,7 +145,11 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService. return; } - poolServer.stop(); + try { + poolServer.stop(); + } catch (ConcurrentModificationException e) { + // ignore + } socket.close(); clientService.shutdownNow(); socket = null; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java index b1a30c9..5027ad4 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java @@ -49,7 +49,9 @@ public class SingleNodeTest extends BaseSingleNodeTest { @After public void tearDown() throws Exception { super.tearDown(); - session.close(); + if (session != null) { + session.close(); + } } @Test
