This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c492b1  [IOTDB-640] Move flush and merge to anltr (Enable flush and 
merge in Session) (#1143)
5c492b1 is described below

commit 5c492b14ad5c65bd4067d6d11d927db8221a088c
Author: Boris <[email protected]>
AuthorDate: Wed May 6 19:43:20 2020 +0800

    [IOTDB-640] Move flush and merge to anltr (Enable flush and merge in 
Session) (#1143)
    
    * Move flush and merge to anltr (Enable flush and merge in Session)
---
 .../1-DDL Data Definition Language.md              | 31 ++++++++++
 .../1-DDL Data Definition Language.md              | 24 ++++++++
 .../org/apache/iotdb/db/qp/strategy/SqlBase.g4     |  7 ++-
 .../org/apache/iotdb/db/mqtt/PublishHandler.java   |  7 ++-
 .../main/java/org/apache/iotdb/db/qp/Planner.java  |  2 +
 .../apache/iotdb/db/qp/constant/SQLConstant.java   |  4 ++
 .../apache/iotdb/db/qp/executor/IPlanExecutor.java |  4 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 47 ++++++++++++++-
 .../org/apache/iotdb/db/qp/logical/Operator.java   |  2 +-
 .../apache/iotdb/db/qp/logical/RootOperator.java   |  1 -
 .../{RootOperator.java => sys/FlushOperator.java}  | 37 +++++++++---
 .../{RootOperator.java => sys/MergeOperator.java}  | 16 +++---
 .../sys/FlushPlan.java}                            | 30 +++++++---
 .../sys/MergePlan.java}                            | 26 ++++++---
 .../iotdb/db/qp/strategy/LogicalGenerator.java     | 36 ++++++++++++
 .../iotdb/db/qp/strategy/PhysicalGenerator.java    |  9 +++
 .../org/apache/iotdb/db/service/TSServiceImpl.java | 66 +---------------------
 .../org/apache/iotdb/session/IoTDBSessionIT.java   | 11 ++--
 18 files changed, 247 insertions(+), 113 deletions(-)

diff --git a/docs/UserGuide/5-Operation Manual/1-DDL Data Definition 
Language.md b/docs/UserGuide/5-Operation Manual/1-DDL Data Definition 
Language.md
index d1f9267..a281094 100644
--- a/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md       
+++ b/docs/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md       
@@ -200,6 +200,7 @@ Example:
 ## Count Timeseries
 
 IoTDB is able to use `COUNT TIMESERIES <Path>` to count the number of 
timeseries in the path. SQL statements are as follows:
+
 ```
 IoTDB > COUNT TIMESERIES root
 IoTDB > COUNT TIMESERIES root.ln
@@ -232,6 +233,7 @@ You will get following results:
 ## Count Nodes
 
 IoTDB is able to use `COUNT NODES <Path> LEVEL=<INTEGER>` to count the number 
of nodes at the given level in current Metadata Tree. This could be used to 
query the number of devices. The usage are as follows:
+
 ```
 IoTDB > COUNT NODES root LEVEL=2
 IoTDB > COUNT NODES root.ln LEVEL=2
@@ -249,6 +251,7 @@ As for the above mentioned example and Metadata tree, you 
can get following resu
 To delete the timeseries we created before, we are able to use `DELETE 
TimeSeries <PrefixPath>` statement.
 
 The usage are as follows:
+
 ```
 IoTDB> delete timeseries root.ln.wf01.wt01.status
 IoTDB> delete timeseries root.ln.wf01.wt01.temperature, 
root.ln.wf02.wt02.hardware
@@ -261,7 +264,9 @@ Similar to `Show Timeseries`, IoTDB also supports two ways 
of viewing devices:
 
 * `SHOW DEVICES` statement presents all devices information, which is equal to 
`SHOW DEVICES root`.
 * `SHOW DEVICES <PrefixPath>` statement specifies the `PrefixPath` and returns 
the devices information under the given level.
+
 SQL statement is as follows:
+
 ```
 IoTDB> show devices
 IoTDB> show devices root.ln
@@ -274,17 +279,43 @@ IoTDB supports storage-level TTL settings, which means it 
is able to delete old
 ## Set TTL
 
 The SQL Statement for setting TTL is as follow:
