This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new ce1c5683233 add insertRow
ce1c5683233 is described below

commit ce1c5683233c76b346c1976bb5c8f4161bb0a681
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jul 4 19:47:18 2024 +0800

    add insertRow
---
 .../java/org/apache/iotdb/isession/ISession.java   |   8 ++
 .../java/org/apache/iotdb/session/Session.java     |  30 +++++
 .../dataregion/DataExecutionVisitor.java           |   5 +
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  30 +++--
 .../plan/parser/StatementGenerator.java            |   1 +
 .../plan/planner/plan/node/PlanVisitor.java        |   6 +
 .../plan/node/write/RelationalInsertRowNode.java   | 135 +++++++++++++++++++++
 .../plan/relational/planner/RelationPlanner.java   |  20 ++-
 .../plan/relational/sql/ast/AstVisitor.java        |   4 +
 .../plan/relational/sql/ast/InsertRow.java         |  92 ++++++++++++++
 .../plan/statement/crud/InsertRowStatement.java    |  34 ++++++
 .../thrift-datanode/src/main/thrift/client.thrift  |   2 +
 12 files changed, 356 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 7b07015f374..b2fa84dabf5 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -212,6 +212,14 @@ public interface ISession extends AutoCloseable {
       Object... values)
       throws IoTDBConnectionException, StatementExecutionException;
 
