This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_upgrade_master in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2ba311deace6f8b43cfd2f1b6c102e3d0bcc3d52 Author: qiaojialin <[email protected]> AuthorDate: Thu Apr 9 10:10:34 2020 +0800 start API service at last --- .../main/java/org/apache/iotdb/SessionExample.java | 2 +- .../org/apache/iotdb/db/concurrent/ThreadName.java | 4 +-- .../java/org/apache/iotdb/db/service/IoTDB.java | 29 +++++++++------ .../service/{JDBCService.java => RPCService.java} | 42 +++++++++++----------- ...{JDBCServiceMBean.java => RPCServiceMBean.java} | 4 +-- .../org/apache/iotdb/db/service/ServiceType.java | 2 +- 6 files changed, 45 insertions(+), 38 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 70b5411..9da6413 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -44,7 +44,7 @@ public class SessionExample { session = new Session("127.0.0.1", 6667, "root", "root"); session.open(false); - session.setStorageGroup("root.sg1"); +// session.setStorageGroup("root.sg1"); if (session.checkTimeseriesExists("root.sg1.d1.s1")) { session.createTimeseries("root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java index 926b554..1b56e02 100644 --- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java +++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java @@ -20,8 +20,8 @@ package org.apache.iotdb.db.concurrent; public enum ThreadName { METRICS_SERVICE("Metrics-ServerServiceImpl"), - JDBC_SERVICE("JDBC-ServerServiceImpl"), - JDBC_CLIENT("JDBC-Client"), + RPC_SERVICE("RPC-ServerServiceImpl"), + RPC_CLIENT("RPC-Client"), MERGE_SERVICE("Merge-ServerServiceImpl"), CLOSE_MERGE_SERVICE("Close-Merge-ServerServiceImpl"), CLOSE_MERGE_DAEMON("Close-Merge-Daemon-Thread"), diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index a0cbd48..7275d21 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -66,7 +66,9 @@ public class IoTDB implements IoTDBMBean { return; } try { - setUp(); + setUpServerService(); + setUpAPIService(); + logger.info("IoTDB is set up."); } catch (StartupException e) { logger.error("meet error while starting up.", e); deactivate(); @@ -76,17 +78,18 @@ public class IoTDB implements IoTDBMBean { logger.info("{} has started.", IoTDBConstant.GLOBAL_DB_NAME); } - private void setUp() throws StartupException { + + private void setUpServerService() throws StartupException { logger.info("Setting up IoTDB..."); Runtime.getRuntime().addShutdownHook(new IoTDBShutdownHook()); setUncaughtExceptionHandler(); initMManager(); + registerManager.register(StorageEngine.getInstance()); registerManager.register(JMXService.getInstance()); registerManager.register(FlushManager.getInstance()); registerManager.register(MultiFileLogNodeManager.getInstance()); - registerManager.register(JDBCService.getInstance()); registerManager.register(Monitor.getInstance()); registerManager.register(StatMonitor.getInstance()); registerManager.register(Measurement.INSTANCE); @@ -96,14 +99,8 @@ public class IoTDB implements IoTDBMBean { registerManager.register(UpgradeSevice.getINSTANCE()); registerManager.register(MergeManager.getINSTANCE()); registerManager.register(CacheHitRatioMonitor.getInstance()); - if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) { - registerManager.register(MetricsService.getInstance()); - } - if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) { - registerManager.register(MQTTService.getInstance()); - } JMXService.registerMBean(getInstance(), mbeanName); - registerManager.register(StorageEngine.getInstance()); + // When registering statMonitor, we should start recovering some statistics // with latest values stored @@ -111,10 +108,20 @@ public class IoTDB implements IoTDBMBean { if (IoTDBDescriptor.getInstance().getConfig().isEnableStatMonitor()) { StatMonitor.getInstance().recovery(); } + } - logger.info("IoTDB is set up."); + private void setUpAPIService() throws StartupException { + // start api services at last + if (IoTDBDescriptor.getInstance().getConfig().isEnableMetricService()) { + registerManager.register(MetricsService.getInstance()); + } + if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) { + registerManager.register(MQTTService.getInstance()); + } + registerManager.register(RPCService.getInstance()); } + private void deactivate() { logger.info("Deactivating IoTDB..."); registerManager.deregisterAll(); diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java similarity index 88% rename from server/src/main/java/org/apache/iotdb/db/service/JDBCService.java rename to server/src/main/java/org/apache/iotdb/db/service/RPCService.java index e4ae5ef..36a6737 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/JDBCService.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RPCService.java @@ -43,15 +43,15 @@ import org.slf4j.LoggerFactory; /** * A service to handle jdbc request from client. */ -public class JDBCService implements JDBCServiceMBean, IService { +public class RPCService implements RPCServiceMBean, IService { - private static final Logger logger = LoggerFactory.getLogger(JDBCService.class); + private static final Logger logger = LoggerFactory.getLogger(RPCService.class); private static final String STATUS_UP = "UP"; private static final String STATUS_DOWN = "DOWN"; private final String mbeanName = String .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE, getID().getJmxName()); - private Thread jdbcServiceThread; + private Thread rpcServiceThread; private TProtocolFactory protocolFactory; private Processor<TSIService.Iface> processor; private TThreadPoolServer.Args poolArgs; @@ -59,15 +59,15 @@ public class JDBCService implements JDBCServiceMBean, IService { private CountDownLatch startLatch; private CountDownLatch stopLatch; - private JDBCService() { + private RPCService() { } - public static final JDBCService getInstance() { - return JDBCServiceHolder.INSTANCE; + public static final RPCService getInstance() { + return RPCServiceHolder.INSTANCE; } @Override - public String getJDBCServiceStatus() { + public String getRPCServiceStatus() { // TODO debug log, will be deleted in production env if(startLatch == null) { logger.info("Start latch is null when getting status"); @@ -113,12 +113,12 @@ public class JDBCService implements JDBCServiceMBean, IService { @Override public ServiceType getID() { - return ServiceType.JDBC_SERVICE; + return ServiceType.RPC_SERVICE; } @Override public synchronized void startService() throws StartupException { - if (STATUS_UP.equals(getJDBCServiceStatus())) { + if (STATUS_UP.equals(getRPCServiceStatus())) { logger.info("{}: {} has been already running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); return; @@ -126,9 +126,9 @@ public class JDBCService implements JDBCServiceMBean, IService { logger.info("{}: start {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); try { reset(); - jdbcServiceThread = new JDBCServiceThread(startLatch, stopLatch); - jdbcServiceThread.setName(ThreadName.JDBC_SERVICE.getName()); - jdbcServiceThread.start(); + rpcServiceThread = new RPCServiceThread(startLatch, stopLatch); + rpcServiceThread.setName(ThreadName.RPC_SERVICE.getName()); + rpcServiceThread.start(); startLatch.await(); } catch (InterruptedException | ClassNotFoundException | IllegalAccessException | InstantiationException e) { @@ -154,13 +154,13 @@ public class JDBCService implements JDBCServiceMBean, IService { @Override public synchronized void stopService() { - if (STATUS_DOWN.equals(getJDBCServiceStatus())) { + if (STATUS_DOWN.equals(getRPCServiceStatus())) { logger.info("{}: {} isn't running now", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); return; } logger.info("{}: closing {}...", IoTDBConstant.GLOBAL_DB_NAME, this.getID().getName()); - if (jdbcServiceThread != null) { - ((JDBCServiceThread) jdbcServiceThread).close(); + if (rpcServiceThread != null) { + ((RPCServiceThread) rpcServiceThread).close(); } try { stopLatch.await(); @@ -172,22 +172,22 @@ public class JDBCService implements JDBCServiceMBean, IService { } } - private static class JDBCServiceHolder { + private static class RPCServiceHolder { - private static final JDBCService INSTANCE = new JDBCService(); + private static final RPCService INSTANCE = new RPCService(); - private JDBCServiceHolder() { + private RPCServiceHolder() { } } - private class JDBCServiceThread extends Thread { + private class RPCServiceThread extends Thread { private TServerSocket serverTransport; private TServer poolServer; private CountDownLatch threadStartLatch; private CountDownLatch threadStopLatch; - public JDBCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) + public RPCServiceThread(CountDownLatch threadStartLatch, CountDownLatch threadStopLatch) throws ClassNotFoundException, IllegalAccessException, InstantiationException { if(IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) { protocolFactory = new TCompactProtocol.Factory(); @@ -214,7 +214,7 @@ public class JDBCService implements JDBCServiceMBean, IService { .stopTimeoutVal( IoTDBDescriptor.getInstance().getConfig().getThriftServerAwaitTimeForStopService()); poolArgs.executorService = IoTDBThreadPoolFactory.createThriftRpcClientThreadPool(poolArgs, - ThreadName.JDBC_CLIENT.getName()); + ThreadName.RPC_CLIENT.getName()); poolArgs.processor(processor); poolArgs.protocolFactory(protocolFactory); poolServer = new TThreadPoolServer(poolArgs); diff --git a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java similarity index 93% rename from server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java rename to server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java index 4a5fee0..ef8d023 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/JDBCServiceMBean.java +++ b/server/src/main/java/org/apache/iotdb/db/service/RPCServiceMBean.java @@ -20,9 +20,9 @@ package org.apache.iotdb.db.service; import org.apache.iotdb.db.exception.StartupException; -public interface JDBCServiceMBean { +public interface RPCServiceMBean { - String getJDBCServiceStatus(); + String getRPCServiceStatus(); int getRPCPort(); diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index 256cccb..e1498bc 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -25,7 +25,7 @@ public enum ServiceType { STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""), JMX_SERVICE("JMX ServerService", "JMX ServerService"), METRICS_SERVICE("Metrics ServerService","MetricsService"), - JDBC_SERVICE("JDBC ServerService", "JDBCService"), + RPC_SERVICE("RPC ServerService", "RPCService"), MQTT_SERVICE("MQTTService", ""), MONITOR_SERVICE("Monitor ServerService", "Monitor"), STAT_MONITOR_SERVICE("Statistics ServerService", ""),