+
 ```
 IoTDB> set ttl to root.ln 3600000
 ```
+
 This example means that for data in `root.ln`, only that of the latest 1 hour 
will remain, the older one is removed or made invisible.
 
 ## Unset TTL
 
 To unset TTL, we can use follwing SQL statement:
+
 ```
 IoTDB> unset ttl to root.ln
 ```
+
 After unset TTL, all data will be accepted in `root.ln`
 
+## FLUSH
+
+Persist all the data points in the memory table of the storage group to the 
disk, and seal the data file.
+
+```
+IoTDB> FLUSH 
+IoTDB> FLUSH root.ln
+IoTDB> FLUSH root.sg1,root.sg2
+```
+
+## MERGE
+
+Merge sequence and unsequence data. Currently IoTDB supports the following two 
types of SQL to manually trigger the merge process of data files:
+
+* `MERGE` Only rewrite overlapped Chunks, the merge speed is quick, while 
there will be redundant data on the disk eventually.
+* `FULL MERGE` Rewrite all data in overlapped files, the merge speed is slow, 
but there will be no redundant data on the disk eventually.
+
+```
+IoTDB> MERGE
+IoTDB> FULL MERGE
+```
+
 
diff --git a/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition 
Language.md b/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition 
Language.md
index 7e6c3bc..eedc259 100644
--- a/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md    
+++ b/docs/zh/UserGuide/5-Operation Manual/1-DDL Data Definition Language.md    
@@ -281,9 +281,33 @@ IoTDB> set ttl to root.ln 3600000
 ## 取消 TTL
 
 取消TTL的SQL语句如下所示:
+
 ```
 IoTDB> unset ttl to root.ln
 ```
+
 取消设置TTL后,存储组`root.ln`中所有的数据都会被保存。
 
+## FLUSH
+
+将指定存储组的内存缓存区Memory Table的数据持久化到磁盘上,并将数据文件封口。
+
+```
+IoTDB> FLUSH 
+IoTDB> FLUSH root.ln
+IoTDB> FLUSH root.sg1,root.sg2
+```
+
+## MERGE
+
+合并顺序和乱序数据。当前IoTDB支持使用如下两种SQL手动触发数据文件的合并:
+
+* `MERGE` 仅重写重复的Chunk,整理速度快,但是最终磁盘会存在多余数据。
+* `FULL MERGE` 将需要合并的顺序和乱序文件的所有数据都重新写一份,整理速度慢,最终磁盘将不存在无用的数据。
+
+```
+IoTDB> MERGE
+IoTDB> FULL MERGE
+```
+
 
diff --git a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4 
b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
index c181d38..e75f4d1 100644
--- a/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
+++ b/server/src/main/antlr4/org/apache/iotdb/db/qp/strategy/SqlBase.g4
@@ -36,7 +36,9 @@ statement
     | DESCRIBE prefixPath #describePath // not support yet
     | CREATE INDEX ON fullPath USING function=ID indexWithClause? whereClause? 
#createIndex //not support yet
     | DROP INDEX function=ID ON fullPath #dropIndex //not support yet
-    | MERGE #merge //not support yet
+    | MERGE #merge
+    | FLUSH prefixPath? (COMMA prefixPath)* (ID)?#flush //ID is true or false
+    | FULL MERGE #fullMerge
     | CREATE USER userName=ID password=STRING_LITERAL #createUser
     | ALTER USER userName=(ROOT|ID) SET PASSWORD password=STRING_LITERAL 
#alterUser
     | DROP USER userName=ID #dropUser
@@ -817,6 +819,9 @@ RENAME
     : R E N A M E
     ;
 
+FULL
+    : F U L L
+    ;
 //============================
 // End of the keywords list
 //============================
diff --git a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java 
b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
index 57cb3aa..fcbaef2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mqtt/PublishHandler.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.executor.IPlanExecutor;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
@@ -92,7 +94,7 @@ public class PublishHandler extends AbstractInterceptHandler {
             boolean status;
             try {
                 status = executeNonQuery(plan);
-            } catch (QueryProcessException e) {
+            } catch (QueryProcessException | StorageGroupNotSetException | 
StorageEngineException e ) {
                 throw new RuntimeException(e);
             }
 
@@ -100,7 +102,8 @@ public class PublishHandler extends 
AbstractInterceptHandler {
         }
     }
 
