This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch manage_flush_pool
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/manage_flush_pool by this push:
new 3df5bff add cli query flush info
3df5bff is described below
commit 3df5bff8b9ab5eb5faccfa85492ff010f5ad13dd
Author: lta <[email protected]>
AuthorDate: Wed Jul 24 12:26:23 2019 +0800
add cli query flush info
---
server/iotdb/data/system/schema/system.properties | 3 +++
server/iotdb/data/system/users/root.profile | Bin 0 -> 50 bytes
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 2 ++
.../apache/iotdb/db/engine/flush/FlushManager.java | 1 -
.../db/engine/flush/pool/AbstractPoolManager.java | 5 +++-
.../engine/flush/pool/FlushSubTaskPoolManager.java | 4 +--
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 29 ++++++++++++++++++---
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 ++-
9 files changed, 41 insertions(+), 9 deletions(-)
diff --git a/server/iotdb/data/system/schema/system.properties
b/server/iotdb/data/system/schema/system.properties
new file mode 100644
index 0000000..9569a50
--- /dev/null
+++ b/server/iotdb/data/system/schema/system.properties
@@ -0,0 +1,3 @@
+#System properties:
+#Wed Jul 24 12:14:04 CST 2019
+timestamp_precision=ms
diff --git a/server/iotdb/data/system/users/root.profile
b/server/iotdb/data/system/users/root.profile
new file mode 100644
index 0000000..9d0b87a
Binary files /dev/null and b/server/iotdb/data/system/users/root.profile differ
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index a4779af..383a350 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -57,6 +57,8 @@ public class IoTDBConstant {
// for cluster, set read consistency level
public static final String SET_READ_CONSISTENCY_LEVEL_PATTERN =
"set\\s+read.*level.*";
+ public static final String SHOW_FLUSH_TASK_INFO =
"show\\s+flush\\s+task\\s+info";
+
public static final String ROLE = "Role";
public static final String USER = "User";
public static final String PRIVILEGE = "Privilege";
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index f996c18..ff5136f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -94,5 +94,4 @@ public class FlushManager implements IService {
private static FlushManager instance = new FlushManager();
}
-
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
index cd11ec1..d1a3dcc 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/AbstractPoolManager.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.db.exception.StartupException;
import org.slf4j.Logger;
public abstract class AbstractPoolManager {
@@ -70,6 +69,10 @@ public abstract class AbstractPoolManager {
return ((ThreadPoolExecutor) pool).getQueue().size();
}
+ public int getTotalTasks() {
+ return getActiveCnt() + getWaitingTasksNumber();
+ }
+
public int getCorePoolSize() {
return ((ThreadPoolExecutor) pool).getCorePoolSize();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
index 448fb49..94eff1b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/pool/FlushSubTaskPoolManager.java
@@ -55,7 +55,7 @@ public class FlushSubTaskPoolManager extends
AbstractPoolManager {
this.pool = IoTDBThreadPoolFactory
.newCachedThreadPool(ThreadName.FLUSH_SUB_TASK_SERVICE.getName());
}
- LOGGER.info("Flush encoding sub task manager started.");
+ LOGGER.info("Flush sub task manager started.");
}
@Override
@@ -64,7 +64,7 @@ public class FlushSubTaskPoolManager extends
AbstractPoolManager {
close();
pool = null;
}
- LOGGER.info("Flush encoding sub task manager stopped");
+ LOGGER.info("Flush sub task manager stopped");
}
private static class InstanceHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 0c70887..1c9b2a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -91,7 +91,6 @@ public class IoTDB implements IoTDBMBean {
}
initMManager();
- registerManager.register(FlushManager.getInstance());
registerManager.register(StorageEngine.getInstance());
registerManager.register(MultiFileLogNodeManager.getInstance());
registerManager.register(JMXService.getInstance());
@@ -101,6 +100,7 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(Measurement.INSTANCE);
registerManager.register(SyncServerManager.getInstance());
registerManager.register(TVListAllocator.getInstance());
+ registerManager.register(FlushManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 06e857b..28b40de 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.exception.ArgsErrorException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -411,7 +412,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
@Override
public TSExecuteBatchStatementResp
executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
- String currStmt = null;
List<Integer> result = new ArrayList<>();
try {
if (!checkLogin()) {
@@ -425,7 +425,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
for (String statement : statements) {
long t2 = System.currentTimeMillis();
- currStmt = statement;
isAllSuccessful =
isAllSuccessful && executeStatementInBatch(statement,
batchErrorMessage, result);
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH,
t2);
@@ -491,8 +490,17 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
"ADMIN_COMMAND_SUCCESS");
}
+ if (execShowFlushInfo(statement)) {
+ String msg = String.format(
+ "There are %d flush tasks, %d flush tasks are in execution and %d
flush tasks are waiting for execution.",
+ FlushTaskPoolManager.getInstance().getTotalTasks(),
+ FlushTaskPoolManager.getInstance().getActiveCnt(),
+ FlushTaskPoolManager.getInstance().getWaitingTasksNumber());
+ return
getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS, msg);
+ }
+
if (execSetConsistencyLevel(statement)) {
- return getTSExecuteStatementResp(TS_StatusCode.SUCCESS_STATUS,
+ return
getTSExecuteStatementResp(TS_StatusCode.SUCCESS_WITH_INFO_STATUS,
"Execute set consistency level successfully");
}
@@ -516,6 +524,21 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
/**
* Set consistency level
*/
+ private boolean execShowFlushInfo(String statement) {
+ if (statement == null) {
+ return false;
+ }
+ statement = statement.toLowerCase().trim();
+ if (Pattern.matches(IoTDBConstant.SHOW_FLUSH_TASK_INFO, statement)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Set consistency level
+ */
private boolean execSetConsistencyLevel(String statement) throws
SQLException {
if (statement == null) {
return false;
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 78a846b..3a6a255 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
+import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.metadata.MManager;
@@ -160,11 +161,12 @@ public class EnvironmentUtils {
}
StorageEngine.getInstance().reset();
MultiFileLogNodeManager.getInstance().start();
+ FlushManager.getInstance().start();
TEST_QUERY_JOB_ID = QueryResourceManager.getInstance().assignJobId();
TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
}
- private static void createAllDir() throws IOException {
+ private static void createAllDir() {
// create sequential files
for (String path : directoryManager.getAllSequenceFileFolders()) {
createDir(path);