This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/Analyze in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2769b988d3b562176e84acf5c1db3094dc88b1b5 Author: JackieTien97 <[email protected]> AuthorDate: Tue Mar 12 17:18:47 2024 +0800 Refactor IQueryExecution to support Table Model in the future --- .../confignode1conf/iotdb-common.properties | 2 +- .../confignode2conf/iotdb-common.properties | 2 +- .../confignode3conf/iotdb-common.properties | 2 +- .../org/apache/iotdb/db/audit/AuditLogger.java | 2 +- .../protocol/writeback/WriteBackConnector.java | 2 +- .../legacy/IoTDBLegacyPipeReceiverAgent.java | 2 +- .../receiver/legacy/loader/DeletionLoader.java | 2 +- .../pipe/receiver/legacy/loader/TsFileLoader.java | 2 +- .../receiver/thrift/IoTDBDataNodeReceiver.java | 2 +- .../db/protocol/client/DataNodeInternalClient.java | 2 +- .../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 2 +- .../rest/v1/impl/GrafanaApiServiceImpl.java | 6 +- .../protocol/rest/v1/impl/RestApiServiceImpl.java | 6 +- .../rest/v2/impl/GrafanaApiServiceImpl.java | 6 +- .../protocol/rest/v2/impl/RestApiServiceImpl.java | 8 +- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 106 ++++++----- .../impl/DataNodeInternalRPCServiceImpl.java | 2 +- .../db/queryengine/common/MPPQueryContext.java | 2 +- .../fragment/FragmentInstanceContext.java | 9 +- .../iotdb/db/queryengine/plan/Coordinator.java | 107 ++++++----- .../db/queryengine/plan/analyze/Analysis.java | 45 ++++- .../IAnalysis.java} | 46 ++--- .../plan/analyze/LoadTsfileAnalyzer.java | 2 +- .../analyze/schema/AutoCreateSchemaExecutor.java | 2 +- .../analyze/schema/ClusterSchemaFetchExecutor.java | 2 +- .../plan/execution/IQueryExecution.java | 3 +- .../queryengine/plan/execution/QueryExecution.java | 209 +++++---------------- .../plan/execution/config/ConfigExecution.java | 32 ++-- .../config/executor/ClusterConfigTaskExecutor.java | 2 +- .../db/queryengine/plan/planner/IPlanner.java | 54 ++++++ .../queryengine/plan/planner/TreeModelPlanner.java | 197 +++++++++++++++++++ .../SimpleFragmentParallelPlanner.java | 3 +- .../distribution/WriteFragmentParallelPlanner.java | 3 +- .../plan/planner/plan/DistributedQueryPlan.java | 8 +- .../plan/planner/plan/FragmentInstance.java | 23 ++- .../plan/planner/plan/PlanFragment.java | 2 +- .../db/queryengine/plan/planner/plan/SubPlan.java | 2 +- .../plan/planner/plan/TimePredicate.java | 39 ++++ .../plan/planner/plan/TreeModelTimePredicate.java | 64 +++++++ .../metrics/IoTDBInternalLocalReporter.java | 3 +- .../execution/operator/MergeSortOperatorTest.java | 3 +- .../plan/planner/FragmentInstanceSerdeTest.java | 3 +- 42 files changed, 645 insertions(+), 376 deletions(-) diff --git a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties index 8981b21285b..c69c63010db 100644 --- a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties +++ b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-common.properties @@ -21,7 +21,7 @@ timestamp_precision=ms data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus schema_replication_factor=3 -data_replication_factor=3 +data_replication_factor=2 udf_lib_dir=target/confignode1/ext/udf trigger_lib_dir=target/confignode1/ext/trigger pipe_lib_dir=target/confignode1/ext/pipe diff --git a/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties b/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties index a9789fedabb..219c2ff4d28 100644 --- a/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties +++ b/iotdb-core/confignode/src/test/resources/confignode2conf/iotdb-common.properties @@ -21,7 +21,7 @@ timestamp_precision=ms data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus schema_replication_factor=3 -data_replication_factor=3 +data_replication_factor=2 udf_lib_dir=target/confignode2/ext/udf trigger_lib_dir=target/confignode2/ext/trigger pipe_lib_dir=target/confignode2/ext/pipe diff --git a/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties b/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties index 6a95388bd72..3ad8b13bac4 100644 --- a/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties +++ b/iotdb-core/confignode/src/test/resources/confignode3conf/iotdb-common.properties @@ -21,7 +21,7 @@ timestamp_precision=ms data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus schema_replication_factor=3 -data_replication_factor=3 +data_replication_factor=2 udf_lib_dir=target/confignode3/ext/udf trigger_lib_dir=target/confignode3/ext/trigger pipe_lib_dir=target/confignode3/ext/pipe diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java index a08142e13ef..645dfbf016d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java @@ -115,7 +115,7 @@ public class AuditLogger { if (auditLogOperationList.contains(operation)) { if (auditLogStorageList.contains(AuditLogStorage.IOTDB)) { try { - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( generateInsertStatement(log, address, username), SESSION_MANAGER.requestQueryId(), sessionInfo, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java index 471474148b8..4b9437372ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java @@ -154,7 +154,7 @@ public class WriteBackConnector implements PipeConnector { private TSStatus executeStatement(InsertBaseStatement statement) { return Coordinator.getInstance() - .execute( + .executeForTreeModel( new PipeEnrichedStatement(statement), SessionManager.getInstance().requestQueryId(), new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java index 56180f75b77..fe0b2ac4073 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -136,7 +136,7 @@ public class IoTDBLegacyPipeReceiverAgent { long queryId = SessionManager.getInstance().requestQueryId(); ExecutionResult result = Coordinator.getInstance() - .execute( + .executeForTreeModel( statement, queryId, new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java index a3f08af6413..f5711a826ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/DeletionLoader.java @@ -61,7 +61,7 @@ public class DeletionLoader implements ILoader { long queryId = SessionManager.getInstance().requestQueryId(); ExecutionResult result = Coordinator.getInstance() - .execute( + .executeForTreeModel( statement, queryId, new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java index 6bcbe95c76f..080f5f105ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/legacy/loader/TsFileLoader.java @@ -63,7 +63,7 @@ public class TsFileLoader implements ILoader { long queryId = SessionManager.getInstance().requestQueryId(); ExecutionResult result = Coordinator.getInstance() - .execute( + .executeForTreeModel( statement, queryId, new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java index 0b99a5859ed..e01e6ff1e59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBDataNodeReceiver.java @@ -321,7 +321,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { final ExecutionResult result = Coordinator.getInstance() - .execute( + .executeForTreeModel( statement, SessionManager.getInstance().requestQueryId(), new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java index 925a4b450f7..cf1250aaecb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeInternalClient.java @@ -84,7 +84,7 @@ public class DataNodeInternalClient { // call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(session), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java index 3c0f3c05726..86c430ac0a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java @@ -162,7 +162,7 @@ public class MPPPublishHandler extends AbstractInterceptHandler { long queryId = sessionManager.requestQueryId(); ExecutionResult result = Coordinator.getInstance() - .execute( + .executeForTreeModel( statement, queryId, sessionManager.getSessionInfo(session), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java index 40bab03fe38..07c735faa7f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java @@ -110,7 +110,7 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -177,7 +177,7 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -239,7 +239,7 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java index 47433bb5a68..428f2c3baaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java @@ -97,7 +97,7 @@ public class RestApiServiceImpl extends RestApiService { } queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -151,7 +151,7 @@ public class RestApiServiceImpl extends RestApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -211,7 +211,7 @@ public class RestApiServiceImpl extends RestApiService { } queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( insertTabletStatement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java index 3025cd83ba1..e3385885a01 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java @@ -110,7 +110,7 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -177,7 +177,7 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -239,7 +239,7 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java index 5cace589391..c0029b913eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java @@ -99,7 +99,7 @@ public class RestApiServiceImpl extends RestApiService { } queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -154,7 +154,7 @@ public class RestApiServiceImpl extends RestApiService { queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -203,7 +203,7 @@ public class RestApiServiceImpl extends RestApiService { } queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( insertRowsStatement, SESSION_MANAGER.requestQueryId(), SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), @@ -273,7 +273,7 @@ public class RestApiServiceImpl extends RestApiService { } queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( insertTabletStatement, SESSION_MANAGER.requestQueryId(), SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index fec0bbd87ec..5612d8777e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -302,7 +302,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -347,7 +347,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost if (statementType != null) { addStatementExecutionLatency( - OperationType.EXECUTE_QUERY_STATEMENT, statementType, currentOperationCost); + OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), currentOperationCost); } if (finished) { @@ -393,7 +393,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -435,7 +435,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost addStatementExecutionLatency( - OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY, currentOperationCost); + OperationType.EXECUTE_RAW_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query @@ -481,7 +481,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -525,7 +525,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost addStatementExecutionLatency( - OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY, currentOperationCost); + OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query @@ -568,7 +568,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -612,7 +612,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost addStatementExecutionLatency( - OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY, currentOperationCost); + OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query @@ -895,7 +895,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -946,7 +946,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record each operation time cost addStatementExecutionLatency( - OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY, currentOperationCost); + OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost); if (finished) { // record total time cost for one query @@ -1051,7 +1051,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { long startTime = System.nanoTime(); boolean finished = false; - StatementType statementType = null; + String statementType = null; Throwable t = null; try { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -1067,7 +1067,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp.setMoreData(false); return resp; } - statementType = queryExecution.getStatement().getType(); + statementType = queryExecution.getStatementType(); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<List<ByteBuffer>, Boolean> pair = @@ -1228,7 +1228,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1269,7 +1269,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1319,7 +1319,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1368,7 +1368,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1408,7 +1408,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1451,7 +1451,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1495,7 +1495,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { for (int i = 0; i < req.getStatements().size(); i++) { String statement = req.getStatements().get(i); long t2 = System.nanoTime(); - StatementType type = null; + String type = null; OperationQuota quota = null; try { Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); @@ -1518,10 +1518,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } long queryId = SESSION_MANAGER.requestQueryId(); - type = s.getType(); + type = s.getType() == null ? null : s.getType().name(); // create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1549,7 +1549,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } finally { addStatementExecutionLatency( - OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL, System.nanoTime() - t1); + OperationType.EXECUTE_BATCH_STATEMENT, StatementType.NULL.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); } return isAllSuccessful @@ -1571,7 +1571,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { boolean finished = false; long startTime = System.nanoTime(); - StatementType statementType = null; + String statementType = null; Throwable t = null; try { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -1587,7 +1587,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp.setMoreData(true); return resp; } - statementType = queryExecution.getStatement().getType(); + statementType = queryExecution.getStatementType(); try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { Pair<TSQueryDataSet, Boolean> pair = @@ -1672,7 +1672,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1688,7 +1688,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { e, OperationType.INSERT_RECORDS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { addStatementExecutionLatency( - OperationType.INSERT_RECORDS, StatementType.BATCH_INSERT_ROWS, System.nanoTime() - t1); + OperationType.INSERT_RECORDS, + StatementType.BATCH_INSERT_ROWS.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1739,7 +1741,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1756,7 +1758,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } finally { addStatementExecutionLatency( OperationType.INSERT_RECORDS_OF_ONE_DEVICE, - StatementType.BATCH_INSERT_ONE_DEVICE, + StatementType.BATCH_INSERT_ONE_DEVICE.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -1807,7 +1809,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1827,7 +1829,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } finally { addStatementExecutionLatency( OperationType.INSERT_STRING_RECORDS_OF_ONE_DEVICE, - StatementType.BATCH_INSERT_ONE_DEVICE, + StatementType.BATCH_INSERT_ONE_DEVICE.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -1876,7 +1878,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1892,7 +1894,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { e, OperationType.INSERT_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { addStatementExecutionLatency( - OperationType.INSERT_RECORD, StatementType.INSERT, System.nanoTime() - t1); + OperationType.INSERT_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1933,7 +1935,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -1949,7 +1951,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { addStatementExecutionLatency( - OperationType.INSERT_TABLETS, StatementType.MULTI_BATCH_INSERT, System.nanoTime() - t1); + OperationType.INSERT_TABLETS, + StatementType.MULTI_BATCH_INSERT.name(), + System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -1989,7 +1993,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2005,7 +2009,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { e, OperationType.INSERT_TABLET, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { addStatementExecutionLatency( - OperationType.INSERT_TABLET, StatementType.BATCH_INSERT, System.nanoTime() - t1); + OperationType.INSERT_TABLET, StatementType.BATCH_INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2054,7 +2058,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2071,7 +2075,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } finally { addStatementExecutionLatency( OperationType.INSERT_STRING_RECORDS, - StatementType.BATCH_INSERT_ROWS, + StatementType.BATCH_INSERT_ROWS.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -2140,7 +2144,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2204,7 +2208,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2308,7 +2312,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); // create and cache dataset ExecutionResult executionResult = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2352,7 +2356,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return null; } finally { addStatementExecutionLatency( - OperationType.EXECUTE_STATEMENT, statement.getType(), System.nanoTime() - startTime); + OperationType.EXECUTE_STATEMENT, + statement.getType().name(), + System.nanoTime() - startTime); SESSION_MANAGER.updateIdleTime(); } } @@ -2385,7 +2391,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2433,7 +2439,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2478,7 +2484,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2522,7 +2528,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2619,7 +2625,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( statement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), @@ -2635,7 +2641,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { e, OperationType.INSERT_STRING_RECORD, TSStatusCode.EXECUTE_STATEMENT_ERROR); } finally { addStatementExecutionLatency( - OperationType.INSERT_STRING_RECORD, StatementType.INSERT, System.nanoTime() - t1); + OperationType.INSERT_STRING_RECORD, StatementType.INSERT.name(), System.nanoTime() - t1); SESSION_MANAGER.updateIdleTime(); if (quota != null) { quota.close(); @@ -2682,7 +2688,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { /** Add stat of operation into metrics */ private void addStatementExecutionLatency( - OperationType operation, StatementType statementType, long costTime) { + OperationType operation, String statementType, long costTime) { if (statementType == null) { return; } @@ -2696,7 +2702,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { Tag.INTERFACE.toString(), operation.toString(), Tag.TYPE.toString(), - statementType.name()); + statementType); } private String checkIdentifierAndRemoveBackQuotesIfNecessary(String identifier) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 76f023f5907..d7b6804ea3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1099,7 +1099,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface SESSION_MANAGER.requestQueryId(session, SESSION_MANAGER.requestStatementId(session)); // Create and cache dataset ExecutionResult result = - COORDINATOR.execute( + COORDINATOR.executeForTreeModel( s, queryId, SESSION_MANAGER.getSessionInfo(session), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 4d7e5a4512f..0e50966d864 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -38,7 +38,7 @@ import java.util.List; */ public class MPPQueryContext { private String sql; - private QueryId queryId; + private final QueryId queryId; // LocalQueryId is kept to adapt to the old client, it's unique in current datanode. // Now it's only be used by EXPLAIN ANALYZE to get queryExecution. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 8c74170cef3..0aa6ba1deff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -27,8 +27,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; -import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; -import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; @@ -132,7 +131,7 @@ public class FragmentInstanceContext extends QueryContext { FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo, IDataRegionForQuery dataRegion, - Expression globalTimePredicate, + TimePredicate globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) { FragmentInstanceContext instanceContext = new FragmentInstanceContext( @@ -167,14 +166,14 @@ public class FragmentInstanceContext extends QueryContext { FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo, IDataRegionForQuery dataRegion, - Expression globalTimePredicate, + TimePredicate globalTimePredicate, Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap) { this.id = id; this.stateMachine = stateMachine; this.executionEndTime.set(END_TIME_INITIAL_VALUE); this.sessionInfo = sessionInfo; this.dataRegion = dataRegion; - this.globalTimeFilter = PredicateUtils.convertPredicateToTimeFilter(globalTimePredicate); + this.globalTimeFilter = globalTimePredicate.convertPredicateToTimeFilter(); this.dataNodeQueryContextMap = dataNodeQueryContextMap; this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 327008e9c2a..8e9cedf2862 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -41,6 +41,8 @@ import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.utils.SetThreadName; @@ -53,6 +55,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.BiFunction; import static org.apache.iotdb.commons.utils.StatusUtils.needRetry; @@ -100,39 +103,11 @@ public class Coordinator { this.scheduledExecutor = getScheduledExecutor(); } - private IQueryExecution createQueryExecution( - Statement statement, - MPPQueryContext queryContext, - IPartitionFetcher partitionFetcher, - ISchemaFetcher schemaFetcher, - long timeOut, - long startTime) { - queryContext.setTimeOut(timeOut); - queryContext.setStartTime(startTime); - if (statement instanceof IConfigStatement) { - queryContext.setQueryType(((IConfigStatement) statement).getQueryType()); - return new ConfigExecution(queryContext, statement, executor); - } - return new QueryExecution( - statement, - queryContext, - executor, - writeOperationExecutor, - scheduledExecutor, - partitionFetcher, - schemaFetcher, - SYNC_INTERNAL_SERVICE_CLIENT_MANAGER, - ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER); - } - - public ExecutionResult execute( - Statement statement, + private ExecutionResult execution( long queryId, SessionInfo session, String sql, - IPartitionFetcher partitionFetcher, - ISchemaFetcher schemaFetcher, - long timeOut) { + BiFunction<MPPQueryContext, Long, IQueryExecution> iQueryExecutionFactory) { long startTime = System.currentTimeMillis(); QueryId globalQueryId = queryIdGenerator.createNextQueryId(); MPPQueryContext queryContext = null; @@ -148,14 +123,7 @@ public class Coordinator { session, DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT, DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT); - IQueryExecution execution = - createQueryExecution( - statement, - queryContext, - partitionFetcher, - schemaFetcher, - timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(), - startTime); + IQueryExecution execution = iQueryExecutionFactory.apply(queryContext, startTime); if (execution.isQuery()) { queryExecutionMap.put(queryId, execution); } else { @@ -170,25 +138,78 @@ public class Coordinator { } return result; } finally { - int lockNums = queryContext.getAcquiredLockNum(); - if (queryContext != null && lockNums > 0) { - for (int i = 0; i < lockNums; i++) DataNodeSchemaCache.getInstance().releaseInsertLock(); + if (queryContext != null && queryContext.getAcquiredLockNum() > 0) { + for (int i = 0, lockNums = queryContext.getAcquiredLockNum(); i < lockNums; i++) { + DataNodeSchemaCache.getInstance().releaseInsertLock(); + } } } } /** This method is called by the write method. So it does not set the timeout parameter. */ - public ExecutionResult execute( + public ExecutionResult executeForTreeModel( Statement statement, long queryId, SessionInfo session, String sql, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { - return execute( + return executeForTreeModel( statement, queryId, session, sql, partitionFetcher, schemaFetcher, Long.MAX_VALUE); } + public ExecutionResult executeForTreeModel( + Statement statement, + long queryId, + SessionInfo session, + String sql, + IPartitionFetcher partitionFetcher, + ISchemaFetcher schemaFetcher, + long timeOut) { + return execution( + queryId, + session, + sql, + ((queryContext, startTime) -> + createQueryExecutionForTreeModel( + statement, + queryContext, + partitionFetcher, + schemaFetcher, + timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(), + startTime))); + } + + private IQueryExecution createQueryExecutionForTreeModel( + Statement statement, + MPPQueryContext queryContext, + IPartitionFetcher partitionFetcher, + ISchemaFetcher schemaFetcher, + long timeOut, + long startTime) { + queryContext.setTimeOut(timeOut); + queryContext.setStartTime(startTime); + if (statement instanceof IConfigStatement) { + queryContext.setQueryType(((IConfigStatement) statement).getQueryType()); + return new ConfigExecution( + queryContext, + statement.getType(), + executor, + statement.accept(new ConfigTaskVisitor(), queryContext)); + } + TreeModelPlanner treeModelPlanner = + new TreeModelPlanner( + statement, + executor, + writeOperationExecutor, + scheduledExecutor, + partitionFetcher, + schemaFetcher, + SYNC_INTERNAL_SERVICE_CLIENT_MANAGER, + ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER); + return new QueryExecution(treeModelPlanner, queryContext, executor); + } + public IQueryExecution getQueryExecution(Long queryId) { return queryExecutionMap.get(queryId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index e91d45d3575..3f109e86e9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -28,9 +28,13 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.SchemaPartition; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.NodeRef; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource; +import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceContext; +import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionType; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; @@ -42,12 +46,14 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.db.schemaengine.template.Template; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -63,7 +69,7 @@ import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; /** Analysis used for planning a query. TODO: This class may need to store more info for a query. */ -public class Analysis { +public class Analysis implements IAnalysis { ///////////////////////////////////////////////////////////////////////////////////////////////// // Common Analysis @@ -378,6 +384,7 @@ public class Analysis { this.globalTimePredicate = timeFilter; } + @Override public DatasetHeader getRespDatasetHeader() { return respDatasetHeader; } @@ -403,7 +410,13 @@ public class Analysis { return type; } - public boolean hasDataSource() { + @Override + public boolean canSkipExecute(MPPQueryContext context) { + return isFinishQueryAfterAnalyze() + || (context.getQueryType() == QueryType.READ && !hasDataSource()); + } + + private boolean hasDataSource() { return (dataPartition != null && !dataPartition.isEmpty()) || (schemaPartition != null && !schemaPartition.isEmpty()) || statement instanceof ShowQueriesStatement @@ -411,6 +424,32 @@ public class Analysis { && ((QueryStatement) statement).isAggregationQuery()); } + @Override + public TsBlock constructResultForMemorySource(MPPQueryContext context) { + StatementMemorySource memorySource = + new StatementMemorySourceVisitor() + .process(getStatement(), new StatementMemorySourceContext(context, this)); + setRespDatasetHeader(memorySource.getDatasetHeader()); + return memorySource.getTsBlock(); + } + + @Override + public boolean isQuery() { + return statement.isQuery(); + } + + @Override + public boolean needSetHighestPriority() { + // if is this Statement is ShowQueryStatement, set its instances to the highest priority, so + // that the sub-tasks of the ShowQueries instances could be executed first. + return StatementType.SHOW_QUERIES.equals(statement.getType()); + } + + @Override + public String getStatementType() { + return statement.getType().name(); + } + public Map<Expression, Set<Expression>> getCrossGroupByExpressions() { return crossGroupByExpressions; } @@ -487,10 +526,12 @@ public class Analysis { this.finishQueryAfterAnalyze = finishQueryAfterAnalyze; } + @Override public boolean isFailed() { return failStatus != null; } + @Override public TSStatus getFailStatus() { return this.failStatus; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java similarity index 52% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java index 9652cbce155..ca85161c7ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java @@ -17,52 +17,28 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.execution; +package org.apache.iotdb.db.queryengine.plan.analyze; -import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import java.nio.ByteBuffer; -import java.util.Optional; +public interface IAnalysis { -public interface IQueryExecution { + boolean isFailed(); - void start(); + TSStatus getFailStatus(); - void stop(Throwable t); + boolean canSkipExecute(MPPQueryContext context); - void stopAndCleanup(); - - void stopAndCleanup(Throwable t); - - void cancel(); - - ExecutionResult getStatus(); - - Optional<TsBlock> getBatchResult() throws IoTDBException; - - Optional<ByteBuffer> getByteBufferBatchResult() throws IoTDBException; - - boolean hasNextResult(); - - int getOutputValueColumnCount(); - - DatasetHeader getDatasetHeader(); + TsBlock constructResultForMemorySource(MPPQueryContext context); boolean isQuery(); - String getQueryId(); - - long getStartExecutionTime(); - - void recordExecutionTime(long executionTime); - - /** @return cost time in ns */ - long getTotalExecutionTime(); + boolean needSetHighestPriority(); - Optional<String> getExecuteSQL(); + DatasetHeader getRespDatasetHeader(); - Statement getStatement(); + String getStatementType(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java index 1e32ea9cee1..b0d5a1b92eb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java @@ -475,7 +475,7 @@ public class LoadTsfileAnalyzer { final long queryId = SessionManager.getInstance().requestQueryId(); final ExecutionResult result = Coordinator.getInstance() - .execute( + .executeForTreeModel( statement, queryId, null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java index 4fca45790ea..cbf85d67a82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -83,7 +83,7 @@ class AutoCreateSchemaExecutor { private ExecutionResult executeStatement(Statement statement, MPPQueryContext context) { - return coordinator.execute( + return coordinator.executeForTreeModel( statement, SessionManager.getInstance().requestQueryId(), context == null ? null : context.getSession(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 0adbca34ad5..75e087307ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -81,7 +81,7 @@ class ClusterSchemaFetchExecutor { if (context != null && context.getQueryType() == QueryType.READ) { sql += ", " + context.getQueryId() + " : " + context.getSql(); } - return coordinator.execute( + return coordinator.executeForTreeModel( statement, queryId, context == null ? null : context.getSession(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index 9652cbce155..0a92914b571 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.execution; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import java.nio.ByteBuffer; @@ -64,5 +63,5 @@ public interface IQueryExecution { Optional<String> getExecuteSQL(); - Statement getStatement(); + String getStatementType(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 99eb73e3425..8446c0bd4fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -20,9 +20,6 @@ package org.apache.iotdb.db.queryengine.plan.execution; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; -import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; @@ -40,31 +37,15 @@ import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle; import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; -import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; -import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer; -import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; -import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.memory.MemorySourceHandle; -import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySource; -import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceContext; -import org.apache.iotdb.db.queryengine.plan.execution.memory.StatementMemorySourceVisitor; -import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner; -import org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner; +import org.apache.iotdb.db.queryengine.plan.planner.IPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; -import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler; import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; -import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; -import org.apache.iotdb.db.queryengine.plan.statement.StatementType; -import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; -import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; -import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.rpc.RpcUtils; @@ -77,7 +58,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CancellationException; @@ -99,9 +79,9 @@ import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DIST * corresponding physical nodes. 3. Collect and monitor the progress/states of this query. */ public class QueryExecution implements IQueryExecution { - private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class); + private static final Logger LOGGER = LoggerFactory.getLogger(QueryExecution.class); - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private static final int MAX_RETRY_COUNT = 3; private static final long RETRY_INTERVAL_IN_MS = 2000; private int retryCount = 0; @@ -109,19 +89,12 @@ public class QueryExecution implements IQueryExecution { private IScheduler scheduler; private final QueryStateMachine stateMachine; - private final Statement rawStatement; - private Analysis analysis; + private final IPlanner planner; + + private IAnalysis analysis; private LogicalQueryPlan logicalPlan; private DistributedQueryPlan distributedPlan; - private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; - private final ScheduledExecutorService scheduledExecutor; - // TODO need to use factory to decide standalone or cluster - private final IPartitionFetcher partitionFetcher; - // TODO need to use factory to decide standalone or cluster, - private final ISchemaFetcher schemaFetcher; - // The result of QueryExecution will be written to the MPPDataExchangeManager in current Node. // We use this SourceHandle to fetch the TsBlock from it. private ISourceHandle resultHandle; @@ -129,12 +102,6 @@ public class QueryExecution implements IQueryExecution { // used for cleaning resultHandle up exactly once private final AtomicBoolean resultHandleCleanUp; - private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> - syncInternalServiceClientManager; - - private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> - asyncInternalServiceClientManager; - private final AtomicBoolean stopped; // cost time in ns @@ -148,28 +115,11 @@ public class QueryExecution implements IQueryExecution { PerformanceOverviewMetrics.getInstance(); @SuppressWarnings("squid:S107") - public QueryExecution( - Statement statement, - MPPQueryContext context, - ExecutorService executor, - ExecutorService writeOperationExecutor, - ScheduledExecutorService scheduledExecutor, - IPartitionFetcher partitionFetcher, - ISchemaFetcher schemaFetcher, - IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, - IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> - asyncInternalServiceClientManager) { - this.rawStatement = statement; - this.executor = executor; - this.writeOperationExecutor = writeOperationExecutor; - this.scheduledExecutor = scheduledExecutor; + public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService executor) { this.context = context; - this.analysis = analyze(statement, context, partitionFetcher, schemaFetcher); + this.planner = planner; + this.analysis = analyze(context); this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); - this.partitionFetcher = partitionFetcher; - this.schemaFetcher = schemaFetcher; - this.syncInternalServiceClientManager = syncInternalServiceClientManager; - this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; // We add the abort logic inside the QueryExecution. // So that the other components can only focus on the state change. @@ -184,7 +134,7 @@ public class QueryExecution implements IQueryExecution { if (state == QueryState.FAILED || state == QueryState.ABORTED || state == QueryState.CANCELED) { - logger.debug("[ReleaseQueryResource] state is: {}", state); + LOGGER.debug("[ReleaseQueryResource] state is: {}", state); Throwable cause = stateMachine.getFailureException(); releaseResource(cause); } @@ -203,7 +153,7 @@ public class QueryExecution implements IQueryExecution { public void start() { final long startTime = System.nanoTime(); if (skipExecute()) { - logger.debug("[SkipExecute]"); + LOGGER.debug("[SkipExecute]"); if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) { stateMachine.transitionToFailed(analysis.getFailStatus()); } else { @@ -255,60 +205,52 @@ public class QueryExecution implements IQueryExecution { private ExecutionResult retry() { if (retryCount >= MAX_RETRY_COUNT) { - logger.warn("[ReachMaxRetryCount]"); + LOGGER.warn("[ReachMaxRetryCount]"); stateMachine.transitionToFailed(); return getStatus(); } - logger.warn("error when executing query. {}", stateMachine.getFailureMessage()); + LOGGER.warn("error when executing query. {}", stateMachine.getFailureMessage()); // stop and clean up resources the QueryExecution used this.stopAndCleanup(stateMachine.getFailureException()); - logger.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS); + LOGGER.info("[WaitBeforeRetry] wait {}ms.", RETRY_INTERVAL_IN_MS); try { Thread.sleep(RETRY_INTERVAL_IN_MS); } catch (InterruptedException e) { - logger.warn("interrupted when waiting retry"); + LOGGER.warn("interrupted when waiting retry"); Thread.currentThread().interrupt(); } retryCount++; - logger.info("[Retry] retry count is: {}", retryCount); + LOGGER.info("[Retry] retry count is: {}", retryCount); stateMachine.transitionToQueued(); // force invalid PartitionCache - partitionFetcher.invalidAllCache(); + planner.invalidatePartitionCache(); // clear runtime variables in MPPQueryContext context.prepareForRetry(); // re-stop this.stopped.compareAndSet(true, false); this.resultHandleCleanUp.compareAndSet(true, false); // re-analyze the query - this.analysis = analyze(rawStatement, context, partitionFetcher, schemaFetcher); + this.analysis = analyze(context); // re-start the QueryExecution this.start(); return getStatus(); } private boolean skipExecute() { - return analysis.isFinishQueryAfterAnalyze() - || (context.getQueryType() == QueryType.READ && !analysis.hasDataSource()); + return analysis.canSkipExecute(context); } private void constructResultForMemorySource() { - StatementMemorySource memorySource = - new StatementMemorySourceVisitor() - .process(analysis.getStatement(), new StatementMemorySourceContext(context, analysis)); - this.resultHandle = new MemorySourceHandle(memorySource.getTsBlock()); - this.analysis.setRespDatasetHeader(memorySource.getDatasetHeader()); + TsBlock tsBlock = analysis.constructResultForMemorySource(context); + this.resultHandle = new MemorySourceHandle(tsBlock); } // Analyze the statement in QueryContext. Generate the analysis this query need - private Analysis analyze( - Statement statement, - MPPQueryContext context, - IPartitionFetcher partitionFetcher, - ISchemaFetcher schemaFetcher) { + private IAnalysis analyze(MPPQueryContext context) { final long startTime = System.nanoTime(); - Analysis result; + IAnalysis result; try { - result = new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement); + result = planner.analyze(context); } finally { long analyzeCost = System.nanoTime() - startTime; context.setAnalyzeCost(analyzeCost); @@ -319,45 +261,15 @@ public class QueryExecution implements IQueryExecution { private void schedule() { final long startTime = System.nanoTime(); - boolean isPipeEnrichedTsFileLoad = - rawStatement instanceof PipeEnrichedStatement - && ((PipeEnrichedStatement) rawStatement).getInnerStatement() - instanceof LoadTsFileStatement; - if (rawStatement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) { - this.scheduler = - new LoadTsFileScheduler( - distributedPlan, - context, - stateMachine, - syncInternalServiceClientManager, - partitionFetcher, - isPipeEnrichedTsFileLoad); - this.scheduler.start(); - return; - } - - // TODO: (xingtanzjr) initialize the query scheduler according to configuration - this.scheduler = - new ClusterScheduler( - context, - stateMachine, - distributedPlan.getInstances(), - context.getQueryType(), - executor, - writeOperationExecutor, - scheduledExecutor, - syncInternalServiceClientManager, - asyncInternalServiceClientManager); - this.scheduler.start(); + this.scheduler = planner.doSchedule(analysis, distributedPlan, context, stateMachine); PERFORMANCE_OVERVIEW_METRICS.recordScheduleCost(System.nanoTime() - startTime); } // Use LogicalPlanner to do the logical query plan and logical optimization public void doLogicalPlan() { - LogicalPlanner planner = new LogicalPlanner(this.context); - this.logicalPlan = planner.plan(this.analysis); - if (isQuery() && logger.isDebugEnabled()) { - logger.debug( + this.logicalPlan = planner.doLogicalPlan(analysis, context); + if (isQuery() && LOGGER.isDebugEnabled()) { + LOGGER.debug( "logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode())); } // check timeout after building logical plan because it could be time-consuming in some cases. @@ -367,10 +279,9 @@ public class QueryExecution implements IQueryExecution { // Generate the distributed plan and split it into fragments public void doDistributedPlan() { long startTime = System.nanoTime(); - DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan); - this.distributedPlan = planner.planFragments(); + this.distributedPlan = planner.doDistributionPlan(analysis, logicalPlan); - if (rawStatement.isQuery()) { + if (analysis.isQuery()) { long distributionPlanCost = System.nanoTime() - startTime; context.setDistributionPlanCost(distributionPlanCost); QUERY_PLAN_COST_METRIC_SET.recordPlanCost(DISTRIBUTION_PLANNER, distributionPlanCost); @@ -378,12 +289,12 @@ public class QueryExecution implements IQueryExecution { // if is this Statement is ShowQueryStatement, set its instances to the highest priority, so // that the sub-tasks of the ShowQueries instances could be executed first. - if (StatementType.SHOW_QUERIES.equals(rawStatement.getType())) { + if (analysis.needSetHighestPriority()) { distributedPlan.getInstances().forEach(instance -> instance.setHighestPriority(true)); } - if (isQuery() && logger.isDebugEnabled()) { - logger.debug( + if (isQuery() && LOGGER.isDebugEnabled()) { + LOGGER.debug( "distribution plan done. Fragment instance count is {}, details is: \n {}", distributedPlan.getInstances().size(), printFragmentInstances(distributedPlan.getInstances())); @@ -493,7 +404,7 @@ public class QueryExecution implements IQueryExecution { while (true) { try { if (resultHandle.isAborted()) { - logger.warn("[ResultHandleAborted]"); + LOGGER.warn("[ResultHandleAborted]"); stateMachine.transitionToAborted(); if (stateMachine.getFailureStatus() != null) { throw new IoTDBException( @@ -504,7 +415,7 @@ public class QueryExecution implements IQueryExecution { TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); } } else if (resultHandle.isFinished()) { - logger.debug("[ResultHandleFinished]"); + LOGGER.debug("[ResultHandleFinished]"); stateMachine.transitionToFinished(); return Optional.empty(); } @@ -695,41 +606,9 @@ public class QueryExecution implements IQueryExecution { // collect redirect info to client for writing // if 0.13_data_insert_adapt is true and ClientVersion is NOT V_1_0, stop returning redirect // info to client - if (analysis.getStatement() instanceof InsertBaseStatement - && !analysis.isFinishQueryAfterAnalyze() - && (!config.isEnable13DataInsertAdapt() - || IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion()))) { - InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement(); - List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList(); - if (insertStatement instanceof InsertRowsStatement - || insertStatement instanceof InsertMultiTabletsStatement) { - // multiple devices - if (statusCode == TSStatusCode.SUCCESS_STATUS) { - boolean needRedirect = false; - List<TSStatus> subStatus = new ArrayList<>(); - for (TEndPoint endPoint : redirectNodeList) { - // redirect writing only if the redirectEndPoint is not the current node - if (!config.getAddressAndPort().equals(endPoint)) { - subStatus.add( - RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint)); - needRedirect = true; - } else { - subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); - } - } - if (needRedirect) { - tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); - tsstatus.setSubStatus(subStatus); - } - } - } else { - // single device - TEndPoint redirectEndPoint = redirectNodeList.get(0); - // redirect writing only if the redirectEndPoint is not the current node - if (!config.getAddressAndPort().equals(redirectEndPoint)) { - tsstatus.setRedirectNode(redirectEndPoint); - } - } + if (!CONFIG.isEnable13DataInsertAdapt() + || IoTDBConstant.ClientVersion.V_1_0.equals(context.getSession().getVersion())) { + planner.setRedirectInfo(analysis, CONFIG.getAddressAndPort(), tsstatus, statusCode); } return new ExecutionResult(context.getQueryId(), tsstatus); @@ -739,10 +618,6 @@ public class QueryExecution implements IQueryExecution { return distributedPlan; } - public LogicalQueryPlan getLogicalPlan() { - return logicalPlan; - } - @Override public boolean isQuery() { return context.getQueryType() == QueryType.READ; @@ -774,8 +649,8 @@ public class QueryExecution implements IQueryExecution { } @Override - public Statement getStatement() { - return analysis.getStatement(); + public String getStatementType() { + return analysis.getStatementType(); } public MPPQueryContext getContext() { @@ -787,6 +662,6 @@ public class QueryExecution implements IQueryExecution { } public ScheduledExecutorService getScheduledExecutor() { - return scheduledExecutor; + return planner.getScheduledExecutorService(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index 55ce059a20d..0fbef6973cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -21,8 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.execution.config; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; @@ -31,7 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor; import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; @@ -57,7 +55,7 @@ public class ConfigExecution implements IQueryExecution { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigExecution.class); - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final TsBlockSerde serde = new TsBlockSerde(); private final MPPQueryContext context; private final ExecutorService executor; @@ -68,31 +66,29 @@ public class ConfigExecution implements IQueryExecution { private DatasetHeader datasetHeader; private boolean resultSetConsumed; private final IConfigTask task; - private IConfigTaskExecutor configTaskExecutor; - - private static final TsBlockSerde serde = new TsBlockSerde(); + private final IConfigTaskExecutor configTaskExecutor; - private Statement statement; + private final StatementType statementType; private long totalExecutionTime; - public ConfigExecution(MPPQueryContext context, Statement statement, ExecutorService executor) { + public ConfigExecution( + MPPQueryContext context, + StatementType statementType, + ExecutorService executor, + IConfigTask task) { this.context = context; - this.statement = statement; + this.statementType = statementType; this.executor = executor; this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); this.taskFuture = SettableFuture.create(); - this.task = statement.accept(new ConfigTaskVisitor(), context); + this.task = task; this.resultSetConsumed = false; configTaskExecutor = ClusterConfigTaskExecutor.getInstance(); } @TestOnly public ConfigExecution(MPPQueryContext context, ExecutorService executor, IConfigTask task) { - this.context = context; - this.executor = executor; - this.stateMachine = new QueryStateMachine(context.getQueryId(), executor); - this.taskFuture = SettableFuture.create(); - this.task = task; + this(context, StatementType.NULL, executor, task); } @Override @@ -249,7 +245,7 @@ public class ConfigExecution implements IQueryExecution { } @Override - public Statement getStatement() { - return statement; + public String getStatementType() { + return statementType.name(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index db8ed28298f..287a9157f3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -1938,7 +1938,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { createLogicalViewStatement.setViewExpressions(Collections.singletonList(viewExpression)); ExecutionResult executionResult = Coordinator.getInstance() - .execute( + .executeForTreeModel( createLogicalViewStatement, 0, null, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java new file mode 100644 index 00000000000..616cca3efb7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; +import org.apache.iotdb.rpc.TSStatusCode; + +import java.util.concurrent.ScheduledExecutorService; + +public interface IPlanner { + + IAnalysis analyze(MPPQueryContext context); + + LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext context); + + DistributedQueryPlan doDistributionPlan(IAnalysis analysis, LogicalQueryPlan logicalPlan); + + IScheduler doSchedule( + IAnalysis analysis, + DistributedQueryPlan distributedPlan, + MPPQueryContext context, + QueryStateMachine stateMachine); + + void invalidatePartitionCache(); + + ScheduledExecutorService getScheduledExecutorService(); + + void setRedirectInfo( + IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java new file mode 100644 index 00000000000..a42ff843f30 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner; +import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.scheduler.ClusterScheduler; +import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler; +import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +public class TreeModelPlanner implements IPlanner { + + private final Statement statement; + + private final ExecutorService executor; + private final ExecutorService writeOperationExecutor; + private final ScheduledExecutorService scheduledExecutor; + + private final IPartitionFetcher partitionFetcher; + + private final ISchemaFetcher schemaFetcher; + + private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> + syncInternalServiceClientManager; + + private final IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> + asyncInternalServiceClientManager; + + public TreeModelPlanner( + Statement statement, + ExecutorService executor, + ExecutorService writeOperationExecutor, + ScheduledExecutorService scheduledExecutor, + IPartitionFetcher partitionFetcher, + ISchemaFetcher schemaFetcher, + IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncInternalServiceClientManager, + IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> + asyncInternalServiceClientManager) { + this.statement = statement; + this.executor = executor; + this.writeOperationExecutor = writeOperationExecutor; + this.scheduledExecutor = scheduledExecutor; + this.partitionFetcher = partitionFetcher; + this.schemaFetcher = schemaFetcher; + this.syncInternalServiceClientManager = syncInternalServiceClientManager; + this.asyncInternalServiceClientManager = asyncInternalServiceClientManager; + } + + @Override + public IAnalysis analyze(MPPQueryContext context) { + return new Analyzer(context, partitionFetcher, schemaFetcher).analyze(statement); + } + + @Override + public LogicalQueryPlan doLogicalPlan(IAnalysis analysis, MPPQueryContext context) { + LogicalPlanner logicalPlanner = new LogicalPlanner(context); + return logicalPlanner.plan((Analysis) analysis); + } + + @Override + public DistributedQueryPlan doDistributionPlan(IAnalysis analysis, LogicalQueryPlan logicalPlan) { + DistributionPlanner planner = new DistributionPlanner((Analysis) analysis, logicalPlan); + return planner.planFragments(); + } + + @Override + public IScheduler doSchedule( + IAnalysis analysis, + DistributedQueryPlan distributedPlan, + MPPQueryContext context, + QueryStateMachine stateMachine) { + IScheduler scheduler; + + boolean isPipeEnrichedTsFileLoad = + statement instanceof PipeEnrichedStatement + && ((PipeEnrichedStatement) statement).getInnerStatement() + instanceof LoadTsFileStatement; + if (statement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) { + scheduler = + new LoadTsFileScheduler( + distributedPlan, + context, + stateMachine, + syncInternalServiceClientManager, + partitionFetcher, + isPipeEnrichedTsFileLoad); + } else { + scheduler = + new ClusterScheduler( + context, + stateMachine, + distributedPlan.getInstances(), + context.getQueryType(), + executor, + writeOperationExecutor, + scheduledExecutor, + syncInternalServiceClientManager, + asyncInternalServiceClientManager); + } + + scheduler.start(); + return scheduler; + } + + @Override + public void invalidatePartitionCache() { + partitionFetcher.invalidAllCache(); + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return scheduledExecutor; + } + + @Override + public void setRedirectInfo( + IAnalysis iAnalysis, TEndPoint localEndPoint, TSStatus tsstatus, TSStatusCode statusCode) { + Analysis analysis = (Analysis) iAnalysis; + if (analysis.getStatement() instanceof InsertBaseStatement + && !analysis.isFinishQueryAfterAnalyze()) { + InsertBaseStatement insertStatement = (InsertBaseStatement) analysis.getStatement(); + List<TEndPoint> redirectNodeList = analysis.getRedirectNodeList(); + if (insertStatement instanceof InsertRowsStatement + || insertStatement instanceof InsertMultiTabletsStatement) { + // multiple devices + if (statusCode == TSStatusCode.SUCCESS_STATUS) { + boolean needRedirect = false; + List<TSStatus> subStatus = new ArrayList<>(); + for (TEndPoint endPoint : redirectNodeList) { + // redirect writing only if the redirectEndPoint is not the current node + if (!localEndPoint.equals(endPoint)) { + subStatus.add( + RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS).setRedirectNode(endPoint)); + needRedirect = true; + } else { + subStatus.add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + } + } + if (needRedirect) { + tsstatus.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()); + tsstatus.setSubStatus(subStatus); + } + } + } else { + // single device + TEndPoint redirectEndPoint = redirectNodeList.get(0); + // redirect writing only if the redirectEndPoint is not the current node + if (!localEndPoint.equals(redirectEndPoint)) { + tsstatus.setRedirectNode(redirectEndPoint); + } + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index 711ff6c25cc..e302ef1dc35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; @@ -141,7 +142,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { new FragmentInstance( fragment, fragment.getId().genFragmentInstanceId(), - globalTimePredicate, + new TreeModelTimePredicate(globalTimePredicate), queryContext.getQueryType(), queryContext.getTimeOut(), queryContext.getSession(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java index 36cb4ecc9d1..542f2b6f5b8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; +import org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; @@ -61,7 +62,7 @@ public class WriteFragmentParallelPlanner implements IFragmentParallelPlaner { new FragmentInstance( new PlanFragment(fragment.getId(), split), fragment.getId().genFragmentInstanceId(), - globalTimePredicate, + new TreeModelTimePredicate(globalTimePredicate), queryContext.getQueryType(), queryContext.getTimeOut(), queryContext.getSession()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java index ca133cc5b8d..cedc99669a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/DistributedQueryPlan.java @@ -23,10 +23,10 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import java.util.List; public class DistributedQueryPlan { - private MPPQueryContext context; - private SubPlan rootSubPlan; - private List<PlanFragment> fragments; - private List<FragmentInstance> instances; + private final MPPQueryContext context; + private final SubPlan rootSubPlan; + private final List<PlanFragment> fragments; + private final List<FragmentInstance> instances; public DistributedQueryPlan( MPPQueryContext context, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index 4c8204a2a22..4dd56beefa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; -import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -46,9 +45,9 @@ import java.util.Objects; public class FragmentInstance implements IConsensusRequest { - private static final Logger logger = LoggerFactory.getLogger(FragmentInstance.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstance.class); - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); private final FragmentInstanceId id; private final QueryType type; @@ -60,7 +59,7 @@ public class FragmentInstance implements IConsensusRequest { private TDataNodeLocation hostDataNode; - private final Expression globalTimePredicate; + private final TimePredicate globalTimePredicate; private final long timeOut; @@ -86,7 +85,7 @@ public class FragmentInstance implements IConsensusRequest { public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, - Expression globalTimePredicate, + TimePredicate globalTimePredicate, QueryType type, long timeOut, SessionInfo sessionInfo) { @@ -94,7 +93,7 @@ public class FragmentInstance implements IConsensusRequest { this.globalTimePredicate = globalTimePredicate; this.id = id; this.type = type; - this.timeOut = timeOut > 0 ? timeOut : config.getQueryTimeoutThreshold(); + this.timeOut = timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(); this.isRoot = false; this.sessionInfo = sessionInfo; } @@ -102,7 +101,7 @@ public class FragmentInstance implements IConsensusRequest { public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, - Expression globalTimePredicate, + TimePredicate globalTimePredicate, QueryType type, long timeOut, SessionInfo sessionInfo, @@ -116,7 +115,7 @@ public class FragmentInstance implements IConsensusRequest { public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, - Expression globalTimePredicate, + TimePredicate globalTimePredicate, QueryType type, long timeOut, SessionInfo sessionInfo, @@ -172,7 +171,7 @@ public class FragmentInstance implements IConsensusRequest { isHighestPriority = highestPriority; } - public Expression getGlobalTimePredicate() { + public TimePredicate getGlobalTimePredicate() { return globalTimePredicate; } @@ -214,7 +213,7 @@ public class FragmentInstance implements IConsensusRequest { boolean hasSessionInfo = ReadWriteIOUtils.readBool(buffer); SessionInfo sessionInfo = hasSessionInfo ? SessionInfo.deserializeFrom(buffer) : null; boolean hasTimePredicate = ReadWriteIOUtils.readBool(buffer); - Expression globalTimePredicate = hasTimePredicate ? Expression.deserialize(buffer) : null; + TimePredicate globalTimePredicate = hasTimePredicate ? TimePredicate.deserialize(buffer) : null; QueryType queryType = QueryType.values()[ReadWriteIOUtils.readInt(buffer)]; int dataNodeFINum = ReadWriteIOUtils.readInt(buffer); FragmentInstance fragmentInstance = @@ -239,7 +238,7 @@ public class FragmentInstance implements IConsensusRequest { } ReadWriteIOUtils.write(globalTimePredicate != null, outputStream); if (globalTimePredicate != null) { - Expression.serialize(globalTimePredicate, outputStream); + globalTimePredicate.serialize(outputStream); } ReadWriteIOUtils.write(type.ordinal(), outputStream); ReadWriteIOUtils.write(dataNodeFINum, outputStream); @@ -250,7 +249,7 @@ public class FragmentInstance implements IConsensusRequest { ReadWriteIOUtils.write(isExplainAnalyze, outputStream); return ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size()); } catch (IOException e) { - logger.error("Unexpected error occurs when serializing this FragmentInstance.", e); + LOGGER.error("Unexpected error occurs when serializing this FragmentInstance.", e); throw new SerializationRunTimeException(e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java index 7583b624b64..925bfe4416e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java @@ -38,7 +38,7 @@ import java.util.Objects; /** PlanFragment contains a sub-query of distributed query. */ public class PlanFragment { - // TODO once you add field for this class you need to change the serialize and deserialize methods + // once you add field for this class you need to change the serialize and deserialize methods private final PlanFragmentId id; private PlanNode planNodeTree; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java index 0bceb53eb5c..909cf67acfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/SubPlan.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.List; public class SubPlan { - private PlanFragment planFragment; + private final PlanFragment planFragment; private List<SubPlan> children; public SubPlan(PlanFragment planFragment) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java new file mode 100644 index 00000000000..1de79f00809 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TimePredicate.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan; + +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface TimePredicate { + + void serialize(DataOutputStream stream) throws IOException; + + Filter convertPredicateToTimeFilter(); + + static TimePredicate deserialize(ByteBuffer byteBuffer) { + // TODO will return another kind of TimePredicate like TableModelTimePredicate in the future + return new TreeModelTimePredicate(Expression.deserialize(byteBuffer)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java new file mode 100644 index 00000000000..139deed980c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/TreeModelTimePredicate.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan; + +import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Objects; + +public class TreeModelTimePredicate implements TimePredicate { + + private final Expression timePredicate; + + public TreeModelTimePredicate(Expression timePredicate) { + this.timePredicate = timePredicate; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + Expression.serialize(timePredicate, stream); + } + + @Override + public Filter convertPredicateToTimeFilter() { + return PredicateUtils.convertPredicateToTimeFilter(timePredicate); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TreeModelTimePredicate that = (TreeModelTimePredicate) o; + return Objects.equals(timePredicate, that.timePredicate); + } + + @Override + public int hashCode() { + return Objects.hash(timePredicate); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index e5afac118d4..87dc13ecd2e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -180,7 +180,8 @@ public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { InsertRowStatement s = StatementGenerator.createStatement(request); final long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = - COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.error("Failed to update the value of metric with status {}", result.status); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index 66e65b37150..61ec16a35da 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -48,7 +48,6 @@ import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; @@ -1810,7 +1809,7 @@ public class MergeSortOperatorTest { } @Override - public Statement getStatement() { + public String getStatementType() { return null; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java index d19e8c29c0e..cc3363a097a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/FragmentInstanceSerdeTest.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; +import org.apache.iotdb.db.queryengine.plan.planner.plan.TreeModelTimePredicate; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; @@ -71,7 +72,7 @@ public class FragmentInstanceSerdeTest { new FragmentInstance( new PlanFragment(planFragmentId, constructPlanNodeTree()), planFragmentId.genFragmentInstanceId(), - ExpressionFactory.groupByTime(1, 2, 3, 4), + new TreeModelTimePredicate(ExpressionFactory.groupByTime(1, 2, 3, 4)), QueryType.READ, config.getQueryTimeoutThreshold(), sessionInfo);
