This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c492b1 [IOTDB-640] Move flush and merge to anltr (Enable flush and
merge in Session) (#1143)
5c492b1 is described below
commit 5c492b14ad5c65bd4067d6d11d927db8221a088c
Author: Boris <[email protected]>
AuthorDate: Wed May 6 19:43:20 2020 +0800
[IOTDB-640] Move flush and merge to anltr (Enable flush and merge in
Session) (#1143)
* Move flush and merge to anltr (Enable flush and merge in Session)
---
.../1-DDL Data Definition Language.md | 31 ++++++++++
.../1-DDL Data Definition Language.md | 24 ++++++++
.../org/apache/iotdb/db/qp/strategy/SqlBase.g4 | 7 ++-
.../org/apache/iotdb/db/mqtt/PublishHandler.java | 7 ++-
.../main/java/org/apache/iotdb/db/qp/Planner.java | 2 +
.../apache/iotdb/db/qp/constant/SQLConstant.java | 4 ++
.../apache/iotdb/db/qp/executor/IPlanExecutor.java | 4 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 47 ++++++++++++++-
.../org/apache/iotdb/db/qp/logical/Operator.java | 2 +-
.../apache/iotdb/db/qp/logical/RootOperator.java | 1 -
.../{RootOperator.java => sys/FlushOperator.java} | 37 +++++++++---
.../{RootOperator.java => sys/MergeOperator.java} | 16 +++---
.../sys/FlushPlan.java} | 30 +++++++---
.../sys/MergePlan.java} | 26 ++++++---
.../iotdb/db/qp/strategy/LogicalGenerator.java | 36 ++++++++++++
.../iotdb/db/qp/strategy/PhysicalGenerator.java | 9 +++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 66 +---------------------
.../org/apache/iotdb/session/IoTDBSessionIT.java | 11 ++--
18 files changed, 247 insertions(+), 113 deletions(-)
diff --git a/docs/UserGuide/5-Operation Manual/1-DDL Data Definition
Language.md b/docs/UserGuide/5-Operation Manual/1-DDL Data Definition
Language.md
index d1f9267..a281094 100644
--- a/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md
+++ b/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md
@@ -200,6 +200,7 @@ Example:
## Count Timeseries
IoTDB is able to use `COUNT TIMESERIES <Path>` to count the number of
timeseries in the path. SQL statements are as follows:
+
```
IoTDB > COUNT TIMESERIES root
IoTDB > COUNT TIMESERIES root.ln
@@ -232,6 +233,7 @@ You will get following results:
## Count Nodes
IoTDB is able to use `COUNT NODES <Path> LEVEL=<INTEGER>` to count the number
of nodes at the given level in current Metadata Tree. This could be used to
query the number of devices. The usage are as follows:
+
```
IoTDB > COUNT NODES root LEVEL=2
IoTDB > COUNT NODES root.ln LEVEL=2
@@ -249,6 +251,7 @@ As for the above mentioned example and Metadata tree, you
can get following resu
To delete the timeseries we created before, we are able to use `DELETE
TimeSeries <PrefixPath>` statement.
The usage are as follows:
+
```
IoTDB> delete timeseries root.ln.wf01.wt01.status
IoTDB> delete timeseries root.ln.wf01.wt01.temperature,
root.ln.wf02.wt02.hardware
@@ -261,7 +264,9 @@ Similar to `Show Timeseries`, IoTDB also supports two ways
of viewing devices:
* `SHOW DEVICES` statement presents all devices information, which is equal to
`SHOW DEVICES root`.
* `SHOW DEVICES <PrefixPath>` statement specifies the `PrefixPath` and returns
the devices information under the given level.
+
SQL statement is as follows:
+
```
IoTDB> show devices
IoTDB> show devices root.ln
@@ -274,17 +279,43 @@ IoTDB supports storage-level TTL settings, which means it
is able to delete old
## Set TTL
The SQL Statement for setting TTL is as follow:
+
```
IoTDB> set ttl to root.ln 3600000
```
+
This example means that for data in `root.ln`, only that of the latest 1 hour
will remain, the older one is removed or made invisible.
## Unset TTL
To unset TTL, we can use follwing SQL statement:
+
```
IoTDB> unset ttl to root.ln
```
+
After unset TTL, all data will be accepted in `root.ln`
+## FLUSH
+
+Persist all the data points in the memory table of the storage group to the
disk, and seal the data file.
+
+```
+IoTDB> FLUSH
+IoTDB> FLUSH root.ln
+IoTDB> FLUSH root.sg1,root.sg2
+```
+
+## MERGE
+
+Merge sequence and unsequence data. Currently IoTDB supports the following two
types of SQL to manually trigger the merge process of data files:
+
+* `MERGE` Only rewrite overlapped Chunks, the merge speed is quick, while
there will be redundant data on the disk eventually.
+* `FULL MERGE` Rewrite all data in overlapped files, the merge speed is slow,
but there will be no redundant data on the disk eventually.
+
+```
+IoTDB> MERGE
+IoTDB> FULL MERGE
+```
+
diff --git a/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition
Language.md b/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition
Language.md
index 7e6c3bc..eedc259 100644
--- a/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md
+++ b/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md
@@ -281,9 +281,33 @@ IoTDB> set ttl to root.ln 3600000
## 取消 TTL
取消TTL的SQL语句如下所示:
+
```
IoTDB> unset ttl to root.ln
```
+
取消设置TTL后,存储组`root.ln`中所有的数据都会被保存。
+## FLUSH
+
+将指定存储组的内存缓存区Memory Table的数据持久化到磁盘上,并将数据文件封口。
+
+```
+IoTDB> FLUSH
+IoTDB> FLUSH root.ln
+IoTDB> FLUSH root.sg1,root.sg2
+```
+
+## MERGE
+
+合并顺序和乱序数据。当前IoTDB支持使用如下两种SQL手动触发数据文件的合并:
+
+* `MERGE` 仅重写重复的Chunk,整理速度快,但是最终磁盘会存在多余数据。
+* `FULL MERGE` 将需要合并的顺序和乱序文件的所有数据都重新写一份,整理速度慢,最终磁盘将不存在无用的数据。
+
+```
+IoTDB> MERGE
+IoTDB> FULL MERGE
+```
+
diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index c181d38..e75f4d1 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -36,7 +36,9 @@ statement
| DESCRIBE prefixPath #describePath // not support yet
| CREATE INDEX ON fullPath USING function=ID indexWithClause? whereClause?
#createIndex //not support yet
| DROP INDEX function=ID ON fullPath #dropIndex //not support yet
- | MERGE #merge //not support yet
+ | MERGE #merge
+ | FLUSH prefixPath? (COMMA prefixPath)* (ID)?#flush //ID is true or false
+ | FULL MERGE #fullMerge
| CREATE USER userName=ID password=STRING_LITERAL #createUser
| ALTER USER userName=(ROOT|ID) SET PASSWORD password=STRING_LITERAL
#alterUser
| DROP USER userName=ID #dropUser
@@ -817,6 +819,9 @@ RENAME
: R E N A M E
;
+FULL
+ : F U L L
+ ;
//============================
// End of the keywords list
//============================
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index 57cb3aa..fcbaef2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.executor.IPlanExecutor;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
@@ -92,7 +94,7 @@ public class PublishHandler extends AbstractInterceptHandler {
boolean status;
try {
status = executeNonQuery(plan);
- } catch (QueryProcessException e) {
+ } catch (QueryProcessException | StorageGroupNotSetException |
StorageEngineException e ) {
throw new RuntimeException(e);
}
@@ -100,7 +102,8 @@ public class PublishHandler extends
AbstractInterceptHandler {
}
}
- private boolean executeNonQuery(PhysicalPlan plan) throws
QueryProcessException {
+ private boolean executeNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new QueryProcessException(
"Current system mode is read-only, does not support
non-query operation");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 15a3a14..fce62a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -95,6 +95,8 @@ public class Planner {
case LOAD_FILES:
case REMOVE_FILE:
case MOVE_FILE:
+ case FLUSH:
+ case MERGE:
return operator;
case QUERY:
case UPDATE:
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index a3771a8..e7258a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -141,6 +141,10 @@ public class SQLConstant {
public static final int TOK_METADATA_ALTER = 80;
+ public static final int TOK_FLUSH = 81;
+ public static final int TOK_MERGE = 82;
+ public static final int TOK_FULL_MERGE = 83;
+
public static final Map<Integer, String> tokenSymbol = new HashMap<>();
public static final Map<Integer, String> tokenNames = new HashMap<>();
public static final Map<Integer, Integer> reverseWords = new HashMap<>();
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index addf757..7e70ed2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.sql.SQLException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -51,7 +52,8 @@ public interface IPlanExecutor {
*
* @param plan Physical Non-Query Plan
*/
- boolean processNonQuery(PhysicalPlan plan) throws QueryProcessException;
+ boolean processNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException;
/**
* execute update command and return whether the operator is successful.
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 92f363e..92b71e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -69,12 +69,14 @@ import
org.apache.iotdb.db.exception.metadata.DeleteFailedException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.mnode.InternalMNode;
import org.apache.iotdb.db.metadata.mnode.LeafMNode;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -97,6 +99,8 @@ import
org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
@@ -173,7 +177,8 @@ public class PlanExecutor implements IPlanExecutor {
}
@Override
- public boolean processNonQuery(PhysicalPlan plan) throws
QueryProcessException {
+ public boolean processNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
switch (plan.getOperatorType()) {
case DELETE:
delete((DeletePlan) plan);
@@ -229,12 +234,47 @@ public class PlanExecutor implements IPlanExecutor {
case MOVE_FILE:
operateMoveFile((OperateFilePlan) plan);
return true;
+ case FLUSH:
+ operateFlush((FlushPlan) plan);
+ return true;
+ case MERGE:
+ operateMerge((MergePlan) plan);
+ return true;
+ case FULL_MERGE:
+ operateMerge((MergePlan) plan);
+ return true;
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported",
plan.getOperatorType()));
}
}
+ private void operateMerge(MergePlan plan) throws StorageEngineException {
+ if(plan.getOperatorType() == OperatorType.FULL_MERGE) {
+ StorageEngine.getInstance().mergeAll(true);
+ } else {
+ StorageEngine.getInstance()
+
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+ }
+ }
+
+ private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException
{
+ if(plan.getPaths() == null) {
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ } else {
+ if(plan.isSeq() == null) {
+ for (Path storageGroup : plan.getPaths()) {
+
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), true);
+
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), false);
+ }
+ } else {
+ for (Path storageGroup : plan.getPaths()) {
+
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(),
plan.isSeq());
+ }
+ }
+ }
+ }
+
protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext
context)
throws StorageEngineException, QueryFilterOptimizationException,
QueryProcessException,
IOException {
@@ -1035,7 +1075,7 @@ public class PlanExecutor implements IPlanExecutor {
if (!failedNames.isEmpty()) {
throw new DeleteFailedException(String.join(",", failedNames));
}
- } catch (MetadataException e) {
+ } catch (MetadataException | StorageEngineException e) {
throw new QueryProcessException(e);
}
return true;
@@ -1113,7 +1153,8 @@ public class PlanExecutor implements IPlanExecutor {
*
* @param pathList deleted paths
*/
- private void deleteDataOfTimeSeries(List<Path> pathList) throws
QueryProcessException {
+ private void deleteDataOfTimeSeries(List<Path> pathList)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
for (Path p : pathList) {
DeletePlan deletePlan = new DeletePlan();
deletePlan.addPath(p);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 37c1d10..50ccba6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -75,6 +75,6 @@ public abstract class Operator {
LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS,
GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING,
TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES,
REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
- ALTER_TIMESERIES
+ ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
index 98da176..adfa8c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
@@ -27,5 +27,4 @@ public abstract class RootOperator extends Operator {
public RootOperator(int tokenIntType) {
super(tokenIntType);
}
-
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/FlushOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to
server/src/main/java/org/apache/iotdb/db/qp/logical/sys/FlushOperator.java
index 98da176..ef9c370 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/FlushOperator.java
@@ -16,16 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.qp.logical;
+package org.apache.iotdb.db.qp.logical.sys;
-/**
- * RootOperator indicates the operator that could be executed as a entire
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.RootOperator;
+import org.apache.iotdb.tsfile.read.common.Path;
- public RootOperator(int tokenIntType) {
- super(tokenIntType);
+public class FlushOperator extends RootOperator {
+
+ public List<Path> getStorageGroupList() {
+ return storageGroupList;
}
+ public void setStorageGroupList(
+ List<Path> storageGroupList) {
+ this.storageGroupList = storageGroupList;
+ }
+
+ private List<Path> storageGroupList;
+
+ public Boolean isSeq() {
+ return isSeq;
+ }
+
+ public void setSeq(boolean seq) {
+ isSeq = seq;
+ }
+
+ private Boolean isSeq;
+
+ public FlushOperator(int tokenIntType) {
+ super(tokenIntType);
+ operatorType = OperatorType.FLUSH;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/MergeOperator.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to
server/src/main/java/org/apache/iotdb/db/qp/logical/sys/MergeOperator.java
index 98da176..16b9a5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/MergeOperator.java
@@ -1,3 +1,7 @@
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.qp.logical.RootOperator;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -16,16 +20,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.qp.logical;
+public class MergeOperator extends RootOperator {
-/**
- * RootOperator indicates the operator that could be executed as a entire
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
-
- public RootOperator(int tokenIntType) {
+ public MergeOperator(int tokenIntType) {
super(tokenIntType);
+ operatorType = OperatorType.MERGE;
}
-
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
index 98da176..d21c26e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
@@ -16,16 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.qp.logical;
+package org.apache.iotdb.db.qp.physical.sys;
-/**
- * RootOperator indicates the operator that could be executed as a entire
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class FlushPlan extends PhysicalPlan {
+ private List<Path> storeGroups;
+
+ public Boolean isSeq() {
+ return isSeq;
+ }
+
+ private Boolean isSeq;
- public RootOperator(int tokenIntType) {
- super(tokenIntType);
+ public FlushPlan(Boolean isSeq, List<Path> storeGroups) {
+ super(false, OperatorType.FLUSH);
+ this.storeGroups = storeGroups;
+ this.isSeq = isSeq;
}
+ @Override
+ public List<Path> getPaths() {
+ return storeGroups;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MergePlan.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MergePlan.java
index 98da176..d443002 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MergePlan.java
@@ -16,16 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.qp.logical;
+package org.apache.iotdb.db.qp.physical.sys;
-/**
- * RootOperator indicates the operator that could be executed as a entire
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class MergePlan extends PhysicalPlan {
- public RootOperator(int tokenIntType) {
- super(tokenIntType);
+ public MergePlan(OperatorType operatorType) {
+ super(false, operatorType);
}
+ public MergePlan() {
+ super(false, OperatorType.MERGE);
+ }
+
+ @Override
+ public List<Path> getPaths() {
+ return new ArrayList<>();
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 561ffe0..8647ab2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.strategy;
+import org.antlr.v4.runtime.misc.ParseCancellationException;
import org.antlr.v4.runtime.tree.TerminalNode;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.runtime.SQLParserException;
@@ -81,6 +82,41 @@ public class LogicalGenerator extends SqlBaseBaseListener {
}
@Override
+ public void enterFlush(FlushContext ctx) {
+ super.enterFlush(ctx);
+ FlushOperator flushOperator = new FlushOperator(SQLConstant.TOK_FLUSH);
+ if(ctx.ID() != null) {
+ if(ctx.ID().getText().equalsIgnoreCase("true")
+ || ctx.ID().getText().equalsIgnoreCase("false")) {
+ flushOperator.setSeq(Boolean.parseBoolean(ctx.ID().getText()));
+ } else {
+ throw new ParseCancellationException("Should be true or false");
+ }
+ }
+ if(ctx.prefixPath(0) != null) {
+ List<Path> storageGroups = new ArrayList<>();
+ for(PrefixPathContext prefixPathContext : ctx.prefixPath()) {
+ storageGroups.add(parsePrefixPath(prefixPathContext));
+ }
+ flushOperator.setStorageGroupList(storageGroups);
+ }
+
+ initializedOperator = flushOperator;
+ }
+
+ @Override
+ public void enterMerge(MergeContext ctx) {
+ super.enterMerge(ctx);
+ initializedOperator = new MergeOperator(SQLConstant.TOK_MERGE);
+ }
+
+ @Override
+ public void enterFullMerge(FullMergeContext ctx) {
+ super.enterFullMerge(ctx);
+ initializedOperator = new MergeOperator(SQLConstant.TOK_FULL_MERGE);
+ }
+
+ @Override
public void enterCountNodes(CountNodesContext ctx) {
super.enterCountNodes(ctx);
initializedOperator = new CountOperator(SQLConstant.TOK_COUNT_NODES,
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index d64413a..7f97a8a 100644
---
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -126,6 +126,15 @@ public class PhysicalGenerator {
insert.getTime(),
insert.getMeasurementList(),
insert.getValueList());
+ case MERGE:
+ if(operator.getTokenIntType() == SQLConstant.TOK_FULL_MERGE) {
+ return new MergePlan(OperatorType.FULL_MERGE);
+ } else {
+ return new MergePlan();
+ }
+ case FLUSH:
+ FlushOperator flushOperator = (FlushOperator) operator;
+ return new FlushPlan(flushOperator.isSeq(),
flushOperator.getStorageGroupList());
case QUERY:
QueryOperator query = (QueryOperator) operator;
return transformQuery(query);
diff --git
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b774ba3..2caef9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -322,60 +321,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return MManager.getInstance().getAllTimeseriesName(path);
}
- /**
- * Judge whether the statement is ADMIN COMMAND and if true, execute it.
- *
- * @param statement command
- * @return true if the statement is ADMIN COMMAND
- */
- private boolean execAdminCommand(String statement, long sessionId) throws
StorageEngineException {
- if (!"root".equals(sessionIdUsernameMap.get(sessionId))) {
- return false;
- }
- if (statement == null) {
- return false;
- }
- statement = statement.toLowerCase();
- if (statement.startsWith("flush")) {
- try {
- execFlush(statement);
- } catch (StorageGroupNotSetException e) {
- throw new StorageEngineException(e);
- }
- return true;
- }
- switch (statement) {
- case "merge":
- StorageEngine.getInstance()
-
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
- return true;
- case "full merge":
- StorageEngine.getInstance().mergeAll(true);
- return true;
- default:
- return false;
- }
- }
-
- private void execFlush(String statement) throws StorageGroupNotSetException {
- String[] args = statement.split("\\s+");
- if (args.length == 1) {
- StorageEngine.getInstance().syncCloseAllProcessor();
- } else if (args.length == 2) {
- String[] storageGroups = args[1].split(",");
- for (String storageGroup : storageGroups) {
- StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
- StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
- }
- } else {
- String[] storageGroups = args[1].split(",");
- boolean isSeq = Boolean.parseBoolean(args[2]);
- for (String storageGroup : storageGroups) {
- StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
- }
- }
- }
-
@Override
public TSExecuteBatchStatementResp
executeBatchStatement(TSExecuteBatchStatementReq req) {
long t1 = System.currentTimeMillis();
@@ -465,10 +410,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
}
String statement = req.getStatement();
- if (execAdminCommand(statement, req.getSessionId())) {
- return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS,
"ADMIN_COMMAND_SUCCESS"));
- }
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement,
sessionIdZoneIdMap.get(req.getSessionId()));
if (physicalPlan.isQuery()) {
@@ -489,10 +430,6 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return RpcUtils.getTSExecuteStatementResp(
RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR,
"Meet error in query process: " + e.getMessage()));
- } catch (StorageEngineException e) {
- logger.info(ERROR_PARSING_SQL, e.getMessage());
- return RpcUtils.getTSExecuteStatementResp(
- RpcUtils.getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR,
e.getMessage()));
} catch (Exception e) {
logger.error("{}: server Internal Error: ",
IoTDBConstant.GLOBAL_DB_NAME, e);
return
RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR,
e.getMessage());
@@ -960,7 +897,8 @@ public class TSServiceImpl implements TSIService.Iface,
ServerContext {
return resp;
}
- private boolean executeNonQuery(PhysicalPlan plan) throws
QueryProcessException {
+ private boolean executeNonQuery(PhysicalPlan plan)
+ throws QueryProcessException, StorageGroupNotSetException,
StorageEngineException {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
throw new QueryProcessException(
"Current system mode is read-only, does not support non-query
operation");
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 820dc3c..1940e35 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -130,13 +130,10 @@ public class IoTDBSessionIT {
insertTabletTest2("root.sg1.d1");
// flush
- Class.forName(Config.JDBC_DRIVER_NAME);
- try (Connection connection = DriverManager
- .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root",
"root");
- Statement statement = connection.createStatement()) {
- statement.execute("FLUSH");
- }
- //
+ session.executeNonQueryStatement("FLUSH");
+ session.executeNonQueryStatement("FLUSH root.sg1");
+ session.executeNonQueryStatement("MERGE");
+ session.executeNonQueryStatement("FULL MERGE");
insertTabletTest3("root.sg1.d1");
queryForBatchSeqAndUnseq();