This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/Analyze
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2769b988d3b562176e84acf5c1db3094dc88b1b5
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Mar 12 17:18:47 2024 +0800

    Refactor IQueryExecution to support Table Model in the future
---
 .../confignode1conf/iotdb-common.properties        |   2 +-
 .../confignode2conf/iotdb-common.properties        |   2 +-
 .../confignode3conf/iotdb-common.properties        |   2 +-
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   2 +-
 .../protocol/writeback/WriteBackConnector.java     |   2 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |   2 +-
 .../receiver/legacy/loader/DeletionLoader.java     |   2 +-
 .../pipe/receiver/legacy/loader/TsFileLoader.java  |   2 +-
 .../receiver/thrift/IoTDBDataNodeReceiver.java     |   2 +-
 .../db/protocol/client/DataNodeInternalClient.java |   2 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |   2 +-
 .../rest/v1/impl/GrafanaApiServiceImpl.java        |   6 +-
 .../protocol/rest/v1/impl/RestApiServiceImpl.java  |   6 +-
 .../rest/v2/impl/GrafanaApiServiceImpl.java        |   6 +-
 .../protocol/rest/v2/impl/RestApiServiceImpl.java  |   8 +-
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 106 ++++++-----
 .../impl/DataNodeInternalRPCServiceImpl.java       |   2 +-
 .../db/queryengine/common/MPPQueryContext.java     |   2 +-
 .../fragment/FragmentInstanceContext.java          |   9 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     | 107 ++++++-----
 .../db/queryengine/plan/analyze/Analysis.java      |  45 ++++-
 .../IAnalysis.java}                                |  46 ++---
 .../plan/analyze/LoadTsfileAnalyzer.java           |   2 +-
 .../analyze/schema/AutoCreateSchemaExecutor.java   |   2 +-
 .../analyze/schema/ClusterSchemaFetchExecutor.java |   2 +-
 .../plan/execution/IQueryExecution.java            |   3 +-
 .../queryengine/plan/execution/QueryExecution.java | 209 +++++----------------
 .../plan/execution/config/ConfigExecution.java     |  32 ++--
 .../config/executor/ClusterConfigTaskExecutor.java |   2 +-
 .../db/queryengine/plan/planner/IPlanner.java      |  54 ++++++
 .../queryengine/plan/planner/TreeModelPlanner.java | 197 +++++++++++++++++++
 .../SimpleFragmentParallelPlanner.java             |   3 +-
 .../distribution/WriteFragmentParallelPlanner.java |   3 +-
 .../plan/planner/plan/DistributedQueryPlan.java    |   8 +-
 .../plan/planner/plan/FragmentInstance.java        |  23 ++-
 .../plan/planner/plan/PlanFragment.java            |   2 +-
 .../db/queryengine/plan/planner/plan/SubPlan.java  |   2 +-
 .../plan/planner/plan/TimePredicate.java           |  39 ++++
 .../plan/planner/plan/TreeModelTimePredicate.java  |  64 +++++++
 .../metrics/IoTDBInternalLocalReporter.java        |   3 +-
 .../execution/operator/MergeSortOperatorTest.java  |   3 +-
 .../plan/planner/FragmentInstanceSerdeTest.java    |   3 +-
 42 files changed, 645 insertions(+), 376 deletions(-)

diff --git 
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
 
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
index 8981b21285b..c69c63010db 100644
--- 
a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
+++ 
b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties
@@ -21,7 +21,7 @@ timestamp_precision=ms
 
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 schema_replication_factor=3
-data_replication_factor=3
+data_replication_factor=2
 udf_lib_dir=target/confignode1/ext/udf
 trigger_lib_dir=target/confignode1/ext/trigger
 pipe_lib_dir=target/confignode1/ext/pipe
diff --git 
a/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties
 
b/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties
index a9789fedabb..219c2ff4d28 100644
--- 
a/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties
+++ 
b/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties
@@ -21,7 +21,7 @@ timestamp_precision=ms
 
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 schema_replication_factor=3
-data_replication_factor=3
+data_replication_factor=2
 udf_lib_dir=target/confignode2/ext/udf
 trigger_lib_dir=target/confignode2/ext/trigger
 pipe_lib_dir=target/confignode2/ext/pipe
diff --git 
a/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties
 
b/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties
index 6a95388bd72..3ad8b13bac4 100644
--- 
a/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties
+++ 
b/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties
@@ -21,7 +21,7 @@ timestamp_precision=ms
 
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
 
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
 schema_replication_factor=3