+  void insertRelationalRecord(
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      Object... values)
+      throws IoTDBConnectionException, StatementExecutionException;
+
   void insertRecord(
       String deviceId,
       long time,
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 47b058ab673..d1f62fdef6c 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1186,6 +1186,36 @@ public class Session implements ISession {
     insertRecord(deviceId, request);
   }
 
+  /**
+   * insert data in one row to the table model, if you want to improve your 
performance, please use insertRecords
+   * method or insertTablet method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  @Override
+  public void insertRelationalRecord(
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      Object... values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertRecordReq request;
+    try {
+      request =
+          filterAndGenTSInsertRecordReq(
+              deviceId, time, measurements, types, Arrays.asList(values), 
false);
+      request.setIsWriteToTable(true);
+    } catch (NoValidValueException e) {
+      logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
+      return;
+    }
+
+    insertRecord(deviceId, request);
+  }
+
+
   private void insertRecord(String prefixPath, TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 73c83918f66..78d42986077 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -57,6 +58,10 @@ public class DataExecutionVisitor extends 
PlanVisitor<TSStatus, DataRegion> {
     return null;
   }
 
+  public TSStatus visitRelationalInsertRow(RelationalInsertRowNode node, 
DataRegion context) {
+    return visitInsertRow(node, context);
+  }
+
   @Override
   public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index dbd2bfd1609..766fe4190ec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -1979,15 +1979,27 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
-      ExecutionResult result =
-          COORDINATOR.executeForTreeModel(
-              statement,
-              queryId,
-              SESSION_MANAGER.getSessionInfo(clientSession),
-              "",
-              partitionFetcher,
-              schemaFetcher);
-
+      ExecutionResult result;
+      if (statement.isWriteToTable()) {
+        result = COORDINATOR.executeForTableModel(
+            statement,
+            relationSqlParser,
+            clientSession,
+            queryId,
+            SESSION_MANAGER.getSessionInfo(clientSession),
+            "",
+            metadata,
+            config.getConnectionTimeoutInMS()
+            );
+      } else {
+        result = COORDINATOR.executeForTreeModel(
+            statement,
+            queryId,
+            SESSION_MANAGER.getSessionInfo(clientSession),
+            "",
+            partitionFetcher,
+            schemaFetcher);
+      }
       return result.status;
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_RECORD, 
e.getErrorCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index ed1cfc5b1bf..9f300e520ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -284,6 +284,7 @@ public class StatementGenerator {
     
insertStatement.setMeasurements(insertRecordReq.getMeasurements().toArray(new 
String[0]));
     insertStatement.setAligned(insertRecordReq.isAligned);
     insertStatement.fillValues(insertRecordReq.values);
+    insertStatement.setWriteToTable(insertRecordReq.isIsWriteToTable());
     PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - 
startTime);
     return insertStatement;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 6b110606954..289c13d7615 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -116,6 +116,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -511,6 +512,11 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
+  public R visitRelationalInsertRow(RelationalInsertRowNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+
   public R visitInsertTablet(InsertTabletNode node, C context) {
     return visitPlan(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
new file mode 100644
index 00000000000..79910ceea8c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.IntToLongFunction;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.query.OutOfTTLException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+public class RelationalInsertRowNode extends InsertRowNode {
+  // deviceId cache for Table-view insertion
+  private IDeviceID deviceID;
+
+  public RelationalInsertRowNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      long time,
+      Object[] values,
+      boolean isNeedInferType,
+      TsTableColumnCategory[] columnCategories) {
+    super(
+        id,
+        devicePath,
+        isAligned,
+        measurements,
+        dataTypes,
+        time,
+        values,
+        isNeedInferType);
+    setColumnCategories(columnCategories);
+  }
+
+  public RelationalInsertRowNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public IDeviceID getDeviceID() {
+    if (deviceID == null) {
+      String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+      deviceIdSegments[0] = this.getTableName();
+      for (int i = 0; i < idColumnIndices.size(); i++) {
+        final Integer columnIndex = idColumnIndices.get(i);
+        deviceIdSegments[i + 1] = getValues()[columnIndex].toString();
+      }
+      deviceID = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+    }
+
+    return deviceID;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitRelationalInsertRow(this, context);
+  }
+
+  public static RelationalInsertRowNode deserialize(ByteBuffer byteBuffer) {
+    RelationalInsertRowNode insertNode = new RelationalInsertRowNode(new 
PlanNodeId(""));
+    insertNode.subDeserialize(byteBuffer);
+    insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+    return insertNode;
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    super.serializeAttributes(byteBuffer);
+    for (int i = 0; i < dataTypes.length; i++) {
+      columnCategories[i].serialize(byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    super.serializeAttributes(stream);
+    for (int i = 0; i < dataTypes.length; i++) {
+      columnCategories[i].serialize(stream);
+    }
+  }
+
+  public void subDeserialize(ByteBuffer buffer) {
+    super.subDeserialize(buffer);
+    columnCategories = new TsTableColumnCategory[dataTypes.length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
+    }
+  }
+
+  @Override
+  public int serializedSize() {
+    return super.serializedSize() + columnCategories.length * Byte.BYTES;
+  }
+
+  @Override
+  public PlanNodeType getType() {
+    return PlanNodeType.RELATIONAL_INSERT_TABLET;
+  }
+
+  public String getTableName() {
+    return devicePath.getFullPath();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index fcd69c179f6..8c40135fd56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -16,6 +16,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
@@ -27,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
@@ -39,6 +41,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Union;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 
 import com.google.common.collect.ImmutableList;
@@ -52,6 +55,7 @@ import java.util.Map;
 import static java.util.Objects.requireNonNull;
 
 public class RelationPlanner extends AstVisitor<RelationPlan, Void> {
+
   private final Analysis analysis;
   private final SymbolAllocator symbolAllocator;
   private final MPPQueryContext queryContext;
@@ -82,7 +86,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, 
Void> {
   @Override
   protected RelationPlan visitQuery(Query node, Void context) {
     return new QueryPlanner(
-            analysis, symbolAllocator, queryContext, sessionInfo, 
recursiveSubqueries)
+        analysis, symbolAllocator, queryContext, sessionInfo, 
recursiveSubqueries)
         .plan(node);
   }
 
@@ -137,7 +141,7 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
   @Override
   protected RelationPlan visitQuerySpecification(QuerySpecification node, Void 
context) {
     return new QueryPlanner(
-            analysis, symbolAllocator, queryContext, sessionInfo, 
recursiveSubqueries)
+        analysis, symbolAllocator, queryContext, sessionInfo, 
recursiveSubqueries)
         .plan(node);
   }
 
@@ -208,4 +212,16 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
     return new RelationPlan(insertNode, analysis.getRootScope(), 
Collections.emptyList());
   }
+
+  protected RelationPlan visitInsertRow(InsertRow node, Void context) {
+    InsertRowStatement insertRowStatement = node.getInnerTreeStatement();
+    RelationalInsertRowNode insertNode =
+        new RelationalInsertRowNode(idAllocator.genPlanNodeId(), 
insertRowStatement.getDevicePath(),
+            insertRowStatement.isAligned(),
+            insertRowStatement.getMeasurements(), 
insertRowStatement.getDataTypes(),
+            insertRowStatement.getTime(), insertRowStatement.getValues(),
+            insertRowStatement.isNeedInferType(), 
insertRowStatement.getColumnCategories());
+    
insertNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+    return new RelationPlan(insertNode, analysis.getRootScope(), 
Collections.emptyList());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 94479a0da72..55c074545bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -375,6 +375,10 @@ public abstract class AstVisitor<R, C> {
     return visitStatement(node, context);
   }
 
+  protected R visitInsertRow(InsertRow node, C context) {
+    return visitStatement(node, context);
+  }
+
   protected R visitDelete(Delete node, C context) {
     return visitStatement(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRow.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRow.java
new file mode 100644
index 00000000000..2d8ecc3d4e8
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRow.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+public class InsertRow extends WrappedInsertStatement {
+
+  public InsertRow(InsertRowStatement insertRowStatement, MPPQueryContext 
context) {
+    super(insertRowStatement, context);
+  }
+
+  @Override
+  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+    return visitor.visitInsertRow(this, context);
+  }
+
+  @Override
+  public InsertRowStatement getInnerTreeStatement() {
+    return ((InsertRowStatement) super.getInnerTreeStatement());
+  }
+
+  @Override
+  public void updateAfterSchemaValidation(MPPQueryContext context) throws 
QueryProcessException {
+    getInnerTreeStatement().updateAfterSchemaValidation(context);
+  }
+
+  @Override
+  public String getDatabase() {
+    return context.getSession().getDatabaseName().orElse(null);
+  }
+
+  @Override
+  public String getTableName() {
+    return getInnerTreeStatement().getDevicePath().getFullPath();
+  }
+
+  @Override
+  public List<Object[]> getDeviceIdList() {
+    final InsertRowStatement insertRowStatement = getInnerTreeStatement();
+    return 
Collections.singletonList(insertRowStatement.getTableDeviceID().getSegments());
+  }
+
+  @Override
+  public List<String> getAttributeColumnNameList() {
+    final InsertRowStatement insertRowStatement = getInnerTreeStatement();
+    List<String> result = new ArrayList<>();
+    for (int i = 0; i < insertRowStatement.getColumnCategories().length; i++) {
+      if (insertRowStatement.getColumnCategories()[i] == 
TsTableColumnCategory.ATTRIBUTE) {
+        result.add(insertRowStatement.getMeasurements()[i]);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public List<Object[]> getAttributeValueList() {
+    final InsertRowStatement insertRowStatement = getInnerTreeStatement();
+    final List<Integer> attrColumnIndices = 
insertRowStatement.getAttrColumnIndices();
+    Object[] attrValues = new Object[attrColumnIndices.size()];
+    for (int j = 0; j < attrColumnIndices.size(); j++) {
+      final int columnIndex = attrColumnIndices.get(j);
+      attrValues[j] = insertRowStatement.getValues()[columnIndex];
+    }
+    return Collections.singletonList(attrValues);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index a5b864e30f4..29579f8e11e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -32,12 +32,16 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Pair;
@@ -72,6 +76,9 @@ public class InsertRowStatement extends InsertBaseStatement 
implements ISchemaVa
    */
   private boolean[] measurementIsAligned;
 
+  private boolean isWriteToTable = false;
+  private IDeviceID deviceID;
+
   public InsertRowStatement() {
     super();
     statementType = StatementType.INSERT;
@@ -444,4 +451,31 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
     return new Pair<>(
         this.recordedBeginOfLogicalViewSchemaList, 
this.recordedEndOfLogicalViewSchemaList);
   }
+
+  public boolean isWriteToTable() {
+    return isWriteToTable;
+  }
+
+  public void setWriteToTable(boolean writeToTable) {
+    isWriteToTable = writeToTable;
+  }
+
+  public IDeviceID getTableDeviceID() {
+    if (deviceID == null) {
+      String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
+      deviceIdSegments[0] = this.devicePath.getFullPath();
+      for (int i = 0; i < getIdColumnIndices().size(); i++) {
+        final Integer columnIndex = getIdColumnIndices().get(i);
+        deviceIdSegments[i + 1] =  values[columnIndex].toString();
+      }
+      deviceID = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+    }
+
+    return deviceID;
+  }
+
+  @Override
+  public Statement toRelationalStatement(MPPQueryContext context) {
+    return new InsertRow(this, context);
+  }
 }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 4e4ffffc2cf..8c26e738bc9 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -212,6 +212,8 @@ struct TSInsertRecordReq {
   4: required binary values
   5: required i64 timestamp
   6: optional bool isAligned
+  7: optional bool isWriteToTable
+  8: optional list<byte> columnCategoryies
 }
 
 struct TSInsertStringRecordReq {

Reply via email to