This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch IOTDB-3007 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 10e7e258281d388004120232c6c6bea9dcaa07c8 Author: HTHou <[email protected]> AuthorDate: Tue Apr 26 09:52:15 2022 +0800 [IOTDB-3007] Separate DataNode and IoTDB --- .../apache/iotdb/commons/conf/IoTDBConstant.java | 2 + .../java/org/apache/iotdb/db/service/DataNode.java | 156 +++++++++++++++++++-- .../java/org/apache/iotdb/db/service/IoTDB.java | 8 +- 3 files changed, 154 insertions(+), 12 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index f6bad59128..07bb394ff1 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -25,6 +25,8 @@ public class IoTDBConstant { public static final String ENV_FILE_NAME = "iotdb-env"; public static final String IOTDB_CONF = "IOTDB_CONF"; public static final String GLOBAL_DB_NAME = "IoTDB"; + public static final String GLOBAL_DATANODE_NAME = "DataNode"; + // when running the program in IDE, we can not get the version info using // getImplementationVersion() public static final String VERSION = diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java index bedd697089..c80503ee6c 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -20,19 +20,43 @@ package org.apache.iotdb.db.service; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.exception.ConfigurationException; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.JMXService; import org.apache.iotdb.commons.service.RegisterManager; +import org.apache.iotdb.commons.service.StartupChecks; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp; import org.apache.iotdb.db.client.ConfigNodeClient; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConfigCheck; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor; import org.apache.iotdb.db.consensus.ConsensusImpl; +import org.apache.iotdb.db.engine.StorageEngineV2; +import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor; +import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; +import org.apache.iotdb.db.engine.cq.ContinuousQueryService; +import org.apache.iotdb.db.engine.flush.FlushManager; +import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.mpp.buffer.DataBlockService; +import org.apache.iotdb.db.mpp.schedule.FragmentInstanceScheduler; +import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager; +import org.apache.iotdb.db.protocol.rest.RestService; +import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService; +import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager; +import org.apache.iotdb.db.query.udf.service.UDFRegistrationService; +import org.apache.iotdb.db.service.basic.ServiceProvider; +import org.apache.iotdb.db.service.basic.StandaloneServiceProvider; +import org.apache.iotdb.db.service.metrics.MetricsService; +import org.apache.iotdb.db.service.thrift.impl.DataNodeTSIServiceImpl; +import org.apache.iotdb.db.sync.receiver.ReceiverService; +import org.apache.iotdb.db.sync.sender.service.SenderService; +import org.apache.iotdb.db.wal.WALManager; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -62,9 +86,8 @@ public class DataNode implements DataNodeMBean { // we do not init anything here, so that we can re-initialize the instance in IT. } - private final IoTDB iotdb = IoTDB.getInstance(); - - private final RegisterManager registerManager = new RegisterManager(); + private static final RegisterManager registerManager = new RegisterManager(); + public static ServiceProvider serviceProvider; // private IClientManager clientManager; @@ -172,7 +195,26 @@ public class DataNode implements DataNodeMBean { IoTDBDescriptor.getInstance().getConfig().setMppMode(true); IoTDBDescriptor.getInstance().getConfig().setClusterMode(true); // start iotdb server first - IoTDB.getInstance().active(); + StartupChecks checks = new StartupChecks().withDefaultTest(); + try { + checks.verify(); + } catch (StartupException e) { + // TODO: what are some checks + logger.error( + "{}: failed to start because some checks failed. ", + IoTDBConstant.GLOBAL_DATANODE_NAME, + e); + return; + } + try { + setUp(); + } catch (StartupException | QueryProcessException e) { + logger.error("meet error while starting up.", e); + deactivate(); + logger.error("{} exit", IoTDBConstant.GLOBAL_DATANODE_NAME); + return; + } + logger.info("{} has started.", IoTDBConstant.GLOBAL_DATANODE_NAME); try { // TODO: Start consensus layer in some where else @@ -190,18 +232,116 @@ public class DataNode implements DataNodeMBean { } } + private void setUp() throws StartupException, QueryProcessException { + logger.info("Setting up DataNode..."); + + Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook()); + setUncaughtExceptionHandler(); + initServiceProvider(); + registerManager.register(MetricsService.getInstance()); + logger.info("recover the schema..."); + initConfigManager(); + registerManager.register(new JMXService()); + registerManager.register(FlushManager.getInstance()); + registerManager.register(CacheHitRatioMonitor.getInstance()); + registerManager.register(CompactionTaskManager.getInstance()); + JMXService.registerMBean(getInstance(), mbeanName); + registerManager.register(WALManager.getInstance()); + + // in mpp mode we need to start some other services + registerManager.register(StorageEngineV2.getInstance()); + registerManager.register(DataBlockService.getInstance()); + registerManager.register(InternalService.getInstance()); + registerManager.register(FragmentInstanceScheduler.getInstance()); + IoTDBDescriptor.getInstance() + .getConfig() + .setRpcImplClassName(DataNodeTSIServiceImpl.class.getName()); + + registerManager.register(TemporaryQueryDataFileService.getInstance()); + registerManager.register(UDFClassLoaderManager.getInstance()); + registerManager.register(UDFRegistrationService.getInstance()); + registerManager.register(ReceiverService.getInstance()); + registerManager.register(MetricsService.getInstance()); + + // in cluster mode, RPC service is not enabled. + if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) { + registerManager.register(RPCService.getInstance()); + } + + initProtocols(); + + if (IoTDBDescriptor.getInstance().getConfig().isEnableInfluxDBRpcService()) { + InfluxDBMetaManager.getInstance().recover(); + } + + logger.info( + "DataNode is setting up, some storage groups may not be ready now, please wait several seconds..."); + + while (!StorageEngineV2.getInstance().isAllSgReady()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.warn("DataNode failed to set up.", e); + Thread.currentThread().interrupt(); + return; + } + } + + registerManager.register(SenderService.getInstance()); + registerManager.register(UpgradeSevice.getINSTANCE()); + // in mpp mode we temporarily don't start settle service because it uses StorageEngine directly + // in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode. + // registerManager.register(SettleService.getINSTANCE()); + registerManager.register(TriggerRegistrationService.getInstance()); + registerManager.register(ContinuousQueryService.getInstance()); + + // start reporter + MetricsService.getInstance().startAllReporter(); + + logger.info("Congratulation, DataNode is set up successfully. Now, enjoy yourself!"); + } + + private void initConfigManager() { + long time = System.currentTimeMillis(); + IoTDB.configManager.init(); + long end = System.currentTimeMillis() - time; + logger.info("spend {}ms to recover schema.", end); + logger.info( + "After initializing, sequence tsFile threshold is {}, unsequence tsFile threshold is {}", + IoTDBDescriptor.getInstance().getConfig().getSeqTsFileSize(), + IoTDBDescriptor.getInstance().getConfig().getUnSeqTsFileSize()); + } + public void stop() { deactivate(); } + private void initServiceProvider() throws QueryProcessException { + serviceProvider = new StandaloneServiceProvider(); + } + + public static void initProtocols() throws StartupException { + if (IoTDBDescriptor.getInstance().getConfig().isEnableInfluxDBRpcService()) { + registerManager.register(InfluxDBRPCService.getInstance()); + } + if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) { + registerManager.register(MQTTService.getInstance()); + } + if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) { + registerManager.register(RestService.getInstance()); + } + } + private void deactivate() { - logger.info("Deactivating data node..."); + logger.info("Deactivating DataNode..."); // stopThreadPools(); registerManager.deregisterAll(); JMXService.deregisterMBean(mbeanName); - logger.info("Data node is deactivated."); - // stop the iotdb kernel - iotdb.stop(); + logger.info("DataNode is deactivated."); + } + + private void setUncaughtExceptionHandler() { + Thread.setDefaultUncaughtExceptionHandler(new IoTDBDefaultThreadExceptionHandler()); } private static class DataNodeHolder { diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index b0324c3a1a..f9e1228893 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -173,7 +173,8 @@ public class IoTDB implements IoTDBMBean { initInfluxDBMManager(); } - logger.info("IoTDB is set up, now may some sgs are not ready, please wait several seconds..."); + logger.info( + "IoTDB is setting up, some storage groups may not be ready now, please wait several seconds..."); while (IoTDBDescriptor.getInstance().getConfig().isMppMode() ? !StorageEngineV2.getInstance().isAllSgReady() @@ -238,10 +239,9 @@ public class IoTDB implements IoTDBMBean { long end = System.currentTimeMillis() - time; logger.info("spend {}ms to recover schema.", end); logger.info( - "After initializing, sequence tsFile threshold is {}, unsequence tsFile threshold is {}, memtableSize is {}", + "After initializing, sequence tsFile threshold is {}, unsequence tsFile threshold is {}", IoTDBDescriptor.getInstance().getConfig().getSeqTsFileSize(), - IoTDBDescriptor.getInstance().getConfig().getUnSeqTsFileSize(), - IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold()); + IoTDBDescriptor.getInstance().getConfig().getUnSeqTsFileSize()); } @Override
