This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 2d58586b975 add RelationalInsertTabletNode
2d58586b975 is described below
commit 2d58586b9754a847f88947a798f388100cbbc443
Author: jt2594838 <[email protected]>
AuthorDate: Thu Jun 20 12:38:33 2024 +0800
add RelationalInsertTabletNode
---
.../dataregion/DataExecutionVisitor.java | 39 ++++-
.../dataregion/DataRegionStateMachine.java | 1 +
.../tablet/PipeInsertNodeTabletInsertionEvent.java | 1 +
.../visitor/PipePlanToStatementVisitor.java | 18 +--
.../execution/executor/RegionWriteExecutor.java | 7 +
.../plan/planner/LogicalPlanVisitor.java | 6 +-
.../distribution/WriteFragmentParallelPlanner.java | 2 +-
.../plan/planner/plan/node/PlanNodeType.java | 7 +-
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../plan/planner/plan/node/WritePlanNode.java | 36 +----
.../plan/node/load/LoadSingleTsFileNode.java | 2 +-
.../planner/plan/node/load/LoadTsFileNode.java | 2 +-
.../plan/node/load/LoadTsFilePieceNode.java | 2 +-
.../node/metedata/write/ActivateTemplateNode.java | 2 +-
.../node/metedata/write/AlterTimeSeriesNode.java | 2 +-
.../metedata/write/BatchActivateTemplateNode.java | 2 +-
.../write/CreateAlignedTimeSeriesNode.java | 2 +-
.../metedata/write/CreateMultiTimeSeriesNode.java | 2 +-
.../node/metedata/write/CreateTimeSeriesNode.java | 2 +-
.../write/InternalBatchActivateTemplateNode.java | 2 +-
.../write/InternalCreateMultiTimeSeriesNode.java | 2 +-
.../write/InternalCreateTimeSeriesNode.java | 2 +-
.../metedata/write/view/CreateLogicalViewNode.java | 2 +-
.../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 4 +-
.../plan/node/pipe/PipeEnrichedInsertNode.java | 4 +-
.../plan/node/pipe/PipeEnrichedWritePlanNode.java | 4 +-
.../planner/plan/node/write/DeleteDataNode.java | 4 +-
.../plan/node/write/InsertMultiTabletsNode.java | 4 +-
.../planner/plan/node/write/InsertRowNode.java | 4 +-
.../planner/plan/node/write/InsertRowsNode.java | 4 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 2 +-
.../planner/plan/node/write/InsertTabletNode.java | 172 +++++++--------------
.../node/write/RelationalInsertTabletNode.java | 161 +++++++++++++++++++
.../plan/relational/planner/RelationPlanner.java | 5 +-
.../distribute/TableDistributionPlanner.java | 2 +-
.../plan/statement/crud/InsertTabletStatement.java | 21 +++
.../db/storageengine/dataregion/DataRegion.java | 30 ++--
.../db/trigger/executor/TriggerFireVisitor.java | 8 +
.../plan/planner/node/load/LoadTsFileNodeTest.java | 4 +-
.../planner/node/write/WritePlanNodeSplitTest.java | 12 +-
.../storageengine/dataregion/DataRegionTest.java | 28 ++--
41 files changed, 383 insertions(+), 238 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 14b2229ed78..a46df8d1b5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -73,10 +74,46 @@ public class DataExecutionVisitor extends
PlanVisitor<TSStatus, DataRegion> {
}
}
+ @Override
+ public TSStatus visitRelationalInsertTablet(RelationalInsertTabletNode node,
DataRegion dataRegion) {
+ try {
+ dataRegion.insertRelationalTablet(node);
+ return StatusUtils.OK;
+ } catch (OutOfTTLException e) {
+ LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ } catch (WriteProcessRejectException e) {
+ LOGGER.warn("Reject in executing plan node: {}, caused by {}", node,
e.getMessage());
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ } catch (WriteProcessException e) {
+ LOGGER.error("Error in executing plan node: {}", node, e);
+ return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
+ } catch (BatchProcessException e) {
+ LOGGER.warn(
+ "Batch failure in executing a InsertTabletNode. device: {},
startTime: {}, measurements: {}, failing status: {}",
+ node.getDevicePath(),
+ node.getTimes()[0],
+ node.getMeasurements(),
+ e.getFailingStatus());
+ // For each error
+ TSStatus firstStatus = null;
+ for (TSStatus status : e.getFailingStatus()) {
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ firstStatus = status;
+ }
+ // Return WRITE_PROCESS_REJECT directly for the consensus retry logic
+ if (status.getCode() ==
TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ return status;
+ }
+ }
+ return firstStatus;
+ }
+ }
+
@Override
public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion
dataRegion) {
try {
- dataRegion.insertTablet(node);
+ dataRegion.insertTreeTablet(node);
return StatusUtils.OK;
} catch (OutOfTTLException e) {
LOGGER.warn("Error in executing plan node: {}, caused by {}", node,
e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 0f949fd4904..f04c61a640c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -186,6 +186,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
List<Integer> index = new ArrayList<>();
int i = 0;
switch (insertNodes.get(0).getType()) {
+ case RELATIONAL_INSERT_TABLET:
case INSERT_TABLET:
// merge to InsertMultiTabletsNode
List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index c4fd1a133c2..6e59dfa2652 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -278,6 +278,7 @@ public class PipeInsertNodeTabletInsertionEvent extends
EnrichedEvent
switch (node.getType()) {
case INSERT_ROW:
case INSERT_TABLET:
+ case RELATIONAL_INSERT_TABLET:
dataContainers.add(
new TabletInsertionDataContainer(pipeTaskMeta, this, node,
pipePattern));
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
index 593e3715c88..1e9055b8ed9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipePlanToStatementVisitor.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
@@ -86,19 +87,14 @@ public class PipePlanToStatementVisitor extends
PlanVisitor<Statement, Void> {
return statement;
}
+ @Override
+ public Statement visitRelationalInsertTablet(RelationalInsertTabletNode
node, Void context) {
+ return new InsertTabletStatement(node);
+ }
+
@Override
public InsertTabletStatement visitInsertTablet(final InsertTabletNode node,
final Void context) {
- final InsertTabletStatement statement = new InsertTabletStatement();
- statement.setDevicePath(node.getDevicePath());
- statement.setMeasurements(node.getMeasurements());
- statement.setTimes(node.getTimes());
- statement.setColumns(node.getColumns());
- statement.setBitMaps(node.getBitMaps());
- statement.setRowCount(node.getRowCount());
- statement.setDataTypes(node.getDataTypes());
- statement.setAligned(node.isAligned());
- statement.setMeasurementSchemas(node.getMeasurementSchemas());
- return statement;
+ return new InsertTabletStatement(node);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 41dc339c23d..83e41119cde 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -62,6 +62,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
@@ -208,6 +209,12 @@ public class RegionWriteExecutor {
return executeDataInsert(node, context);
}
+ @Override
+ public RegionExecutionResult
visitRelationalInsertTablet(RelationalInsertTabletNode node,
+ WritePlanNodeExecutionContext context) {
+ return executeDataInsert(node, context);
+ }
+
@Override
public RegionExecutionResult visitInsertRows(
InsertRowsNode node, WritePlanNodeExecutionContext context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index e95a44471e7..98f544c75f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -464,8 +464,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount(),
- insertTabletStatement.getColumnCategories());
+ insertTabletStatement.getRowCount());
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
return insertNode;
}
@@ -737,8 +736,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount(),
- insertTabletStatement.getColumnCategories());
+ insertTabletStatement.getRowCount());
insertTabletNode.setFailedMeasurementNumber(
insertTabletStatement.getFailedMeasurementNumber());
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, i);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 07535f6f6b1..511e3bc7466 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -45,7 +45,7 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
this.subPlan = subPlan;
this.analysis = analysis;
this.queryContext = queryContext;
- this.nodeSplitter = WritePlanNode::splitByTreePartition;
+ this.nodeSplitter = WritePlanNode::splitByPartition;
}
public WriteFragmentParallelPlanner(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 2d910f02c17..f536fa189a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -109,6 +109,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
@@ -233,7 +234,9 @@ public enum PlanNodeType {
TABLE_OFFSET_NODE((short) 1005),
TABLE_SORT_NODE((short) 1006),
TABLE_MERGESORT_NODE((short) 1007),
- TABLE_TOPK_NODE((short) 1008);
+ TABLE_TOPK_NODE((short) 1008),
+
+ RELATIONAL_INSERT_TABLET((short) 2000);
public static final int BYTES = Short.BYTES;
@@ -503,6 +506,8 @@ public enum PlanNodeType {
.deserialize(buffer);
case 1008:
return TopKNode.deserialize(buffer);
+ case 2000:
+ return RelationalInsertTabletNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 2e5a61b7a83..4591ff7afc7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -112,6 +112,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -496,6 +497,10 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitRelationalInsertTablet(RelationalInsertTabletNode node, C
context) {
+ return visitInsertTablet(node, context);
+ }
+
public R visitInsertRows(InsertRowsNode node, C context) {
return visitPlan(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
index 664a1dc5bf5..43cbb20a40a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
@@ -19,49 +19,15 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import java.util.List;
public abstract class WritePlanNode extends PlanNode implements
IPartitionRelatedNode {
- protected boolean writeToTable;
-
protected WritePlanNode(PlanNodeId id) {
super(id);
}
- public abstract List<WritePlanNode> splitByTreePartition(IAnalysis analysis);
-
- public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- @Override
- public void serialize(ByteBuffer byteBuffer) {
- super.serialize(byteBuffer);
- byteBuffer.put(writeToTable ? (byte) 1 : (byte) 0);
- }
-
- @Override
- public void serialize(DataOutputStream stream) throws IOException {
- super.serialize(stream);
- stream.writeBoolean(writeToTable);
- }
-
- public boolean isWriteToTable() {
- return writeToTable;
- }
-
- public void setWriteToTable(boolean writeToTable) {
- this.writeToTable = writeToTable;
- }
-
- public int serializedSize() {
- // write to table
- return 1;
- }
+ public abstract List<WritePlanNode> splitByPartition(IAnalysis analysis);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 0316412f635..3623e976007 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -188,7 +188,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
throw new NotImplementedException("split load single TsFile is not
implemented");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index f7f4b2ccbb6..be5de4cc819 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -89,7 +89,7 @@ public class LoadTsFileNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
List<WritePlanNode> res = new ArrayList<>();
LoadTsFileStatement statement =
((Analysis) analysis).getTreeStatement() instanceof
PipeEnrichedStatement
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index dc81b2e9d0f..c3c4c3fa2e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -147,7 +147,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
throw new NotImplementedException("split load piece TsFile is not
implemented");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
index 5276d1b2df1..6d4d704ac50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
@@ -161,7 +161,7 @@ public class ActivateTemplateNode extends WritePlanNode
implements IActivateTemp
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 410f1fa6336..05b3cb89b1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -373,7 +373,7 @@ public class AlterTimeSeriesNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getIDeviceID());
setRegionReplicaSet(regionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
index 15441601966..5b944718515 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
@@ -148,7 +148,7 @@ public class BatchActivateTemplateNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap
= new HashMap<>();
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
templateActivationMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 19eea48cf9b..0028b90f49d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -455,7 +455,7 @@ public class CreateAlignedTimeSeriesNode extends
WritePlanNode
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
index 56844c4a093..2c83be62330 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -229,7 +229,7 @@ public class CreateMultiTimeSeriesNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, MeasurementGroup>> splitMap = new
HashMap<>();
for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 00ad72b8271..676dcfc3e3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -386,7 +386,7 @@ public class CreateTimeSeriesNode extends WritePlanNode
implements ICreateTimeSe
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getIDeviceID());
setRegionReplicaSet(regionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
index ea2b68a11f8..c9e2f5e1729 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
@@ -141,7 +141,7 @@ public class InternalBatchActivateTemplateNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap
= new HashMap<>();
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
templateActivationMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
index 2a663a57a5b..e140b0fa528 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -143,7 +143,7 @@ public class InternalCreateMultiTimeSeriesNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>>
splitMap =
new HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
index 40195e0fde2..a07b1c4eb7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
@@ -136,7 +136,7 @@ public class InternalCreateTimeSeriesNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
index 4d7c545adcd..cc6210671fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
@@ -220,7 +220,7 @@ public class CreateLogicalViewNode extends WritePlanNode
implements ICreateLogic
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new
HashMap<>();
for (Map.Entry<PartialPath, ViewExpression> entry :
this.viewPathToSourceMap.entrySet()) {
// for each entry in the map for target path to source expression,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 3800d61ddff..57450a3cd06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -162,8 +162,8 @@ public class PipeEnrichedDeleteDataNode extends
DeleteDataNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
- return deleteDataNode.splitByTreePartition(analysis).stream()
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ return deleteDataNode.splitByPartition(analysis).stream()
.map(
plan ->
plan instanceof PipeEnrichedDeleteDataNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 02950cef32d..855fe79de5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -134,8 +134,8 @@ public class PipeEnrichedInsertNode extends InsertNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
- return insertNode.splitByTreePartition(analysis).stream()
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ return insertNode.splitByPartition(analysis).stream()
.map(
plan ->
plan instanceof PipeEnrichedInsertNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
index 85b28a91604..2e7f7eeb4ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
@@ -184,8 +184,8 @@ public class PipeEnrichedWritePlanNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
- return writePlanNode.splitByTreePartition(analysis).stream()
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ return writePlanNode.splitByPartition(analysis).stream()
.map(
plan ->
plan instanceof PipeEnrichedWritePlanNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 555132e8dcc..2b846146403 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -143,7 +143,7 @@ public class DeleteDataNode extends WritePlanNode
implements WALEntryValue {
@Override
public int serializedSize() {
- int size = super.serializedSize() + FIXED_SERIALIZED_SIZE;
+ int size = FIXED_SERIALIZED_SIZE;
for (PartialPath path : pathList) {
size += ReadWriteIOUtils.sizeToWrite(path.getFullPath());
}
@@ -282,7 +282,7 @@ public class DeleteDataNode extends WritePlanNode
implements WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
ISchemaTree schemaTree = ((Analysis) analysis).getSchemaTree();
DataPartition dataPartition = analysis.getDataPartitionInfo();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index bfbef67faff..430ab19c7c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -135,11 +135,11 @@ public class InsertMultiTabletsNode extends InsertNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertTabletNodeList.size(); i++) {
InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
- List<WritePlanNode> tmpResult =
insertTabletNode.splitByTreePartition(analysis);
+ List<WritePlanNode> tmpResult =
insertTabletNode.splitByPartition(analysis);
for (WritePlanNode subNode : tmpResult) {
TRegionReplicaSet dataRegionReplicaSet = ((InsertNode)
subNode).getDataRegionReplicaSet();
InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 1afb26ccbd6..1cdec3c1267 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -107,7 +107,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(time);
this.dataRegionReplicaSet =
analysis
@@ -504,7 +504,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
/** Serialized size for wal. */
@Override
public int serializedSize() {
- return super.serializedSize() + Short.BYTES + Long.BYTES +
subSerializeSize();
+ return Short.BYTES + Long.BYTES + subSerializeSize();
}
protected int subSerializeSize() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 08f67b8a4eb..2fc54f9d080 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -232,7 +232,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
List<TEndPoint> redirectInfo = new ArrayList<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
@@ -284,7 +284,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
/** Serialized size for wal. */
@Override
public int serializedSize() {
- return super.serializedSize() + Short.BYTES + Long.BYTES +
subSerializeSize();
+ return Short.BYTES + Long.BYTES + subSerializeSize();
}
private int subSerializeSize() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index f77823ec569..4bc93873736 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -154,7 +154,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
List<WritePlanNode> result = new ArrayList<>();
Map<TRegionReplicaSet, Map<TTimePartitionSlot, List<InsertRowNode>>>
splitMap = new HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index d0c9f9c5aa6..608baa1b366 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -70,12 +69,12 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
private static final String DATATYPE_UNSUPPORTED = "Data type %s is not
supported.";
- private long[] times; // times should be sorted. It is done in the session
API.
+ protected long[] times; // times should be sorted. It is done in the session
API.
- private BitMap[] bitMaps;
- private Object[] columns;
+ protected BitMap[] bitMaps;
+ protected Object[] columns;
- private int rowCount = 0;
+ protected int rowCount = 0;
// When this plan is sub-plan split from another InsertTabletNode, this
indicates the original
// positions of values in
@@ -86,10 +85,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
// of the parent plan.
// this is usually used to back-propagate exceptions to the parent plan
without losing their
// proper positions.
- private List<Integer> range;
-
- // deviceId cache for Table-view insertion
- private IDeviceID[] deviceIDs;
+ protected List<Integer> range;
public InsertTabletNode(PlanNodeId id) {
super(id);
@@ -113,7 +109,6 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
this.rowCount = rowCount;
}
- @TestOnly
public InsertTabletNode(
PlanNodeId id,
PartialPath devicePath,
@@ -125,24 +120,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
BitMap[] bitMaps,
Object[] columns,
int rowCount) {
- this(id, devicePath, isAligned, measurements, dataTypes,
measurementSchemas, times, bitMaps,
- columns, rowCount,
- null);
- }
-
- public InsertTabletNode(
- PlanNodeId id,
- PartialPath devicePath,
- boolean isAligned,
- String[] measurements,
- TSDataType[] dataTypes,
- MeasurementSchema[] measurementSchemas,
- long[] times,
- BitMap[] bitMaps,
- Object[] columns,
- int rowCount,
- TsTableColumnCategory[] columnCategories) {
- super(id, devicePath, isAligned, measurements, dataTypes,
columnCategories);
+ super(id, devicePath, isAligned, measurements, dataTypes);
this.measurementSchemas = measurementSchemas;
this.times = times;
this.bitMaps = bitMaps;
@@ -219,24 +197,29 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
- return splitByPartition(analysis, i -> deviceID);
- }
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ // only single device in single database
+ if (times.length == 0) {
+ return Collections.emptyList();
+ }
- public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
- return splitByPartition(analysis, this::getTableDeviceID);
+ final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap =
collectSplitRanges();
+ final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
+ deviceIDSplitInfoMap, analysis);
+
+ return doSplit(splitMap);
}
- private Map<IDeviceID, PartitionSplitInfo>
collectSplitRanges(IntFunction<IDeviceID> rowNumDeviceIdMapper) {
+ private Map<IDeviceID, PartitionSplitInfo> collectSplitRanges() {
long upperBoundOfTimePartition =
TimePartitionUtils.getTimePartitionUpperBound(times[0]);
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(times[0]);
int startLoc = 0; // included
- IDeviceID currDeviceId = rowNumDeviceIdMapper.apply(0);
+ IDeviceID currDeviceId = getDeviceID(0);
Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = new HashMap<>();
for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
- IDeviceID nextDeviceId = rowNumDeviceIdMapper.apply(i);
+ IDeviceID nextDeviceId = getDeviceID(i);
if (times[i] >= upperBoundOfTimePartition ||
!currDeviceId.equals(nextDeviceId)) {
final PartitionSplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
deviceID1 -> new PartitionSplitInfo());
@@ -262,7 +245,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return deviceIDSplitInfoMap;
}
- public Map<TRegionReplicaSet, List<Integer>>
splitByReplicaSet(Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap,
IAnalysis analysis) {
+ private Map<TRegionReplicaSet, List<Integer>>
splitByReplicaSet(Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap,
IAnalysis analysis) {
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
for (Entry<IDeviceID, PartitionSplitInfo> entry :
deviceIDSplitInfoMap.entrySet()) {
@@ -281,10 +264,10 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
.get(0)
.getClientRpcEndPoint());
for (int i = 0; i < replicaSets.size(); i++) {
- List<Integer> sub_ranges =
+ List<Integer> subRanges =
splitMap.computeIfAbsent(replicaSets.get(i), x -> new
ArrayList<>());
- sub_ranges.add(splitInfo.ranges.get(2 * i));
- sub_ranges.add(splitInfo.ranges.get(2 * i + 1));
+ subRanges.add(splitInfo.ranges.get(2 * i));
+ subRanges.add(splitInfo.ranges.get(2 * i + 1));
}
}
return splitMap;
@@ -310,6 +293,23 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return result;
}
+ protected InsertTabletNode getEmptySplit(int count) {
+ long[] subTimes = new long[count];
+ Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+ BitMap[] newBitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
+ return new InsertTabletNode(
+ getPlanNodeId(),
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ subTimes,
+ newBitMaps,
+ values,
+ subTimes.length);
+ }
+
private WritePlanNode generateOneSplit(Map.Entry<TRegionReplicaSet,
List<Integer>> entry) {
List<Integer> locs;
// generate a new times and values
@@ -321,22 +321,7 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
count += end - start;
}
- long[] subTimes = new long[count];
- Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
- BitMap[] newBitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
- InsertTabletNode subNode =
- new InsertTabletNode(
- getPlanNodeId(),
- devicePath,
- isAligned,
- measurements,
- dataTypes,
- measurementSchemas,
- subTimes,
- newBitMaps,
- values,
- subTimes.length,
- columnCategories);
+ final InsertTabletNode subNode = getEmptySplit(count);
int destLoc = 0;
for (int i = 0; i < locs.size(); i += 2) {
@@ -344,13 +329,13 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
int end = locs.get(i + 1);
final int length = end - start;
- System.arraycopy(times, start, subTimes, destLoc, length);
- for (int k = 0; k < values.length; k++) {
+ System.arraycopy(times, start, subNode.times, destLoc, length);
+ for (int k = 0; k < subNode.columns.length; k++) {
if (dataTypes[k] != null) {
- System.arraycopy(columns[k], start, values[k], destLoc, length);
+ System.arraycopy(columns[k], start, subNode.columns[k], destLoc,
length);
}
- if (newBitMaps != null && this.bitMaps[k] != null) {
- BitMap.copyOfRange(this.bitMaps[k], start, newBitMaps[k], destLoc,
length);
+ if (subNode.bitMaps != null && this.bitMaps[k] != null) {
+ BitMap.copyOfRange(this.bitMaps[k], start, subNode.bitMaps[k],
destLoc, length);
}
}
destLoc += length;
@@ -361,19 +346,6 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return subNode;
}
- public List<WritePlanNode> splitByPartition(IAnalysis analysis,
- IntFunction<IDeviceID> rowNumDeviceIdMapper) {
- // only single device in single database
- if (times.length == 0) {
- return Collections.emptyList();
- }
-
- final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap =
collectSplitRanges(rowNumDeviceIdMapper);
- final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
- deviceIDSplitInfoMap, analysis);
-
- return doSplit(splitMap);
- }
@TestOnly
public List<TTimePartitionSlot> getTimePartitionSlots() {
@@ -392,7 +364,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return result;
}
- private Object[] initTabletValues(int columnSize, int rowSize, TSDataType[]
dataTypes) {
+ protected Object[] initTabletValues(int columnSize, int rowSize,
TSDataType[] dataTypes) {
Object[] values = new Object[columnSize];
for (int i = 0; i < values.length; i++) {
if (dataTypes[i] != null) {
@@ -424,7 +396,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
return values;
}
- private BitMap[] initBitmaps(int columnSize, int rowSize) {
+ protected BitMap[] initBitmaps(int columnSize, int rowSize) {
BitMap[] bitMaps = new BitMap[columnSize];
for (int i = 0; i < columnSize; i++) {
bitMaps[i] = new BitMap(rowSize);
@@ -449,13 +421,13 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
- PlanNodeType.INSERT_TABLET.serialize(byteBuffer);
+ getType().serialize(byteBuffer);
subSerialize(byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
- PlanNodeType.INSERT_TABLET.serialize(stream);
+ getType().serialize(stream);
subSerialize(stream);
}
@@ -479,6 +451,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream);
}
+
/** Serialize measurements or measurement schemas, ignoring failed time
series */
private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(),
buffer);
@@ -714,9 +687,6 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
insertNode.subDeserialize(byteBuffer);
insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
- if (byteBuffer.hasRemaining()) {
- insertNode.writeToTable = byteBuffer.get() == 1;
- }
return insertNode;
}
@@ -766,7 +736,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
/** Serialized size for wal */
@Override
public int serializedSize() {
- return super.serializedSize() + serializedSize(0, rowCount);
+ return serializedSize(0, rowCount);
}
/** Serialized size for wal */
@@ -808,8 +778,10 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
size += getColumnSize(dataTypes[i], columns[i], start, end);
}
}
-
+ // isAlign
size += Byte.BYTES;
+ // column category
+
return size;
}
@@ -855,7 +827,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
}
public void serializeToWAL(IWALByteBufferView buffer, int start, int end) {
- buffer.putShort(PlanNodeType.INSERT_TABLET.getNodeType());
+ buffer.putShort(getType().getNodeType());
subSerialize(buffer, start, end);
}
@@ -1187,21 +1159,8 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return new TimeValuePair(times[lastIdx], value);
}
- public IDeviceID getTableDeviceID(int rowIdx) {
- if (deviceIDs == null) {
- deviceIDs = new IDeviceID[rowCount];
- }
- if (deviceIDs[rowIdx] == null) {
- String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
- deviceIdSegments[0] = this.devicePath.getFullPath();
- for (int i = 0; i < idColumnIndices.size(); i++) {
- final Integer columnIndex = idColumnIndices.get(i);
- deviceIdSegments[i + 1] = ((Binary[])
columns[columnIndex])[rowIdx].toString();
- }
- deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
- }
-
- return deviceIDs[rowIdx];
+ public IDeviceID getDeviceID(int rowIdx) {
+ return deviceID;
}
private static class PartitionSplitInfo {
@@ -1218,19 +1177,6 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
* @return each the number in the pair is the end offset of a device
*/
public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
- List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
- IDeviceID prevDeviceId = getTableDeviceID(start);
-
- int i = start + 1;
- for (; i < end; i++) {
- IDeviceID currentDeviceId = getTableDeviceID(i);
- if (!currentDeviceId.equals(prevDeviceId)) {
- result.add(new Pair<>(prevDeviceId, i));
- prevDeviceId = getTableDeviceID(i);
- }
- }
- result.add(new Pair<>(prevDeviceId, start));
-
- return result;
+ return Collections.singletonList(new Pair<>(deviceID, end));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
new file mode 100644
index 00000000000..c60499e97f3
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+public class RelationalInsertTabletNode extends InsertTabletNode {
+
+ // deviceId cache for Table-view insertion
+ private IDeviceID[] deviceIDs;
+
+ public RelationalInsertTabletNode(
+ PlanNodeId id,
+ PartialPath devicePath, boolean isAligned, String[] measurements,
+ TSDataType[] dataTypes,
+ MeasurementSchema[] measurementSchemas, long[] times,
+ BitMap[] bitMaps, Object[] columns, int rowCount,
+ TsTableColumnCategory[] columnCategories) {
+ super(id, devicePath, isAligned, measurements, dataTypes,
measurementSchemas, times, bitMaps,
+ columns, rowCount);
+ setColumnCategories(columnCategories);
+ }
+
+ public RelationalInsertTabletNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public IDeviceID getDeviceID(int rowIdx) {
+ if (deviceIDs == null) {
+ deviceIDs = new IDeviceID[rowCount];
+ }
+ if (deviceIDs[rowIdx] == null) {
+ String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ deviceIdSegments[0] = this.devicePath.getFullPath();
+ for (int i = 0; i < idColumnIndices.size(); i++) {
+ final Integer columnIndex = idColumnIndices.get(i);
+ deviceIdSegments[i + 1] = ((Binary[])
columns[columnIndex])[rowIdx].toString();
+ }
+ deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+ }
+
+ return deviceIDs[rowIdx];
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitRelationalInsertTablet(this, context);
+ }
+
+ protected InsertTabletNode getEmptySplit(int count) {
+ long[] subTimes = new long[count];
+ Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+ BitMap[] newBitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
+ return new RelationalInsertTabletNode(
+ getPlanNodeId(),
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ subTimes,
+ newBitMaps,
+ values,
+ subTimes.length,
+ columnCategories);
+ }
+
+ public static RelationalInsertTabletNode deserialize(ByteBuffer byteBuffer) {
+ RelationalInsertTabletNode insertNode = new RelationalInsertTabletNode(new
PlanNodeId(""));
+ insertNode.subDeserialize(byteBuffer);
+ insertNode.setPlanNodeId(PlanNodeId.deserialize(byteBuffer));
+ return insertNode;
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ super.serializeAttributes(byteBuffer);
+ for (int i = 0; i < dataTypes.length; i++) {
+ columnCategories[i].serialize(byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ super.serializeAttributes(stream);
+ for (int i = 0; i < dataTypes.length; i++) {
+ columnCategories[i].serialize(stream);
+ }
+ }
+
+ public void subDeserialize(ByteBuffer buffer) {
+ super.subDeserialize(buffer);
+ columnCategories = new TsTableColumnCategory[dataTypes.length];
+ for (int i = 0; i < dataTypes.length; i++) {
+ columnCategories[i] = TsTableColumnCategory.deserialize(buffer);
+ }
+ }
+
+ @Override
+ public int serializedSize() {
+ return super.serializedSize() + columnCategories.length * Byte.BYTES;
+ }
+
+ @Override
+ public PlanNodeType getType() {
+ return PlanNodeType.RELATIONAL_INSERT_TABLET;
+ }
+
+ public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
+ List<Pair<IDeviceID, Integer>> result = new ArrayList<>();
+ IDeviceID prevDeviceId = getDeviceID(start);
+
+ int i = start + 1;
+ for (; i < end; i++) {
+ IDeviceID currentDeviceId = getDeviceID(i);
+ if (!currentDeviceId.equals(prevDeviceId)) {
+ result.add(new Pair<>(prevDeviceId, i));
+ prevDeviceId = getDeviceID(i);
+ }
+ }
+ result.add(new Pair<>(prevDeviceId, start));
+
+ return result;
+ }
+}
+
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 6603a8cab08..4f8a76d3be0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -18,6 +18,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
@@ -183,8 +184,8 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
@Override
protected RelationPlan visitInsertTablet(InsertTablet node, Void context) {
final InsertTabletStatement insertTabletStatement =
node.getInnerTreeStatement();
- InsertTabletNode insertNode =
- new InsertTabletNode(
+ RelationalInsertTabletNode insertNode =
+ new RelationalInsertTabletNode(
idAllocator.genPlanNodeId(),
insertTabletStatement.getDevicePath(),
insertTabletStatement.isAligned(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 5efc6f33fab..ab561ef09df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -83,7 +83,7 @@ public class TableDistributionPlanner {
? new TableModelQueryFragmentPlanner(subPlan, analysis,
mppQueryContext).plan()
:
new WriteFragmentParallelPlanner(subPlan, analysis,
mppQueryContext,
- WritePlanNode::splitByTablePartition).parallelPlan();
+ WritePlanNode::splitByPartition).parallelPlan();
// Only execute this step for READ operation
if (mppQueryContext.getQueryType() == QueryType.READ) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 45b77109425..9173dbada89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
@@ -83,6 +85,25 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
this.recordedEndOfLogicalViewSchemaList = 0;
}
+ public InsertTabletStatement(InsertTabletNode node) {
+ this();
+ setDevicePath(node.getDevicePath());
+ setMeasurements(node.getMeasurements());
+ setTimes(node.getTimes());
+ setColumns(node.getColumns());
+ setBitMaps(node.getBitMaps());
+ setRowCount(node.getRowCount());
+ setDataTypes(node.getDataTypes());
+ setAligned(node.isAligned());
+ setMeasurementSchemas(node.getMeasurementSchemas());
+ }
+
+ public InsertTabletStatement(RelationalInsertTabletNode node) {
+ this(((InsertTabletNode) node));
+ setColumnCategories(node.getColumnCategories());
+ setWriteToTable(true);
+ }
+
public int getRowCount() {
return rowCount;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 688cf838a50..506e814e09f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -965,10 +965,16 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ /**
+ * Insert a tablet (rows belonging to the same devices) into this database.
+ *
+ * @throws BatchProcessException if some of the rows failed to be inserted
+ */
+ @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
public void insertTreeTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
final IDeviceID deviceID = insertTabletNode.getDeviceID();
- insertTablet(insertTabletNode, i -> deviceID, i ->
+ insertTablet(insertTabletNode, insertTabletNode::getDeviceID, i ->
config.isEnableSeparateData()
? lastFlushTimeMap.getFlushedTime(
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
@@ -978,13 +984,13 @@ public class DataRegion implements IDataRegionForQuery {
);
}
- public void insertTableTablet(InsertTabletNode insertTabletNode)
+ public void insertRelationalTablet(InsertTabletNode insertTabletNode)
throws BatchProcessException, WriteProcessException {
- insertTablet(insertTabletNode, insertTabletNode::getTableDeviceID, i ->
+ insertTablet(insertTabletNode, insertTabletNode::getDeviceID, i ->
config.isEnableSeparateData()
? lastFlushTimeMap.getFlushedTime(
TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[i]),
- insertTabletNode.getTableDeviceID(i))
+ insertTabletNode.getDeviceID(i))
: Long.MAX_VALUE,
true
);
@@ -1147,20 +1153,6 @@ public class DataRegion implements IDataRegionForQuery {
return firstAliveLoc;
}
- /**
- * Insert a tablet (rows belonging to the same devices) into this database.
- *
- * @throws BatchProcessException if some of the rows failed to be inserted
- */
- @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive
Complexity warning
- public void insertTablet(InsertTabletNode insertTabletNode)
- throws BatchProcessException, WriteProcessException {
- if (insertTabletNode.isWriteToTable()) {
- insertTableTablet(insertTabletNode);
- } else {
- insertTreeTablet(insertTabletNode);
- }
- }
/**
* Check whether the time falls in TTL.
@@ -3554,7 +3546,7 @@ public class DataRegion implements IDataRegionForQuery {
for (int i = 0; i <
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
InsertTabletNode insertTabletNode =
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
try {
- insertTablet(insertTabletNode);
+ insertTreeTablet(insertTabletNode);
} catch (WriteProcessException e) {
insertMultiTabletsNode
.getResults()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 9fc5e21b7a4..69e8987e7f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
@@ -133,6 +134,13 @@ public class TriggerFireVisitor extends
PlanVisitor<TriggerFireResult, TriggerEv
return hasFailedTrigger ? TriggerFireResult.FAILED_NO_TERMINATION :
TriggerFireResult.SUCCESS;
}
+ @Override
+ public TriggerFireResult
visitRelationalInsertTablet(RelationalInsertTabletNode node,
+ TriggerEvent context) {
+ // TODO-Table: add support
+ return visitInsertTablet(node, context);
+ }
+
@Override
public TriggerFireResult visitInsertTablet(InsertTabletNode node,
TriggerEvent context) {
// group Triggers and measurements
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
index 1896d854fc6..849d45c61c8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
@@ -52,7 +52,7 @@ public class LoadTsFileNodeTest {
} catch (NotImplementedException ignored) {
}
try {
- node.splitByTreePartition(new Analysis());
+ node.splitByPartition(new Analysis());
Assert.fail();
} catch (NotImplementedException ignored) {
}
@@ -75,7 +75,7 @@ public class LoadTsFileNodeTest {
} catch (NotImplementedException ignored) {
}
try {
- node.splitByTreePartition(new Analysis());
+ node.splitByPartition(new Analysis());
Assert.fail();
} catch (NotImplementedException ignored) {
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index faefd94b638..8b70093595f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -211,7 +211,7 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- List<WritePlanNode> insertTabletNodeList =
insertTabletNode.splitByTreePartition(analysis);
+ List<WritePlanNode> insertTabletNodeList =
insertTabletNode.splitByPartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
@@ -237,7 +237,7 @@ public class WritePlanNodeSplitTest {
analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- insertTabletNodeList = insertTabletNode.splitByTreePartition(analysis);
+ insertTabletNodeList = insertTabletNode.splitByPartition(analysis);
Assert.assertEquals(5, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
@@ -281,7 +281,7 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- List<WritePlanNode> insertTabletNodeList =
insertMultiTabletsNode.splitByTreePartition(analysis);
+ List<WritePlanNode> insertTabletNodeList =
insertMultiTabletsNode.splitByPartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
}
@@ -315,7 +315,7 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- List<WritePlanNode> insertRowsNodeList =
insertRowsNode.splitByTreePartition(analysis);
+ List<WritePlanNode> insertRowsNodeList =
insertRowsNode.splitByPartition(analysis);
Assert.assertEquals(8, insertRowsNodeList.size());
}
@@ -354,7 +354,7 @@ public class WritePlanNodeSplitTest {
analysis.setDataPartitionInfo(dataPartition);
List<WritePlanNode> insertRowsOfOneDeviceNodeList =
- insertRowsOfOneDeviceNode.splitByTreePartition(analysis);
+ insertRowsOfOneDeviceNode.splitByPartition(analysis);
Assert.assertEquals(5, insertRowsOfOneDeviceNodeList.size());
for (WritePlanNode insertNode : insertRowsOfOneDeviceNodeList) {
@@ -390,7 +390,7 @@ public class WritePlanNodeSplitTest {
analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- insertRowsOfOneDeviceNodeList =
insertRowsOfOneDeviceNode.splitByTreePartition(analysis);
+ insertRowsOfOneDeviceNodeList =
insertRowsOfOneDeviceNode.splitByPartition(analysis);
Assert.assertEquals(5, insertRowsOfOneDeviceNodeList.size());
for (WritePlanNode insertNode : insertRowsOfOneDeviceNodeList) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
index 80b45ebf46c..b29ef2a48ac 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java
@@ -287,7 +287,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
@@ -309,7 +309,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -365,7 +365,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
for (int r = 50; r < 149; r++) {
times[r - 50] = r;
@@ -386,7 +386,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0);
dataRegion.syncDeleteDataFiles();
Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize());
@@ -440,7 +440,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
@@ -462,7 +462,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -516,7 +516,7 @@ public class DataRegionTest {
times.length);
insertTabletNode1.setFailedMeasurementNumber(2);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 50; r < 149; r++) {
@@ -539,7 +539,7 @@ public class DataRegionTest {
times.length);
insertTabletNode2.setFailedMeasurementNumber(2);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -708,7 +708,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 149; r >= 50; r--) {
@@ -729,7 +729,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -793,7 +793,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 1249; r >= 50; r--) {
@@ -814,7 +814,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();
@@ -878,7 +878,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode1);
+ dataRegion.insertTreeTablet(insertTabletNode1);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
for (int r = 1249; r >= 50; r--) {
@@ -899,7 +899,7 @@ public class DataRegionTest {
columns,
times.length);
- dataRegion.insertTablet(insertTabletNode2);
+ dataRegion.insertTreeTablet(insertTabletNode2);
dataRegion.asyncCloseAllWorkingTsFileProcessors();
dataRegion.syncCloseAllWorkingTsFileProcessors();