-data_replication_factor=3
+data_replication_factor=2
 udf_lib_dir=target/confignode3/ext/udf
 trigger_lib_dir=target/confignode3/ext/trigger
 pipe_lib_dir=target/confignode3/ext/pipe
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index a08142e13ef..645dfbf016d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -115,7 +115,7 @@ public class AuditLogger {
     if (auditLogOperationList.contains(operation)) {
       if (auditLogStorageList.contains(AuditLogStorage.IOTDB)) {
         try {
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               generateInsertStatement(log, address, username),
               SESSION_MANAGER.requestQueryId(),
               sessionInfo,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 471474148b8..4b9437372ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -154,7 +154,7 @@ public class WriteBackConnector implements PipeConnector {
 
   private TSStatus executeStatement(InsertBaseStatement statement) {
     return Coordinator.getInstance()
-        .execute(
+        .executeForTreeModel(
             new PipeEnrichedStatement(statement),
             SessionManager.getInstance().requestQueryId(),
             new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index 56180f75b77..fe0b2ac4073 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -136,7 +136,7 @@ public class IoTDBLegacyPipeReceiverAgent {
       long queryId = SessionManager.getInstance().requestQueryId();
       ExecutionResult result =
           Coordinator.getInstance()
-              .execute(
+              .executeForTreeModel(
                   statement,
                   queryId,
                   new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
index a3f08af6413..f5711a826ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
@@ -61,7 +61,7 @@ public class DeletionLoader implements ILoader {
       long queryId = SessionManager.getInstance().requestQueryId();
       ExecutionResult result =
           Coordinator.getInstance()
-              .execute(
+              .executeForTreeModel(
                   statement,
                   queryId,
                   new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
index 6bcbe95c76f..080f5f105ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
@@ -63,7 +63,7 @@ public class TsFileLoader implements ILoader {
       long queryId = SessionManager.getInstance().requestQueryId();
       ExecutionResult result =
           Coordinator.getInstance()
-              .execute(
+              .executeForTreeModel(
                   statement,
                   queryId,
                   new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
index 0b99a5859ed..e01e6ff1e59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
@@ -321,7 +321,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
     final ExecutionResult result =
         Coordinator.getInstance()
-            .execute(
+            .executeForTreeModel(
                 statement,
                 SessionManager.getInstance().requestQueryId(),
                 new SessionInfo(0, AuthorityChecker.SUPER_USER, 
ZoneId.systemDefault()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
index 925a4b450f7..cf1250aaecb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
@@ -84,7 +84,7 @@ public class DataNodeInternalClient {
       // call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(session),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 3c0f3c05726..86c430ac0a5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -162,7 +162,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
             long queryId = sessionManager.requestQueryId();
             ExecutionResult result =
                 Coordinator.getInstance()
-                    .execute(
+                    .executeForTreeModel(
                         statement,
                         queryId,
                         sessionManager.getSessionInfo(session),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
index 40bab03fe38..07c735faa7f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
@@ -110,7 +110,7 @@ public class GrafanaApiServiceImpl extends 
GrafanaApiService {
       queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -177,7 +177,7 @@ public class GrafanaApiServiceImpl extends 
GrafanaApiService {
       queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -239,7 +239,7 @@ public class GrafanaApiServiceImpl extends 
GrafanaApiService {
         queryId = SESSION_MANAGER.requestQueryId();
         // create and cache dataset
         ExecutionResult result =
-            COORDINATOR.execute(
+            COORDINATOR.executeForTreeModel(
                 statement,
                 queryId,
                 
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
index 47433bb5a68..428f2c3baaa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
@@ -97,7 +97,7 @@ public class RestApiServiceImpl extends RestApiService {
       }
       queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -151,7 +151,7 @@ public class RestApiServiceImpl extends RestApiService {
       queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -211,7 +211,7 @@ public class RestApiServiceImpl extends RestApiService {
       }
       queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               insertTabletStatement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
index 3025cd83ba1..e3385885a01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
@@ -110,7 +110,7 @@ public class GrafanaApiServiceImpl extends 
GrafanaApiService {
       queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -177,7 +177,7 @@ public class GrafanaApiServiceImpl extends 
GrafanaApiService {
       queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -239,7 +239,7 @@ public class GrafanaApiServiceImpl extends 
GrafanaApiService {
         queryId = SESSION_MANAGER.requestQueryId();
         // create and cache dataset
         ExecutionResult result =
-            COORDINATOR.execute(
+            COORDINATOR.executeForTreeModel(
                 statement,
                 queryId,
                 
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
index 5cace589391..c0029b913eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
@@ -99,7 +99,7 @@ public class RestApiServiceImpl extends RestApiService {
       }
       queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -154,7 +154,7 @@ public class RestApiServiceImpl extends RestApiService {
       queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -203,7 +203,7 @@ public class RestApiServiceImpl extends RestApiService {
       }
       queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               insertRowsStatement,
               SESSION_MANAGER.requestQueryId(),
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -273,7 +273,7 @@ public class RestApiServiceImpl extends RestApiService {
       }
       queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               insertTabletStatement,
               SESSION_MANAGER.requestQueryId(),
               SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index fec0bbd87ec..5612d8777e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -302,7 +302,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               s,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -347,7 +347,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // record each operation time cost
       if (statementType != null) {
         addStatementExecutionLatency(
-            OperationType.EXECUTE_QUERY_STATEMENT, statementType, 
currentOperationCost);
+            OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), 
currentOperationCost);
       }
 
       if (finished) {
@@ -393,7 +393,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               s,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -435,7 +435,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // record each operation time cost
       addStatementExecutionLatency(
-          OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY, 
currentOperationCost);
+          OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(), 
currentOperationCost);
 
       if (finished) {
         // record total time cost for one query
@@ -481,7 +481,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               s,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -525,7 +525,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // record each operation time cost
       addStatementExecutionLatency(
-          OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY, 
currentOperationCost);
+          OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), 
currentOperationCost);
 
       if (finished) {
         // record total time cost for one query
@@ -568,7 +568,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               s,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -612,7 +612,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // record each operation time cost
       addStatementExecutionLatency(
-          OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY, 
currentOperationCost);
+          OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(), 
currentOperationCost);
 
       if (finished) {
         // record total time cost for one query
@@ -895,7 +895,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       }
       // create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               s,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -946,7 +946,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // record each operation time cost
       addStatementExecutionLatency(
-          OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY, 
currentOperationCost);
+          OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), 
currentOperationCost);
 
       if (finished) {
         // record total time cost for one query
@@ -1051,7 +1051,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
     long startTime = System.nanoTime();
     boolean finished = false;
-    StatementType statementType = null;
+    String statementType = null;
     Throwable t = null;
     try {
       IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
@@ -1067,7 +1067,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         resp.setMoreData(false);
         return resp;
       }
-      statementType = queryExecution.getStatement().getType();
+      statementType = queryExecution.getStatementType();
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<List<ByteBuffer>, Boolean> pair =
@@ -1228,7 +1228,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1269,7 +1269,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1319,7 +1319,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1368,7 +1368,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1408,7 +1408,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1451,7 +1451,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1495,7 +1495,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       for (int i = 0; i < req.getStatements().size(); i++) {
         String statement = req.getStatements().get(i);
         long t2 = System.nanoTime();
-        StatementType type = null;
+        String type = null;
         OperationQuota quota = null;
         try {
           Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
@@ -1518,10 +1518,10 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           }
 
           long queryId = SESSION_MANAGER.requestQueryId();
-          type = s.getType();
+          type = s.getType() == null ? null : s.getType().name();
           // create and cache dataset
           ExecutionResult result =
-              COORDINATOR.execute(
+              COORDINATOR.executeForTreeModel(
                   s,
                   queryId,
                   SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1549,7 +1549,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       }
     } finally {
       addStatementExecutionLatency(
-          OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL, 
System.nanoTime() - t1);
+          OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), 
System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
     }
     return isAllSuccessful
@@ -1571,7 +1571,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
     boolean finished = false;
     long startTime = System.nanoTime();
-    StatementType statementType = null;
+    String statementType = null;
     Throwable t = null;
     try {
       IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
@@ -1587,7 +1587,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         resp.setMoreData(true);
         return resp;
       }
-      statementType = queryExecution.getStatement().getType();
+      statementType = queryExecution.getStatementType();
 
       try (SetThreadName queryName = new 
SetThreadName(queryExecution.getQueryId())) {
         Pair<TSQueryDataSet, Boolean> pair =
@@ -1672,7 +1672,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1688,7 +1688,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           e, OperationType.INSERT_RECORDS, 
TSStatusCode.EXECUTE_STATEMENT_ERROR);
     } finally {
       addStatementExecutionLatency(
-          OperationType.INSERT_RECORDS, StatementType.BATCH_INSERT_ROWS, 
System.nanoTime() - t1);
+          OperationType.INSERT_RECORDS,
+          StatementType.BATCH_INSERT_ROWS.name(),
+          System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
         quota.close();
@@ -1739,7 +1741,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1756,7 +1758,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } finally {
       addStatementExecutionLatency(
           OperationType.INSERT_RECORDS_OF_ONE_DEVICE,
-          StatementType.BATCH_INSERT_ONE_DEVICE,
+          StatementType.BATCH_INSERT_ONE_DEVICE.name(),
           System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -1807,7 +1809,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1827,7 +1829,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } finally {
       addStatementExecutionLatency(
           OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE,
-          StatementType.BATCH_INSERT_ONE_DEVICE,
+          StatementType.BATCH_INSERT_ONE_DEVICE.name(),
           System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -1876,7 +1878,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1892,7 +1894,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           e, OperationType.INSERT_RECORD, 
TSStatusCode.EXECUTE_STATEMENT_ERROR);
     } finally {
       addStatementExecutionLatency(
-          OperationType.INSERT_RECORD, StatementType.INSERT, System.nanoTime() 
- t1);
+          OperationType.INSERT_RECORD, StatementType.INSERT.name(), 
System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
         quota.close();
@@ -1933,7 +1935,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1949,7 +1951,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           e, OperationType.INSERT_TABLETS, 
TSStatusCode.EXECUTE_STATEMENT_ERROR);
     } finally {
       addStatementExecutionLatency(
-          OperationType.INSERT_TABLETS, StatementType.MULTI_BATCH_INSERT, 
System.nanoTime() - t1);
+          OperationType.INSERT_TABLETS,
+          StatementType.MULTI_BATCH_INSERT.name(),
+          System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
         quota.close();
@@ -1989,7 +1993,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2005,7 +2009,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           e, OperationType.INSERT_TABLET, 
TSStatusCode.EXECUTE_STATEMENT_ERROR);
     } finally {
       addStatementExecutionLatency(
-          OperationType.INSERT_TABLET, StatementType.BATCH_INSERT, 
System.nanoTime() - t1);
+          OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(), 
System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
         quota.close();
@@ -2054,7 +2058,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2071,7 +2075,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } finally {
       addStatementExecutionLatency(
           OperationType.INSERT_STRING_RECORDS,
-          StatementType.BATCH_INSERT_ROWS,
+          StatementType.BATCH_INSERT_ROWS.name(),
           System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -2140,7 +2144,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2204,7 +2208,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2308,7 +2312,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       long queryId = SESSION_MANAGER.requestQueryId();
       // create and cache dataset
       ExecutionResult executionResult =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2352,7 +2356,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       return null;
     } finally {
       addStatementExecutionLatency(
-          OperationType.EXECUTE_STATEMENT, statement.getType(), 
System.nanoTime() - startTime);
+          OperationType.EXECUTE_STATEMENT,
+          statement.getType().name(),
+          System.nanoTime() - startTime);
       SESSION_MANAGER.updateIdleTime();
     }
   }
@@ -2385,7 +2391,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2433,7 +2439,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2478,7 +2484,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2522,7 +2528,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2619,7 +2625,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               statement,
               queryId,
               SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2635,7 +2641,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
           e, OperationType.INSERT_STRING_RECORD, 
TSStatusCode.EXECUTE_STATEMENT_ERROR);
     } finally {
       addStatementExecutionLatency(
-          OperationType.INSERT_STRING_RECORD, StatementType.INSERT, 
System.nanoTime() - t1);
+          OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(), 
System.nanoTime() - t1);
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
         quota.close();
@@ -2682,7 +2688,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   /** Add stat of operation into metrics */
   private void addStatementExecutionLatency(
-      OperationType operation, StatementType statementType, long costTime) {
+      OperationType operation, String statementType, long costTime) {
     if (statementType == null) {
       return;
     }
@@ -2696,7 +2702,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
             Tag.INTERFACE.toString(),
             operation.toString(),
             Tag.TYPE.toString(),
-            statementType.name());
+            statementType);
   }
 
   private String checkIdentifierAndRemoveBackQuotesIfNecessary(String 
identifier) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 76f023f5907..d7b6804ea3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1099,7 +1099,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           SESSION_MANAGER.requestQueryId(session, 
SESSION_MANAGER.requestStatementId(session));
       // Create and cache dataset
       ExecutionResult result =
-          COORDINATOR.execute(
+          COORDINATOR.executeForTreeModel(
               s,
               queryId,
               SESSION_MANAGER.getSessionInfo(session),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 4d7e5a4512f..0e50966d864 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -38,7 +38,7 @@ import java.util.List;
  */
 public class MPPQueryContext {
   private String sql;
-  private QueryId queryId;
+  private final QueryId queryId;
 
   // LocalQueryId is kept to adapt to the old client, it's unique in current 
datanode.
   // Now it's only be used by EXPLAIN ANALYZE to get queryExecution.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 8c74170cef3..0aa6ba1deff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -27,8 +27,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
-import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
-import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
@@ -132,7 +131,7 @@ public class FragmentInstanceContext extends QueryContext {
       FragmentInstanceStateMachine stateMachine,
       SessionInfo sessionInfo,
       IDataRegionForQuery dataRegion,
-      Expression globalTimePredicate,
+      TimePredicate globalTimePredicate,
       Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) {
     FragmentInstanceContext instanceContext =
         new FragmentInstanceContext(
@@ -167,14 +166,14 @@ public class FragmentInstanceContext extends QueryContext 
{
       FragmentInstanceStateMachine stateMachine,
       SessionInfo sessionInfo,
       IDataRegionForQuery dataRegion,
-      Expression globalTimePredicate,
+      TimePredicate globalTimePredicate,
       Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) {
     this.id = id;
     this.stateMachine = stateMachine;
     this.executionEndTime.set(END_TIME_INITIAL_VALUE);
     this.sessionInfo = sessionInfo;
     this.dataRegion = dataRegion;
-    this.globalTimeFilter = 
PredicateUtils.convertPredicateToTimeFilter(globalTimePredicate);
+    this.globalTimeFilter = globalTimePredicate.convertPredicateToTimeFilter();
     this.dataNodeQueryContextMap = dataNodeQueryContextMap;
     this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 327008e9c2a..8e9cedf2862 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -41,6 +41,8 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution;
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner;
 import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -53,6 +55,7 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiFunction;
 
 import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
 
@@ -100,39 +103,11 @@ public class Coordinator {
     this.scheduledExecutor = getScheduledExecutor();
   }
 
-  private IQueryExecution createQueryExecution(
-      Statement statement,
-      MPPQueryContext queryContext,
-      IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher,
-      long timeOut,
-      long startTime) {
-    queryContext.setTimeOut(timeOut);
-    queryContext.setStartTime(startTime);
-    if (statement instanceof IConfigStatement) {
-      queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
-      return new ConfigExecution(queryContext, statement, executor);
-    }
-    return new QueryExecution(
-        statement,
-        queryContext,
-        executor,
-        writeOperationExecutor,
-        scheduledExecutor,
-        partitionFetcher,
-        schemaFetcher,
-        SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
-        ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
-  }
-
-  public ExecutionResult execute(
-      Statement statement,
+  private ExecutionResult execution(
       long queryId,
       SessionInfo session,
       String sql,
-      IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher,
-      long timeOut) {
+      BiFunction<MPPQueryContext, Long, IQueryExecution> 
iQueryExecutionFactory) {
     long startTime = System.currentTimeMillis();
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     MPPQueryContext queryContext = null;
@@ -148,14 +123,7 @@ public class Coordinator {
               session,
               DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
               DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT);
-      IQueryExecution execution =
-          createQueryExecution(
-              statement,
-              queryContext,
-              partitionFetcher,
-              schemaFetcher,
-              timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
-              startTime);
+      IQueryExecution execution = iQueryExecutionFactory.apply(queryContext, 
startTime);
       if (execution.isQuery()) {
         queryExecutionMap.put(queryId, execution);
       } else {
@@ -170,25 +138,78 @@ public class Coordinator {
       }
       return result;
     } finally {
-      int lockNums = queryContext.getAcquiredLockNum();
-      if (queryContext != null && lockNums > 0) {
-        for (int i = 0; i < lockNums; i++) 
DataNodeSchemaCache.getInstance().releaseInsertLock();
+      if (queryContext != null && queryContext.getAcquiredLockNum() > 0) {
+        for (int i = 0, lockNums = queryContext.getAcquiredLockNum(); i < 
lockNums; i++) {
+          DataNodeSchemaCache.getInstance().releaseInsertLock();
+        }
       }
     }
   }
 
   /** This method is called by the write method. So it does not set the 
timeout parameter. */
-  public ExecutionResult execute(
+  public ExecutionResult executeForTreeModel(
       Statement statement,
       long queryId,
       SessionInfo session,
       String sql,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
-    return execute(
+    return executeForTreeModel(
         statement, queryId, session, sql, partitionFetcher, schemaFetcher, 
Long.MAX_VALUE);
   }
 
+  public ExecutionResult executeForTreeModel(
+      Statement statement,
+      long queryId,
+      SessionInfo session,
+      String sql,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      long timeOut) {
+    return execution(
+        queryId,
+        session,
+        sql,
+        ((queryContext, startTime) ->
+            createQueryExecutionForTreeModel(
+                statement,
+                queryContext,
+                partitionFetcher,
+                schemaFetcher,
+                timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
+                startTime)));
+  }
+
+  private IQueryExecution createQueryExecutionForTreeModel(
+      Statement statement,
+      MPPQueryContext queryContext,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      long timeOut,
+      long startTime) {
+    queryContext.setTimeOut(timeOut);
+    queryContext.setStartTime(startTime);
+    if (statement instanceof IConfigStatement) {
+      queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
+      return new ConfigExecution(
+          queryContext,
+          statement.getType(),
+          executor,
+          statement.accept(new ConfigTaskVisitor(), queryContext));
+    }
+    TreeModelPlanner treeModelPlanner =
+        new TreeModelPlanner(
+            statement,
+            executor,
+            writeOperationExecutor,
+            scheduledExecutor,
+            partitionFetcher,
+            schemaFetcher,
+            SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+            ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
+    return new QueryExecution(treeModelPlanner, queryContext, executor);
+  }
+
   public IQueryExecution getQueryExecution(Long queryId) {
     return queryExecutionMap.get(queryId);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index e91d45d3575..3f109e86e9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -28,9 +28,13 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.NodeRef;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource;
+import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceContext;
+import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -42,12 +46,14 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -63,7 +69,7 @@ import java.util.Set;
 import static com.google.common.base.Preconditions.checkArgument;
 
 /** Analysis used for planning a query. TODO: This class may need to store 
more info for a query. */
-public class Analysis {
+public class Analysis implements IAnalysis {
 
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // Common Analysis
@@ -378,6 +384,7 @@ public class Analysis {
     this.globalTimePredicate = timeFilter;
   }
 
+  @Override
   public DatasetHeader getRespDatasetHeader() {
     return respDatasetHeader;
   }
@@ -403,7 +410,13 @@ public class Analysis {
     return type;
   }
 
-  public boolean hasDataSource() {
+  @Override
+  public boolean canSkipExecute(MPPQueryContext context) {
+    return isFinishQueryAfterAnalyze()
+        || (context.getQueryType() == QueryType.READ && !hasDataSource());
+  }
+
+  private boolean hasDataSource() {
     return (dataPartition != null && !dataPartition.isEmpty())
         || (schemaPartition != null && !schemaPartition.isEmpty())
         || statement instanceof ShowQueriesStatement
@@ -411,6 +424,32 @@ public class Analysis {
             && ((QueryStatement) statement).isAggregationQuery());
   }
 
+  @Override
+  public TsBlock constructResultForMemorySource(MPPQueryContext context) {
+    StatementMemorySource memorySource =
+        new StatementMemorySourceVisitor()
+            .process(getStatement(), new StatementMemorySourceContext(context, 
this));
+    setRespDatasetHeader(memorySource.getDatasetHeader());
+    return memorySource.getTsBlock();
+  }
+
+  @Override
+  public boolean isQuery() {
+    return statement.isQuery();
+  }
+
+  @Override
+  public boolean needSetHighestPriority() {
+    // if is this Statement is ShowQueryStatement, set its instances to the 
highest priority, so
+    // that the sub-tasks of the ShowQueries instances could be executed first.
+    return StatementType.SHOW_QUERIES.equals(statement.getType());
+  }
+
+  @Override
+  public String getStatementType() {
+    return statement.getType().name();
+  }
+
   public Map<Expression, Set<Expression>> getCrossGroupByExpressions() {
     return crossGroupByExpressions;
   }
@@ -487,10 +526,12 @@ public class Analysis {
     this.finishQueryAfterAnalyze = finishQueryAfterAnalyze;
   }
 
+  @Override
   public boolean isFailed() {
     return failStatus != null;
   }
 
+  @Override
   public TSStatus getFailStatus() {
     return this.failStatus;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
similarity index 52%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 9652cbce155..ca85161c7ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -17,52 +17,28 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.execution;
+package org.apache.iotdb.db.queryengine.plan.analyze;
 
-import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
-import java.nio.ByteBuffer;
-import java.util.Optional;
+public interface IAnalysis {
 
-public interface IQueryExecution {
+  boolean isFailed();
 
-  void start();
+  TSStatus getFailStatus();
 
-  void stop(Throwable t);
+  boolean canSkipExecute(MPPQueryContext context);
 
-  void stopAndCleanup();
-
-  void stopAndCleanup(Throwable t);
-
-  void cancel();
-
-  ExecutionResult getStatus();
-
-  Optional<TsBlock> getBatchResult() throws IoTDBException;
-
-  Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException;
-
-  boolean hasNextResult();
-
-  int getOutputValueColumnCount();
-
-  DatasetHeader getDatasetHeader();
+  TsBlock constructResultForMemorySource(MPPQueryContext context);
 
   boolean isQuery();
 
-  String getQueryId();
-
-  long getStartExecutionTime();
-
-  void recordExecutionTime(long executionTime);
-
-  /** @return cost time in ns */
-  long getTotalExecutionTime();
+  boolean needSetHighestPriority();
 
-  Optional<String> getExecuteSQL();
+  DatasetHeader getRespDatasetHeader();
 
-  Statement getStatement();
+  String getStatementType();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 1e32ea9cee1..b0d5a1b92eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -475,7 +475,7 @@ public class LoadTsfileAnalyzer {
       final long queryId = SessionManager.getInstance().requestQueryId();
       final ExecutionResult result =
           Coordinator.getInstance()
-              .execute(
+              .executeForTreeModel(
                   statement,
                   queryId,
                   null,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 4fca45790ea..cbf85d67a82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -83,7 +83,7 @@ class AutoCreateSchemaExecutor {
 
   private ExecutionResult executeStatement(Statement statement, 
MPPQueryContext context) {
 
-    return coordinator.execute(
+    return coordinator.executeForTreeModel(
         statement,
         SessionManager.getInstance().requestQueryId(),
         context == null ? null : context.getSession(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 0adbca34ad5..75e087307ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -81,7 +81,7 @@ class ClusterSchemaFetchExecutor {
     if (context != null && context.getQueryType() == QueryType.READ) {
       sql += ", " + context.getQueryId() + " : " + context.getSql();
     }
-    return coordinator.execute(
+    return coordinator.executeForTreeModel(
         statement,
         queryId,
         context == null ? null : context.getSession(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index 9652cbce155..0a92914b571 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.execution;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
 import java.nio.ByteBuffer;
@@ -64,5 +63,5 @@ public interface IQueryExecution {
 
   Optional<String> getExecuteSQL();
 
-  Statement getStatement();
+  String getStatementType();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 99eb73e3425..8446c0bd4fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -20,9 +20,6 @@ package org.apache.iotdb.db.queryengine.plan.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.IClientManager;
-import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
@@ -40,31 +37,15 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import 
org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
-import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource;
-import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceContext;
-import 
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
-import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
-import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
-import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -77,7 +58,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
@@ -99,9 +79,9 @@ import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DIST
  * corresponding physical nodes. 3. Collect and monitor the progress/states of 
this query.
  */
 public class QueryExecution implements IQueryExecution {
-  private static final Logger logger = 
LoggerFactory.getLogger(QueryExecution.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryExecution.class);
 
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
   private static final int MAX_RETRY_COUNT = 3;
   private static final long RETRY_INTERVAL_IN_MS = 2000;
   private int retryCount = 0;
@@ -109,19 +89,12 @@ public class QueryExecution implements IQueryExecution {
   private IScheduler scheduler;
   private final QueryStateMachine stateMachine;
 
-  private final Statement rawStatement;
-  private Analysis analysis;
+  private final IPlanner planner;
+
+  private IAnalysis analysis;
   private LogicalQueryPlan logicalPlan;
   private DistributedQueryPlan distributedPlan;
 
-  private final ExecutorService executor;
-  private final ExecutorService writeOperationExecutor;
-  private final ScheduledExecutorService scheduledExecutor;
-  // TODO need to use factory to decide standalone or cluster
-  private final IPartitionFetcher partitionFetcher;
-  // TODO need to use factory to decide standalone or cluster,
-  private final ISchemaFetcher schemaFetcher;
-
   // The result of QueryExecution will be written to the 
MPPDataExchangeManager in current Node.
   // We use this SourceHandle to fetch the TsBlock from it.
   private ISourceHandle resultHandle;
@@ -129,12 +102,6 @@ public class QueryExecution implements IQueryExecution {
   // used for cleaning resultHandle up exactly once
   private final AtomicBoolean resultHandleCleanUp;
 
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
-      syncInternalServiceClientManager;
-
-  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
-      asyncInternalServiceClientManager;
-
   private final AtomicBoolean stopped;
 
   // cost time in ns
@@ -148,28 +115,11 @@ public class QueryExecution implements IQueryExecution {
       PerformanceOverviewMetrics.getInstance();
 
   @SuppressWarnings("squid:S107")
-  public QueryExecution(
-      Statement statement,
-      MPPQueryContext context,
-      ExecutorService executor,
-      ExecutorService writeOperationExecutor,
-      ScheduledExecutorService scheduledExecutor,
-      IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
-      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
-          asyncInternalServiceClientManager) {
-    this.rawStatement = statement;
-    this.executor = executor;
-    this.writeOperationExecutor = writeOperationExecutor;
-    this.scheduledExecutor = scheduledExecutor;
+  public QueryExecution(IPlanner planner, MPPQueryContext context, 
ExecutorService executor) {
     this.context = context;
-    this.analysis = analyze(statement, context, partitionFetcher, 
schemaFetcher);
+    this.planner = planner;
+    this.analysis = analyze(context);
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
-    this.partitionFetcher = partitionFetcher;
-    this.schemaFetcher = schemaFetcher;
-    this.syncInternalServiceClientManager = syncInternalServiceClientManager;
-    this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
 
     // We add the abort logic inside the QueryExecution.
     // So that the other components can only focus on the state change.
@@ -184,7 +134,7 @@ public class QueryExecution implements IQueryExecution {
             if (state == QueryState.FAILED
                 || state == QueryState.ABORTED
                 || state == QueryState.CANCELED) {
-              logger.debug("[ReleaseQueryResource] state is: {}", state);
+              LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
               Throwable cause = stateMachine.getFailureException();
               releaseResource(cause);
             }
@@ -203,7 +153,7 @@ public class QueryExecution implements IQueryExecution {
   public void start() {
     final long startTime = System.nanoTime();
     if (skipExecute()) {
-      logger.debug("[SkipExecute]");
+      LOGGER.debug("[SkipExecute]");
       if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
         stateMachine.transitionToFailed(analysis.getFailStatus());
       } else {
@@ -255,60 +205,52 @@ public class QueryExecution implements IQueryExecution {
 
   private ExecutionResult retry() {
     if (retryCount >= MAX_RETRY_COUNT) {
-      logger.warn("[ReachMaxRetryCount]");
+      LOGGER.warn("[ReachMaxRetryCount]");
       stateMachine.transitionToFailed();
       return getStatus();
     }
-    logger.warn("error when executing query. {}", 
stateMachine.getFailureMessage());
+    LOGGER.warn("error when executing query. {}", 
stateMachine.getFailureMessage());
     // stop and clean up resources the QueryExecution used
     this.stopAndCleanup(stateMachine.getFailureException());
-    logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
+    LOGGER.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
     try {
       Thread.sleep(RETRY_INTERVAL_IN_MS);
     } catch (InterruptedException e) {
-      logger.warn("interrupted when waiting retry");
+      LOGGER.warn("interrupted when waiting retry");
       Thread.currentThread().interrupt();
     }
     retryCount++;
-    logger.info("[Retry] retry count is: {}", retryCount);
+    LOGGER.info("[Retry] retry count is: {}", retryCount);
     stateMachine.transitionToQueued();
     // force invalid PartitionCache
-    partitionFetcher.invalidAllCache();
+    planner.invalidatePartitionCache();
     // clear runtime variables in MPPQueryContext
     context.prepareForRetry();
     // re-stop
     this.stopped.compareAndSet(true, false);
     this.resultHandleCleanUp.compareAndSet(true, false);
     // re-analyze the query
-    this.analysis = analyze(rawStatement, context, partitionFetcher, 
schemaFetcher);
+    this.analysis = analyze(context);
     // re-start the QueryExecution
     this.start();
     return getStatus();
   }
 
   private boolean skipExecute() {
-    return analysis.isFinishQueryAfterAnalyze()
-        || (context.getQueryType() == QueryType.READ && 
!analysis.hasDataSource());
+    return analysis.canSkipExecute(context);
   }
 
   private void constructResultForMemorySource() {
-    StatementMemorySource memorySource =
-        new StatementMemorySourceVisitor()
-            .process(analysis.getStatement(), new 
StatementMemorySourceContext(context, analysis));
-    this.resultHandle = new MemorySourceHandle(memorySource.getTsBlock());
-    this.analysis.setRespDatasetHeader(memorySource.getDatasetHeader());
+    TsBlock tsBlock = analysis.constructResultForMemorySource(context);
+    this.resultHandle = new MemorySourceHandle(tsBlock);
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query 
need
-  private Analysis analyze(
-      Statement statement,
-      MPPQueryContext context,
-      IPartitionFetcher partitionFetcher,
-      ISchemaFetcher schemaFetcher) {
+  private IAnalysis analyze(MPPQueryContext context) {
     final long startTime = System.nanoTime();
-    Analysis result;
+    IAnalysis result;
     try {
-      result = new Analyzer(context, partitionFetcher, 
schemaFetcher).analyze(statement);
+      result = planner.analyze(context);
     } finally {
       long analyzeCost = System.nanoTime() - startTime;
       context.setAnalyzeCost(analyzeCost);
@@ -319,45 +261,15 @@ public class QueryExecution implements IQueryExecution {
 
   private void schedule() {
     final long startTime = System.nanoTime();
-    boolean isPipeEnrichedTsFileLoad =
-        rawStatement instanceof PipeEnrichedStatement
-            && ((PipeEnrichedStatement) rawStatement).getInnerStatement()
-                instanceof LoadTsFileStatement;
-    if (rawStatement instanceof LoadTsFileStatement || 
isPipeEnrichedTsFileLoad) {
-      this.scheduler =
-          new LoadTsFileScheduler(
-              distributedPlan,
-              context,
-              stateMachine,
-              syncInternalServiceClientManager,
-              partitionFetcher,
-              isPipeEnrichedTsFileLoad);
-      this.scheduler.start();
-      return;
-    }
-
-    // TODO: (xingtanzjr) initialize the query scheduler according to 
configuration
-    this.scheduler =
-        new ClusterScheduler(
-            context,
-            stateMachine,
-            distributedPlan.getInstances(),
-            context.getQueryType(),
-            executor,
-            writeOperationExecutor,
-            scheduledExecutor,
-            syncInternalServiceClientManager,
-            asyncInternalServiceClientManager);
-    this.scheduler.start();
+    this.scheduler = planner.doSchedule(analysis, distributedPlan, context, 
stateMachine);
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - 
startTime);
   }
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
   public void doLogicalPlan() {
-    LogicalPlanner planner = new LogicalPlanner(this.context);
-    this.logicalPlan = planner.plan(this.analysis);
-    if (isQuery() && logger.isDebugEnabled()) {
-      logger.debug(
+    this.logicalPlan = planner.doLogicalPlan(analysis, context);
+    if (isQuery() && LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
           "logical plan is: \n {}", 
PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
     }
     // check timeout after building logical plan because it could be 
time-consuming in some cases.
@@ -367,10 +279,9 @@ public class QueryExecution implements IQueryExecution {
   // Generate the distributed plan and split it into fragments
   public void doDistributedPlan() {
     long startTime = System.nanoTime();
-    DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
-    this.distributedPlan = planner.planFragments();
+    this.distributedPlan = planner.doDistributionPlan(analysis, logicalPlan);
 
-    if (rawStatement.isQuery()) {
+    if (analysis.isQuery()) {
       long distributionPlanCost = System.nanoTime() - startTime;
       context.setDistributionPlanCost(distributionPlanCost);
       QUERY_PLAN_COST_METRIC_SET.recordPlanCost(DISTRIBUTION_PLANNER, 
distributionPlanCost);
@@ -378,12 +289,12 @@ public class QueryExecution implements IQueryExecution {
 
     // if is this Statement is ShowQueryStatement, set its instances to the 
highest priority, so
     // that the sub-tasks of the ShowQueries instances could be executed first.
-    if (StatementType.SHOW_QUERIES.equals(rawStatement.getType())) {
+    if (analysis.needSetHighestPriority()) {
       distributedPlan.getInstances().forEach(instance -> 
instance.setHighestPriority(true));
     }
 
-    if (isQuery() && logger.isDebugEnabled()) {
-      logger.debug(
+    if (isQuery() && LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
           "distribution plan done. Fragment instance count is {}, details is: 
\n {}",
           distributedPlan.getInstances().size(),
           printFragmentInstances(distributedPlan.getInstances()));
@@ -493,7 +404,7 @@ public class QueryExecution implements IQueryExecution {
     while (true) {
       try {
         if (resultHandle.isAborted()) {
-          logger.warn("[ResultHandleAborted]");
+          LOGGER.warn("[ResultHandleAborted]");
           stateMachine.transitionToAborted();
           if (stateMachine.getFailureStatus() != null) {
             throw new IoTDBException(
@@ -504,7 +415,7 @@ public class QueryExecution implements IQueryExecution {
                 TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
           }
         } else if (resultHandle.isFinished()) {
-          logger.debug("[ResultHandleFinished]");
+          LOGGER.debug("[ResultHandleFinished]");
           stateMachine.transitionToFinished();
           return Optional.empty();
         }
@@ -695,41 +606,9 @@ public class QueryExecution implements IQueryExecution {
     // collect redirect info to client for writing
     // if 0.13_data_insert_adapt is true and ClientVersion is NOT V_1_0, stop 
returning redirect
     // info to client
-    if (analysis.getStatement() instanceof InsertBaseStatement
-        && !analysis.isFinishQueryAfterAnalyze()
-        && (!config.isEnable13DataInsertAdapt()
-            || 
IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion()))) {
-      InsertBaseStatement insertStatement = (InsertBaseStatement) 
analysis.getStatement();
-      List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
-      if (insertStatement instanceof InsertRowsStatement
-          || insertStatement instanceof InsertMultiTabletsStatement) {
-        // multiple devices
-        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
-          boolean needRedirect = false;
-          List<TSStatus> subStatus = new ArrayList<>();
-          for (TEndPoint endPoint : redirectNodeList) {
-            // redirect writing only if the redirectEndPoint is not the 
current node
-            if (!config.getAddressAndPort().equals(endPoint)) {
-              subStatus.add(
-                  
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
-              needRedirect = true;
-            } else {
-              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-            }
-          }
-          if (needRedirect) {
-            
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
-            tsstatus.setSubStatus(subStatus);
-          }
-        }
-      } else {
-        // single device
-        TEndPoint redirectEndPoint = redirectNodeList.get(0);
-        // redirect writing only if the redirectEndPoint is not the current 
node
-        if (!config.getAddressAndPort().equals(redirectEndPoint)) {
-          tsstatus.setRedirectNode(redirectEndPoint);
-        }
-      }
+    if (!CONFIG.isEnable13DataInsertAdapt()
+        || 
IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) {
+      planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus, 
statusCode);
     }
 
     return new ExecutionResult(context.getQueryId(), tsstatus);
@@ -739,10 +618,6 @@ public class QueryExecution implements IQueryExecution {
     return distributedPlan;
   }
 
-  public LogicalQueryPlan getLogicalPlan() {
-    return logicalPlan;
-  }
-
   @Override
   public boolean isQuery() {
     return context.getQueryType() == QueryType.READ;
@@ -774,8 +649,8 @@ public class QueryExecution implements IQueryExecution {
   }
 
   @Override
-  public Statement getStatement() {
-    return analysis.getStatement();
+  public String getStatementType() {
+    return analysis.getStatementType();
   }
 
   public MPPQueryContext getContext() {
@@ -787,6 +662,6 @@ public class QueryExecution implements IQueryExecution {
   }
 
   public ScheduledExecutorService getScheduledExecutor() {
-    return scheduledExecutor;
+    return planner.getScheduledExecutorService();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index 55ce059a20d..0fbef6973cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.execution.config;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
@@ -31,7 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -57,7 +55,7 @@ public class ConfigExecution implements IQueryExecution {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigExecution.class);
 
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final TsBlockSerde serde = new TsBlockSerde();
 
   private final MPPQueryContext context;
   private final ExecutorService executor;
@@ -68,31 +66,29 @@ public class ConfigExecution implements IQueryExecution {
   private DatasetHeader datasetHeader;
   private boolean resultSetConsumed;
   private final IConfigTask task;
-  private IConfigTaskExecutor configTaskExecutor;
-
-  private static final TsBlockSerde serde = new TsBlockSerde();
+  private final IConfigTaskExecutor configTaskExecutor;
 
-  private Statement statement;
+  private final StatementType statementType;
   private long totalExecutionTime;
 
-  public ConfigExecution(MPPQueryContext context, Statement statement, 
ExecutorService executor) {
+  public ConfigExecution(
+      MPPQueryContext context,
+      StatementType statementType,
+      ExecutorService executor,
+      IConfigTask task) {
     this.context = context;
-    this.statement = statement;
+    this.statementType = statementType;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.taskFuture = SettableFuture.create();
-    this.task = statement.accept(new ConfigTaskVisitor(), context);
+    this.task = task;
     this.resultSetConsumed = false;
     configTaskExecutor = ClusterConfigTaskExecutor.getInstance();
   }
 
   @TestOnly
   public ConfigExecution(MPPQueryContext context, ExecutorService executor, 
IConfigTask task) {
-    this.context = context;
-    this.executor = executor;
-    this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
-    this.taskFuture = SettableFuture.create();
-    this.task = task;
+    this(context, StatementType.NULL, executor, task);
   }
 
   @Override
@@ -249,7 +245,7 @@ public class ConfigExecution implements IQueryExecution {
   }
 
   @Override
-  public Statement getStatement() {
-    return statement;
+  public String getStatementType() {
+    return statementType.name();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index db8ed28298f..287a9157f3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1938,7 +1938,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     
createLogicalViewStatement.setViewExpressions(Collections.singletonList(viewExpression));
     ExecutionResult executionResult =
         Coordinator.getInstance()
-            .execute(
+            .executeForTreeModel(
                 createLogicalViewStatement,
                 0,
                 null,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
new file mode 100644
index 00000000000..616cca3efb7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+public interface IPlanner {
+
+  IAnalysis analyze(MPPQueryContext context);
+
+  LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext context);
+
+  DistributedQueryPlan doDistributionPlan(IAnalysis analysis, LogicalQueryPlan 
logicalPlan);
+
+  IScheduler doSchedule(
+      IAnalysis analysis,
+      DistributedQueryPlan distributedPlan,
+      MPPQueryContext context,
+      QueryStateMachine stateMachine);
+
+  void invalidatePartitionCache();
+
+  ScheduledExecutorService getScheduledExecutorService();
+
+  void setRedirectInfo(
+      IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode);
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
new file mode 100644
index 00000000000..a42ff843f30
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
+import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class TreeModelPlanner implements IPlanner {
+
+  private final Statement statement;
+
+  private final ExecutorService executor;
+  private final ExecutorService writeOperationExecutor;
+  private final ScheduledExecutorService scheduledExecutor;
+
+  private final IPartitionFetcher partitionFetcher;
+
+  private final ISchemaFetcher schemaFetcher;
+
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+      syncInternalServiceClientManager;
+
+  private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+      asyncInternalServiceClientManager;
+
+  public TreeModelPlanner(
+      Statement statement,
+      ExecutorService executor,
+      ExecutorService writeOperationExecutor,
+      ScheduledExecutorService scheduledExecutor,
+      IPartitionFetcher partitionFetcher,
+      ISchemaFetcher schemaFetcher,
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
+      IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+          asyncInternalServiceClientManager) {
+    this.statement = statement;
+    this.executor = executor;
+    this.writeOperationExecutor = writeOperationExecutor;
+    this.scheduledExecutor = scheduledExecutor;
+    this.partitionFetcher = partitionFetcher;
+    this.schemaFetcher = schemaFetcher;
+    this.syncInternalServiceClientManager = syncInternalServiceClientManager;
+    this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
+  }
+
+  @Override
+  public IAnalysis analyze(MPPQueryContext context) {
+    return new Analyzer(context, partitionFetcher, 
schemaFetcher).analyze(statement);
+  }
+
+  @Override
+  public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext 
context) {
+    LogicalPlanner logicalPlanner = new LogicalPlanner(context);
+    return logicalPlanner.plan((Analysis) analysis);
+  }
+
+  @Override
+  public DistributedQueryPlan doDistributionPlan(IAnalysis analysis, 
LogicalQueryPlan logicalPlan) {
+    DistributionPlanner planner = new DistributionPlanner((Analysis) analysis, 
logicalPlan);
+    return planner.planFragments();
+  }
+
+  @Override
+  public IScheduler doSchedule(
+      IAnalysis analysis,
+      DistributedQueryPlan distributedPlan,
+      MPPQueryContext context,
+      QueryStateMachine stateMachine) {
+    IScheduler scheduler;
+
+    boolean isPipeEnrichedTsFileLoad =
+        statement instanceof PipeEnrichedStatement
+            && ((PipeEnrichedStatement) statement).getInnerStatement()
+                instanceof LoadTsFileStatement;
+    if (statement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) {
+      scheduler =
+          new LoadTsFileScheduler(
+              distributedPlan,
+              context,
+              stateMachine,
+              syncInternalServiceClientManager,
+              partitionFetcher,
+              isPipeEnrichedTsFileLoad);
+    } else {
+      scheduler =
+          new ClusterScheduler(
+              context,
+              stateMachine,
+              distributedPlan.getInstances(),
+              context.getQueryType(),
+              executor,
+              writeOperationExecutor,
+              scheduledExecutor,
+              syncInternalServiceClientManager,
+              asyncInternalServiceClientManager);
+    }
+
+    scheduler.start();
+    return scheduler;
+  }
+
+  @Override
+  public void invalidatePartitionCache() {
+    partitionFetcher.invalidAllCache();
+  }
+
+  @Override
+  public ScheduledExecutorService getScheduledExecutorService() {
+    return scheduledExecutor;
+  }
+
+  @Override
+  public void setRedirectInfo(
+      IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode) {
+    Analysis analysis = (Analysis) iAnalysis;
+    if (analysis.getStatement() instanceof InsertBaseStatement
+        && !analysis.isFinishQueryAfterAnalyze()) {
+      InsertBaseStatement insertStatement = (InsertBaseStatement) 
analysis.getStatement();
+      List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
+      if (insertStatement instanceof InsertRowsStatement
+          || insertStatement instanceof InsertMultiTabletsStatement) {
+        // multiple devices
+        if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+          boolean needRedirect = false;
+          List<TSStatus> subStatus = new ArrayList<>();
+          for (TEndPoint endPoint : redirectNodeList) {
+            // redirect writing only if the redirectEndPoint is not the 
current node
+            if (!localEndPoint.equals(endPoint)) {
+              subStatus.add(
+                  
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+              needRedirect = true;
+            } else {
+              subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+            }
+          }
+          if (needRedirect) {
+            
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+            tsstatus.setSubStatus(subStatus);
+          }
+        }
+      } else {
+        // single device
+        TEndPoint redirectEndPoint = redirectNodeList.get(0);
+        // redirect writing only if the redirectEndPoint is not the current 
node
+        if (!localEndPoint.equals(redirectEndPoint)) {
+          tsstatus.setRedirectNode(redirectEndPoint);
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 711ff6c25cc..e302ef1dc35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -141,7 +142,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
         new FragmentInstance(
             fragment,
             fragment.getId().genFragmentInstanceId(),
-            globalTimePredicate,
+            new TreeModelTimePredicate(globalTimePredicate),
             queryContext.getQueryType(),
             queryContext.getTimeOut(),
             queryContext.getSession(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 36cb4ecc9d1..542f2b6f5b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 
@@ -61,7 +62,7 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
           new FragmentInstance(
               new PlanFragment(fragment.getId(), split),
               fragment.getId().genFragmentInstanceId(),
-              globalTimePredicate,
+              new TreeModelTimePredicate(globalTimePredicate),
               queryContext.getQueryType(),
               queryContext.getTimeOut(),
               queryContext.getSession());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
index ca133cc5b8d..cedc99669a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
@@ -23,10 +23,10 @@ import 
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import java.util.List;
 
 public class DistributedQueryPlan {
-  private MPPQueryContext context;
-  private SubPlan rootSubPlan;
-  private List<PlanFragment> fragments;
-  private List<FragmentInstance> instances;
+  private final MPPQueryContext context;
+  private final SubPlan rootSubPlan;
+  private final List<PlanFragment> fragments;
+  private final List<FragmentInstance> instances;
 
   public DistributedQueryPlan(
       MPPQueryContext context,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 4c8204a2a22..4dd56beefa8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
-import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -46,9 +45,9 @@ import java.util.Objects;
 
 public class FragmentInstance implements IConsensusRequest {
 
-  private static final Logger logger = 
LoggerFactory.getLogger(FragmentInstance.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstance.class);
 
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private final FragmentInstanceId id;
   private final QueryType type;
@@ -60,7 +59,7 @@ public class FragmentInstance implements IConsensusRequest {
 
   private TDataNodeLocation hostDataNode;
 
-  private final Expression globalTimePredicate;
+  private final TimePredicate globalTimePredicate;
 
   private final long timeOut;
 
@@ -86,7 +85,7 @@ public class FragmentInstance implements IConsensusRequest {
   public FragmentInstance(
       PlanFragment fragment,
       FragmentInstanceId id,
-      Expression globalTimePredicate,
+      TimePredicate globalTimePredicate,
       QueryType type,
       long timeOut,
       SessionInfo sessionInfo) {
@@ -94,7 +93,7 @@ public class FragmentInstance implements IConsensusRequest {
     this.globalTimePredicate = globalTimePredicate;
     this.id = id;
     this.type = type;
-    this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
+    this.timeOut = timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold();
     this.isRoot = false;
     this.sessionInfo = sessionInfo;
   }
@@ -102,7 +101,7 @@ public class FragmentInstance implements IConsensusRequest {
   public FragmentInstance(
       PlanFragment fragment,
       FragmentInstanceId id,
-      Expression globalTimePredicate,
+      TimePredicate globalTimePredicate,
       QueryType type,
       long timeOut,
       SessionInfo sessionInfo,
@@ -116,7 +115,7 @@ public class FragmentInstance implements IConsensusRequest {
   public FragmentInstance(
       PlanFragment fragment,
       FragmentInstanceId id,
-      Expression globalTimePredicate,
+      TimePredicate globalTimePredicate,
       QueryType type,
       long timeOut,
       SessionInfo sessionInfo,
@@ -172,7 +171,7 @@ public class FragmentInstance implements IConsensusRequest {
     isHighestPriority = highestPriority;
   }
 
-  public Expression getGlobalTimePredicate() {
+  public TimePredicate getGlobalTimePredicate() {
     return globalTimePredicate;
   }
 
@@ -214,7 +213,7 @@ public class FragmentInstance implements IConsensusRequest {
     boolean hasSessionInfo = ReadWriteIOUtils.readBool(buffer);
     SessionInfo sessionInfo = hasSessionInfo ? 
SessionInfo.deserializeFrom(buffer) : null;
     boolean hasTimePredicate = ReadWriteIOUtils.readBool(buffer);
-    Expression globalTimePredicate = hasTimePredicate ? 
Expression.deserialize(buffer) : null;
+    TimePredicate globalTimePredicate = hasTimePredicate ? 
TimePredicate.deserialize(buffer) : null;
     QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
     int dataNodeFINum = ReadWriteIOUtils.readInt(buffer);
     FragmentInstance fragmentInstance =
@@ -239,7 +238,7 @@ public class FragmentInstance implements IConsensusRequest {
       }
       ReadWriteIOUtils.write(globalTimePredicate != null, outputStream);
       if (globalTimePredicate != null) {
-        Expression.serialize(globalTimePredicate, outputStream);
+        globalTimePredicate.serialize(outputStream);
       }
       ReadWriteIOUtils.write(type.ordinal(), outputStream);
       ReadWriteIOUtils.write(dataNodeFINum, outputStream);
@@ -250,7 +249,7 @@ public class FragmentInstance implements IConsensusRequest {
       ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
       return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
     } catch (IOException e) {
-      logger.error("Unexpected error occurs when serializing this 
FragmentInstance.", e);
+      LOGGER.error("Unexpected error occurs when serializing this 
FragmentInstance.", e);
       throw new SerializationRunTimeException(e);
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 7583b624b64..925bfe4416e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -38,7 +38,7 @@ import java.util.Objects;
 
 /** PlanFragment contains a sub-query of distributed query. */
 public class PlanFragment {
-  // TODO once you add field for this class you need to change the serialize 
and deserialize methods
+  // once you add field for this class you need to change the serialize and 
deserialize methods
   private final PlanFragmentId id;
   private PlanNode planNodeTree;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
index 0bceb53eb5c..909cf67acfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class SubPlan {
-  private PlanFragment planFragment;
+  private final PlanFragment planFragment;
   private List<SubPlan> children;
 
   public SubPlan(PlanFragment planFragment) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java
new file mode 100644
index 00000000000..1de79f00809
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan;
+
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface TimePredicate {
+
+  void serialize(DataOutputStream stream) throws IOException;
+
+  Filter convertPredicateToTimeFilter();
+
+  static TimePredicate deserialize(ByteBuffer byteBuffer) {
+    // TODO will return another kind of TimePredicate like 
TableModelTimePredicate in the future
+    return new TreeModelTimePredicate(Expression.deserialize(byteBuffer));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java
new file mode 100644
index 00000000000..139deed980c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan;
+
+import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+public class TreeModelTimePredicate implements TimePredicate {
+
+  private final Expression timePredicate;
+
+  public TreeModelTimePredicate(Expression timePredicate) {
+    this.timePredicate = timePredicate;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    Expression.serialize(timePredicate, stream);
+  }
+
+  @Override
+  public Filter convertPredicateToTimeFilter() {
+    return PredicateUtils.convertPredicateToTimeFilter(timePredicate);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TreeModelTimePredicate that = (TreeModelTimePredicate) o;
+    return Objects.equals(timePredicate, that.timePredicate);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(timePredicate);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index e5afac118d4..87dc13ecd2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -180,7 +180,8 @@ public class IoTDBInternalLocalReporter extends 
IoTDBInternalReporter {
             InsertRowStatement s = StatementGenerator.createStatement(request);
             final long queryId = SESSION_MANAGER.requestQueryId();
             ExecutionResult result =
-                COORDINATOR.execute(s, queryId, sessionInfo, "", 
partitionFetcher, schemaFetcher);
+                COORDINATOR.executeForTreeModel(
+                    s, queryId, sessionInfo, "", partitionFetcher, 
schemaFetcher);
             if (result.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
               LOGGER.error("Failed to update the value of metric with status 
{}", result.status);
             }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index 66e65b37150..61ec16a35da 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -48,7 +48,6 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
@@ -1810,7 +1809,7 @@ public class MergeSortOperatorTest {
     }
 
     @Override
-    public Statement getStatement() {
+    public String getStatementType() {
       return null;
     }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
index d19e8c29c0e..cc3363a097a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
@@ -71,7 +72,7 @@ public class FragmentInstanceSerdeTest {
         new FragmentInstance(
             new PlanFragment(planFragmentId, constructPlanNodeTree()),
             planFragmentId.genFragmentInstanceId(),
-            ExpressionFactory.groupByTime(1, 2, 3, 4),
+            new TreeModelTimePredicate(ExpressionFactory.groupByTime(1, 2, 3, 
4)),
             QueryType.READ,
             config.getQueryTimeoutThreshold(),
             sessionInfo);

Reply via email to