This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch cluster- in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9f71122d8adad47c5ebdfe1ed35a2c0acfcadb61 Author: xiangdong huang <[email protected]> AuthorDate: Thu Aug 12 00:11:26 2021 +0800 try to fix tests --- .../org/apache/iotdb/cluster/ClusterIoTDB.java | 27 ++++++++++----- .../iotdb/cluster/coordinator/Coordinator.java | 13 +++++--- .../apache/iotdb/cluster/log/LogDispatcher.java | 11 +++--- .../log/manage/PartitionedSnapshotLogManager.java | 6 ++-- .../iotdb/cluster/log/manage/RaftLogManager.java | 23 +++---------- .../apache/iotdb/cluster/metadata/CMManager.java | 3 ++ .../iotdb/cluster/query/ClusterPlanExecutor.java | 15 ++++++++- .../cluster/query/fill/ClusterPreviousFill.java | 4 ++- .../query/last/ClusterLastQueryExecutor.java | 5 +-- .../iotdb/cluster/server/ClusterRPCService.java | 26 +++++---------- .../cluster/server/PullSnapshotHintService.java | 4 +-- .../cluster/server/member/DataGroupMember.java | 16 +++++---- .../cluster/server/member/MetaGroupMember.java | 2 +- .../iotdb/cluster/server/member/RaftMember.java | 21 +++++------- .../cluster/server/raft/AbstractRaftService.java | 4 +-- .../server/service/DataGroupServiceImpls.java | 39 ++++++++++++++++++++-- .../cluster/server/service/MetaAsyncService.java | 31 +++++++++++++---- .../cluster/integration/BaseSingleNodeTest.java | 15 +++++---- .../cluster/server/member/MetaGroupMemberTest.java | 3 +- .../db/concurrent/IoTDBThreadPoolFactory.java | 28 ++++++++++++++++ .../org/apache/iotdb/db/service/ServiceType.java | 1 + 21 files changed, 192 insertions(+), 105 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java index 332c7e1..5913b80 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java @@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotStrategy; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.server.ClusterRPCService; +import org.apache.iotdb.cluster.server.ClusterTSServiceImpl; import org.apache.iotdb.cluster.server.HardLinkCleaner; import org.apache.iotdb.cluster.server.Response; import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer; @@ -47,16 +48,17 @@ import org.apache.iotdb.cluster.server.service.MetaAsyncService; import org.apache.iotdb.cluster.server.service.MetaSyncService; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBConfigCheck; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.service.JMXService; import org.apache.iotdb.db.service.RegisterManager; import org.apache.iotdb.db.service.thrift.ThriftServiceThread; import org.apache.iotdb.db.utils.TestOnly; - import org.apache.thrift.TException; import org.apache.thrift.async.TAsyncClientManager; import org.apache.thrift.protocol.TBinaryProtocol.Factory; @@ -68,7 +70,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -113,13 +114,13 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { * of all raft members in this node */ private ScheduledExecutorService reportThread = - Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "NodeReportThread")); + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread"); private boolean allowReport = true; /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */ private ScheduledExecutorService hardLinkCleanerThread = - Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "HardLinkCleaner")); + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner"); // currently, dataClientProvider is only used for those instances who do not belong to any // DataGroup.. @@ -127,6 +128,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { private DataClientProvider dataClientProvider; private ClusterIoTDB() { + // we do not init anything here, so that we can re-initialize the instance in IT. + } + + public void initLocalEngines() { ClusterConfig config = ClusterDescriptor.getInstance().getConfig(); thisNode = new Node(); // set internal rpc ip and ports @@ -137,9 +142,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { thisNode.setClientPort(config.getClusterRpcPort()); thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress()); coordinator = new Coordinator(); - } - - public void initLocalEngines() { // local engine TProtocolFactory protocolFactory = ThriftServiceThread.getProtocolFactory( @@ -256,6 +258,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { // TODO fixme it is better to remove coordinator out of metaGroupEngine registerManager.register(metaGroupEngine); + registerManager.register(dataGroupEngine); // rpc service initialize if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { @@ -291,8 +294,15 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { registerManager.register(ClusterMonitor.INSTANCE); // we must wait until the metaGroup established. // So that the ClusterRPCService can work. + ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl(); + clusterRPCServiceImpl.setCoordinator(coordinator); + clusterRPCServiceImpl.setExecutor(metaGroupEngine); + ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl); registerManager.register(ClusterRPCService.getInstance()); - } catch (StartupException | StartUpCheckFailureException | ConfigInconsistentException e) { + } catch (StartupException + | StartUpCheckFailureException + | ConfigInconsistentException + | QueryProcessException e) { logger.error("Fail to start server", e); stop(); } @@ -497,7 +507,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean { private void deactivate() { logger.info("Deactivating Cluster IoTDB..."); - // metaServer.stop(); stopThreadPools(); registerManager.deregisterAll(); JMXService.deregisterMBean(mbeanName); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java index 5066908..68518b1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java @@ -58,11 +58,11 @@ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; - import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,16 +92,19 @@ public class Coordinator { "The following errors occurred when executing " + "the query, please retry or contact the DBA: "; + @TestOnly public Coordinator(MetaGroupMember metaGroupMember) { - this.metaGroupMember = metaGroupMember; - this.name = metaGroupMember.getName(); - this.thisNode = metaGroupMember.getThisNode(); + linkMetaGroupMember(metaGroupMember); } public Coordinator() {} - public void setMetaGroupMember(MetaGroupMember metaGroupMember) { + public void linkMetaGroupMember(MetaGroupMember metaGroupMember) { this.metaGroupMember = metaGroupMember; + if (metaGroupMember.getCoordinator() != null && metaGroupMember.getCoordinator() != this) { + logger.warn("MetadataGroupMember linked inconsistent Coordinator, will correct it."); + metaGroupMember.setCoordinator(this); + } this.name = metaGroupMember.getName(); this.thisNode = metaGroupMember.getThisNode(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java index a600c4a..2ce4d76 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java @@ -30,11 +30,11 @@ import org.apache.iotdb.cluster.server.member.RaftMember; import org.apache.iotdb.cluster.server.monitor.Peer; import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.utils.ClientUtils; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.utils.TestOnly; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; @@ -46,7 +46,6 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,13 +68,13 @@ public class LogDispatcher { private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>(); private ExecutorService executorService; private static ExecutorService serializationService = - Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build()); + IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread( + Runtime.getRuntime().availableProcessors(), "DispatcherEncoder"); public LogDispatcher(RaftMember member) { this.member = member; - executorService = Executors.newCachedThreadPool(); + executorService = + IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName()); for (Node node : member.getAllNodes()) { if (!node.equals(member.getThisNode())) { nodeLogQueues.add(createQueueAndBindingThread(node)); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java index 20535fe..c2e3c1b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java @@ -33,7 +33,6 @@ import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +68,10 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends Node thisNode, SnapshotFactory<T> factory, DataGroupMember dataGroupMember) { - super(new SyncLogDequeSerializer(header.nodeIdentifier), logApplier, header.toString()); + super( + new SyncLogDequeSerializer(header.nodeIdentifier), + logApplier, + Integer.toString(header.getNodeIdentifier())); this.partitionTable = partitionTable; this.factory = factory; this.thisNode = thisNode; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index 322b541..be05af4 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.log.LogApplier; import org.apache.iotdb.cluster.log.Snapshot; import org.apache.iotdb.cluster.log.StableEntryManager; import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.utils.RamUsageEstimator; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +44,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; public abstract class RaftLogManager { @@ -140,19 +138,10 @@ public abstract class RaftLogManager { this.blockedUnappliedLogList = new CopyOnWriteArrayList<>(); this.deleteLogExecutorService = - new ScheduledThreadPoolExecutor( - 1, - new BasicThreadFactory.Builder() - .namingPattern("raft-log-delete-" + name) - .daemon(true) - .build()); + IoTDBThreadPoolFactory.newScheduledThreadPoolWithDaemon(1, "raft-log-delete-" + name); this.checkLogApplierExecutorService = - Executors.newSingleThreadExecutor( - new BasicThreadFactory.Builder() - .namingPattern("check-log-applier-" + name) - .daemon(true) - .build()); + IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name); /** deletion check period of the submitted log */ int logDeleteCheckIntervalSecond = @@ -763,11 +752,7 @@ public abstract class RaftLogManager { this.blockAppliedCommitIndex = -1; this.blockedUnappliedLogList = new CopyOnWriteArrayList<>(); this.checkLogApplierExecutorService = - Executors.newSingleThreadExecutor( - new BasicThreadFactory.Builder() - .namingPattern("check-log-applier-" + name) - .daemon(true) - .build()); + IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name); this.checkLogApplierFuture = checkLogApplierExecutorService.submit(this::checkAppliedLogIndex); for (int i = 0; i < logUpdateConditions.length; i++) { logUpdateConditions[i] = new Object(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index f6b401f..b4e920c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -1281,6 +1281,9 @@ public class CMManager extends MManager { List<PartialPath> originalPaths) { ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>(); ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>(); + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService getAllPathsService = Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size()); for (PartialPath pathStr : originalPaths) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java index 9884385..78892cdf 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java @@ -200,7 +200,9 @@ public class ClusterPlanExecutor extends PlanExecutor { if (groupPathMap.isEmpty()) { return result.get(); } - + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size()); List<Future<Void>> remoteFutures = new ArrayList<>(); // query each data group separately @@ -301,6 +303,10 @@ public class ClusterPlanExecutor extends PlanExecutor { throws MetadataException { ConcurrentSkipListSet<PartialPath> nodeSet = new ConcurrentSkipListSet<>(); + + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE); List<Future<Void>> futureList = new ArrayList<>(); @@ -319,6 +325,7 @@ public class ClusterPlanExecutor extends PlanExecutor { return null; })); } + // TODO seems there is a long-term block here. waitForThreadPool(futureList, pool, "getNodesList()"); return new ArrayList<>(nodeSet); } @@ -398,6 +405,9 @@ public class ClusterPlanExecutor extends PlanExecutor { protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException { ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>(); List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups(); + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE); List<Future<Void>> futureList = new ArrayList<>(); for (PartitionGroup group : globalGroups) { @@ -489,6 +499,9 @@ public class ClusterPlanExecutor extends PlanExecutor { @Override protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException { ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>(); + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE); List<Future<Void>> futureList = new ArrayList<>(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java index 585b841..933d145 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java @@ -121,7 +121,9 @@ public class ClusterPreviousFill extends PreviousFill { } CountDownLatch latch = new CountDownLatch(partitionGroups.size()); PreviousFillHandler handler = new PreviousFillHandler(latch); - + // TODO it is not suitable for register and deregister an Object to JMX to such a frequent + // function call. + // BUT is it suitable to create a thread pool for each calling?? ExecutorService fillService = Executors.newFixedThreadPool(partitionGroups.size()); PreviousFillArguments arguments = new PreviousFillArguments(path, dataType, queryTime, beforeRange, deviceMeasurements); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java index 3ed7837..b4e9d5d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java @@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.utils.ClusterQueryUtils; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; @@ -58,7 +59,6 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ClusterLastQueryExecutor extends LastQueryExecutor { @@ -67,7 +67,8 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor { private MetaGroupMember metaGroupMember; private static ExecutorService lastQueryPool = - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + IoTDBThreadPoolFactory.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), "ClusterLastQuery"); public ClusterLastQueryExecutor(LastQueryPlan lastQueryPlan, MetaGroupMember metaGroupMember) { super(lastQueryPlan); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java index b8a2c00..fa2a2c9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java @@ -20,12 +20,9 @@ package org.apache.iotdb.cluster.server; import org.apache.iotdb.cluster.config.ClusterDescriptor; -import org.apache.iotdb.cluster.coordinator.Coordinator; -import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.RPCServiceException; import org.apache.iotdb.db.service.RPCServiceThriftHandler; import org.apache.iotdb.db.service.ServiceType; @@ -50,12 +47,15 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic } @Override - public void initTProcessor() throws IllegalAccessException, InstantiationException { - try { - impl = new ClusterTSServiceImpl(); - initSyncedServiceImpl(null); - } catch (QueryProcessException e) { - throw new InstantiationException(e.getMessage()); + public void initSyncedServiceImpl(Object serviceImpl) { + impl = (ClusterTSServiceImpl) serviceImpl; + super.initSyncedServiceImpl(serviceImpl); + } + + @Override + public void initTProcessor() throws InstantiationException { + if (impl == null) { + throw new InstantiationException("ClusterTSServiceImpl is null"); } processor = new Processor<>(impl); } @@ -100,14 +100,6 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic return ClusterRPCServiceHolder.INSTANCE; } - public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException { - this.impl.setExecutor(member); - } - - public void assignCoordinator(Coordinator coordinator) { - this.impl.setCoordinator(coordinator); - } - private static class ClusterRPCServiceHolder { private static final ClusterRPCService INSTANCE = new ClusterRPCService(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java index c137027..ed99a45 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java @@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.server.member.DataGroupMember; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -56,7 +56,7 @@ public class PullSnapshotHintService { } public void start() { - this.service = Executors.newScheduledThreadPool(1); + this.service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "PullSnapshotHint"); this.service.scheduleAtFixedRate(this::sendHints, 0, 10, TimeUnit.MILLISECONDS); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index aaba50f..0b1abca 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -74,6 +74,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer; import org.apache.iotdb.cluster.server.monitor.Timer.Statistic; import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.cluster.utils.StatusUtils; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -94,7 +95,6 @@ import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.utils.Pair; - import org.apache.thrift.protocol.TProtocolFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,7 +118,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S; @@ -184,14 +183,15 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean } DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) { + // The name is used in JMX, so we have to avoid to use "(" "," "=" ")" super( - "Data(" + "Data-" + nodes.getHeader().getNode().getInternalIp() - + ":" + + "-" + nodes.getHeader().getNode().getDataPort() - + ", raftId=" + + "-raftId-" + nodes.getId() - + ")", + + "", new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)), new SyncClientPool(new SyncDataClient.FactorySync(factory)), new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)), @@ -235,7 +235,9 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean JMXService.registerMBean(this, mbeanName); super.start(); heartBeatService.submit(new DataHeartbeatThread(this)); - pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + pullSnapshotService = + IoTDBThreadPoolFactory.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), "pullSnapshot"); pullSnapshotHintService = new PullSnapshotHintService(this); pullSnapshotHintService.start(); resumePullSnapshotTasks(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index b3dcc7a..429459d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -96,7 +96,6 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.read.filter.basic.Filter; - import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TTransportException; @@ -258,6 +257,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe // try loading the partition table if there was a previous cluster this.coordinator = coordinator; + coordinator.linkMetaGroupMember(this); loadPartitionTable(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index d714540..3bd68ed 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -19,7 +19,6 @@ package org.apache.iotdb.cluster.server.member; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.client.async.AsyncClientPool; import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor; @@ -64,6 +63,7 @@ import org.apache.iotdb.cluster.utils.IOUtils; import org.apache.iotdb.cluster.utils.PlanSerializer; import org.apache.iotdb.cluster.utils.StatusUtils; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.concurrent.IoTThreadFactory; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.exception.BatchProcessException; import org.apache.iotdb.db.exception.IoTDBException; @@ -81,6 +81,7 @@ import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; + import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +101,6 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -289,16 +289,12 @@ public abstract class RaftMember implements RaftMemberMBean { } void startBackGroundThreads() { - heartBeatService = - Executors.newSingleThreadScheduledExecutor( - r -> new Thread(r, name + "-HeartbeatThread@" + System.currentTimeMillis())); - catchUpService = - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat(getName() + "-CatchUpThread%d").build()); + heartBeatService = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name + "-Heartbeat"); + + catchUpService = IoTDBThreadPoolFactory.newCachedThreadPool(name + "-CatchUp"); appendLogThreadPool = - Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 10, - new ThreadFactoryBuilder().setNameFormat(getName() + "-AppendLog%d").build()); + IoTDBThreadPoolFactory.newFixedThreadPool( + Runtime.getRuntime().availableProcessors() * 10, name + "-AppendLog"); if (!config.isUseAsyncServer()) { serialToParallelPool = IoTDBThreadPoolFactory.newThreadPool( @@ -307,7 +303,8 @@ public abstract class RaftMember implements RaftMemberMBean { 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat(getName() + "-SerialToParallel%d").build()); + new IoTThreadFactory(getName() + "-SerialToParallel"), + getName() + "-SerialToParallel"); } commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog"); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java index d62024b..2a13550 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java @@ -19,12 +19,12 @@ package org.apache.iotdb.cluster.server.raft; import org.apache.iotdb.cluster.config.ClusterDescriptor; -import org.apache.iotdb.cluster.rpc.thrift.TSMetaService; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.runtime.RPCServiceException; import org.apache.iotdb.db.service.thrift.ThriftService; import org.apache.iotdb.db.service.thrift.ThriftServiceThread; +import org.apache.thrift.TBaseAsyncProcessor; public abstract class AbstractRaftService extends ThriftService { @@ -36,7 +36,7 @@ public abstract class AbstractRaftService extends ThriftService { if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { thriftServiceThread = new ThriftServiceThread( - (TSMetaService.AsyncProcessor) processor, + (TBaseAsyncProcessor) processor, getID().getName(), clientThreadName, getBindIP(), diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java index b355766..5eec743 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java @@ -32,16 +32,39 @@ import org.apache.iotdb.cluster.partition.NodeRemovalResult; import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; -import org.apache.iotdb.cluster.rpc.thrift.*; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; +import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; +import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; +import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest; +import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult; +import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest; +import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; +import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; +import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest; +import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest; +import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; +import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; +import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; +import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; +import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse; +import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; +import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; +import org.apache.iotdb.cluster.rpc.thrift.TSDataService; import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.StoppedMemberManager; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport; import org.apache.iotdb.cluster.utils.IOUtils; +import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.service.rpc.thrift.TSStatus; - import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TProtocolFactory; @@ -62,7 +85,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class DataGroupServiceImpls - implements TSDataService.AsyncIface, TSDataService.Iface, DataGroupServiceImplsMBean { + implements TSDataService.AsyncIface, TSDataService.Iface, IService, DataGroupServiceImplsMBean { private static final Logger logger = LoggerFactory.getLogger(DataGroupServiceImpls.class); @@ -94,6 +117,11 @@ public class DataGroupServiceImpls this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory); } + @Override + public void start() throws StartupException { + // seems do nothing + } + // @Override // TODO public void stop() { @@ -103,6 +131,11 @@ public class DataGroupServiceImpls } } + @Override + public ServiceType getID() { + return ServiceType.CLUSTER_DATA_ENGINE; + } + /** * Add a DataGroupMember into this server, if a member with the same header exists, the old member * will be stopped and replaced by the new one. diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java index 4f547a3..8114425 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java @@ -38,7 +38,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.Response; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.utils.ClusterUtils; - import org.apache.thrift.TException; import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; @@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; public class MetaAsyncService extends BaseAsyncService implements TSMetaService.AsyncIface { - + private static final String ERROR_MSG_META_NOT_READY = "The metadata not is not ready."; private static final Logger logger = LoggerFactory.getLogger(MetaAsyncService.class); private MetaGroupMember metaGroupMember; @@ -59,11 +58,19 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService. @Override public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) { - if (metaGroupMember.getPartitionTable() == null) { - // this node lacks information of the cluster and refuse to work - logger.debug("This node is blind to the cluster and cannot accept logs"); - resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE); - return; + // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded + // locally, but the partition table is not verified), we do not handle the RPC requests. + if (!metaGroupMember.isReady()) { + // the only special case is that the leader will send an empty entry for letting followers + // submit previous log + // at this time, the partitionTable has been loaded but is not verified. So the PRC is not + // ready. + if (metaGroupMember.getPartitionTable() == null) { + // this node lacks information of the cluster and refuse to work + logger.debug("This node is blind to the cluster and cannot accept logs"); + resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE); + return; + } } super.appendEntry(request, resultHandler); @@ -72,6 +79,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService. @Override public void addNode( Node node, StartUpStatus startUpStatus, AsyncMethodCallback<AddNodeResponse> resultHandler) { + if (!metaGroupMember.isReady()) { + logger.debug(ERROR_MSG_META_NOT_READY); + resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY)); + return; + } AddNodeResponse addNodeResponse = null; try { addNodeResponse = metaGroupMember.addNode(node, startUpStatus); @@ -160,6 +172,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService. @Override public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) { + if (!metaGroupMember.isReady()) { + logger.debug(ERROR_MSG_META_NOT_READY); + resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY)); + return; + } long result; try { result = metaGroupMember.removeNode(node); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java index 541af9e..cedaf7d 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java @@ -19,13 +19,12 @@ package org.apache.iotdb.cluster.integration; +import org.apache.iotdb.cluster.ClusterIoTDB; import org.apache.iotdb.cluster.config.ClusterDescriptor; -import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.utils.Constants; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.session.Session; - import org.junit.After; import org.junit.Before; @@ -34,7 +33,7 @@ import java.util.List; public abstract class BaseSingleNodeTest { - private MetaGroupMember metaServer; + private ClusterIoTDB daemon; private boolean useAsyncServer; private List<String> seedNodeUrls; @@ -44,24 +43,26 @@ public abstract class BaseSingleNodeTest { @Before public void setUp() throws Exception { initConfigs(); - metaServer = new MetaGroupMember(); - metaServer.start(); - metaServer.buildCluster(); + daemon = ClusterIoTDB.getInstance(); + daemon.initLocalEngines(); + daemon.activeStartNodeMode(); } @After public void tearDown() throws Exception { // TODO fixme - metaServer.stop(); + daemon.stop(); recoverConfigs(); EnvironmentUtils.cleanEnv(); } private void initConfigs() { + // remember the original values useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(); seedNodeUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls(); replicaNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum(); autoCreateSchema = ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema(); + // set the cluster as a single node cluster. ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true); ClusterDescriptor.getInstance() .getConfig() diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 90884f2..73eb793 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -108,7 +108,6 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.ValueFilter; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; - import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol.Factory; @@ -538,7 +537,7 @@ public class MetaGroupMemberTest extends BaseMember { } } }; - metaGroupMember.getCoordinator().setMetaGroupMember(metaGroupMember); + metaGroupMember.getCoordinator().linkMetaGroupMember(metaGroupMember); metaGroupMember.setLeader(node); metaGroupMember.setAllNodes(allNodes); metaGroupMember.setCharacter(NodeCharacter.LEADER); diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java index 72dacc9..cdd31a3 100644 --- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.concurrent.threadpool.WrappedScheduledExecutorService import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadExecutorService; import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadScheduledExecutor; import org.apache.iotdb.db.concurrent.threadpool.WrappedThreadPoolExecutor; + import org.apache.thrift.server.TThreadPoolServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,19 @@ public class IoTDBThreadPoolFactory { poolName); } + public static ExecutorService newFixedThreadPoolWithDaemonThread(int nThreads, String poolName) { + logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads); + + return new WrappedThreadPoolExecutor( + nThreads, + nThreads, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new IoTDBDaemonThreadFactory(poolName), + poolName); + } + public static ExecutorService newFixedThreadPool( int nThreads, String poolName, Thread.UncaughtExceptionHandler handler) { logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads); @@ -89,6 +103,12 @@ public class IoTDBThreadPoolFactory { Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), poolName); } + public static ExecutorService newSingleThreadExecutorWithDaemon(String poolName) { + logger.info("new single thread pool: {}", poolName); + return new WrappedSingleThreadExecutorService( + Executors.newSingleThreadExecutor(new IoTDBDaemonThreadFactory(poolName)), poolName); + } + public static ExecutorService newSingleThreadExecutor( String poolName, Thread.UncaughtExceptionHandler handler) { logger.info("new single thread pool: {}", poolName); @@ -172,6 +192,14 @@ public class IoTDBThreadPoolFactory { Executors.newScheduledThreadPool(corePoolSize, new IoTThreadFactory(poolName)), poolName); } + public static ScheduledExecutorService newScheduledThreadPoolWithDaemon( + int corePoolSize, String poolName) { + logger.info("new scheduled thread pool: {}", poolName); + return new WrappedScheduledExecutorService( + Executors.newScheduledThreadPool(corePoolSize, new IoTDBDaemonThreadFactory(poolName)), + poolName); + } + public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, String poolName, Thread.UncaughtExceptionHandler handler) { logger.info("new scheduled thread pool: {}", poolName); diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index 34d892c..55e99bd 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -64,6 +64,7 @@ public enum ServiceType { CLUSTER_DATA_HEART_BEAT_RPC_SERVICE( "Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"), CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"), + CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"), ; private final String name;
