This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch manage_flush_pool in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit fd7f5b692adfb359342399397d4b84e294511eb0 Author: lta <[email protected]> AuthorDate: Wed Jul 24 12:26:23 2019 +0800 add cli query flush info --- .../org/apache/iotdb/db/conf/IoTDBConstant.java | 2 ++ .../apache/iotdb/db/engine/flush/FlushManager.java | 1 - .../db/engine/flush/pool/AbstractPoolManager.java | 5 +++- .../engine/flush/pool/FlushSubTaskPoolManager.java | 4 +-- .../java/org/apache/iotdb/db/service/IoTDB.java | 2 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 29 +++++++++++++++++++--- .../apache/iotdb/db/utils/EnvironmentUtils.java | 4 ++- 7 files changed, 38 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java index a4779af..383a350 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java @@ -57,6 +57,8 @@ public class IoTDBConstant { // for cluster, set read consistency level public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN = "set\\s+read.*level.*"; + public static final String SHOW_FLUSH_TASK_INFO = "show\\s+flush\\s+task\\s+info"; + public static final String ROLE = "Role"; public static final String USER = "User"; public static final String PRIVILEGE = "Privilege"; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java index f996c18..ff5136f 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java @@ -94,5 +94,4 @@ public class FlushManager implements IService { private static FlushManager instance = new FlushManager(); } - } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java index cd11ec1..d1a3dcc 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.iotdb.db.exception.StartupException; import org.slf4j.Logger; public abstract class AbstractPoolManager { @@ -70,6 +69,10 @@ public abstract class AbstractPoolManager { return ((ThreadPoolExecutor) pool).getQueue().size(); } + public int getTotalTasks() { + return getActiveCnt() + getWaitingTasksNumber(); + } + public int getCorePoolSize() { return ((ThreadPoolExecutor) pool).getCorePoolSize(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java index 448fb49..94eff1b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java @@ -55,7 +55,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager { this.pool = IoTDBThreadPoolFactory .newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName()); } - LOGGER.info("Flush encoding sub task manager started."); + LOGGER.info("Flush sub task manager started."); } @Override @@ -64,7 +64,7 @@ public class FlushSubTaskPoolManager extends AbstractPoolManager { close(); pool = null; } - LOGGER.info("Flush encoding sub task manager stopped"); + LOGGER.info("Flush sub task manager stopped"); } private static class InstanceHolder { diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 0c70887..1c9b2a0 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -91,7 +91,6 @@ public class IoTDB implements IoTDBMBean { } initMManager(); - registerManager.register(FlushManager.getInstance()); registerManager.register(StorageEngine.getInstance()); registerManager.register(MultiFileLogNodeManager.getInstance()); registerManager.register(JMXService.getInstance()); @@ -101,6 +100,7 @@ public class IoTDB implements IoTDBMBean { registerManager.register(Measurement.INSTANCE); registerManager.register(SyncServerManager.getInstance()); registerManager.register(TVListAllocator.getInstance()); + registerManager.register(FlushManager.getInstance()); JMXService.registerMBean(getInstance(), mbeanName); diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 06e857b..28b40de 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.cost.statistic.Measurement; import org.apache.iotdb.db.cost.statistic.Operation; import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager; import org.apache.iotdb.db.exception.ArgsErrorException; import org.apache.iotdb.db.exception.MetadataErrorException; import org.apache.iotdb.db.exception.PathErrorException; @@ -411,7 +412,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req) { long t1 = System.currentTimeMillis(); - String currStmt = null; List<Integer> result = new ArrayList<>(); try { if (!checkLogin()) { @@ -425,7 +425,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { for (String statement : statements) { long t2 = System.currentTimeMillis(); - currStmt = statement; isAllSuccessful = isAllSuccessful && executeStatementInBatch(statement, batchErrorMessage, result); Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2); @@ -491,8 +490,17 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, "ADMIN_COMMAND_SUCCESS"); } + if (execShowFlushInfo(statement)) { + String msg = String.format( + "There are %d flush tasks, %d flush tasks are in execution and %d flush tasks are waiting for execution.", + FlushTaskPoolManager.getInstance().getTotalTasks(), + FlushTaskPoolManager.getInstance().getActiveCnt(), + FlushTaskPoolManager.getInstance().getWaitingTasksNumber()); + return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS, msg); + } + if (execSetConsistencyLevel(statement)) { - return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS, + return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS, "Execute set consistency level successfully"); } @@ -516,6 +524,21 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { /** * Set consistency level */ + private boolean execShowFlushInfo(String statement) { + if (statement == null) { + return false; + } + statement = statement.toLowerCase().trim(); + if (Pattern.matches(IoTDBConstant.SHOW_FLUSH_TASK_INFO, statement)) { + return true; + } else { + return false; + } + } + + /** + * Set consistency level + */ private boolean execSetConsistencyLevel(String statement) throws SQLException { if (statement == null) { return false; diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java index 78a846b..3a6a255 100644 --- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java +++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache; import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache; +import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; @@ -160,11 +161,12 @@ public class EnvironmentUtils { } StorageEngine.getInstance().reset(); MultiFileLogNodeManager.getInstance().start(); + FlushManager.getInstance().start(); TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId(); TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID); } - private static void createAllDir() throws IOException { + private static void createAllDir() { // create sequential files for (String path : directoryManager.getAllSequenceFileFolders()) { createDir(path);
