This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch support_copy in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 27d1afa3a06672584c251b46998a407385ea2c72 Author: Tian Jiang <[email protected]> AuthorDate: Thu Mar 12 12:13:16 2026 +0800 temp --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 235 ++++++++++++--------- .../relational/analyzer/StatementAnalyzer.java | 6 + .../plan/relational/planner/QueryPlanner.java | 5 + .../plan/relational/planner/RelationPlanner.java | 6 + .../relational/planner/TableLogicalPlanner.java | 14 +- .../distribute/TableDistributedPlanGenerator.java | 1 + .../plan/relational/sql/ast/AstVisitor.java | 4 + .../queryengine/plan/relational/sql/ast/Copy.java | 24 +++ .../plan/relational/sql/parser/AstBuilder.java | 5 + .../db/relational/grammar/sql/RelationalSql.g4 | 1 + 10 files changed, 198 insertions(+), 103 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index d3f6b7d91dd..41563e7bbad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.protocol.thrift.impl; +import java.util.Map.Entry; import org.apache.iotdb.common.rpc.thrift.TAggregationType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -33,6 +34,7 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.ConfigurationFileUtils; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.partition.DataPartition; @@ -53,6 +55,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.IClientSession.SqlDialect; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.protocol.thrift.OperationType; import org.apache.iotdb.db.queryengine.common.SessionInfo; @@ -83,7 +86,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; -import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.Builder; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; @@ -93,6 +96,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.security.TreeAccessCheckC import org.apache.iotdb.db.queryengine.plan.relational.sql.ParameterExtractor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Copy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Execute; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; @@ -199,6 +203,7 @@ import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; import io.airlift.units.Duration; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.common.conf.TSFileConfig; @@ -315,6 +320,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { relationSqlParser = new SqlParser(); } + private static class ExecutionContext { + private OperationQuota quota = null; + private StatementType statementType = null; + private boolean finished = false; + private TSStatus status = null; + private boolean isDatabaseSetBefore; + } + private TSExecuteStatementResp executeStatementInternal( NativeStatementRequest request, SelectResult setResult) { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); @@ -322,39 +335,35 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); } - boolean finished = false; + Long statementId = request.getStatementId(); long queryId = Long.MIN_VALUE; String statement = request.getSql(); // quota - OperationQuota quota = null; + ExecutionContext executionContext = new ExecutionContext(); long startTime = System.nanoTime(); - StatementType statementType = null; Throwable t = null; - boolean useDatabase = false; - final boolean isDatabaseSetBefore = Objects.nonNull(clientSession.getDatabaseName()); - boolean setSqlDialect = false; + executionContext.isDatabaseSetBefore = Objects.nonNull(clientSession.getDatabaseName()); + Statement treeStatement = null; + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement tableStatement = null; try { // create and cache dataset ExecutionResult result; - if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) { - Statement s = request.getTreeStatement(clientSession.getZoneId()); - if (s instanceof SetSqlDialectStatement) { - setSqlDialect = true; - } + if (clientSession.getSqlDialect() == SqlDialect.TREE) { + treeStatement = request.getTreeStatement(clientSession.getZoneId()); - if (s == null) { + if (treeStatement == null) { return RpcUtils.getTSExecuteStatementResp( RpcUtils.getStatus( TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); } - if (s instanceof CreateTableViewStatement) { + if (treeStatement instanceof CreateTableViewStatement) { result = COORDINATOR.executeForTableModel( - ((CreateTableViewStatement) s).getCreateTableView(), + ((CreateTableViewStatement) treeStatement).getCreateTableView(), new SqlParser(), SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(), SESSION_MANAGER.requestQueryId(), @@ -363,12 +372,12 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { LocalExecutionPlanner.getInstance().metadata, config.getQueryTimeoutThreshold(), false, - s.isDebug()); + treeStatement.isDebug()); } else { // permission check TSStatus status = AuthorityChecker.checkAuthority( - s, + treeStatement, new TreeAccessCheckContext( clientSession.getUserId(), clientSession.getUsername(), @@ -378,18 +387,18 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return RpcUtils.getTSExecuteStatementResp(status); } - quota = + executionContext.quota = DataNodeThrottleQuotaManager.getInstance() - .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); - statementType = s.getType(); + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), treeStatement); + executionContext.statementType = treeStatement.getType(); queryId = SESSION_MANAGER.requestQueryId(clientSession, statementId); // Split statement if needed to limit resource consumption during statement analysis - if (s.shouldSplit()) { + if (treeStatement.shouldSplit()) { result = executeBatchStatement( - s, + treeStatement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), statement, @@ -400,7 +409,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } else { result = COORDINATOR.executeForTreeModel( - s, + treeStatement, queryId, SESSION_MANAGER.getSessionInfo(clientSession), statement, @@ -408,22 +417,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { schemaFetcher, request.getTimeout(), true, - s.isDebug()); + treeStatement.isDebug()); } } } else { - org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s = + tableStatement = request.getTableStatement(relationSqlParser, clientSession.getZoneId(), clientSession); - if (s instanceof Use) { - useDatabase = true; - } - - if (s instanceof SetSqlDialect) { - setSqlDialect = true; - } - - if (s == null) { + if (tableStatement == null) { return RpcUtils.getTSExecuteStatementResp( RpcUtils.getStatus( TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); @@ -432,10 +433,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { queryId = SESSION_MANAGER.requestQueryId(clientSession, statementId); // Split statement if needed to limit resource consumption during statement analysis - if (s.shouldSplit()) { + if (tableStatement.shouldSplit()) { result = executeBatchTableStatement( - s, + tableStatement, relationSqlParser, clientSession, queryId, @@ -447,7 +448,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } else { result = COORDINATOR.executeForTableModel( - s, + tableStatement, relationSqlParser, clientSession, queryId, @@ -456,76 +457,43 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { metadata, request.getTimeout(), true, - s.isDebug()); + tableStatement.isDebug()); } } if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - finished = true; + executionContext.finished = true; final TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(result.status); - if (isDatabaseSetBefore && Objects.isNull(clientSession.getDatabaseName())) { + if (executionContext.isDatabaseSetBefore && Objects.isNull(clientSession.getDatabaseName())) { // Previously unused resp.setOperationType("dropDB"); } return resp; } - - IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + executionContext.status = result.status; try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { - TSExecuteStatementResp resp; - if (queryExecution != null && queryExecution.isQuery()) { - statementType = statementType == null ? StatementType.QUERY : statementType; - resp = createResponse(queryExecution.getDatasetHeader(), queryId); - resp.setStatus(result.status); - finished = setResult.apply(resp, queryExecution, request.getFetchSize()); - resp.setMoreData(!finished); - if (quota != null) { - quota.addReadResult(resp.getQueryResult()); - } - // Should return SUCCESS_MESSAGE for insert into query - if (queryExecution.getQueryType() == QueryType.READ_WRITE) { - resp.setColumns(null); - } - } else { - finished = true; - resp = RpcUtils.getTSExecuteStatementResp(result.status); - // set for use XX - if (useDatabase) { - resp.setDatabase(clientSession.getDatabaseName()); - } - if (isDatabaseSetBefore && Objects.isNull(clientSession.getDatabaseName())) { - // Previously unused - resp.setOperationType("dropDB"); - } - - if (setSqlDialect) { - resp.setTableModel( - SESSION_MANAGER.getCurrSessionAndUpdateIdleTime().getSqlDialect() - == IClientSession.SqlDialect.TABLE); - } - } - return resp; + return constructExecutionResponse(queryId, executionContext, request, setResult, clientSession, treeStatement, tableStatement); } } catch (ParsingException e) { - finished = true; + executionContext.finished = true; t = e; return RpcUtils.getTSExecuteStatementResp( RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage())); } catch (Exception e) { - finished = true; + executionContext.finished = true; t = e; final TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp( onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); - if (isDatabaseSetBefore && Objects.isNull(clientSession.getDatabaseName())) { + if (executionContext.isDatabaseSetBefore && Objects.isNull(clientSession.getDatabaseName())) { // Previously unused resp.setOperationType("dropDB"); } return resp; } catch (Error error) { - finished = true; + executionContext.finished = true; t = error; throw error; } finally { @@ -533,25 +501,90 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { COORDINATOR.recordExecutionTime(queryId, currentOperationCost); // record each operation time cost - if (statementType != null) { + if (executionContext.statementType != null) { CommonUtils.addStatementExecutionLatency( - OperationType.EXECUTE_QUERY_STATEMENT, statementType.name(), currentOperationCost); + OperationType.EXECUTE_QUERY_STATEMENT, executionContext.statementType.name(), currentOperationCost); } - if (finished) { + if (executionContext.finished) { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); CommonUtils.addQueryLatency( - statementType, executionTime > 0 ? executionTime : currentOperationCost); + executionContext.statementType, executionTime > 0 ? executionTime : currentOperationCost); clearUp(clientSession, statementId, queryId, request, t); } SESSION_MANAGER.updateIdleTime(); - if (quota != null) { - quota.close(); + if (executionContext.quota != null) { + executionContext.quota.close(); } } } + private TSExecuteStatementResp constructExecutionResponse(long queryId, + ExecutionContext executionContext, NativeStatementRequest request, + SelectResult setResult, IClientSession clientSession, Statement treeStatement, + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement tableStatement) throws IoTDBException, IOException { + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + if (queryExecution != null && queryExecution.isQuery()) { + if (tableStatement instanceof Copy) { + return constructCopyExecutionResponse(executionContext, queryId, queryExecution, request, tableStatement); + } + return constructQueryExecutionResponse(executionContext, queryId, queryExecution, setResult, request); + } else { + return constructNonQueryExecutionResponse(executionContext, clientSession, + treeStatement, tableStatement); + } + } + + private TSExecuteStatementResp constructCopyExecutionResponse(ExecutionContext executionContext, + long queryId, IQueryExecution queryExecution, NativeStatementRequest request, + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement) throws IoTDBException, IOException { + + } + + private TSExecuteStatementResp constructQueryExecutionResponse(ExecutionContext executionContext, + long queryId, IQueryExecution queryExecution, SelectResult setResult, + NativeStatementRequest request) throws IoTDBException, IOException { + TSExecuteStatementResp resp; + executionContext.statementType = executionContext.statementType == null ? StatementType.QUERY : executionContext.statementType; + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(executionContext.status); + executionContext.finished = setResult.apply(resp, queryExecution, request.getFetchSize()); + resp.setMoreData(!executionContext.finished); + if (executionContext.quota != null) { + executionContext.quota.addReadResult(resp.getQueryResult()); + } + // Should return SUCCESS_MESSAGE for insert into query + if (queryExecution.getQueryType() == QueryType.READ_WRITE) { + resp.setColumns(null); + } + return resp; + } + + private TSExecuteStatementResp constructNonQueryExecutionResponse( + ExecutionContext executionContext, + IClientSession clientSession, Statement treeStatement, + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement tableStatement) { + TSExecuteStatementResp resp; + executionContext.finished = true; + resp = RpcUtils.getTSExecuteStatementResp(executionContext.status); + // set for use XX + if (tableStatement instanceof Use) { + resp.setDatabase(clientSession.getDatabaseName()); + } + if (executionContext.isDatabaseSetBefore && Objects.isNull(clientSession.getDatabaseName())) { + // Previously unused + resp.setOperationType("dropDB"); + } + + if (tableStatement instanceof SetSqlDialect || treeStatement instanceof SetSqlDialectStatement) { + resp.setTableModel( + SESSION_MANAGER.getCurrSessionAndUpdateIdleTime().getSqlDialect() + == SqlDialect.TABLE); + } + return resp; + } + private void clearUp( IClientSession clientSession, Long statementId, @@ -566,7 +599,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { IClientSession clientSession, Long statementId, Long queryId, - org.apache.thrift.TBase<?, ?> req, + TBase<?, ?> req, Throwable t) { COORDINATOR.cleanupQueryExecution(queryId, req, t); clientSession.removeQueryId(statementId, queryId); @@ -1045,7 +1078,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { new OperatorContext(1, planNodeId, "SeriesAggregationScanOperator", driverContext); operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE); - SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + Builder scanOptionsBuilder = new Builder(); scanOptionsBuilder.withAllSensors(Collections.singleton(measurement)); scanOptionsBuilder.withGlobalTimeFilter(timeFilter); @@ -1198,13 +1231,13 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // 2.2 all sensors hit cache, return response ~= 20ms final TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum); - for (final Map.Entry<TableId, Map<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>> + for (final Entry<TableId, Map<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>>> result : resultMap.entrySet()) { - for (final Map.Entry<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>> + for (final Entry<IDeviceID, Map<String, Pair<TSDataType, TimeValuePair>>> device2MeasurementLastEntry : result.getValue().entrySet()) { final String deviceWithSeparator = device2MeasurementLastEntry.getKey().toString() + TsFileConstant.PATH_SEPARATOR; - for (final Map.Entry<String, Pair<TSDataType, TimeValuePair>> measurementLastEntry : + for (final Entry<String, Pair<TSDataType, TimeValuePair>> measurementLastEntry : device2MeasurementLastEntry.getValue().entrySet()) { final TimeValuePair tvPair = measurementLastEntry.getValue().getRight(); if (tvPair != TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR) { @@ -1581,8 +1614,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { - IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req); - IClientSession.SqlDialect sqlDialect; + ClientVersion clientVersion = parseClientVersion(req); + SqlDialect sqlDialect; try { sqlDialect = parseSqlDialect(req); } catch (IllegalArgumentException e) { @@ -1613,27 +1646,27 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp.setSessionId(openSessionResp.getSessionId()).setConfiguration(configuration); } - private IoTDBConstant.ClientVersion parseClientVersion(TSOpenSessionReq req) { + private ClientVersion parseClientVersion(TSOpenSessionReq req) { Map<String, String> configuration = req.configuration; if (configuration != null && configuration.containsKey("version")) { - return IoTDBConstant.ClientVersion.valueOf(configuration.get("version")); + return ClientVersion.valueOf(configuration.get("version")); } - return IoTDBConstant.ClientVersion.V_0_12; + return ClientVersion.V_0_12; } - private IClientSession.SqlDialect parseSqlDialect(TSOpenSessionReq req) { + private SqlDialect parseSqlDialect(TSOpenSessionReq req) { Map<String, String> configuration = req.configuration; if (configuration != null && configuration.containsKey("sql_dialect")) { String sqlDialect = configuration.get("sql_dialect"); if ("tree".equalsIgnoreCase(sqlDialect)) { - return IClientSession.SqlDialect.TREE; + return SqlDialect.TREE; } else if ("table".equalsIgnoreCase(sqlDialect)) { - return IClientSession.SqlDialect.TABLE; + return SqlDialect.TABLE; } else { throw new IllegalArgumentException("Unknown sql_dialect: " + sqlDialect); } } else { - return IClientSession.SqlDialect.TREE; + return SqlDialect.TREE; } } @@ -2095,7 +2128,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { try { long queryId; ExecutionResult result; - if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) { + if (clientSession.getSqlDialect() == SqlDialect.TREE) { Statement s = StatementGenerator.createStatement(statement, clientSession.getZoneId()); if (s == null) { return RpcUtils.getStatus( @@ -3478,7 +3511,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp.setQueryId(queryId); resp.setTableModel( SESSION_MANAGER.getCurrSessionAndUpdateIdleTime().getSqlDialect() - == IClientSession.SqlDialect.TABLE); + == SqlDialect.TABLE); return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index bc6d54c37e7..3c99bb6ae93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -68,6 +68,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Columns; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Copy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateIndex; @@ -914,6 +915,11 @@ public class StatementAnalyzer { return queryScope; } + @Override + protected Scope visitCopyStatement(Copy node, Optional<Scope> context) { + + } + private List<Expression> descriptorToFields(Scope scope) { ImmutableList.Builder<Expression> builder = ImmutableList.builder(); for (int fieldIndex = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index 071a6a05da6..cc211426f3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -46,6 +46,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNod import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Copy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FieldReference; @@ -202,6 +203,10 @@ public class QueryPlanner { outerContext); } + public RelationPlan plan(Copy copy) { + + } + public RelationPlan plan(QuerySpecification node) { PlanBuilder builder = planFrom(node); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index ded699588c1..e63425354f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -88,6 +88,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AsofJoinOn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CoalesceExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Copy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except; @@ -245,6 +246,11 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { .plan(node); } + @Override + protected RelationPlan visitCopyStatement(Copy node, Void context) { + + } + @Override protected RelationPlan visitTable(final Table table, final Void context) { // is this a recursive reference in expandable named query? If so, there's base relation already diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java index 2d7ed174d2a..4d6d903a8db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java @@ -50,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments.Builder; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.PredicateWithUncorrelatedScalarSubqueryReconstructor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; @@ -65,8 +66,10 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.Table import org.apache.iotdb.db.queryengine.plan.relational.planner.node.schema.TableDeviceQueryScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LogicalOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; +import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer.Context; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AbstractQueryDeviceWithCache; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AbstractTraverseDevice; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Copy; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateOrUpdateDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; @@ -177,7 +180,7 @@ public class TableLogicalPlanner { planNode = optimizer.optimize( planNode, - new PlanOptimizer.Context( + new Context( sessionInfo, analysis, metadata, @@ -257,10 +260,17 @@ public class TableLogicalPlanner { if (statement instanceof Insert) { return genInsertPlan(analysis, (Insert) statement); } + if (statement instanceof Copy) { + return createRelationPlan(analysis, ((Copy) statement)); + } throw new IllegalStateException( "Unsupported statement type: " + statement.getClass().getSimpleName()); } + private RelationPlan createRelationPlan(Analysis analysis, Copy statement) { + return getRelationPlanner(analysis).process(statement, null); + } + private RelationPlan genInsertPlan(final Analysis analysis, final Insert node) { // query plan and visible fields Query query = node.getQuery(); @@ -280,7 +290,7 @@ public class TableLogicalPlanner { Analysis.Insert insert = analysis.getInsert(); List<ColumnSchema> insertColumns = insert.getColumns(); - Assignments.Builder assignments = Assignments.builder(); + Builder assignments = Assignments.builder(); List<Symbol> neededInputColumnNames = new ArrayList<>(insertColumns.size()); for (int i = 0, size = insertColumns.size(); i < size; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7072b5f519f..6257e701a27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.ClusterTopology; import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator; import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; import org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.CopyNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 13925ae3920..7ff4359ae8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -901,4 +901,8 @@ public abstract class AstVisitor<R, C> { protected R visitRangeQuantifier(RangeQuantifier node, C context) { return visitPatternQuantifier(node, context); } + + protected R visitCopyStatement(Copy node, C context) { + return visitNode(node, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Copy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Copy.java new file mode 100644 index 00000000000..6e13fdc648a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Copy.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +public class Copy extends Statement { + +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 1721ad9b80d..90285bc4cb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -2170,6 +2170,11 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { body.getLimit()); } + @Override + public Node visitCopy(RelationalSqlParser.CopyStatementContext ctx) { + + } + @Override public Node visitWith(RelationalSqlParser.WithContext ctx) { return new With( diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 08a8b4c2e82..c99adaef857 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -45,6 +45,7 @@ standaloneRowPattern statement // Query Statement : queryStatement + | copyStatement // Database Statement | useDatabaseStatement