-    private boolean executeNonQuery(PhysicalPlan plan) throws 
QueryProcessException {
+    private boolean executeNonQuery(PhysicalPlan plan)
+        throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
         if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
             throw new QueryProcessException(
                     "Current system mode is read-only, does not support 
non-query operation");
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java 
b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 15a3a14..fce62a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -95,6 +95,8 @@ public class Planner {
       case LOAD_FILES:
       case REMOVE_FILE:
       case MOVE_FILE:
+      case FLUSH:
+      case MERGE:
         return operator;
       case QUERY:
       case UPDATE:
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java 
b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
index a3771a8..e7258a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/constant/SQLConstant.java
@@ -141,6 +141,10 @@ public class SQLConstant {
 
   public static final int TOK_METADATA_ALTER = 80;
 
+  public static final int TOK_FLUSH = 81;
+  public static final int TOK_MERGE = 82;
+  public static final int TOK_FULL_MERGE = 83;
+
   public static final Map<Integer, String> tokenSymbol = new HashMap<>();
   public static final Map<Integer, String> tokenNames = new HashMap<>();
   public static final Map<Integer, Integer> reverseWords = new HashMap<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
index addf757..7e70ed2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/IPlanExecutor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
@@ -51,7 +52,8 @@ public interface IPlanExecutor {
    *
    * @param plan Physical Non-Query Plan
    */
-  boolean processNonQuery(PhysicalPlan plan) throws QueryProcessException;
+  boolean processNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException;
 
   /**
    * execute update command and return whether the operator is successful.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 92f363e..92b71e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -69,12 +69,14 @@ import 
org.apache.iotdb.db.exception.metadata.DeleteFailedException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.mnode.InternalMNode;
 import org.apache.iotdb.db.metadata.mnode.LeafMNode;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
 import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -97,6 +99,8 @@ import 
org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
+import org.apache.iotdb.db.qp.physical.sys.MergePlan;
 import org.apache.iotdb.db.qp.physical.sys.OperateFilePlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
@@ -173,7 +177,8 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   @Override
-  public boolean processNonQuery(PhysicalPlan plan) throws 
QueryProcessException {
+  public boolean processNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
     switch (plan.getOperatorType()) {
       case DELETE:
         delete((DeletePlan) plan);
@@ -229,12 +234,47 @@ public class PlanExecutor implements IPlanExecutor {
       case MOVE_FILE:
         operateMoveFile((OperateFilePlan) plan);
         return true;
+      case FLUSH:
+        operateFlush((FlushPlan) plan);
+        return true;
+      case MERGE:
+        operateMerge((MergePlan) plan);
+        return true;
+      case FULL_MERGE:
+        operateMerge((MergePlan) plan);
+        return true;
       default:
         throw new UnsupportedOperationException(
             String.format("operation %s is not supported", 
plan.getOperatorType()));
     }
   }
 
+  private void operateMerge(MergePlan plan) throws StorageEngineException {
+    if(plan.getOperatorType() == OperatorType.FULL_MERGE) {
+      StorageEngine.getInstance().mergeAll(true);
+    } else {
+      StorageEngine.getInstance()
+          
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
+    }
+  }
+
+  private void operateFlush(FlushPlan plan) throws StorageGroupNotSetException 
{
+    if(plan.getPaths() == null) {
+      StorageEngine.getInstance().syncCloseAllProcessor();
+    } else {
+      if(plan.isSeq() == null) {
+        for (Path storageGroup : plan.getPaths()) {
+          
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), true);
+          
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), false);
+        }
+      } else {
+        for (Path storageGroup : plan.getPaths()) {
+          
StorageEngine.getInstance().asyncCloseProcessor(storageGroup.toString(), 
plan.isSeq());
+        }
+      }
+    }
+  }
+
   protected QueryDataSet processDataQuery(QueryPlan queryPlan, QueryContext 
context)
       throws StorageEngineException, QueryFilterOptimizationException, 
QueryProcessException,
           IOException {
@@ -1035,7 +1075,7 @@ public class PlanExecutor implements IPlanExecutor {
       if (!failedNames.isEmpty()) {
         throw new DeleteFailedException(String.join(",", failedNames));
       }
-    } catch (MetadataException e) {
+    } catch (MetadataException | StorageEngineException e) {
       throw new QueryProcessException(e);
     }
     return true;
@@ -1113,7 +1153,8 @@ public class PlanExecutor implements IPlanExecutor {
    *
    * @param pathList deleted paths
    */
-  private void deleteDataOfTimeSeries(List<Path> pathList) throws 
QueryProcessException {
+  private void deleteDataOfTimeSeries(List<Path> pathList)
+      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
     for (Path p : pathList) {
       DeletePlan deletePlan = new DeletePlan();
       deletePlan.addPath(p);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index 37c1d10..50ccba6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -75,6 +75,6 @@ public abstract class Operator {
     LIST_USER_PRIVILEGE, LIST_ROLE_PRIVILEGE, LIST_USER_ROLES, LIST_ROLE_USERS,
     GRANT_WATERMARK_EMBEDDING, REVOKE_WATERMARK_EMBEDDING,
     TTL, DELETE_STORAGE_GROUP, LOAD_CONFIGURATION, SHOW, LOAD_FILES, 
REMOVE_FILE, MOVE_FILE, LAST, GROUP_BY_FILL,
-    ALTER_TIMESERIES
+    ALTER_TIMESERIES, FLUSH, MERGE, FULL_MERGE
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
index 98da176..adfa8c7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
@@ -27,5 +27,4 @@ public abstract class RootOperator extends Operator {
   public RootOperator(int tokenIntType) {
     super(tokenIntType);
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/FlushOperator.java
similarity index 55%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to 
server/src/main/java/org/apache/iotdb/db/qp/logical/sys/FlushOperator.java
index 98da176..ef9c370 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/FlushOperator.java
@@ -16,16 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.qp.logical;
+package org.apache.iotdb.db.qp.logical.sys;
 
-/**
- * RootOperator indicates the operator that could be executed as a entire 
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.RootOperator;
+import org.apache.iotdb.tsfile.read.common.Path;
 
-  public RootOperator(int tokenIntType) {
-    super(tokenIntType);
+public class FlushOperator extends RootOperator {
+
+  public List<Path> getStorageGroupList() {
+    return storageGroupList;
   }
 
+  public void setStorageGroupList(
+      List<Path> storageGroupList) {
+    this.storageGroupList = storageGroupList;
+  }
+
+  private List<Path> storageGroupList;
+
+  public Boolean isSeq() {
+    return isSeq;
+  }
+
+  public void setSeq(boolean seq) {
+    isSeq = seq;
+  }
+
+  private Boolean isSeq;
+
+  public FlushOperator(int tokenIntType) {
+    super(tokenIntType);
+    operatorType = OperatorType.FLUSH;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/MergeOperator.java
similarity index 72%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to 
server/src/main/java/org/apache/iotdb/db/qp/logical/sys/MergeOperator.java
index 98da176..16b9a5f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/MergeOperator.java
@@ -1,3 +1,7 @@
+package org.apache.iotdb.db.qp.logical.sys;
+
+import org.apache.iotdb.db.qp.logical.RootOperator;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -16,16 +20,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.qp.logical;
+public class MergeOperator extends RootOperator {
 
-/**
- * RootOperator indicates the operator that could be executed as a entire 
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
-
-  public RootOperator(int tokenIntType) {
+  public MergeOperator(int tokenIntType) {
     super(tokenIntType);
+    operatorType = OperatorType.MERGE;
   }
-
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
similarity index 56%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
index 98da176..d21c26e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/FlushPlan.java
@@ -16,16 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.qp.logical;
+package org.apache.iotdb.db.qp.physical.sys;
 
-/**
- * RootOperator indicates the operator that could be executed as a entire 
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class FlushPlan extends PhysicalPlan {
+  private List<Path> storeGroups;
+
+  public Boolean isSeq() {
+    return isSeq;
+  }
+
+  private Boolean isSeq;
 
-  public RootOperator(int tokenIntType) {
-    super(tokenIntType);
+  public FlushPlan(Boolean isSeq, List<Path> storeGroups) {
+    super(false, OperatorType.FLUSH);
+    this.storeGroups = storeGroups;
+    this.isSeq = isSeq;
   }
 
+  @Override
+  public List<Path> getPaths() {
+    return storeGroups;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MergePlan.java
similarity index 60%
copy from server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
copy to server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MergePlan.java
index 98da176..d443002 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/RootOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/MergePlan.java
@@ -16,16 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.qp.logical;
+package org.apache.iotdb.db.qp.physical.sys;
 
-/**
- * RootOperator indicates the operator that could be executed as a entire 
command. RootOperator
- * consists of SFWOperator, like INSERT/UPDATE/DELETE, and other Operators.
- */
-public abstract class RootOperator extends Operator {
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.read.common.Path;
+
+public class MergePlan extends PhysicalPlan {
 
-  public RootOperator(int tokenIntType) {
-    super(tokenIntType);
+  public MergePlan(OperatorType operatorType) {
+    super(false, operatorType);
   }
 
+  public MergePlan() {
+    super(false, OperatorType.MERGE);
+  }
+
+  @Override
+  public List<Path> getPaths() {
+    return new ArrayList<>();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
index 561ffe0..8647ab2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalGenerator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.qp.strategy;
 
+import org.antlr.v4.runtime.misc.ParseCancellationException;
 import org.antlr.v4.runtime.tree.TerminalNode;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.SQLParserException;
@@ -81,6 +82,41 @@ public class LogicalGenerator extends SqlBaseBaseListener {
   }
 
   @Override
+  public void enterFlush(FlushContext ctx) {
+    super.enterFlush(ctx);
+    FlushOperator flushOperator = new FlushOperator(SQLConstant.TOK_FLUSH);
+    if(ctx.ID() != null) {
+      if(ctx.ID().getText().equalsIgnoreCase("true")
+          || ctx.ID().getText().equalsIgnoreCase("false")) {
+        flushOperator.setSeq(Boolean.parseBoolean(ctx.ID().getText()));
+      } else {
+        throw new ParseCancellationException("Should be true or false");
+      }
+    }
+    if(ctx.prefixPath(0) != null) {
+      List<Path> storageGroups = new ArrayList<>();
+      for(PrefixPathContext prefixPathContext : ctx.prefixPath()) {
+        storageGroups.add(parsePrefixPath(prefixPathContext));
+      }
+      flushOperator.setStorageGroupList(storageGroups);
+    }
+
+    initializedOperator = flushOperator;
+  }
+
+  @Override
+  public void enterMerge(MergeContext ctx) {
+    super.enterMerge(ctx);
+    initializedOperator = new MergeOperator(SQLConstant.TOK_MERGE);
+  }
+
+  @Override
+  public void enterFullMerge(FullMergeContext ctx) {
+    super.enterFullMerge(ctx);
+    initializedOperator = new MergeOperator(SQLConstant.TOK_FULL_MERGE);
+  }
+
+  @Override
   public void enterCountNodes(CountNodesContext ctx) {
     super.enterCountNodes(ctx);
     initializedOperator = new CountOperator(SQLConstant.TOK_COUNT_NODES,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
index d64413a..7f97a8a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java
@@ -126,6 +126,15 @@ public class PhysicalGenerator {
             insert.getTime(),
             insert.getMeasurementList(),
             insert.getValueList());
+      case MERGE:
+        if(operator.getTokenIntType() == SQLConstant.TOK_FULL_MERGE) {
+          return new MergePlan(OperatorType.FULL_MERGE);
+        } else {
+          return new MergePlan();
+        }
+      case FLUSH:
+        FlushOperator flushOperator = (FlushOperator) operator;
+        return new FlushPlan(flushOperator.isSeq(), 
flushOperator.getStorageGroupList());
       case QUERY:
         QueryOperator query = (QueryOperator) operator;
         return transformQuery(query);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index b774ba3..2caef9d 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cost.statistic.Measurement;
 import org.apache.iotdb.db.cost.statistic.Operation;
-import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -322,60 +321,6 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     return MManager.getInstance().getAllTimeseriesName(path);
   }
 
-  /**
-   * Judge whether the statement is ADMIN COMMAND and if true, execute it.
-   *
-   * @param statement command
-   * @return true if the statement is ADMIN COMMAND
-   */
-  private boolean execAdminCommand(String statement, long sessionId) throws 
StorageEngineException {
-    if (!"root".equals(sessionIdUsernameMap.get(sessionId))) {
-      return false;
-    }
-    if (statement == null) {
-      return false;
-    }
-    statement = statement.toLowerCase();
-    if (statement.startsWith("flush")) {
-      try {
-        execFlush(statement);
-      } catch (StorageGroupNotSetException e) {
-        throw new StorageEngineException(e);
-      }
-      return true;
-    }
-    switch (statement) {
-      case "merge":
-        StorageEngine.getInstance()
-            
.mergeAll(IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
-        return true;
-      case "full merge":
-        StorageEngine.getInstance().mergeAll(true);
-        return true;
-      default:
-        return false;
-    }
-  }
-
-  private void execFlush(String statement) throws StorageGroupNotSetException {
-    String[] args = statement.split("\\s+");
-    if (args.length == 1) {
-      StorageEngine.getInstance().syncCloseAllProcessor();
-    } else if (args.length == 2) {
-      String[] storageGroups = args[1].split(",");
-      for (String storageGroup : storageGroups) {
-        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, true);
-        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, false);
-      }
-    } else {
-      String[] storageGroups = args[1].split(",");
-      boolean isSeq = Boolean.parseBoolean(args[2]);
-      for (String storageGroup : storageGroups) {
-        StorageEngine.getInstance().asyncCloseProcessor(storageGroup, isSeq);
-      }
-    }
-  }
-
   @Override
   public TSExecuteBatchStatementResp 
executeBatchStatement(TSExecuteBatchStatementReq req) {
     long t1 = System.currentTimeMillis();
@@ -465,10 +410,6 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       }
       String statement = req.getStatement();
 
-      if (execAdminCommand(statement, req.getSessionId())) {
-        return RpcUtils.getTSExecuteStatementResp(
-            RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, 
"ADMIN_COMMAND_SUCCESS"));
-      }
       PhysicalPlan physicalPlan =
           processor.parseSQLToPhysicalPlan(statement, 
sessionIdZoneIdMap.get(req.getSessionId()));
       if (physicalPlan.isQuery()) {
@@ -489,10 +430,6 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
       return RpcUtils.getTSExecuteStatementResp(
           RpcUtils.getStatus(TSStatusCode.QUERY_PROCESS_ERROR,
               "Meet error in query process: " + e.getMessage()));
-    } catch (StorageEngineException e) {
-      logger.info(ERROR_PARSING_SQL, e.getMessage());
-      return RpcUtils.getTSExecuteStatementResp(
-          RpcUtils.getStatus(TSStatusCode.READ_ONLY_SYSTEM_ERROR, 
e.getMessage()));
     } catch (Exception e) {
       logger.error("{}: server Internal Error: ", 
IoTDBConstant.GLOBAL_DB_NAME, e);
       return 
RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, 
e.getMessage());
@@ -960,7 +897,8 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
     return resp;
   }
 
-  private boolean executeNonQuery(PhysicalPlan plan) throws 
QueryProcessException {
+  private boolean executeNonQuery(PhysicalPlan plan)
+      throws QueryProcessException, StorageGroupNotSetException, 
StorageEngineException {
     if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
       throw new QueryProcessException(
           "Current system mode is read-only, does not support non-query 
operation");
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java 
b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index 820dc3c..1940e35 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -130,13 +130,10 @@ public class IoTDBSessionIT {
 
     insertTabletTest2("root.sg1.d1");
     // flush
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    try (Connection connection = DriverManager
-        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
-        Statement statement = connection.createStatement()) {
-      statement.execute("FLUSH");
-    }
-    //
+    session.executeNonQueryStatement("FLUSH");
+    session.executeNonQueryStatement("FLUSH root.sg1");
+    session.executeNonQueryStatement("MERGE");
+    session.executeNonQueryStatement("FULL MERGE");
     insertTabletTest3("root.sg1.d1");
 
     queryForBatchSeqAndUnseq();

Reply via email to