This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch TableModelIngestion in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6a7f837d1d28d7135d49dd7081fc753d4f3fb190 Author: jt2594838 <[email protected]> AuthorDate: Tue Jun 11 14:47:24 2024 +0800 Add WrappedStatement --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 42 +++++++++------ .../iotdb/db/queryengine/plan/Coordinator.java | 1 + .../plan/parser/StatementGenerator.java | 1 + .../plan/relational/sql/ast/InsertTablet.java | 29 +++++++++++ .../plan/relational/sql/ast/WrappedStatement.java | 60 ++++++++++++++++++++++ .../db/queryengine/plan/statement/Statement.java | 4 ++ .../plan/statement/crud/InsertTabletStatement.java | 9 ++++ .../thrift-datanode/src/main/thrift/client.thrift | 1 + 8 files changed, 132 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 0acddacb690..5a35238718d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -249,6 +249,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @FunctionalInterface public interface SelectResult { + boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) throws IoTDBException, IOException; } @@ -724,7 +725,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { true, true), AggregationStep.SINGLE, - Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)})); + Collections.singletonList(new InputLocation[]{new InputLocation(0, 0)})); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter( @@ -752,7 +753,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, !TSDataType.BLOB.equals(dataType) || (!TAggregationType.LAST_VALUE.equals(aggregationType) - && !TAggregationType.FIRST_VALUE.equals(aggregationType))); + && !TAggregationType.FIRST_VALUE.equals(aggregationType))); } else { path = new NonAlignedFullPath(Factory.DEFAULT_FACTORY.create(device), measurementSchema); operator = @@ -768,7 +769,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, !TSDataType.BLOB.equals(dataType) || (!TAggregationType.LAST_VALUE.equals(aggregationType) - && !TAggregationType.FIRST_VALUE.equals(aggregationType))); + && !TAggregationType.FIRST_VALUE.equals(aggregationType))); } try { @@ -1213,7 +1214,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { public TSStatus closeSession(TSCloseSessionReq req) { return new TSStatus( !SESSION_MANAGER.closeSession( - SESSION_MANAGER.getCurrSession(), COORDINATOR::cleanupQueryExecution) + SESSION_MANAGER.getCurrSession(), COORDINATOR::cleanupQueryExecution) ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN) : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } @@ -2069,15 +2070,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // Step 2: call the coordinator long queryId = SESSION_MANAGER.requestQueryId(); - ExecutionResult result = - COORDINATOR.executeForTreeModel( - statement, - queryId, - SESSION_MANAGER.getSessionInfo(clientSession), - "", - partitionFetcher, - schemaFetcher); - + ExecutionResult result; + if (statement.isWriteToTable()) { + result = COORDINATOR.executeForTableModel(statement.toRelationalStatement(), + relationSqlParser, clientSession, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), "", metadata, + config.getConnectionTimeoutInMS()); + } else { + result = COORDINATOR.executeForTreeModel( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher); + } return result.status; } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode()); @@ -2745,7 +2753,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { "Log in failed. Either you are not authorized or the session has timed out."); } - /** Add stat of whole stage query into metrics */ + /** + * Add stat of whole stage query into metrics + */ private void addQueryLatency(StatementType statementType, long costTimeInNanos) { if (statementType == null) { return; @@ -2763,7 +2773,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { statementType.name()); } - /** Add stat of operation into metrics */ + /** + * Add stat of operation into metrics + */ private void addStatementExecutionLatency( OperationType operation, String statementType, long costTime) { if (statementType == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 1b42abe62d8..0ace106a035 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -293,6 +293,7 @@ public class Coordinator { return new QueryExecution(relationalModelPlanner, queryContext, executor); } + public IQueryExecution getQueryExecution(Long queryId) { return queryExecutionMap.get(queryId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java index b0385c8e642..43a117002c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java @@ -336,6 +336,7 @@ public class StatementGenerator { insertStatement.setDataTypes(dataTypes); insertStatement.setAligned(insertTabletReq.isAligned); PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime); + insertStatement.setWriteToTable(insertTabletReq.isWriteToTable()); return insertStatement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java new file mode 100644 index 00000000000..2b858bdb8a7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.tree; + +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +public class InsertTablet extends WrappedStatement { + + public InsertTablet(InsertTabletStatement insertTabletStatement) { + super(insertTabletStatement); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java new file mode 100644 index 00000000000..62c7a7dee77 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.tree; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public abstract class WrappedStatement extends Statement { + private final org.apache.iotdb.db.queryengine.plan.statement.Statement innerTreeStatement; + + public WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement innerTreeStatement) { + super(null); + this.innerTreeStatement = innerTreeStatement; + } + + @Override + public List<? extends Node> getChildren() { + return Collections.emptyList(); + } + + @Override + public int hashCode() { + return innerTreeStatement.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WrappedStatement that = (WrappedStatement) o; + return Objects.equals(innerTreeStatement, that.innerTreeStatement); + } + + @Override + public String toString() { + return innerTreeStatement.toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java index f74d210c667..a19c61439c0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java @@ -66,4 +66,8 @@ public abstract class Statement extends StatementNode { AuthorityChecker.SUPER_USER.equals(userName), "Only the admin user can perform this operation"); } + + public org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement toRelationalStatement() { + throw new UnsupportedOperationException("Method not implemented yet"); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index a5dd5fbf511..35a0a6f18f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -67,6 +67,7 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem * views. */ private boolean[] measurementIsAligned; + private boolean writeToTable = false; public InsertTabletStatement() { super(); @@ -400,4 +401,12 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem return new Pair<>( this.recordedBeginOfLogicalViewSchemaList, this.recordedEndOfLogicalViewSchemaList); } + + public boolean isWriteToTable() { + return writeToTable; + } + + public void setWriteToTable(boolean writeToTable) { + this.writeToTable = writeToTable; + } } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index a99a8e599c9..2838cd82128 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -233,6 +233,7 @@ struct TSInsertTabletReq { 6: required list<i32> types 7: required i32 size 8: optional bool isAligned + 9: optional bool writeToTable } struct TSInsertTabletsReq {
