This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new ce1c5683233 add insertRow
ce1c5683233 is described below
commit ce1c5683233c76b346c1976bb5c8f4161bb0a681
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jul 4 19:47:18 2024 +0800
add insertRow
---
.../java/org/apache/iotdb/isession/ISession.java | 8 ++
.../java/org/apache/iotdb/session/Session.java | 30 +++++
.../dataregion/DataExecutionVisitor.java | 5 +
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 30 +++--
.../plan/parser/StatementGenerator.java | 1 +
.../plan/planner/plan/node/PlanVisitor.java | 6 +
.../plan/node/write/RelationalInsertRowNode.java | 135 +++++++++++++++++++++
.../plan/relational/planner/RelationPlanner.java | 20 ++-
.../plan/relational/sql/ast/AstVisitor.java | 4 +
.../plan/relational/sql/ast/InsertRow.java | 92 ++++++++++++++
.../plan/statement/crud/InsertRowStatement.java | 34 ++++++
.../thrift-datanode/src/main/thrift/client.thrift | 2 +
12 files changed, 356 insertions(+), 11 deletions(-)
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 7b07015f374..b2fa84dabf5 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -212,6 +212,14 @@ public interface ISession extends AutoCloseable {
Object... values)
throws IoTDBConnectionException, StatementExecutionException;
+ void insertRelationalRecord(
+ String deviceId,
+ long time,
+ List<String> measurements,
+ List<TSDataType> types,
+ Object... values)
+ throws IoTDBConnectionException, StatementExecutionException;
+
void insertRecord(
String deviceId,
long time,
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 47b058ab673..d1f62fdef6c 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -1186,6 +1186,36 @@ public class Session implements ISession {
insertRecord(deviceId, request);
}
+ /**
+ * insert data in one row to the table model, if you want to improve your
performance, please use insertRecords
+ * method or insertTablet method
+ *
+ * @see Session#insertRecords(List, List, List, List, List)
+ * @see Session#insertTablet(Tablet)
+ */
+ @Override
+ public void insertRelationalRecord(
+ String deviceId,
+ long time,
+ List<String> measurements,
+ List<TSDataType> types,
+ Object... values)
+ throws IoTDBConnectionException, StatementExecutionException {
+ TSInsertRecordReq request;
+ try {
+ request =
+ filterAndGenTSInsertRecordReq(
+ deviceId, time, measurements, types, Arrays.asList(values),
false);
+ request.setIsWriteToTable(true);
+ } catch (NoValidValueException e) {
+ logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
+ return;
+ }
+
+ insertRecord(deviceId, request);
+ }
+
+
private void insertRecord(String prefixPath, TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 73c83918f66..78d42986077 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.rpc.RpcUtils;
@@ -57,6 +58,10 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
return null;
}
+ public TSStatus visitRelationalInsertRow(RelationalInsertRowNode node,
DataRegion context) {
+ return visitInsertRow(node, context);
+ }
+
@Override
public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index dbd2bfd1609..766fe4190ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -1979,15 +1979,27 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
- ExecutionResult result =
- COORDINATOR.executeForTreeModel(
- statement,
- queryId,
- SESSION_MANAGER.getSessionInfo(clientSession),
- "",
- partitionFetcher,
- schemaFetcher);
-
+ ExecutionResult result;
+ if (statement.isWriteToTable()) {
+ result = COORDINATOR.executeForTableModel(
+ statement,
+ relationSqlParser,
+ clientSession,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ "",
+ metadata,
+ config.getConnectionTimeoutInMS()
+ );
+ } else {
+ result = COORDINATOR.executeForTreeModel(
+ statement,
+ queryId,
+ SESSION_MANAGER.getSessionInfo(clientSession),
+ "",
+ partitionFetcher,
+ schemaFetcher);
+ }
return result.status;
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORD,
e.getErrorCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index ed1cfc5b1bf..9f300e520ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -284,6 +284,7 @@ public class StatementGenerator {
insertStatement.setMeasurements(insertRecordReq.getMeasurements().toArray(new
String[0]));
insertStatement.setAligned(insertRecordReq.isAligned);
insertStatement.fillValues(insertRecordReq.values);
+ insertStatement.setWriteToTable(insertRecordReq.isIsWriteToTable());
PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() -
startTime);
return insertStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 6b110606954..289c13d7615 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -116,6 +116,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CreateTableDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -511,6 +512,11 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitRelationalInsertRow(RelationalInsertRowNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+
public R visitInsertTablet(InsertTabletNode node, C context) {
return visitPlan(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
new file mode 100644
index 00000000000..79910ceea8c
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.IntToLongFunction;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.query.OutOfTTLException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+public class RelationalInsertRowNode extends InsertRowNode {
+ // deviceId cache for Table-view insertion
+ private IDeviceID deviceID;
+
+ public RelationalInsertRowNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ long time,
+ Object[] values,
+ boolean isNeedInferType,
+ TsTableColumnCategory[] columnCategories) {
+ super(
+ id,
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ time,
+ values,
+ isNeedInferType);
+ setColumnCategories(columnCategories);
+ }
+
+ public RelationalInsertRowNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public IDeviceID getDeviceID() {
+ if (deviceID == null) {
+ String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ deviceIdSegments[0] = this.getTableName();
+ for (int i = 0; i < idColumnIndices.size(); i++) {
+ final Integer columnIndex = idColumnIndices.get(i);
+ deviceIdSegments[i + 1] = getValues()[columnIndex].toString();
+ }
+ deviceID = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+ }
+
+ return deviceID;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRelationalInsertRow(this, context);
+ }
+
+ public static RelationalInsertRowNode deserialize(ByteBuffer byteBuffer) {
+ RelationalInsertRowNode insertNode = new RelationalInsertRowNode(new
PlanNodeId(""));
+ insertNode.subDeserialize(byteBuffer);
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ return insertNode;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ super.serializeAttributes(byteBuffer);
+ for (int i = 0; i < dataTypes.length; i++) {
+ columnCategories[i].serialize(byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ super.serializeAttributes(stream);
+ for (int i = 0; i < dataTypes.length; i++) {
+ columnCategories[i].serialize(stream);
+ }
+ }
+
+ public void subDeserialize(ByteBuffer buffer) {
+ super.subDeserialize(buffer);
+ columnCategories = new TsTableColumnCategory[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
+ }
+ }
+
+ @Override
+ public int serializedSize() {
+ return super.serializedSize() + columnCategories.length * Byte.BYTES;
+ }
+
+ @Override
+ public PlanNodeType getType() {
+ return PlanNodeType.RELATIONAL_INSERT_TABLET;
+ }
+
+ public String getTableName() {
+ return devicePath.getFullPath();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index fcd69c179f6..8c40135fd56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -16,6 +16,7 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
@@ -27,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
@@ -39,6 +41,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Union;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import com.google.common.collect.ImmutableList;
@@ -52,6 +55,7 @@ import java.util.Map;
import static java.util.Objects.requireNonNull;
public class RelationPlanner extends AstVisitor<RelationPlan, Void> {
+
private final Analysis analysis;
private final SymbolAllocator symbolAllocator;
private final MPPQueryContext queryContext;
@@ -82,7 +86,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan,
Void> {
@Override
protected RelationPlan visitQuery(Query node, Void context) {
return new QueryPlanner(
- analysis, symbolAllocator, queryContext, sessionInfo,
recursiveSubqueries)
+ analysis, symbolAllocator, queryContext, sessionInfo,
recursiveSubqueries)
.plan(node);
}
@@ -137,7 +141,7 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
@Override
protected RelationPlan visitQuerySpecification(QuerySpecification node, Void
context) {
return new QueryPlanner(
- analysis, symbolAllocator, queryContext, sessionInfo,
recursiveSubqueries)
+ analysis, symbolAllocator, queryContext, sessionInfo,
recursiveSubqueries)
.plan(node);
}
@@ -208,4 +212,16 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
return new RelationPlan(insertNode, analysis.getRootScope(),
Collections.emptyList());
}
+
+ protected RelationPlan visitInsertRow(InsertRow node, Void context) {
+ InsertRowStatement insertRowStatement = node.getInnerTreeStatement();
+ RelationalInsertRowNode insertNode =
+ new RelationalInsertRowNode(idAllocator.genPlanNodeId(),
insertRowStatement.getDevicePath(),
+ insertRowStatement.isAligned(),
+ insertRowStatement.getMeasurements(),
insertRowStatement.getDataTypes(),
+ insertRowStatement.getTime(), insertRowStatement.getValues(),
+ insertRowStatement.isNeedInferType(),
insertRowStatement.getColumnCategories());
+
insertNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
+ return new RelationPlan(insertNode, analysis.getRootScope(),
Collections.emptyList());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index 94479a0da72..55c074545bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -375,6 +375,10 @@ public abstract class AstVisitor<R, C> {
return visitStatement(node, context);
}
+ protected R visitInsertRow(InsertRow node, C context) {
+ return visitStatement(node, context);
+ }
+
protected R visitDelete(Delete node, C context) {
return visitStatement(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRow.java
new file mode 100644
index 00000000000..2d8ecc3d4e8
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRow.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.tsfile.file.metadata.IDeviceID;
+
+public class InsertRow extends WrappedInsertStatement {
+
+ public InsertRow(InsertRowStatement insertRowStatement, MPPQueryContext
context) {
+ super(insertRowStatement, context);
+ }
+
+ @Override
+ public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertRow(this, context);
+ }
+
+ @Override
+ public InsertRowStatement getInnerTreeStatement() {
+ return ((InsertRowStatement) super.getInnerTreeStatement());
+ }
+
+ @Override
+ public void updateAfterSchemaValidation(MPPQueryContext context) throws
QueryProcessException {
+ getInnerTreeStatement().updateAfterSchemaValidation(context);
+ }
+
+ @Override
+ public String getDatabase() {
+ return context.getSession().getDatabaseName().orElse(null);
+ }
+
+ @Override
+ public String getTableName() {
+ return getInnerTreeStatement().getDevicePath().getFullPath();
+ }
+
+ @Override
+ public List<Object[]> getDeviceIdList() {
+ final InsertRowStatement insertRowStatement = getInnerTreeStatement();
+ return
Collections.singletonList(insertRowStatement.getTableDeviceID().getSegments());
+ }
+
+ @Override
+ public List<String> getAttributeColumnNameList() {
+ final InsertRowStatement insertRowStatement = getInnerTreeStatement();
+ List<String> result = new ArrayList<>();
+ for (int i = 0; i < insertRowStatement.getColumnCategories().length; i++) {
+ if (insertRowStatement.getColumnCategories()[i] ==
TsTableColumnCategory.ATTRIBUTE) {
+ result.add(insertRowStatement.getMeasurements()[i]);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public List<Object[]> getAttributeValueList() {
+ final InsertRowStatement insertRowStatement = getInnerTreeStatement();
+ final List<Integer> attrColumnIndices =
insertRowStatement.getAttrColumnIndices();
+ Object[] attrValues = new Object[attrColumnIndices.size()];
+ for (int j = 0; j < attrColumnIndices.size(); j++) {
+ final int columnIndex = attrColumnIndices.get(j);
+ attrValues[j] = insertRowStatement.getValues()[columnIndex];
+ }
+ return Collections.singletonList(attrValues);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index a5b864e30f4..29579f8e11e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -32,12 +32,16 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
@@ -72,6 +76,9 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
*/
private boolean[] measurementIsAligned;
+ private boolean isWriteToTable = false;
+ private IDeviceID deviceID;
+
public InsertRowStatement() {
super();
statementType = StatementType.INSERT;
@@ -444,4 +451,31 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
return new Pair<>(
this.recordedBeginOfLogicalViewSchemaList,
this.recordedEndOfLogicalViewSchemaList);
}
+
+ public boolean isWriteToTable() {
+ return isWriteToTable;
+ }
+
+ public void setWriteToTable(boolean writeToTable) {
+ isWriteToTable = writeToTable;
+ }
+
+ public IDeviceID getTableDeviceID() {
+ if (deviceID == null) {
+ String[] deviceIdSegments = new String[getIdColumnIndices().size() + 1];
+ deviceIdSegments[0] = this.devicePath.getFullPath();
+ for (int i = 0; i < getIdColumnIndices().size(); i++) {
+ final Integer columnIndex = getIdColumnIndices().get(i);
+ deviceIdSegments[i + 1] = values[columnIndex].toString();
+ }
+ deviceID = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+ }
+
+ return deviceID;
+ }
+
+ @Override
+ public Statement toRelationalStatement(MPPQueryContext context) {
+ return new InsertRow(this, context);
+ }
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 4e4ffffc2cf..8c26e738bc9 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -212,6 +212,8 @@ struct TSInsertRecordReq {
4: required binary values
5: required i64 timestamp
6: optional bool isAligned
+ 7: optional bool isWriteToTable
+ 8: optional list<byte> columnCategoryies
}
struct TSInsertStringRecordReq {