This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6a7f837d1d28d7135d49dd7081fc753d4f3fb190
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 11 14:47:24 2024 +0800

    Add WrappedStatement
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 42 +++++++++------
 .../iotdb/db/queryengine/plan/Coordinator.java     |  1 +
 .../plan/parser/StatementGenerator.java            |  1 +
 .../plan/relational/sql/ast/InsertTablet.java      | 29 +++++++++++
 .../plan/relational/sql/ast/WrappedStatement.java  | 60 ++++++++++++++++++++++
 .../db/queryengine/plan/statement/Statement.java   |  4 ++
 .../plan/statement/crud/InsertTabletStatement.java |  9 ++++
 .../thrift-datanode/src/main/thrift/client.thrift  |  1 +
 8 files changed, 132 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 0acddacb690..5a35238718d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -249,6 +249,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
   @FunctionalInterface
   public interface SelectResult {
+
     boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, 
int fetchSize)
         throws IoTDBException, IOException;
   }
@@ -724,7 +725,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
                 true,
                 true),
             AggregationStep.SINGLE,
-            Collections.singletonList(new InputLocation[] {new 
InputLocation(0, 0)}));
+            Collections.singletonList(new InputLocation[]{new InputLocation(0, 
0)}));
 
     GroupByTimeParameter groupByTimeParameter =
         new GroupByTimeParameter(
@@ -752,7 +753,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
               DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
               !TSDataType.BLOB.equals(dataType)
                   || (!TAggregationType.LAST_VALUE.equals(aggregationType)
-                      && 
!TAggregationType.FIRST_VALUE.equals(aggregationType)));
+                  && !TAggregationType.FIRST_VALUE.equals(aggregationType)));
     } else {
       path = new NonAlignedFullPath(Factory.DEFAULT_FACTORY.create(device), 
measurementSchema);
       operator =
@@ -768,7 +769,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
               DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
               !TSDataType.BLOB.equals(dataType)
                   || (!TAggregationType.LAST_VALUE.equals(aggregationType)
-                      && 
!TAggregationType.FIRST_VALUE.equals(aggregationType)));
+                  && !TAggregationType.FIRST_VALUE.equals(aggregationType)));
     }
 
     try {
@@ -1213,7 +1214,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
   public TSStatus closeSession(TSCloseSessionReq req) {
     return new TSStatus(
         !SESSION_MANAGER.closeSession(
-                SESSION_MANAGER.getCurrSession(), 
COORDINATOR::cleanupQueryExecution)
+            SESSION_MANAGER.getCurrSession(), 
COORDINATOR::cleanupQueryExecution)
             ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN)
             : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
   }
@@ -2069,15 +2070,22 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
-      ExecutionResult result =
-          COORDINATOR.executeForTreeModel(
-              statement,
-              queryId,
-              SESSION_MANAGER.getSessionInfo(clientSession),
-              "",
-              partitionFetcher,
-              schemaFetcher);
-
+      ExecutionResult result;
+      if (statement.isWriteToTable()) {
+        result = 
COORDINATOR.executeForTableModel(statement.toRelationalStatement(),
+            relationSqlParser, clientSession,
+            queryId,
+            SESSION_MANAGER.getSessionInfo(clientSession), "", metadata,
+            config.getConnectionTimeoutInMS());
+      } else {
+        result = COORDINATOR.executeForTreeModel(
+            statement,
+            queryId,
+            SESSION_MANAGER.getSessionInfo(clientSession),
+            "",
+            partitionFetcher,
+            schemaFetcher);
+      }
       return result.status;
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_TABLET, 
e.getErrorCode());
@@ -2745,7 +2753,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         "Log in failed. Either you are not authorized or the session has timed 
out.");
   }
 
-  /** Add stat of whole stage query into metrics */
+  /**
+   * Add stat of whole stage query into metrics
+   */
   private void addQueryLatency(StatementType statementType, long 
costTimeInNanos) {
     if (statementType == null) {
       return;
@@ -2763,7 +2773,9 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
             statementType.name());
   }
 
-  /** Add stat of operation into metrics */
+  /**
+   * Add stat of operation into metrics
+   */
   private void addStatementExecutionLatency(
       OperationType operation, String statementType, long costTime) {
     if (statementType == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 1b42abe62d8..0ace106a035 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -293,6 +293,7 @@ public class Coordinator {
     return new QueryExecution(relationalModelPlanner, queryContext, executor);
   }
 
+
   public IQueryExecution getQueryExecution(Long queryId) {
     return queryExecutionMap.get(queryId);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index b0385c8e642..43a117002c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -336,6 +336,7 @@ public class StatementGenerator {
     insertStatement.setDataTypes(dataTypes);
     insertStatement.setAligned(insertTabletReq.isAligned);
     PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - 
startTime);
+    insertStatement.setWriteToTable(insertTabletReq.isWriteToTable());
     return insertStatement;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
new file mode 100644
index 00000000000..2b858bdb8a7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.tree;
+
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+public class InsertTablet extends WrappedStatement {
+
+  public InsertTablet(InsertTabletStatement insertTabletStatement) {
+    super(insertTabletStatement);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
new file mode 100644
index 00000000000..62c7a7dee77
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.tree;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public abstract class WrappedStatement extends Statement {
+  private final org.apache.iotdb.db.queryengine.plan.statement.Statement 
innerTreeStatement;
+
+  public 
WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement 
innerTreeStatement) {
+    super(null);
+    this.innerTreeStatement = innerTreeStatement;
+  }
+
+  @Override
+  public List<? extends Node> getChildren() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public int hashCode() {
+    return innerTreeStatement.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WrappedStatement that = (WrappedStatement) o;
+    return Objects.equals(innerTreeStatement, that.innerTreeStatement);
+  }
+
+  @Override
+  public String toString() {
+    return innerTreeStatement.toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
index f74d210c667..a19c61439c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
@@ -66,4 +66,8 @@ public abstract class Statement extends StatementNode {
         AuthorityChecker.SUPER_USER.equals(userName),
         "Only the admin user can perform this operation");
   }
+
+  public org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement 
toRelationalStatement() {
+    throw new UnsupportedOperationException("Method not implemented yet");
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index a5dd5fbf511..35a0a6f18f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -67,6 +67,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
    * views.
    */
   private boolean[] measurementIsAligned;
+  private boolean writeToTable = false;
 
   public InsertTabletStatement() {
     super();
@@ -400,4 +401,12 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
     return new Pair<>(
         this.recordedBeginOfLogicalViewSchemaList, 
this.recordedEndOfLogicalViewSchemaList);
   }
+
+  public boolean isWriteToTable() {
+    return writeToTable;
+  }
+
+  public void setWriteToTable(boolean writeToTable) {
+    this.writeToTable = writeToTable;
+  }
 }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index a99a8e599c9..2838cd82128 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -233,6 +233,7 @@ struct TSInsertTabletReq {
   6: required list<i32> types
   7: required i32 size
   8: optional bool isAligned
+  9: optional bool writeToTable
 }
 
 struct TSInsertTabletsReq {

Reply via email to