This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new bc6fe8d5ed2 finish distributed plan
bc6fe8d5ed2 is described below
commit bc6fe8d5ed24f190bfc13e9302ab5e51b685ea27
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 18 17:59:53 2024 +0800
finish distributed plan
---
.../plan/planner/LogicalPlanVisitor.java | 6 +-
.../distribution/WriteFragmentParallelPlanner.java | 14 +-
.../plan/planner/plan/node/WritePlanNode.java | 6 +-
.../plan/node/load/LoadSingleTsFileNode.java | 2 +-
.../planner/plan/node/load/LoadTsFileNode.java | 2 +-
.../plan/node/load/LoadTsFilePieceNode.java | 2 +-
.../node/metedata/write/ActivateTemplateNode.java | 2 +-
.../node/metedata/write/AlterTimeSeriesNode.java | 2 +-
.../metedata/write/BatchActivateTemplateNode.java | 2 +-
.../write/CreateAlignedTimeSeriesNode.java | 2 +-
.../metedata/write/CreateMultiTimeSeriesNode.java | 2 +-
.../node/metedata/write/CreateTimeSeriesNode.java | 2 +-
.../write/InternalBatchActivateTemplateNode.java | 2 +-
.../write/InternalCreateMultiTimeSeriesNode.java | 2 +-
.../write/InternalCreateTimeSeriesNode.java | 2 +-
.../metedata/write/view/CreateLogicalViewNode.java | 2 +-
.../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 4 +-
.../plan/node/pipe/PipeEnrichedInsertNode.java | 4 +-
.../plan/node/pipe/PipeEnrichedWritePlanNode.java | 4 +-
.../planner/plan/node/write/DeleteDataNode.java | 2 +-
.../plan/node/write/InsertMultiTabletsNode.java | 4 +-
.../plan/planner/plan/node/write/InsertNode.java | 33 +++
.../planner/plan/node/write/InsertRowNode.java | 2 +-
.../planner/plan/node/write/InsertRowsNode.java | 2 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 2 +-
.../planner/plan/node/write/InsertTabletNode.java | 259 +++++++++++++++------
.../plan/relational/planner/LogicalPlanner.java | 9 +
.../plan/relational/planner/RelationPlanner.java | 24 ++
.../distribute/TableDistributionPlanner.java | 6 +-
.../relational/sql/ast/WrappedInsertStatement.java | 19 ++
.../plan/statement/crud/InsertBaseStatement.java | 10 +-
.../plan/planner/node/load/LoadTsFileNodeTest.java | 4 +-
.../planner/node/write/WritePlanNodeSplitTest.java | 12 +-
33 files changed, 333 insertions(+), 119 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 98f544c75f5..e95a44471e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -464,7 +464,8 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount());
+ insertTabletStatement.getRowCount(),
+ insertTabletStatement.getColumnCategories());
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
return insertNode;
}
@@ -736,7 +737,8 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
insertTabletStatement.getTimes(),
insertTabletStatement.getBitMaps(),
insertTabletStatement.getColumns(),
- insertTabletStatement.getRowCount());
+ insertTabletStatement.getRowCount(),
+ insertTabletStatement.getColumnCategories());
insertTabletNode.setFailedMeasurementNumber(
insertTabletStatement.getFailedMeasurementNumber());
insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, i);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index b1d1fe80123..07535f6f6b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+import java.util.function.BiFunction;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -37,12 +38,23 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
private SubPlan subPlan;
private IAnalysis analysis;
private MPPQueryContext queryContext;
+ private BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>>
nodeSplitter;
public WriteFragmentParallelPlanner(
SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext) {
this.subPlan = subPlan;
this.analysis = analysis;
this.queryContext = queryContext;
+ this.nodeSplitter = WritePlanNode::splitByTreePartition;
+ }
+
+ public WriteFragmentParallelPlanner(
+ SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext,
+ BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>> nodeSplitter) {
+ this.subPlan = subPlan;
+ this.analysis = analysis;
+ this.queryContext = queryContext;
+ this.nodeSplitter = nodeSplitter;
}
@Override
@@ -52,7 +64,7 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
if (!(node instanceof WritePlanNode)) {
throw new IllegalArgumentException("PlanNode should be IWritePlanNode in
WRITE operation");
}
- List<WritePlanNode> splits = ((WritePlanNode)
node).splitByPartition(analysis);
+ List<WritePlanNode> splits = nodeSplitter.apply(((WritePlanNode) node),
analysis);
List<FragmentInstance> ret = new ArrayList<>();
for (WritePlanNode split : splits) {
FragmentInstance instance =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
index 43cbb20a40a..dd383b52c95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
@@ -29,5 +29,9 @@ public abstract class WritePlanNode extends PlanNode
implements IPartitionRelate
super(id);
}
- public abstract List<WritePlanNode> splitByPartition(IAnalysis analysis);
+ public abstract List<WritePlanNode> splitByTreePartition(IAnalysis analysis);
+
+ public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 3623e976007..0316412f635 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -188,7 +188,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
throw new NotImplementedException("split load single TsFile is not
implemented");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index be5de4cc819..f7f4b2ccbb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -89,7 +89,7 @@ public class LoadTsFileNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
List<WritePlanNode> res = new ArrayList<>();
LoadTsFileStatement statement =
((Analysis) analysis).getTreeStatement() instanceof
PipeEnrichedStatement
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index c3c4c3fa2e3..dc81b2e9d0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -147,7 +147,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
throw new NotImplementedException("split load piece TsFile is not
implemented");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
index 6d4d704ac50..5276d1b2df1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
@@ -161,7 +161,7 @@ public class ActivateTemplateNode extends WritePlanNode
implements IActivateTemp
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 05b3cb89b1d..410f1fa6336 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -373,7 +373,7 @@ public class AlterTimeSeriesNode extends WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getIDeviceID());
setRegionReplicaSet(regionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
index 5b944718515..15441601966 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
@@ -148,7 +148,7 @@ public class BatchActivateTemplateNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap
= new HashMap<>();
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
templateActivationMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 0028b90f49d..19eea48cf9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -455,7 +455,7 @@ public class CreateAlignedTimeSeriesNode extends
WritePlanNode
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
index 2c83be62330..56844c4a093 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -229,7 +229,7 @@ public class CreateMultiTimeSeriesNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, MeasurementGroup>> splitMap = new
HashMap<>();
for (Map.Entry<PartialPath, MeasurementGroup> entry :
measurementGroupMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 676dcfc3e3e..00ad72b8271 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -386,7 +386,7 @@ public class CreateTimeSeriesNode extends WritePlanNode
implements ICreateTimeSe
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getIDeviceID());
setRegionReplicaSet(regionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
index c9e2f5e1729..ea2b68a11f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
@@ -141,7 +141,7 @@ public class InternalBatchActivateTemplateNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap
= new HashMap<>();
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
templateActivationMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
index e140b0fa528..2a663a57a5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -143,7 +143,7 @@ public class InternalCreateMultiTimeSeriesNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
// gather devices to same target region
Map<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>>
splitMap =
new HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
index a07b1c4eb7e..40195e0fde2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
@@ -136,7 +136,7 @@ public class InternalCreateTimeSeriesNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
TRegionReplicaSet regionReplicaSet =
analysis
.getSchemaPartitionInfo()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
index cc6210671fd..4d7c545adcd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
@@ -220,7 +220,7 @@ public class CreateLogicalViewNode extends WritePlanNode
implements ICreateLogic
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new
HashMap<>();
for (Map.Entry<PartialPath, ViewExpression> entry :
this.viewPathToSourceMap.entrySet()) {
// for each entry in the map for target path to source expression,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 57450a3cd06..3800d61ddff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -162,8 +162,8 @@ public class PipeEnrichedDeleteDataNode extends
DeleteDataNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
- return deleteDataNode.splitByPartition(analysis).stream()
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ return deleteDataNode.splitByTreePartition(analysis).stream()
.map(
plan ->
plan instanceof PipeEnrichedDeleteDataNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 855fe79de5d..02950cef32d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -134,8 +134,8 @@ public class PipeEnrichedInsertNode extends InsertNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
- return insertNode.splitByPartition(analysis).stream()
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ return insertNode.splitByTreePartition(analysis).stream()
.map(
plan ->
plan instanceof PipeEnrichedInsertNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
index 2e7f7eeb4ee..85b28a91604 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
@@ -184,8 +184,8 @@ public class PipeEnrichedWritePlanNode extends
WritePlanNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
- return writePlanNode.splitByPartition(analysis).stream()
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ return writePlanNode.splitByTreePartition(analysis).stream()
.map(
plan ->
plan instanceof PipeEnrichedWritePlanNode
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 2b846146403..7fbf5e03814 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -282,7 +282,7 @@ public class DeleteDataNode extends WritePlanNode
implements WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
ISchemaTree schemaTree = ((Analysis) analysis).getSchemaTree();
DataPartition dataPartition = analysis.getDataPartitionInfo();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 430ab19c7c9..bfbef67faff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -135,11 +135,11 @@ public class InsertMultiTabletsNode extends InsertNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
for (int i = 0; i < insertTabletNodeList.size(); i++) {
InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
- List<WritePlanNode> tmpResult =
insertTabletNode.splitByPartition(analysis);
+ List<WritePlanNode> tmpResult =
insertTabletNode.splitByTreePartition(analysis);
for (WritePlanNode subNode : tmpResult) {
TRegionReplicaSet dataRegionReplicaSet = ((InsertNode)
subNode).getDataRegionReplicaSet();
InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 054770c30ed..f81150db2a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -59,6 +62,9 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
protected String[] measurements;
protected TSDataType[] dataTypes;
+ protected TsTableColumnCategory[] columnCategories;
+ protected List<Integer> idColumnIndices;
+
protected int failedMeasurementNumber = 0;
/**
@@ -90,11 +96,22 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
boolean isAligned,
String[] measurements,
TSDataType[] dataTypes) {
+ this(id, devicePath, isAligned, measurements, dataTypes, null);
+ }
+
+ protected InsertNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ TsTableColumnCategory[] columnCategories) {
super(id);
this.devicePath = devicePath;
this.isAligned = isAligned;
this.measurements = measurements;
this.dataTypes = dataTypes;
+ setColumnCategories(columnCategories);
}
public TRegionReplicaSet getDataRegionReplicaSet() {
@@ -310,4 +327,20 @@ public abstract class InsertNode extends WritePlanNode
implements ComparableCons
result = 31 * result + Arrays.hashCode(dataTypes);
return result;
}
+
+ public TsTableColumnCategory[] getColumnCategories() {
+ return columnCategories;
+ }
+
+ public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
+ this.columnCategories = columnCategories;
+ if (columnCategories != null) {
+ idColumnIndices = new ArrayList<>();
+ for (int i = 0; i < columnCategories.length; i++) {
+ if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
+ idColumnIndices.add(i);
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 1cdec3c1267..bc3dcf8a774 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -107,7 +107,7 @@ public class InsertRowNode extends InsertNode implements
WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(time);
this.dataRegionReplicaSet =
analysis
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 2fc54f9d080..22a5483ec5b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -232,7 +232,7 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
List<TEndPoint> redirectInfo = new ArrayList<>();
for (int i = 0; i < insertRowNodeList.size(); i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 4bc93873736..f77823ec569 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -154,7 +154,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
List<WritePlanNode> result = new ArrayList<>();
Map<TRegionReplicaSet, Map<TTimePartitionSlot, List<InsertRowNode>>>
splitMap = new HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index bc0adcbc855..cef96317f8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,10 +19,14 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.function.IntFunction;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -39,10 +43,13 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -82,6 +89,9 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
// proper positions.
private List<Integer> range;
+ // deviceId cache for Table-view insertion
+ private IDeviceID[] deviceIDs;
+
public InsertTabletNode(PlanNodeId id) {
super(id);
}
@@ -104,6 +114,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
this.rowCount = rowCount;
}
+ @TestOnly
public InsertTabletNode(
PlanNodeId id,
PartialPath devicePath,
@@ -115,7 +126,24 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
BitMap[] bitMaps,
Object[] columns,
int rowCount) {
- super(id, devicePath, isAligned, measurements, dataTypes);
+ this(id, devicePath, isAligned, measurements, dataTypes,
measurementSchemas, times, bitMaps,
+ columns, rowCount,
+ null);
+ }
+
+ public InsertTabletNode(
+ PlanNodeId id,
+ PartialPath devicePath,
+ boolean isAligned,
+ String[] measurements,
+ TSDataType[] dataTypes,
+ MeasurementSchema[] measurementSchemas,
+ long[] times,
+ BitMap[] bitMaps,
+ Object[] columns,
+ int rowCount,
+ TsTableColumnCategory[] columnCategories) {
+ super(id, devicePath, isAligned, measurements, dataTypes,
columnCategories);
this.measurementSchemas = measurementSchemas;
this.times = times;
this.bitMaps = bitMaps;
@@ -192,107 +220,160 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
@Override
- public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
- // only single device in single database
- List<WritePlanNode> result = new ArrayList<>();
- if (times.length == 0) {
- return Collections.emptyList();
- }
+ public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+ return splitByPartition(analysis, i -> deviceID);
+ }
+
+ public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
+ return splitByPartition(analysis, this::getTableDeviceID);
+ }
+
+ private Map<IDeviceID, SplitInfo> collectSplitRanges(IntFunction<IDeviceID>
rowNumDeviceIdMapper) {
long upperBoundOfTimePartition =
TimePartitionUtils.getTimePartitionUpperBound(times[0]);
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(times[0]);
int startLoc = 0; // included
+ IDeviceID currDeviceId = rowNumDeviceIdMapper.apply(0);
+
+ Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap = new HashMap<>();
- List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
- // for each List in split, they are range1.start, range1.end,
range2.start, range2.end, ...
- List<Integer> ranges = new ArrayList<>();
for (int i = 1; i < times.length; i++) { // times are sorted in session
API.
- if (times[i] >= upperBoundOfTimePartition) {
+ IDeviceID nextDeviceId = rowNumDeviceIdMapper.apply(i);
+ if (times[i] >= upperBoundOfTimePartition ||
!currDeviceId.equals(nextDeviceId)) {
+ final SplitInfo splitInfo =
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+ deviceID1 -> new SplitInfo());
// a new range.
- ranges.add(startLoc); // included
- ranges.add(i); // excluded
- timePartitionSlots.add(timePartitionSlot);
+ splitInfo.ranges.add(startLoc); // included
+ splitInfo.ranges.add(i); // excluded
+ splitInfo.timePartitionSlots.add(timePartitionSlot);
// next init
startLoc = i;
upperBoundOfTimePartition =
TimePartitionUtils.getTimePartitionUpperBound(times[i]);
timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
+ currDeviceId = nextDeviceId;
}
}
+ SplitInfo splitInfo = deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+ deviceID1 -> new SplitInfo());
// the final range
- ranges.add(startLoc); // included
- ranges.add(times.length); // excluded
- timePartitionSlots.add(timePartitionSlot);
-
- // data region for each time partition
- List<TRegionReplicaSet> dataRegionReplicaSets =
- analysis
- .getDataPartitionInfo()
- .getDataRegionReplicaSetForWriting(
- devicePath.getIDeviceIDAsFullDevice(), timePartitionSlots);
-
- // collect redirectInfo
- analysis.addEndPointToRedirectNodeList(
- dataRegionReplicaSets
- .get(dataRegionReplicaSets.size() - 1)
- .getDataNodeLocations()
- .get(0)
- .getClientRpcEndPoint());
+ splitInfo.ranges.add(startLoc); // included
+ splitInfo.ranges.add(times.length); // excluded
+ splitInfo.timePartitionSlots.add(timePartitionSlot);
+
+ return deviceIDSplitInfoMap;
+ }
+ public Map<TRegionReplicaSet, List<Integer>>
splitByReplicaSet(Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap, IAnalysis
analysis) {
Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
- for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
- List<Integer> sub_ranges =
- splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new
ArrayList<>());
- sub_ranges.add(ranges.get(2 * i));
- sub_ranges.add(ranges.get(2 * i + 1));
+
+ for (Entry<IDeviceID, SplitInfo> entry : deviceIDSplitInfoMap.entrySet()) {
+ final IDeviceID deviceID = entry.getKey();
+ final SplitInfo splitInfo = entry.getValue();
+ final List<TRegionReplicaSet> replicaSets = analysis
+ .getDataPartitionInfo()
+ .getDataRegionReplicaSetForWriting(
+ deviceID, splitInfo.timePartitionSlots);
+ splitInfo.replicaSets = replicaSets;
+ // collect redirectInfo
+ analysis.addEndPointToRedirectNodeList(
+ replicaSets
+ .get(replicaSets.size() - 1)
+ .getDataNodeLocations()
+ .get(0)
+ .getClientRpcEndPoint());
+ for (int i = 0; i < replicaSets.size(); i++) {
+ List<Integer> sub_ranges =
+ splitMap.computeIfAbsent(replicaSets.get(i), x -> new
ArrayList<>());
+ sub_ranges.add(splitInfo.ranges.get(2 * i));
+ sub_ranges.add(splitInfo.ranges.get(2 * i + 1));
+ }
}
+ return splitMap;
+ }
- List<Integer> locs;
- for (Map.Entry<TRegionReplicaSet, List<Integer>> entry :
splitMap.entrySet()) {
- // generate a new times and values
- locs = entry.getValue();
- // Avoid using system arraycopy when there is no need to split
- if (splitMap.size() == 1 && locs.size() == 2) {
- setRange(locs);
+ private List<WritePlanNode> doSplit(Map<TRegionReplicaSet, List<Integer>>
splitMap) {
+ List<WritePlanNode> result = new ArrayList<>();
+
+ if (splitMap.size() == 1) {
+ final Entry<TRegionReplicaSet, List<Integer>> entry =
splitMap.entrySet().iterator().next();
+ if (entry.getValue().size() == 2) {
+ // Avoid using system arraycopy when there is no need to split
+ setRange(entry.getValue());
setDataRegionReplicaSet(entry.getKey());
result.add(this);
return result;
}
- for (int i = 0; i < locs.size(); i += 2) {
- int start = locs.get(i);
- int end = locs.get(i + 1);
- int count = end - start;
- long[] subTimes = new long[count];
- int destLoc = 0;
- Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
- BitMap[] bitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
- System.arraycopy(times, start, subTimes, destLoc, end - start);
- for (int k = 0; k < values.length; k++) {
- if (dataTypes[k] != null) {
- System.arraycopy(columns[k], start, values[k], destLoc, end -
start);
- }
- if (bitMaps != null && this.bitMaps[k] != null) {
- BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc,
end - start);
- }
+ }
+
+ for (Map.Entry<TRegionReplicaSet, List<Integer>> entry :
splitMap.entrySet()) {
+ result.add(generateOneSplit(entry));
+ }
+ return result;
+ }
+
+ private WritePlanNode generateOneSplit(Map.Entry<TRegionReplicaSet,
List<Integer>> entry) {
+ List<Integer> locs;
+ // generate a new times and values
+ locs = entry.getValue();
+ int count = 0;
+ for (int i = 0; i < locs.size(); i += 2) {
+ int start = locs.get(i);
+ int end = locs.get(i + 1);
+ count += end - start;
+ }
+
+ long[] subTimes = new long[count];
+ Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+ BitMap[] newBitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
+ InsertTabletNode subNode =
+ new InsertTabletNode(
+ getPlanNodeId(),
+ devicePath,
+ isAligned,
+ measurements,
+ dataTypes,
+ measurementSchemas,
+ subTimes,
+ newBitMaps,
+ values,
+ subTimes.length,
+ columnCategories);
+ int destLoc = 0;
+
+ for (int i = 0; i < locs.size(); i += 2) {
+ int start = locs.get(i);
+ int end = locs.get(i + 1);
+ final int length = end - start;
+
+ System.arraycopy(times, start, subTimes, destLoc, length);
+ for (int k = 0; k < values.length; k++) {
+ if (dataTypes[k] != null) {
+ System.arraycopy(columns[k], start, values[k], destLoc, length);
+ }
+ if (newBitMaps != null && this.bitMaps[k] != null) {
+ BitMap.copyOfRange(this.bitMaps[k], start, newBitMaps[k], destLoc,
length);
}
- InsertTabletNode subNode =
- new InsertTabletNode(
- getPlanNodeId(),
- devicePath,
- isAligned,
- measurements,
- dataTypes,
- measurementSchemas,
- subTimes,
- bitMaps,
- values,
- subTimes.length);
- subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
- subNode.setRange(locs);
- subNode.setDataRegionReplicaSet(entry.getKey());
- result.add(subNode);
}
+ destLoc += length;
}
- return result;
+ subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
+ subNode.setRange(locs);
+ subNode.setDataRegionReplicaSet(entry.getKey());
+ return subNode;
+ }
+
+ public List<WritePlanNode> splitByPartition(IAnalysis analysis,
+ IntFunction<IDeviceID> rowNumDeviceIdMapper) {
+ // only single device in single database
+ if (times.length == 0) {
+ return Collections.emptyList();
+ }
+
+ final Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap =
collectSplitRanges(rowNumDeviceIdMapper);
+ final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
+ deviceIDSplitInfoMap, analysis);
+
+ return doSplit(splitMap);
}
@TestOnly
@@ -1103,4 +1184,28 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
}
return new TimeValuePair(times[lastIdx], value);
}
+
+ public IDeviceID getTableDeviceID(int rowIdx) {
+ if (deviceIDs == null) {
+ deviceIDs = new IDeviceID[rowCount];
+ }
+ if (deviceIDs[rowIdx] == null) {
+ String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ deviceIdSegments[0] = this.devicePath.getFullPath();
+ for (int i = 0; i < idColumnIndices.size(); i++) {
+ final Integer columnIndex = idColumnIndices.get(i);
+ deviceIdSegments[i + 1] = ((Binary[])
columns[columnIndex])[rowIdx].toString();
+ }
+ deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+ }
+
+ return deviceIDs[rowIdx];
+ }
+
+ private class SplitInfo {
+ // for each List in split, they are range1.start, range1.end,
range2.start, range2.end, ...
+ private List<Integer> ranges = new ArrayList<>();
+ private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+ private List<TRegionReplicaSet> replicaSets;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index a7537c4e680..37ccd002762 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -40,6 +40,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedStatement;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -106,6 +108,9 @@ public class LogicalPlanner {
if (statement instanceof Explain) {
return createRelationPlan(analysis, (Query) ((Explain)
statement).getStatement());
}
+ if (statement instanceof WrappedStatement) {
+ return createRelationPlan(analysis, ((WrappedStatement) statement));
+ }
throw new IllegalStateException(
"Unsupported statement type: " + statement.getClass().getSimpleName());
}
@@ -144,6 +149,10 @@ public class LogicalPlanner {
return outputNode;
}
+ private RelationPlan createRelationPlan(Analysis analysis, WrappedStatement
statement) {
+ return getRelationPlanner(analysis).process(statement, null);
+ }
+
private RelationPlan createRelationPlan(Analysis analysis, Query query) {
return getRelationPlanner(analysis).process(query, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index c0386bcbbb2..6603a8cab08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -13,9 +13,11 @@
*/
package org.apache.iotdb.db.queryengine.plan.relational.planner;
+import java.util.Collections;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
@@ -25,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
@@ -42,6 +45,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import static java.util.Objects.requireNonNull;
@@ -175,4 +179,24 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
protected RelationPlan visitExcept(Except node, Void context) {
throw new IllegalStateException("Except is not supported in current
version.");
}
+
+ @Override
+ protected RelationPlan visitInsertTablet(InsertTablet node, Void context) {
+ final InsertTabletStatement insertTabletStatement =
node.getInnerTreeStatement();
+ InsertTabletNode insertNode =
+ new InsertTabletNode(
+ idAllocator.genPlanNodeId(),
+ insertTabletStatement.getDevicePath(),
+ insertTabletStatement.isAligned(),
+ insertTabletStatement.getMeasurements(),
+ insertTabletStatement.getDataTypes(),
+ insertTabletStatement.getMeasurementSchemas(),
+ insertTabletStatement.getTimes(),
+ insertTabletStatement.getBitMaps(),
+ insertTabletStatement.getColumns(),
+ insertTabletStatement.getRowCount(),
+ insertTabletStatement.getColumnCategories());
+
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
+ return new RelationPlan(insertNode, analysis.getScope(node),
Collections.emptyList());
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index ccfd5b91cd0..5efc6f33fab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -38,6 +39,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
public class TableDistributionPlanner {
+
private final Analysis analysis;
private final LogicalQueryPlan logicalQueryPlan;
private final MPPQueryContext mppQueryContext;
@@ -79,7 +81,9 @@ public class TableDistributionPlanner {
List<FragmentInstance> fragmentInstances =
mppQueryContext.getQueryType() == QueryType.READ
? new TableModelQueryFragmentPlanner(subPlan, analysis,
mppQueryContext).plan()
- : new WriteFragmentParallelPlanner(subPlan, analysis,
mppQueryContext).parallelPlan();
+ :
+ new WriteFragmentParallelPlanner(subPlan, analysis,
mppQueryContext,
+ WritePlanNode::splitByTablePartition).parallelPlan();
// Only execute this step for READ operation
if (mppQueryContext.getQueryType() == QueryType.READ) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 6aa4568a6ff..f81a26dc594 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
import org.apache.iotdb.db.exception.query.QueryProcessException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 6a48221189e..b7964aaf333 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -248,10 +248,12 @@ public abstract class InsertBaseStatement extends
Statement {
public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
this.columnCategories = columnCategories;
- idColumnIndices = new ArrayList<>();
- for (int i = 0; i < columnCategories.length; i++) {
- if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
- idColumnIndices.add(i);
+ if (columnCategories != null) {
+ idColumnIndices = new ArrayList<>();
+ for (int i = 0; i < columnCategories.length; i++) {
+ if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
+ idColumnIndices.add(i);
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
index 849d45c61c8..1896d854fc6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
@@ -52,7 +52,7 @@ public class LoadTsFileNodeTest {
} catch (NotImplementedException ignored) {
}
try {
- node.splitByPartition(new Analysis());
+ node.splitByTreePartition(new Analysis());
Assert.fail();
} catch (NotImplementedException ignored) {
}
@@ -75,7 +75,7 @@ public class LoadTsFileNodeTest {
} catch (NotImplementedException ignored) {
}
try {
- node.splitByPartition(new Analysis());
+ node.splitByTreePartition(new Analysis());
Assert.fail();
} catch (NotImplementedException ignored) {
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index 8b70093595f..faefd94b638 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -211,7 +211,7 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- List<WritePlanNode> insertTabletNodeList =
insertTabletNode.splitByPartition(analysis);
+ List<WritePlanNode> insertTabletNodeList =
insertTabletNode.splitByTreePartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
@@ -237,7 +237,7 @@ public class WritePlanNodeSplitTest {
analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- insertTabletNodeList = insertTabletNode.splitByPartition(analysis);
+ insertTabletNodeList = insertTabletNode.splitByTreePartition(analysis);
Assert.assertEquals(5, insertTabletNodeList.size());
for (WritePlanNode insertNode : insertTabletNodeList) {
@@ -281,7 +281,7 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- List<WritePlanNode> insertTabletNodeList =
insertMultiTabletsNode.splitByPartition(analysis);
+ List<WritePlanNode> insertTabletNodeList =
insertMultiTabletsNode.splitByTreePartition(analysis);
Assert.assertEquals(6, insertTabletNodeList.size());
}
@@ -315,7 +315,7 @@ public class WritePlanNodeSplitTest {
Analysis analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- List<WritePlanNode> insertRowsNodeList =
insertRowsNode.splitByPartition(analysis);
+ List<WritePlanNode> insertRowsNodeList =
insertRowsNode.splitByTreePartition(analysis);
Assert.assertEquals(8, insertRowsNodeList.size());
}
@@ -354,7 +354,7 @@ public class WritePlanNodeSplitTest {
analysis.setDataPartitionInfo(dataPartition);
List<WritePlanNode> insertRowsOfOneDeviceNodeList =
- insertRowsOfOneDeviceNode.splitByPartition(analysis);
+ insertRowsOfOneDeviceNode.splitByTreePartition(analysis);
Assert.assertEquals(5, insertRowsOfOneDeviceNodeList.size());
for (WritePlanNode insertNode : insertRowsOfOneDeviceNodeList) {
@@ -390,7 +390,7 @@ public class WritePlanNodeSplitTest {
analysis = new Analysis();
analysis.setDataPartitionInfo(dataPartition);
- insertRowsOfOneDeviceNodeList =
insertRowsOfOneDeviceNode.splitByPartition(analysis);
+ insertRowsOfOneDeviceNodeList =
insertRowsOfOneDeviceNode.splitByTreePartition(analysis);
Assert.assertEquals(5, insertRowsOfOneDeviceNodeList.size());
for (WritePlanNode insertNode : insertRowsOfOneDeviceNodeList) {