This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c6d901ca76c Refactor IQueryExecution to support Table Model in the
future
c6d901ca76c is described below
commit c6d901ca76c78de79a689eac0e1d2a4c62aee4ba
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Mar 13 14:43:42 2024 +0800
Refactor IQueryExecution to support Table Model in the future
---
.../org/apache/iotdb/db/audit/AuditLogger.java | 2 +-
.../protocol/writeback/WriteBackConnector.java | 2 +-
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 2 +-
.../receiver/legacy/loader/DeletionLoader.java | 2 +-
.../pipe/receiver/legacy/loader/TsFileLoader.java | 2 +-
.../receiver/thrift/IoTDBDataNodeReceiver.java | 2 +-
.../db/protocol/client/DataNodeInternalClient.java | 2 +-
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 2 +-
.../rest/v1/impl/GrafanaApiServiceImpl.java | 6 +-
.../protocol/rest/v1/impl/RestApiServiceImpl.java | 6 +-
.../rest/v2/impl/GrafanaApiServiceImpl.java | 6 +-
.../protocol/rest/v2/impl/RestApiServiceImpl.java | 8 +-
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 106 ++++++-----
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../db/queryengine/common/MPPQueryContext.java | 2 +-
.../fragment/FragmentInstanceContext.java | 10 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 107 ++++++-----
.../db/queryengine/plan/analyze/Analysis.java | 45 ++++-
.../IAnalysis.java} | 46 ++---
.../plan/analyze/LoadTsfileAnalyzer.java | 2 +-
.../analyze/schema/AutoCreateSchemaExecutor.java | 2 +-
.../analyze/schema/ClusterSchemaFetchExecutor.java | 2 +-
.../plan/execution/IQueryExecution.java | 3 +-
.../queryengine/plan/execution/QueryExecution.java | 209 +++++----------------
.../plan/execution/config/ConfigExecution.java | 32 ++--
.../config/executor/ClusterConfigTaskExecutor.java | 2 +-
.../db/queryengine/plan/planner/IPlanner.java | 54 ++++++
.../queryengine/plan/planner/TreeModelPlanner.java | 197 +++++++++++++++++++
.../SimpleFragmentParallelPlanner.java | 3 +-
.../distribution/WriteFragmentParallelPlanner.java | 3 +-
.../plan/planner/plan/DistributedQueryPlan.java | 8 +-
.../plan/planner/plan/FragmentInstance.java | 23 ++-
.../plan/planner/plan/PlanFragment.java | 2 +-
.../db/queryengine/plan/planner/plan/SubPlan.java | 2 +-
.../plan/planner/plan/TimePredicate.java | 39 ++++
.../plan/planner/plan/TreeModelTimePredicate.java | 64 +++++++
.../metrics/IoTDBInternalLocalReporter.java | 3 +-
.../execution/operator/MergeSortOperatorTest.java | 3 +-
.../plan/planner/FragmentInstanceSerdeTest.java | 3 +-
39 files changed, 643 insertions(+), 373 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index a08142e13ef..645dfbf016d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -115,7 +115,7 @@ public class AuditLogger {
if (auditLogOperationList.contains(operation)) {
if (auditLogStorageList.contains(AuditLogStorage.IOTDB)) {
try {
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
generateInsertStatement(log, address, username),
SESSION_MANAGER.requestQueryId(),
sessionInfo,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 471474148b8..4b9437372ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -154,7 +154,7 @@ public class WriteBackConnector implements PipeConnector {
private TSStatus executeStatement(InsertBaseStatement statement) {
return Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
new PipeEnrichedStatement(statement),
SessionManager.getInstance().requestQueryId(),
new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
index 56180f75b77..fe0b2ac4073 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -136,7 +136,7 @@ public class IoTDBLegacyPipeReceiverAgent {
long queryId = SessionManager.getInstance().requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
statement,
queryId,
new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
index a3f08af6413..f5711a826ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java
@@ -61,7 +61,7 @@ public class DeletionLoader implements ILoader {
long queryId = SessionManager.getInstance().requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
statement,
queryId,
new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
index 6bcbe95c76f..080f5f105ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java
@@ -63,7 +63,7 @@ public class TsFileLoader implements ILoader {
long queryId = SessionManager.getInstance().requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
statement,
queryId,
new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
index 946a456645c..fcb41ee3d0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java
@@ -322,7 +322,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final ExecutionResult result =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
statement,
SessionManager.getInstance().requestQueryId(),
new SessionInfo(0, AuthorityChecker.SUPER_USER,
ZoneId.systemDefault()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
index 925a4b450f7..cf1250aaecb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java
@@ -84,7 +84,7 @@ public class DataNodeInternalClient {
// call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(session),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 3c0f3c05726..86c430ac0a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -162,7 +162,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
long queryId = sessionManager.requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
statement,
queryId,
sessionManager.getSessionInfo(session),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
index 40bab03fe38..07c735faa7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
@@ -110,7 +110,7 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -177,7 +177,7 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -239,7 +239,7 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
index 47433bb5a68..428f2c3baaa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
@@ -97,7 +97,7 @@ public class RestApiServiceImpl extends RestApiService {
}
queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -151,7 +151,7 @@ public class RestApiServiceImpl extends RestApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -211,7 +211,7 @@ public class RestApiServiceImpl extends RestApiService {
}
queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
insertTabletStatement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
index 3025cd83ba1..e3385885a01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
@@ -110,7 +110,7 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -177,7 +177,7 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -239,7 +239,7 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
index 5cace589391..c0029b913eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
@@ -99,7 +99,7 @@ public class RestApiServiceImpl extends RestApiService {
}
queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -154,7 +154,7 @@ public class RestApiServiceImpl extends RestApiService {
queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -203,7 +203,7 @@ public class RestApiServiceImpl extends RestApiService {
}
queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
insertRowsStatement,
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
@@ -273,7 +273,7 @@ public class RestApiServiceImpl extends RestApiService {
}
queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
insertTabletStatement,
SESSION_MANAGER.requestQueryId(),
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index fec0bbd87ec..5612d8777e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -302,7 +302,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -347,7 +347,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// record each operation time cost
if (statementType != null) {
addStatementExecutionLatency(
- OperationType.EXECUTE_QUERY_STATEMENT, statementType,
currentOperationCost);
+ OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(),
currentOperationCost);
}
if (finished) {
@@ -393,7 +393,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -435,7 +435,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// record each operation time cost
addStatementExecutionLatency(
- OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY,
currentOperationCost);
+ OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(),
currentOperationCost);
if (finished) {
// record total time cost for one query
@@ -481,7 +481,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -525,7 +525,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// record each operation time cost
addStatementExecutionLatency(
- OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY,
currentOperationCost);
+ OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(),
currentOperationCost);
if (finished) {
// record total time cost for one query
@@ -568,7 +568,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -612,7 +612,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// record each operation time cost
addStatementExecutionLatency(
- OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY,
currentOperationCost);
+ OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(),
currentOperationCost);
if (finished) {
// record total time cost for one query
@@ -895,7 +895,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -946,7 +946,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// record each operation time cost
addStatementExecutionLatency(
- OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY,
currentOperationCost);
+ OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(),
currentOperationCost);
if (finished) {
// record total time cost for one query
@@ -1051,7 +1051,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
long startTime = System.nanoTime();
boolean finished = false;
- StatementType statementType = null;
+ String statementType = null;
Throwable t = null;
try {
IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
@@ -1067,7 +1067,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
resp.setMoreData(false);
return resp;
}
- statementType = queryExecution.getStatement().getType();
+ statementType = queryExecution.getStatementType();
try (SetThreadName queryName = new
SetThreadName(queryExecution.getQueryId())) {
Pair<List<ByteBuffer>, Boolean> pair =
@@ -1228,7 +1228,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1269,7 +1269,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1319,7 +1319,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1368,7 +1368,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1408,7 +1408,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1451,7 +1451,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1495,7 +1495,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
for (int i = 0; i < req.getStatements().size(); i++) {
String statement = req.getStatements().get(i);
long t2 = System.nanoTime();
- StatementType type = null;
+ String type = null;
OperationQuota quota = null;
try {
Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
@@ -1518,10 +1518,10 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
long queryId = SESSION_MANAGER.requestQueryId();
- type = s.getType();
+ type = s.getType() == null ? null : s.getType().name();
// create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1549,7 +1549,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
} finally {
addStatementExecutionLatency(
- OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL,
System.nanoTime() - t1);
+ OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
}
return isAllSuccessful
@@ -1571,7 +1571,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
boolean finished = false;
long startTime = System.nanoTime();
- StatementType statementType = null;
+ String statementType = null;
Throwable t = null;
try {
IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
@@ -1587,7 +1587,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
resp.setMoreData(true);
return resp;
}
- statementType = queryExecution.getStatement().getType();
+ statementType = queryExecution.getStatementType();
try (SetThreadName queryName = new
SetThreadName(queryExecution.getQueryId())) {
Pair<TSQueryDataSet, Boolean> pair =
@@ -1672,7 +1672,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1688,7 +1688,9 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
e, OperationType.INSERT_RECORDS,
TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addStatementExecutionLatency(
- OperationType.INSERT_RECORDS, StatementType.BATCH_INSERT_ROWS,
System.nanoTime() - t1);
+ OperationType.INSERT_RECORDS,
+ StatementType.BATCH_INSERT_ROWS.name(),
+ System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
quota.close();
@@ -1739,7 +1741,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1756,7 +1758,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} finally {
addStatementExecutionLatency(
OperationType.INSERT_RECORDS_OF_ONE_DEVICE,
- StatementType.BATCH_INSERT_ONE_DEVICE,
+ StatementType.BATCH_INSERT_ONE_DEVICE.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -1807,7 +1809,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1827,7 +1829,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} finally {
addStatementExecutionLatency(
OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE,
- StatementType.BATCH_INSERT_ONE_DEVICE,
+ StatementType.BATCH_INSERT_ONE_DEVICE.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -1876,7 +1878,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1892,7 +1894,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
e, OperationType.INSERT_RECORD,
TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addStatementExecutionLatency(
- OperationType.INSERT_RECORD, StatementType.INSERT, System.nanoTime()
- t1);
+ OperationType.INSERT_RECORD, StatementType.INSERT.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
quota.close();
@@ -1933,7 +1935,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -1949,7 +1951,9 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
e, OperationType.INSERT_TABLETS,
TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addStatementExecutionLatency(
- OperationType.INSERT_TABLETS, StatementType.MULTI_BATCH_INSERT,
System.nanoTime() - t1);
+ OperationType.INSERT_TABLETS,
+ StatementType.MULTI_BATCH_INSERT.name(),
+ System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
quota.close();
@@ -1989,7 +1993,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2005,7 +2009,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
e, OperationType.INSERT_TABLET,
TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addStatementExecutionLatency(
- OperationType.INSERT_TABLET, StatementType.BATCH_INSERT,
System.nanoTime() - t1);
+ OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
quota.close();
@@ -2054,7 +2058,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2071,7 +2075,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} finally {
addStatementExecutionLatency(
OperationType.INSERT_STRING_RECORDS,
- StatementType.BATCH_INSERT_ROWS,
+ StatementType.BATCH_INSERT_ROWS.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -2140,7 +2144,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2204,7 +2208,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2308,7 +2312,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long queryId = SESSION_MANAGER.requestQueryId();
// create and cache dataset
ExecutionResult executionResult =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2352,7 +2356,9 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return null;
} finally {
addStatementExecutionLatency(
- OperationType.EXECUTE_STATEMENT, statement.getType(),
System.nanoTime() - startTime);
+ OperationType.EXECUTE_STATEMENT,
+ statement.getType().name(),
+ System.nanoTime() - startTime);
SESSION_MANAGER.updateIdleTime();
}
}
@@ -2385,7 +2391,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2433,7 +2439,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2478,7 +2484,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2522,7 +2528,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2619,7 +2625,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
statement,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
@@ -2635,7 +2641,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
e, OperationType.INSERT_STRING_RECORD,
TSStatusCode.EXECUTE_STATEMENT_ERROR);
} finally {
addStatementExecutionLatency(
- OperationType.INSERT_STRING_RECORD, StatementType.INSERT,
System.nanoTime() - t1);
+ OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(),
System.nanoTime() - t1);
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
quota.close();
@@ -2682,7 +2688,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
/** Add stat of operation into metrics */
private void addStatementExecutionLatency(
- OperationType operation, StatementType statementType, long costTime) {
+ OperationType operation, String statementType, long costTime) {
if (statementType == null) {
return;
}
@@ -2696,7 +2702,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
Tag.INTERFACE.toString(),
operation.toString(),
Tag.TYPE.toString(),
- statementType.name());
+ statementType);
}
private String checkIdentifierAndRemoveBackQuotesIfNecessary(String
identifier) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 76f023f5907..d7b6804ea3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1099,7 +1099,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
SESSION_MANAGER.requestQueryId(session,
SESSION_MANAGER.requestStatementId(session));
// Create and cache dataset
ExecutionResult result =
- COORDINATOR.execute(
+ COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(session),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 4d7e5a4512f..0e50966d864 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -38,7 +38,7 @@ import java.util.List;
*/
public class MPPQueryContext {
private String sql;
- private QueryId queryId;
+ private final QueryId queryId;
// LocalQueryId is kept to adapt to the old client, it's unique in current
datanode.
// Now it's only be used by EXPLAIN ANALYZE to get queryExecution.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 8c74170cef3..89a2e60ef8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -27,8 +27,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
-import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
-import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
@@ -132,7 +131,7 @@ public class FragmentInstanceContext extends QueryContext {
FragmentInstanceStateMachine stateMachine,
SessionInfo sessionInfo,
IDataRegionForQuery dataRegion,
- Expression globalTimePredicate,
+ TimePredicate globalTimePredicate,
Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) {
FragmentInstanceContext instanceContext =
new FragmentInstanceContext(
@@ -167,14 +166,15 @@ public class FragmentInstanceContext extends QueryContext
{
FragmentInstanceStateMachine stateMachine,
SessionInfo sessionInfo,
IDataRegionForQuery dataRegion,
- Expression globalTimePredicate,
+ TimePredicate globalTimePredicate,
Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) {
this.id = id;
this.stateMachine = stateMachine;
this.executionEndTime.set(END_TIME_INITIAL_VALUE);
this.sessionInfo = sessionInfo;
this.dataRegion = dataRegion;
- this.globalTimeFilter =
PredicateUtils.convertPredicateToTimeFilter(globalTimePredicate);
+ this.globalTimeFilter =
+ globalTimePredicate == null ? null :
globalTimePredicate.convertPredicateToTimeFilter();
this.dataNodeQueryContextMap = dataNodeQueryContextMap;
this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 327008e9c2a..8e9cedf2862 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -41,6 +41,8 @@ import
org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution;
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner;
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -53,6 +55,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.BiFunction;
import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
@@ -100,39 +103,11 @@ public class Coordinator {
this.scheduledExecutor = getScheduledExecutor();
}
- private IQueryExecution createQueryExecution(
- Statement statement,
- MPPQueryContext queryContext,
- IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher,
- long timeOut,
- long startTime) {
- queryContext.setTimeOut(timeOut);
- queryContext.setStartTime(startTime);
- if (statement instanceof IConfigStatement) {
- queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
- return new ConfigExecution(queryContext, statement, executor);
- }
- return new QueryExecution(
- statement,
- queryContext,
- executor,
- writeOperationExecutor,
- scheduledExecutor,
- partitionFetcher,
- schemaFetcher,
- SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
- ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
- }
-
- public ExecutionResult execute(
- Statement statement,
+ private ExecutionResult execution(
long queryId,
SessionInfo session,
String sql,
- IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher,
- long timeOut) {
+ BiFunction<MPPQueryContext, Long, IQueryExecution>
iQueryExecutionFactory) {
long startTime = System.currentTimeMillis();
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
MPPQueryContext queryContext = null;
@@ -148,14 +123,7 @@ public class Coordinator {
session,
DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT);
- IQueryExecution execution =
- createQueryExecution(
- statement,
- queryContext,
- partitionFetcher,
- schemaFetcher,
- timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
- startTime);
+ IQueryExecution execution = iQueryExecutionFactory.apply(queryContext,
startTime);
if (execution.isQuery()) {
queryExecutionMap.put(queryId, execution);
} else {
@@ -170,25 +138,78 @@ public class Coordinator {
}
return result;
} finally {
- int lockNums = queryContext.getAcquiredLockNum();
- if (queryContext != null && lockNums > 0) {
- for (int i = 0; i < lockNums; i++)
DataNodeSchemaCache.getInstance().releaseInsertLock();
+ if (queryContext != null && queryContext.getAcquiredLockNum() > 0) {
+ for (int i = 0, lockNums = queryContext.getAcquiredLockNum(); i <
lockNums; i++) {
+ DataNodeSchemaCache.getInstance().releaseInsertLock();
+ }
}
}
}
/** This method is called by the write method. So it does not set the
timeout parameter. */
- public ExecutionResult execute(
+ public ExecutionResult executeForTreeModel(
Statement statement,
long queryId,
SessionInfo session,
String sql,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
- return execute(
+ return executeForTreeModel(
statement, queryId, session, sql, partitionFetcher, schemaFetcher,
Long.MAX_VALUE);
}
+ public ExecutionResult executeForTreeModel(
+ Statement statement,
+ long queryId,
+ SessionInfo session,
+ String sql,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher,
+ long timeOut) {
+ return execution(
+ queryId,
+ session,
+ sql,
+ ((queryContext, startTime) ->
+ createQueryExecutionForTreeModel(
+ statement,
+ queryContext,
+ partitionFetcher,
+ schemaFetcher,
+ timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
+ startTime)));
+ }
+
+ private IQueryExecution createQueryExecutionForTreeModel(
+ Statement statement,
+ MPPQueryContext queryContext,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher,
+ long timeOut,
+ long startTime) {
+ queryContext.setTimeOut(timeOut);
+ queryContext.setStartTime(startTime);
+ if (statement instanceof IConfigStatement) {
+ queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
+ return new ConfigExecution(
+ queryContext,
+ statement.getType(),
+ executor,
+ statement.accept(new ConfigTaskVisitor(), queryContext));
+ }
+ TreeModelPlanner treeModelPlanner =
+ new TreeModelPlanner(
+ statement,
+ executor,
+ writeOperationExecutor,
+ scheduledExecutor,
+ partitionFetcher,
+ schemaFetcher,
+ SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+ ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
+ return new QueryExecution(treeModelPlanner, queryContext, executor);
+ }
+
public IQueryExecution getQueryExecution(Long queryId) {
return queryExecutionMap.get(queryId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index e91d45d3575..3f109e86e9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -28,9 +28,13 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource;
+import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceContext;
+import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -42,12 +46,14 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -63,7 +69,7 @@ import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
/** Analysis used for planning a query. TODO: This class may need to store
more info for a query. */
-public class Analysis {
+public class Analysis implements IAnalysis {
/////////////////////////////////////////////////////////////////////////////////////////////////
// Common Analysis
@@ -378,6 +384,7 @@ public class Analysis {
this.globalTimePredicate = timeFilter;
}
+ @Override
public DatasetHeader getRespDatasetHeader() {
return respDatasetHeader;
}
@@ -403,7 +410,13 @@ public class Analysis {
return type;
}
- public boolean hasDataSource() {
+ @Override
+ public boolean canSkipExecute(MPPQueryContext context) {
+ return isFinishQueryAfterAnalyze()
+ || (context.getQueryType() == QueryType.READ && !hasDataSource());
+ }
+
+ private boolean hasDataSource() {
return (dataPartition != null && !dataPartition.isEmpty())
|| (schemaPartition != null && !schemaPartition.isEmpty())
|| statement instanceof ShowQueriesStatement
@@ -411,6 +424,32 @@ public class Analysis {
&& ((QueryStatement) statement).isAggregationQuery());
}
+ @Override
+ public TsBlock constructResultForMemorySource(MPPQueryContext context) {
+ StatementMemorySource memorySource =
+ new StatementMemorySourceVisitor()
+ .process(getStatement(), new StatementMemorySourceContext(context,
this));
+ setRespDatasetHeader(memorySource.getDatasetHeader());
+ return memorySource.getTsBlock();
+ }
+
+ @Override
+ public boolean isQuery() {
+ return statement.isQuery();
+ }
+
+ @Override
+ public boolean needSetHighestPriority() {
+ // if is this Statement is ShowQueryStatement, set its instances to the
highest priority, so
+ // that the sub-tasks of the ShowQueries instances could be executed first.
+ return StatementType.SHOW_QUERIES.equals(statement.getType());
+ }
+
+ @Override
+ public String getStatementType() {
+ return statement.getType().name();
+ }
+
public Map<Expression, Set<Expression>> getCrossGroupByExpressions() {
return crossGroupByExpressions;
}
@@ -487,10 +526,12 @@ public class Analysis {
this.finishQueryAfterAnalyze = finishQueryAfterAnalyze;
}
+ @Override
public boolean isFailed() {
return failStatus != null;
}
+ @Override
public TSStatus getFailStatus() {
return this.failStatus;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
similarity index 52%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 9652cbce155..ca85161c7ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -17,52 +17,28 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.execution;
+package org.apache.iotdb.db.queryengine.plan.analyze;
-import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import java.nio.ByteBuffer;
-import java.util.Optional;
+public interface IAnalysis {
-public interface IQueryExecution {
+ boolean isFailed();
- void start();
+ TSStatus getFailStatus();
- void stop(Throwable t);
+ boolean canSkipExecute(MPPQueryContext context);
- void stopAndCleanup();
-
- void stopAndCleanup(Throwable t);
-
- void cancel();
-
- ExecutionResult getStatus();
-
- Optional<TsBlock> getBatchResult() throws IoTDBException;
-
- Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException;
-
- boolean hasNextResult();
-
- int getOutputValueColumnCount();
-
- DatasetHeader getDatasetHeader();
+ TsBlock constructResultForMemorySource(MPPQueryContext context);
boolean isQuery();
- String getQueryId();
-
- long getStartExecutionTime();
-
- void recordExecutionTime(long executionTime);
-
- /** @return cost time in ns */
- long getTotalExecutionTime();
+ boolean needSetHighestPriority();
- Optional<String> getExecuteSQL();
+ DatasetHeader getRespDatasetHeader();
- Statement getStatement();
+ String getStatementType();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 1e32ea9cee1..b0d5a1b92eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -475,7 +475,7 @@ public class LoadTsfileAnalyzer {
final long queryId = SessionManager.getInstance().requestQueryId();
final ExecutionResult result =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
statement,
queryId,
null,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 4fca45790ea..cbf85d67a82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -83,7 +83,7 @@ class AutoCreateSchemaExecutor {
private ExecutionResult executeStatement(Statement statement,
MPPQueryContext context) {
- return coordinator.execute(
+ return coordinator.executeForTreeModel(
statement,
SessionManager.getInstance().requestQueryId(),
context == null ? null : context.getSession(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 0adbca34ad5..75e087307ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -81,7 +81,7 @@ class ClusterSchemaFetchExecutor {
if (context != null && context.getQueryType() == QueryType.READ) {
sql += ", " + context.getQueryId() + " : " + context.getSql();
}
- return coordinator.execute(
+ return coordinator.executeForTreeModel(
statement,
queryId,
context == null ? null : context.getSession(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index 9652cbce155..0a92914b571 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.execution;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import java.nio.ByteBuffer;
@@ -64,5 +63,5 @@ public interface IQueryExecution {
Optional<String> getExecuteSQL();
- Statement getStatement();
+ String getStatementType();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 99eb73e3425..8446c0bd4fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -20,9 +20,6 @@ package org.apache.iotdb.db.queryengine.plan.execution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.IClientManager;
-import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
@@ -40,31 +37,15 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
-import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
-import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import
org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle;
-import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource;
-import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceContext;
-import
org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
-import
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
-import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
-import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
@@ -77,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
@@ -99,9 +79,9 @@ import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DIST
* corresponding physical nodes. 3. Collect and monitor the progress/states of
this query.
*/
public class QueryExecution implements IQueryExecution {
- private static final Logger logger =
LoggerFactory.getLogger(QueryExecution.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryExecution.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final int MAX_RETRY_COUNT = 3;
private static final long RETRY_INTERVAL_IN_MS = 2000;
private int retryCount = 0;
@@ -109,19 +89,12 @@ public class QueryExecution implements IQueryExecution {
private IScheduler scheduler;
private final QueryStateMachine stateMachine;
- private final Statement rawStatement;
- private Analysis analysis;
+ private final IPlanner planner;
+
+ private IAnalysis analysis;
private LogicalQueryPlan logicalPlan;
private DistributedQueryPlan distributedPlan;
- private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
- private final ScheduledExecutorService scheduledExecutor;
- // TODO need to use factory to decide standalone or cluster
- private final IPartitionFetcher partitionFetcher;
- // TODO need to use factory to decide standalone or cluster,
- private final ISchemaFetcher schemaFetcher;
-
// The result of QueryExecution will be written to the
MPPDataExchangeManager in current Node.
// We use this SourceHandle to fetch the TsBlock from it.
private ISourceHandle resultHandle;
@@ -129,12 +102,6 @@ public class QueryExecution implements IQueryExecution {
// used for cleaning resultHandle up exactly once
private final AtomicBoolean resultHandleCleanUp;
- private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- syncInternalServiceClientManager;
-
- private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
- asyncInternalServiceClientManager;
-
private final AtomicBoolean stopped;
// cost time in ns
@@ -148,28 +115,11 @@ public class QueryExecution implements IQueryExecution {
PerformanceOverviewMetrics.getInstance();
@SuppressWarnings("squid:S107")
- public QueryExecution(
- Statement statement,
- MPPQueryContext context,
- ExecutorService executor,
- ExecutorService writeOperationExecutor,
- ScheduledExecutorService scheduledExecutor,
- IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher,
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
- IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
- asyncInternalServiceClientManager) {
- this.rawStatement = statement;
- this.executor = executor;
- this.writeOperationExecutor = writeOperationExecutor;
- this.scheduledExecutor = scheduledExecutor;
+ public QueryExecution(IPlanner planner, MPPQueryContext context,
ExecutorService executor) {
this.context = context;
- this.analysis = analyze(statement, context, partitionFetcher,
schemaFetcher);
+ this.planner = planner;
+ this.analysis = analyze(context);
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
- this.partitionFetcher = partitionFetcher;
- this.schemaFetcher = schemaFetcher;
- this.syncInternalServiceClientManager = syncInternalServiceClientManager;
- this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
// We add the abort logic inside the QueryExecution.
// So that the other components can only focus on the state change.
@@ -184,7 +134,7 @@ public class QueryExecution implements IQueryExecution {
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
- logger.debug("[ReleaseQueryResource] state is: {}", state);
+ LOGGER.debug("[ReleaseQueryResource] state is: {}", state);
Throwable cause = stateMachine.getFailureException();
releaseResource(cause);
}
@@ -203,7 +153,7 @@ public class QueryExecution implements IQueryExecution {
public void start() {
final long startTime = System.nanoTime();
if (skipExecute()) {
- logger.debug("[SkipExecute]");
+ LOGGER.debug("[SkipExecute]");
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
stateMachine.transitionToFailed(analysis.getFailStatus());
} else {
@@ -255,60 +205,52 @@ public class QueryExecution implements IQueryExecution {
private ExecutionResult retry() {
if (retryCount >= MAX_RETRY_COUNT) {
- logger.warn("[ReachMaxRetryCount]");
+ LOGGER.warn("[ReachMaxRetryCount]");
stateMachine.transitionToFailed();
return getStatus();
}
- logger.warn("error when executing query. {}",
stateMachine.getFailureMessage());
+ LOGGER.warn("error when executing query. {}",
stateMachine.getFailureMessage());
// stop and clean up resources the QueryExecution used
this.stopAndCleanup(stateMachine.getFailureException());
- logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
+ LOGGER.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS);
try {
Thread.sleep(RETRY_INTERVAL_IN_MS);
} catch (InterruptedException e) {
- logger.warn("interrupted when waiting retry");
+ LOGGER.warn("interrupted when waiting retry");
Thread.currentThread().interrupt();
}
retryCount++;
- logger.info("[Retry] retry count is: {}", retryCount);
+ LOGGER.info("[Retry] retry count is: {}", retryCount);
stateMachine.transitionToQueued();
// force invalid PartitionCache
- partitionFetcher.invalidAllCache();
+ planner.invalidatePartitionCache();
// clear runtime variables in MPPQueryContext
context.prepareForRetry();
// re-stop
this.stopped.compareAndSet(true, false);
this.resultHandleCleanUp.compareAndSet(true, false);
// re-analyze the query
- this.analysis = analyze(rawStatement, context, partitionFetcher,
schemaFetcher);
+ this.analysis = analyze(context);
// re-start the QueryExecution
this.start();
return getStatus();
}
private boolean skipExecute() {
- return analysis.isFinishQueryAfterAnalyze()
- || (context.getQueryType() == QueryType.READ &&
!analysis.hasDataSource());
+ return analysis.canSkipExecute(context);
}
private void constructResultForMemorySource() {
- StatementMemorySource memorySource =
- new StatementMemorySourceVisitor()
- .process(analysis.getStatement(), new
StatementMemorySourceContext(context, analysis));
- this.resultHandle = new MemorySourceHandle(memorySource.getTsBlock());
- this.analysis.setRespDatasetHeader(memorySource.getDatasetHeader());
+ TsBlock tsBlock = analysis.constructResultForMemorySource(context);
+ this.resultHandle = new MemorySourceHandle(tsBlock);
}
// Analyze the statement in QueryContext. Generate the analysis this query
need
- private Analysis analyze(
- Statement statement,
- MPPQueryContext context,
- IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher) {
+ private IAnalysis analyze(MPPQueryContext context) {
final long startTime = System.nanoTime();
- Analysis result;
+ IAnalysis result;
try {
- result = new Analyzer(context, partitionFetcher,
schemaFetcher).analyze(statement);
+ result = planner.analyze(context);
} finally {
long analyzeCost = System.nanoTime() - startTime;
context.setAnalyzeCost(analyzeCost);
@@ -319,45 +261,15 @@ public class QueryExecution implements IQueryExecution {
private void schedule() {
final long startTime = System.nanoTime();
- boolean isPipeEnrichedTsFileLoad =
- rawStatement instanceof PipeEnrichedStatement
- && ((PipeEnrichedStatement) rawStatement).getInnerStatement()
- instanceof LoadTsFileStatement;
- if (rawStatement instanceof LoadTsFileStatement ||
isPipeEnrichedTsFileLoad) {
- this.scheduler =
- new LoadTsFileScheduler(
- distributedPlan,
- context,
- stateMachine,
- syncInternalServiceClientManager,
- partitionFetcher,
- isPipeEnrichedTsFileLoad);
- this.scheduler.start();
- return;
- }
-
- // TODO: (xingtanzjr) initialize the query scheduler according to
configuration
- this.scheduler =
- new ClusterScheduler(
- context,
- stateMachine,
- distributedPlan.getInstances(),
- context.getQueryType(),
- executor,
- writeOperationExecutor,
- scheduledExecutor,
- syncInternalServiceClientManager,
- asyncInternalServiceClientManager);
- this.scheduler.start();
+ this.scheduler = planner.doSchedule(analysis, distributedPlan, context,
stateMachine);
PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() -
startTime);
}
// Use LogicalPlanner to do the logical query plan and logical optimization
public void doLogicalPlan() {
- LogicalPlanner planner = new LogicalPlanner(this.context);
- this.logicalPlan = planner.plan(this.analysis);
- if (isQuery() && logger.isDebugEnabled()) {
- logger.debug(
+ this.logicalPlan = planner.doLogicalPlan(analysis, context);
+ if (isQuery() && LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
"logical plan is: \n {}",
PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
}
// check timeout after building logical plan because it could be
time-consuming in some cases.
@@ -367,10 +279,9 @@ public class QueryExecution implements IQueryExecution {
// Generate the distributed plan and split it into fragments
public void doDistributedPlan() {
long startTime = System.nanoTime();
- DistributionPlanner planner = new DistributionPlanner(this.analysis,
this.logicalPlan);
- this.distributedPlan = planner.planFragments();
+ this.distributedPlan = planner.doDistributionPlan(analysis, logicalPlan);
- if (rawStatement.isQuery()) {
+ if (analysis.isQuery()) {
long distributionPlanCost = System.nanoTime() - startTime;
context.setDistributionPlanCost(distributionPlanCost);
QUERY_PLAN_COST_METRIC_SET.recordPlanCost(DISTRIBUTION_PLANNER,
distributionPlanCost);
@@ -378,12 +289,12 @@ public class QueryExecution implements IQueryExecution {
// if is this Statement is ShowQueryStatement, set its instances to the
highest priority, so
// that the sub-tasks of the ShowQueries instances could be executed first.
- if (StatementType.SHOW_QUERIES.equals(rawStatement.getType())) {
+ if (analysis.needSetHighestPriority()) {
distributedPlan.getInstances().forEach(instance ->
instance.setHighestPriority(true));
}
- if (isQuery() && logger.isDebugEnabled()) {
- logger.debug(
+ if (isQuery() && LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
"distribution plan done. Fragment instance count is {}, details is:
\n {}",
distributedPlan.getInstances().size(),
printFragmentInstances(distributedPlan.getInstances()));
@@ -493,7 +404,7 @@ public class QueryExecution implements IQueryExecution {
while (true) {
try {
if (resultHandle.isAborted()) {
- logger.warn("[ResultHandleAborted]");
+ LOGGER.warn("[ResultHandleAborted]");
stateMachine.transitionToAborted();
if (stateMachine.getFailureStatus() != null) {
throw new IoTDBException(
@@ -504,7 +415,7 @@ public class QueryExecution implements IQueryExecution {
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
}
} else if (resultHandle.isFinished()) {
- logger.debug("[ResultHandleFinished]");
+ LOGGER.debug("[ResultHandleFinished]");
stateMachine.transitionToFinished();
return Optional.empty();
}
@@ -695,41 +606,9 @@ public class QueryExecution implements IQueryExecution {
// collect redirect info to client for writing
// if 0.13_data_insert_adapt is true and ClientVersion is NOT V_1_0, stop
returning redirect
// info to client
- if (analysis.getStatement() instanceof InsertBaseStatement
- && !analysis.isFinishQueryAfterAnalyze()
- && (!config.isEnable13DataInsertAdapt()
- ||
IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion()))) {
- InsertBaseStatement insertStatement = (InsertBaseStatement)
analysis.getStatement();
- List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
- if (insertStatement instanceof InsertRowsStatement
- || insertStatement instanceof InsertMultiTabletsStatement) {
- // multiple devices
- if (statusCode == TSStatusCode.SUCCESS_STATUS) {
- boolean needRedirect = false;
- List<TSStatus> subStatus = new ArrayList<>();
- for (TEndPoint endPoint : redirectNodeList) {
- // redirect writing only if the redirectEndPoint is not the
current node
- if (!config.getAddressAndPort().equals(endPoint)) {
- subStatus.add(
-
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
- needRedirect = true;
- } else {
- subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
- }
- }
- if (needRedirect) {
-
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
- tsstatus.setSubStatus(subStatus);
- }
- }
- } else {
- // single device
- TEndPoint redirectEndPoint = redirectNodeList.get(0);
- // redirect writing only if the redirectEndPoint is not the current
node
- if (!config.getAddressAndPort().equals(redirectEndPoint)) {
- tsstatus.setRedirectNode(redirectEndPoint);
- }
- }
+ if (!CONFIG.isEnable13DataInsertAdapt()
+ ||
IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) {
+ planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus,
statusCode);
}
return new ExecutionResult(context.getQueryId(), tsstatus);
@@ -739,10 +618,6 @@ public class QueryExecution implements IQueryExecution {
return distributedPlan;
}
- public LogicalQueryPlan getLogicalPlan() {
- return logicalPlan;
- }
-
@Override
public boolean isQuery() {
return context.getQueryType() == QueryType.READ;
@@ -774,8 +649,8 @@ public class QueryExecution implements IQueryExecution {
}
@Override
- public Statement getStatement() {
- return analysis.getStatement();
+ public String getStatementType() {
+ return analysis.getStatementType();
}
public MPPQueryContext getContext() {
@@ -787,6 +662,6 @@ public class QueryExecution implements IQueryExecution {
}
public ScheduledExecutorService getScheduledExecutor() {
- return scheduledExecutor;
+ return planner.getScheduledExecutorService();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index 55ce059a20d..0fbef6973cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.execution.config;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
@@ -31,7 +29,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -57,7 +55,7 @@ public class ConfigExecution implements IQueryExecution {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigExecution.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final TsBlockSerde serde = new TsBlockSerde();
private final MPPQueryContext context;
private final ExecutorService executor;
@@ -68,31 +66,29 @@ public class ConfigExecution implements IQueryExecution {
private DatasetHeader datasetHeader;
private boolean resultSetConsumed;
private final IConfigTask task;
- private IConfigTaskExecutor configTaskExecutor;
-
- private static final TsBlockSerde serde = new TsBlockSerde();
+ private final IConfigTaskExecutor configTaskExecutor;
- private Statement statement;
+ private final StatementType statementType;
private long totalExecutionTime;
- public ConfigExecution(MPPQueryContext context, Statement statement,
ExecutorService executor) {
+ public ConfigExecution(
+ MPPQueryContext context,
+ StatementType statementType,
+ ExecutorService executor,
+ IConfigTask task) {
this.context = context;
- this.statement = statement;
+ this.statementType = statementType;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
this.taskFuture = SettableFuture.create();
- this.task = statement.accept(new ConfigTaskVisitor(), context);
+ this.task = task;
this.resultSetConsumed = false;
configTaskExecutor = ClusterConfigTaskExecutor.getInstance();
}
@TestOnly
public ConfigExecution(MPPQueryContext context, ExecutorService executor,
IConfigTask task) {
- this.context = context;
- this.executor = executor;
- this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
- this.taskFuture = SettableFuture.create();
- this.task = task;
+ this(context, StatementType.NULL, executor, task);
}
@Override
@@ -249,7 +245,7 @@ public class ConfigExecution implements IQueryExecution {
}
@Override
- public Statement getStatement() {
- return statement;
+ public String getStatementType() {
+ return statementType.name();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index db8ed28298f..287a9157f3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1938,7 +1938,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
createLogicalViewStatement.setViewExpressions(Collections.singletonList(viewExpression));
ExecutionResult executionResult =
Coordinator.getInstance()
- .execute(
+ .executeForTreeModel(
createLogicalViewStatement,
0,
null,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
new file mode 100644
index 00000000000..616cca3efb7
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+public interface IPlanner {
+
+ IAnalysis analyze(MPPQueryContext context);
+
+ LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext context);
+
+ DistributedQueryPlan doDistributionPlan(IAnalysis analysis, LogicalQueryPlan
logicalPlan);
+
+ IScheduler doSchedule(
+ IAnalysis analysis,
+ DistributedQueryPlan distributedPlan,
+ MPPQueryContext context,
+ QueryStateMachine stateMachine);
+
+ void invalidatePartitionCache();
+
+ ScheduledExecutorService getScheduledExecutorService();
+
+ void setRedirectInfo(
+ IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
new file mode 100644
index 00000000000..a42ff843f30
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler;
+import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class TreeModelPlanner implements IPlanner {
+
+ private final Statement statement;
+
+ private final ExecutorService executor;
+ private final ExecutorService writeOperationExecutor;
+ private final ScheduledExecutorService scheduledExecutor;
+
+ private final IPartitionFetcher partitionFetcher;
+
+ private final ISchemaFetcher schemaFetcher;
+
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ syncInternalServiceClientManager;
+
+ private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager;
+
+ public TreeModelPlanner(
+ Statement statement,
+ ExecutorService executor,
+ ExecutorService writeOperationExecutor,
+ ScheduledExecutorService scheduledExecutor,
+ IPartitionFetcher partitionFetcher,
+ ISchemaFetcher schemaFetcher,
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
+ IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
+ asyncInternalServiceClientManager) {
+ this.statement = statement;
+ this.executor = executor;
+ this.writeOperationExecutor = writeOperationExecutor;
+ this.scheduledExecutor = scheduledExecutor;
+ this.partitionFetcher = partitionFetcher;
+ this.schemaFetcher = schemaFetcher;
+ this.syncInternalServiceClientManager = syncInternalServiceClientManager;
+ this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
+ }
+
+ @Override
+ public IAnalysis analyze(MPPQueryContext context) {
+ return new Analyzer(context, partitionFetcher,
schemaFetcher).analyze(statement);
+ }
+
+ @Override
+ public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext
context) {
+ LogicalPlanner logicalPlanner = new LogicalPlanner(context);
+ return logicalPlanner.plan((Analysis) analysis);
+ }
+
+ @Override
+ public DistributedQueryPlan doDistributionPlan(IAnalysis analysis,
LogicalQueryPlan logicalPlan) {
+ DistributionPlanner planner = new DistributionPlanner((Analysis) analysis,
logicalPlan);
+ return planner.planFragments();
+ }
+
+ @Override
+ public IScheduler doSchedule(
+ IAnalysis analysis,
+ DistributedQueryPlan distributedPlan,
+ MPPQueryContext context,
+ QueryStateMachine stateMachine) {
+ IScheduler scheduler;
+
+ boolean isPipeEnrichedTsFileLoad =
+ statement instanceof PipeEnrichedStatement
+ && ((PipeEnrichedStatement) statement).getInnerStatement()
+ instanceof LoadTsFileStatement;
+ if (statement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) {
+ scheduler =
+ new LoadTsFileScheduler(
+ distributedPlan,
+ context,
+ stateMachine,
+ syncInternalServiceClientManager,
+ partitionFetcher,
+ isPipeEnrichedTsFileLoad);
+ } else {
+ scheduler =
+ new ClusterScheduler(
+ context,
+ stateMachine,
+ distributedPlan.getInstances(),
+ context.getQueryType(),
+ executor,
+ writeOperationExecutor,
+ scheduledExecutor,
+ syncInternalServiceClientManager,
+ asyncInternalServiceClientManager);
+ }
+
+ scheduler.start();
+ return scheduler;
+ }
+
+ @Override
+ public void invalidatePartitionCache() {
+ partitionFetcher.invalidAllCache();
+ }
+
+ @Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return scheduledExecutor;
+ }
+
+ @Override
+ public void setRedirectInfo(
+ IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus,
TSStatusCode statusCode) {
+ Analysis analysis = (Analysis) iAnalysis;
+ if (analysis.getStatement() instanceof InsertBaseStatement
+ && !analysis.isFinishQueryAfterAnalyze()) {
+ InsertBaseStatement insertStatement = (InsertBaseStatement)
analysis.getStatement();
+ List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList();
+ if (insertStatement instanceof InsertRowsStatement
+ || insertStatement instanceof InsertMultiTabletsStatement) {
+ // multiple devices
+ if (statusCode == TSStatusCode.SUCCESS_STATUS) {
+ boolean needRedirect = false;
+ List<TSStatus> subStatus = new ArrayList<>();
+ for (TEndPoint endPoint : redirectNodeList) {
+ // redirect writing only if the redirectEndPoint is not the
current node
+ if (!localEndPoint.equals(endPoint)) {
+ subStatus.add(
+
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint));
+ needRedirect = true;
+ } else {
+ subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ }
+ }
+ if (needRedirect) {
+
tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
+ tsstatus.setSubStatus(subStatus);
+ }
+ }
+ } else {
+ // single device
+ TEndPoint redirectEndPoint = redirectNodeList.get(0);
+ // redirect writing only if the redirectEndPoint is not the current
node
+ if (!localEndPoint.equals(redirectEndPoint)) {
+ tsstatus.setRedirectNode(redirectEndPoint);
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 711ff6c25cc..156206d9f14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -141,7 +142,7 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
new FragmentInstance(
fragment,
fragment.getId().genFragmentInstanceId(),
- globalTimePredicate,
+ globalTimePredicate == null ? null : new
TreeModelTimePredicate(globalTimePredicate),
queryContext.getQueryType(),
queryContext.getTimeOut(),
queryContext.getSession(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 36cb4ecc9d1..6fb32419a4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
@@ -61,7 +62,7 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
new FragmentInstance(
new PlanFragment(fragment.getId(), split),
fragment.getId().genFragmentInstanceId(),
- globalTimePredicate,
+ globalTimePredicate == null ? null : new
TreeModelTimePredicate(globalTimePredicate),
queryContext.getQueryType(),
queryContext.getTimeOut(),
queryContext.getSession());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
index ca133cc5b8d..cedc99669a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java
@@ -23,10 +23,10 @@ import
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import java.util.List;
public class DistributedQueryPlan {
- private MPPQueryContext context;
- private SubPlan rootSubPlan;
- private List<PlanFragment> fragments;
- private List<FragmentInstance> instances;
+ private final MPPQueryContext context;
+ private final SubPlan rootSubPlan;
+ private final List<PlanFragment> fragments;
+ private final List<FragmentInstance> instances;
public DistributedQueryPlan(
MPPQueryContext context,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 4c8204a2a22..4dd56beefa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
-import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -46,9 +45,9 @@ import java.util.Objects;
public class FragmentInstance implements IConsensusRequest {
- private static final Logger logger =
LoggerFactory.getLogger(FragmentInstance.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FragmentInstance.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private final FragmentInstanceId id;
private final QueryType type;
@@ -60,7 +59,7 @@ public class FragmentInstance implements IConsensusRequest {
private TDataNodeLocation hostDataNode;
- private final Expression globalTimePredicate;
+ private final TimePredicate globalTimePredicate;
private final long timeOut;
@@ -86,7 +85,7 @@ public class FragmentInstance implements IConsensusRequest {
public FragmentInstance(
PlanFragment fragment,
FragmentInstanceId id,
- Expression globalTimePredicate,
+ TimePredicate globalTimePredicate,
QueryType type,
long timeOut,
SessionInfo sessionInfo) {
@@ -94,7 +93,7 @@ public class FragmentInstance implements IConsensusRequest {
this.globalTimePredicate = globalTimePredicate;
this.id = id;
this.type = type;
- this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold();
+ this.timeOut = timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold();
this.isRoot = false;
this.sessionInfo = sessionInfo;
}
@@ -102,7 +101,7 @@ public class FragmentInstance implements IConsensusRequest {
public FragmentInstance(
PlanFragment fragment,
FragmentInstanceId id,
- Expression globalTimePredicate,
+ TimePredicate globalTimePredicate,
QueryType type,
long timeOut,
SessionInfo sessionInfo,
@@ -116,7 +115,7 @@ public class FragmentInstance implements IConsensusRequest {
public FragmentInstance(
PlanFragment fragment,
FragmentInstanceId id,
- Expression globalTimePredicate,
+ TimePredicate globalTimePredicate,
QueryType type,
long timeOut,
SessionInfo sessionInfo,
@@ -172,7 +171,7 @@ public class FragmentInstance implements IConsensusRequest {
isHighestPriority = highestPriority;
}
- public Expression getGlobalTimePredicate() {
+ public TimePredicate getGlobalTimePredicate() {
return globalTimePredicate;
}
@@ -214,7 +213,7 @@ public class FragmentInstance implements IConsensusRequest {
boolean hasSessionInfo = ReadWriteIOUtils.readBool(buffer);
SessionInfo sessionInfo = hasSessionInfo ?
SessionInfo.deserializeFrom(buffer) : null;
boolean hasTimePredicate = ReadWriteIOUtils.readBool(buffer);
- Expression globalTimePredicate = hasTimePredicate ?
Expression.deserialize(buffer) : null;
+ TimePredicate globalTimePredicate = hasTimePredicate ?
TimePredicate.deserialize(buffer) : null;
QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)];
int dataNodeFINum = ReadWriteIOUtils.readInt(buffer);
FragmentInstance fragmentInstance =
@@ -239,7 +238,7 @@ public class FragmentInstance implements IConsensusRequest {
}
ReadWriteIOUtils.write(globalTimePredicate != null, outputStream);
if (globalTimePredicate != null) {
- Expression.serialize(globalTimePredicate, outputStream);
+ globalTimePredicate.serialize(outputStream);
}
ReadWriteIOUtils.write(type.ordinal(), outputStream);
ReadWriteIOUtils.write(dataNodeFINum, outputStream);
@@ -250,7 +249,7 @@ public class FragmentInstance implements IConsensusRequest {
ReadWriteIOUtils.write(isExplainAnalyze, outputStream);
return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
} catch (IOException e) {
- logger.error("Unexpected error occurs when serializing this
FragmentInstance.", e);
+ LOGGER.error("Unexpected error occurs when serializing this
FragmentInstance.", e);
throw new SerializationRunTimeException(e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 7583b624b64..925bfe4416e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -38,7 +38,7 @@ import java.util.Objects;
/** PlanFragment contains a sub-query of distributed query. */
public class PlanFragment {
- // TODO once you add field for this class you need to change the serialize
and deserialize methods
+ // once you add field for this class you need to change the serialize and
deserialize methods
private final PlanFragmentId id;
private PlanNode planNodeTree;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
index 0bceb53eb5c..909cf67acfe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
public class SubPlan {
- private PlanFragment planFragment;
+ private final PlanFragment planFragment;
private List<SubPlan> children;
public SubPlan(PlanFragment planFragment) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java
new file mode 100644
index 00000000000..1de79f00809
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan;
+
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface TimePredicate {
+
+ void serialize(DataOutputStream stream) throws IOException;
+
+ Filter convertPredicateToTimeFilter();
+
+ static TimePredicate deserialize(ByteBuffer byteBuffer) {
+ // TODO will return another kind of TimePredicate like
TableModelTimePredicate in the future
+ return new TreeModelTimePredicate(Expression.deserialize(byteBuffer));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java
new file mode 100644
index 00000000000..139deed980c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan;
+
+import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Objects;
+
+public class TreeModelTimePredicate implements TimePredicate {
+
+ private final Expression timePredicate;
+
+ public TreeModelTimePredicate(Expression timePredicate) {
+ this.timePredicate = timePredicate;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ Expression.serialize(timePredicate, stream);
+ }
+
+ @Override
+ public Filter convertPredicateToTimeFilter() {
+ return PredicateUtils.convertPredicateToTimeFilter(timePredicate);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TreeModelTimePredicate that = (TreeModelTimePredicate) o;
+ return Objects.equals(timePredicate, that.timePredicate);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timePredicate);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index e5afac118d4..87dc13ecd2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -180,7 +180,8 @@ public class IoTDBInternalLocalReporter extends
IoTDBInternalReporter {
InsertRowStatement s = StatementGenerator.createStatement(request);
final long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
- COORDINATOR.execute(s, queryId, sessionInfo, "",
partitionFetcher, schemaFetcher);
+ COORDINATOR.executeForTreeModel(
+ s, queryId, sessionInfo, "", partitionFetcher,
schemaFetcher);
if (result.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Failed to update the value of metric with status
{}", result.status);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
index 66e65b37150..61ec16a35da 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java
@@ -48,7 +48,6 @@ import
org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
@@ -1810,7 +1809,7 @@ public class MergeSortOperatorTest {
}
@Override
- public Statement getStatement() {
+ public String getStatementType() {
return null;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
index d19e8c29c0e..cc3363a097a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
@@ -71,7 +72,7 @@ public class FragmentInstanceSerdeTest {
new FragmentInstance(
new PlanFragment(planFragmentId, constructPlanNodeTree()),
planFragmentId.genFragmentInstanceId(),
- ExpressionFactory.groupByTime(1, 2, 3, 4),
+ new TreeModelTimePredicate(ExpressionFactory.groupByTime(1, 2, 3,
4)),
QueryType.READ,
config.getQueryTimeoutThreshold(),
sessionInfo);