This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new bc6fe8d5ed2 finish distributed plan
bc6fe8d5ed2 is described below

commit bc6fe8d5ed24f190bfc13e9302ab5e51b685ea27
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 18 17:59:53 2024 +0800

    finish distributed plan
---
 .../plan/planner/LogicalPlanVisitor.java           |   6 +-
 .../distribution/WriteFragmentParallelPlanner.java |  14 +-
 .../plan/planner/plan/node/WritePlanNode.java      |   6 +-
 .../plan/node/load/LoadSingleTsFileNode.java       |   2 +-
 .../planner/plan/node/load/LoadTsFileNode.java     |   2 +-
 .../plan/node/load/LoadTsFilePieceNode.java        |   2 +-
 .../node/metedata/write/ActivateTemplateNode.java  |   2 +-
 .../node/metedata/write/AlterTimeSeriesNode.java   |   2 +-
 .../metedata/write/BatchActivateTemplateNode.java  |   2 +-
 .../write/CreateAlignedTimeSeriesNode.java         |   2 +-
 .../metedata/write/CreateMultiTimeSeriesNode.java  |   2 +-
 .../node/metedata/write/CreateTimeSeriesNode.java  |   2 +-
 .../write/InternalBatchActivateTemplateNode.java   |   2 +-
 .../write/InternalCreateMultiTimeSeriesNode.java   |   2 +-
 .../write/InternalCreateTimeSeriesNode.java        |   2 +-
 .../metedata/write/view/CreateLogicalViewNode.java |   2 +-
 .../plan/node/pipe/PipeEnrichedDeleteDataNode.java |   4 +-
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |   4 +-
 .../plan/node/pipe/PipeEnrichedWritePlanNode.java  |   4 +-
 .../planner/plan/node/write/DeleteDataNode.java    |   2 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   4 +-
 .../plan/planner/plan/node/write/InsertNode.java   |  33 +++
 .../planner/plan/node/write/InsertRowNode.java     |   2 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   2 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   2 +-
 .../planner/plan/node/write/InsertTabletNode.java  | 259 +++++++++++++++------
 .../plan/relational/planner/LogicalPlanner.java    |   9 +
 .../plan/relational/planner/RelationPlanner.java   |  24 ++
 .../distribute/TableDistributionPlanner.java       |   6 +-
 .../relational/sql/ast/WrappedInsertStatement.java |  19 ++
 .../plan/statement/crud/InsertBaseStatement.java   |  10 +-
 .../plan/planner/node/load/LoadTsFileNodeTest.java |   4 +-
 .../planner/node/write/WritePlanNodeSplitTest.java |  12 +-
 33 files changed, 333 insertions(+), 119 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 98f544c75f5..e95a44471e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -464,7 +464,8 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
             insertTabletStatement.getTimes(),
             insertTabletStatement.getBitMaps(),
             insertTabletStatement.getColumns(),
-            insertTabletStatement.getRowCount());
+            insertTabletStatement.getRowCount(),
+            insertTabletStatement.getColumnCategories());
     
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
     return insertNode;
   }
@@ -736,7 +737,8 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
               insertTabletStatement.getTimes(),
               insertTabletStatement.getBitMaps(),
               insertTabletStatement.getColumns(),
-              insertTabletStatement.getRowCount());
+              insertTabletStatement.getRowCount(),
+              insertTabletStatement.getColumnCategories());
       insertTabletNode.setFailedMeasurementNumber(
           insertTabletStatement.getFailedMeasurementNumber());
       insertMultiTabletsNode.addInsertTabletNode(insertTabletNode, i);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index b1d1fe80123..07535f6f6b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.distribution;
 
+import java.util.function.BiFunction;
 import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -37,12 +38,23 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
   private SubPlan subPlan;
   private IAnalysis analysis;
   private MPPQueryContext queryContext;
+  private BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>> 
nodeSplitter;
 
   public WriteFragmentParallelPlanner(
       SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext) {
     this.subPlan = subPlan;
     this.analysis = analysis;
     this.queryContext = queryContext;
+    this.nodeSplitter = WritePlanNode::splitByTreePartition;
+  }
+
+  public WriteFragmentParallelPlanner(
+      SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext,
+      BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>> nodeSplitter) {
+    this.subPlan = subPlan;
+    this.analysis = analysis;
+    this.queryContext = queryContext;
+    this.nodeSplitter = nodeSplitter;
   }
 
   @Override
@@ -52,7 +64,7 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     if (!(node instanceof WritePlanNode)) {
       throw new IllegalArgumentException("PlanNode should be IWritePlanNode in 
WRITE operation");
     }
-    List<WritePlanNode> splits = ((WritePlanNode) 
node).splitByPartition(analysis);
+    List<WritePlanNode> splits = nodeSplitter.apply(((WritePlanNode) node), 
analysis);
     List<FragmentInstance> ret = new ArrayList<>();
     for (WritePlanNode split : splits) {
       FragmentInstance instance =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
index 43cbb20a40a..dd383b52c95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/WritePlanNode.java
@@ -29,5 +29,9 @@ public abstract class WritePlanNode extends PlanNode 
implements IPartitionRelate
     super(id);
   }
 
-  public abstract List<WritePlanNode> splitByPartition(IAnalysis analysis);
+  public abstract List<WritePlanNode> splitByTreePartition(IAnalysis analysis);
+
+  public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 3623e976007..0316412f635 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -188,7 +188,7 @@ public class LoadSingleTsFileNode extends WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     throw new NotImplementedException("split load single TsFile is not 
implemented");
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index be5de4cc819..f7f4b2ccbb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -89,7 +89,7 @@ public class LoadTsFileNode extends WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     List<WritePlanNode> res = new ArrayList<>();
     LoadTsFileStatement statement =
         ((Analysis) analysis).getTreeStatement() instanceof 
PipeEnrichedStatement
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index c3c4c3fa2e3..dc81b2e9d0f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -147,7 +147,7 @@ public class LoadTsFilePieceNode extends WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     throw new NotImplementedException("split load piece TsFile is not 
implemented");
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
index 6d4d704ac50..5276d1b2df1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/ActivateTemplateNode.java
@@ -161,7 +161,7 @@ public class ActivateTemplateNode extends WritePlanNode 
implements IActivateTemp
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     TRegionReplicaSet regionReplicaSet =
         analysis
             .getSchemaPartitionInfo()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index 05b3cb89b1d..410f1fa6336 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -373,7 +373,7 @@ public class AlterTimeSeriesNode extends WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     TRegionReplicaSet regionReplicaSet =
         
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getIDeviceID());
     setRegionReplicaSet(regionReplicaSet);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
index 5b944718515..15441601966 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/BatchActivateTemplateNode.java
@@ -148,7 +148,7 @@ public class BatchActivateTemplateNode extends 
WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     // gather devices to same target region
     Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap 
= new HashMap<>();
     for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : 
templateActivationMap.entrySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
index 0028b90f49d..19eea48cf9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateAlignedTimeSeriesNode.java
@@ -455,7 +455,7 @@ public class CreateAlignedTimeSeriesNode extends 
WritePlanNode
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     TRegionReplicaSet regionReplicaSet =
         analysis
             .getSchemaPartitionInfo()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
index 2c83be62330..56844c4a093 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateMultiTimeSeriesNode.java
@@ -229,7 +229,7 @@ public class CreateMultiTimeSeriesNode extends 
WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     // gather devices to same target region
     Map<TRegionReplicaSet, Map<PartialPath, MeasurementGroup>> splitMap = new 
HashMap<>();
     for (Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index 676dcfc3e3e..00ad72b8271 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -386,7 +386,7 @@ public class CreateTimeSeriesNode extends WritePlanNode 
implements ICreateTimeSe
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     TRegionReplicaSet regionReplicaSet =
         
analysis.getSchemaPartitionInfo().getSchemaRegionReplicaSet(path.getIDeviceID());
     setRegionReplicaSet(regionReplicaSet);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
index c9e2f5e1729..ea2b68a11f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalBatchActivateTemplateNode.java
@@ -141,7 +141,7 @@ public class InternalBatchActivateTemplateNode extends 
WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     // gather devices to same target region
     Map<TRegionReplicaSet, Map<PartialPath, Pair<Integer, Integer>>> splitMap 
= new HashMap<>();
     for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : 
templateActivationMap.entrySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
index e140b0fa528..2a663a57a5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateMultiTimeSeriesNode.java
@@ -143,7 +143,7 @@ public class InternalCreateMultiTimeSeriesNode extends 
WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     // gather devices to same target region
     Map<TRegionReplicaSet, Map<PartialPath, Pair<Boolean, MeasurementGroup>>> 
splitMap =
         new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
index a07b1c4eb7e..40195e0fde2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/InternalCreateTimeSeriesNode.java
@@ -136,7 +136,7 @@ public class InternalCreateTimeSeriesNode extends 
WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     TRegionReplicaSet regionReplicaSet =
         analysis
             .getSchemaPartitionInfo()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
index cc6210671fd..4d7c545adcd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/write/view/CreateLogicalViewNode.java
@@ -220,7 +220,7 @@ public class CreateLogicalViewNode extends WritePlanNode 
implements ICreateLogic
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     Map<TRegionReplicaSet, Map<PartialPath, ViewExpression>> splitMap = new 
HashMap<>();
     for (Map.Entry<PartialPath, ViewExpression> entry : 
this.viewPathToSourceMap.entrySet()) {
       // for each entry in the map for target path to source expression,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 57450a3cd06..3800d61ddff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -162,8 +162,8 @@ public class PipeEnrichedDeleteDataNode extends 
DeleteDataNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
-    return deleteDataNode.splitByPartition(analysis).stream()
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+    return deleteDataNode.splitByTreePartition(analysis).stream()
         .map(
             plan ->
                 plan instanceof PipeEnrichedDeleteDataNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 855fe79de5d..02950cef32d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -134,8 +134,8 @@ public class PipeEnrichedInsertNode extends InsertNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
-    return insertNode.splitByPartition(analysis).stream()
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+    return insertNode.splitByTreePartition(analysis).stream()
         .map(
             plan ->
                 plan instanceof PipeEnrichedInsertNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
index 2e7f7eeb4ee..85b28a91604 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedWritePlanNode.java
@@ -184,8 +184,8 @@ public class PipeEnrichedWritePlanNode extends 
WritePlanNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
-    return writePlanNode.splitByPartition(analysis).stream()
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+    return writePlanNode.splitByTreePartition(analysis).stream()
         .map(
             plan ->
                 plan instanceof PipeEnrichedWritePlanNode
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 2b846146403..7fbf5e03814 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -282,7 +282,7 @@ public class DeleteDataNode extends WritePlanNode 
implements WALEntryValue {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     ISchemaTree schemaTree = ((Analysis) analysis).getSchemaTree();
     DataPartition dataPartition = analysis.getDataPartitionInfo();
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 430ab19c7c9..bfbef67faff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -135,11 +135,11 @@ public class InsertMultiTabletsNode extends InsertNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     Map<TRegionReplicaSet, InsertMultiTabletsNode> splitMap = new HashMap<>();
     for (int i = 0; i < insertTabletNodeList.size(); i++) {
       InsertTabletNode insertTabletNode = insertTabletNodeList.get(i);
-      List<WritePlanNode> tmpResult = 
insertTabletNode.splitByPartition(analysis);
+      List<WritePlanNode> tmpResult = 
insertTabletNode.splitByTreePartition(analysis);
       for (WritePlanNode subNode : tmpResult) {
         TRegionReplicaSet dataRegionReplicaSet = ((InsertNode) 
subNode).getDataRegionReplicaSet();
         InsertMultiTabletsNode tmpNode = splitMap.get(dataRegionReplicaSet);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
index 054770c30ed..f81150db2a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -59,6 +62,9 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
   protected String[] measurements;
   protected TSDataType[] dataTypes;
 
+  protected TsTableColumnCategory[] columnCategories;
+  protected List<Integer> idColumnIndices;
+
   protected int failedMeasurementNumber = 0;
 
   /**
@@ -90,11 +96,22 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
       boolean isAligned,
       String[] measurements,
       TSDataType[] dataTypes) {
+    this(id, devicePath, isAligned, measurements, dataTypes, null);
+  }
+
+  protected InsertNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      TsTableColumnCategory[] columnCategories) {
     super(id);
     this.devicePath = devicePath;
     this.isAligned = isAligned;
     this.measurements = measurements;
     this.dataTypes = dataTypes;
+    setColumnCategories(columnCategories);
   }
 
   public TRegionReplicaSet getDataRegionReplicaSet() {
@@ -310,4 +327,20 @@ public abstract class InsertNode extends WritePlanNode 
implements ComparableCons
     result = 31 * result + Arrays.hashCode(dataTypes);
     return result;
   }
+
+  public TsTableColumnCategory[] getColumnCategories() {
+    return columnCategories;
+  }
+
+  public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
+    this.columnCategories = columnCategories;
+    if (columnCategories != null) {
+      idColumnIndices = new ArrayList<>();
+      for (int i = 0; i < columnCategories.length; i++) {
+        if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
+          idColumnIndices.add(i);
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 1cdec3c1267..bc3dcf8a774 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -107,7 +107,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     TTimePartitionSlot timePartitionSlot = 
TimePartitionUtils.getTimePartitionSlot(time);
     this.dataRegionReplicaSet =
         analysis
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 2fc54f9d080..22a5483ec5b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -232,7 +232,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
     List<TEndPoint> redirectInfo = new ArrayList<>();
     for (int i = 0; i < insertRowNodeList.size(); i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 4bc93873736..f77823ec569 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -154,7 +154,7 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
     List<WritePlanNode> result = new ArrayList<>();
 
     Map<TRegionReplicaSet, Map<TTimePartitionSlot, List<InsertRowNode>>> 
splitMap = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index bc0adcbc855..cef96317f8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -19,10 +19,14 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.function.IntFunction;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -39,10 +43,13 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.NotImplementedException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -82,6 +89,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
   // proper positions.
   private List<Integer> range;
 
+  // deviceId cache for Table-view insertion
+  private IDeviceID[] deviceIDs;
+
   public InsertTabletNode(PlanNodeId id) {
     super(id);
   }
@@ -104,6 +114,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     this.rowCount = rowCount;
   }
 
+  @TestOnly
   public InsertTabletNode(
       PlanNodeId id,
       PartialPath devicePath,
@@ -115,7 +126,24 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
       BitMap[] bitMaps,
       Object[] columns,
       int rowCount) {
-    super(id, devicePath, isAligned, measurements, dataTypes);
+    this(id, devicePath, isAligned, measurements, dataTypes, 
measurementSchemas, times, bitMaps,
+        columns, rowCount,
+        null);
+  }
+
+  public InsertTabletNode(
+      PlanNodeId id,
+      PartialPath devicePath,
+      boolean isAligned,
+      String[] measurements,
+      TSDataType[] dataTypes,
+      MeasurementSchema[] measurementSchemas,
+      long[] times,
+      BitMap[] bitMaps,
+      Object[] columns,
+      int rowCount,
+      TsTableColumnCategory[] columnCategories) {
+    super(id, devicePath, isAligned, measurements, dataTypes, 
columnCategories);
     this.measurementSchemas = measurementSchemas;
     this.times = times;
     this.bitMaps = bitMaps;
@@ -192,107 +220,160 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   @Override
-  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
-    // only single device in single database
-    List<WritePlanNode> result = new ArrayList<>();
-    if (times.length == 0) {
-      return Collections.emptyList();
-    }
+  public List<WritePlanNode> splitByTreePartition(IAnalysis analysis) {
+    return splitByPartition(analysis, i -> deviceID);
+  }
+
+  public List<WritePlanNode> splitByTablePartition(IAnalysis analysis) {
+    return splitByPartition(analysis, this::getTableDeviceID);
+  }
+
+  private Map<IDeviceID, SplitInfo> collectSplitRanges(IntFunction<IDeviceID> 
rowNumDeviceIdMapper) {
     long upperBoundOfTimePartition = 
TimePartitionUtils.getTimePartitionUpperBound(times[0]);
     TTimePartitionSlot timePartitionSlot = 
TimePartitionUtils.getTimePartitionSlot(times[0]);
     int startLoc = 0; // included
+    IDeviceID currDeviceId = rowNumDeviceIdMapper.apply(0);
+
+    Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap = new HashMap<>();
 
-    List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
-    // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
-    List<Integer> ranges = new ArrayList<>();
     for (int i = 1; i < times.length; i++) { // times are sorted in session 
API.
-      if (times[i] >= upperBoundOfTimePartition) {
+      IDeviceID nextDeviceId = rowNumDeviceIdMapper.apply(i);
+      if (times[i] >= upperBoundOfTimePartition || 
!currDeviceId.equals(nextDeviceId)) {
+        final SplitInfo splitInfo = 
deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+            deviceID1 -> new SplitInfo());
         // a new range.
-        ranges.add(startLoc); // included
-        ranges.add(i); // excluded
-        timePartitionSlots.add(timePartitionSlot);
+        splitInfo.ranges.add(startLoc); // included
+        splitInfo.ranges.add(i); // excluded
+        splitInfo.timePartitionSlots.add(timePartitionSlot);
         // next init
         startLoc = i;
         upperBoundOfTimePartition = 
TimePartitionUtils.getTimePartitionUpperBound(times[i]);
         timePartitionSlot = TimePartitionUtils.getTimePartitionSlot(times[i]);
+        currDeviceId = nextDeviceId;
       }
     }
 
+    SplitInfo splitInfo = deviceIDSplitInfoMap.computeIfAbsent(currDeviceId,
+        deviceID1 -> new SplitInfo());
     // the final range
-    ranges.add(startLoc); // included
-    ranges.add(times.length); // excluded
-    timePartitionSlots.add(timePartitionSlot);
-
-    // data region for each time partition
-    List<TRegionReplicaSet> dataRegionReplicaSets =
-        analysis
-            .getDataPartitionInfo()
-            .getDataRegionReplicaSetForWriting(
-                devicePath.getIDeviceIDAsFullDevice(), timePartitionSlots);
-
-    // collect redirectInfo
-    analysis.addEndPointToRedirectNodeList(
-        dataRegionReplicaSets
-            .get(dataRegionReplicaSets.size() - 1)
-            .getDataNodeLocations()
-            .get(0)
-            .getClientRpcEndPoint());
+    splitInfo.ranges.add(startLoc); // included
+    splitInfo.ranges.add(times.length); // excluded
+    splitInfo.timePartitionSlots.add(timePartitionSlot);
+
+    return deviceIDSplitInfoMap;
+  }
 
+  public  Map<TRegionReplicaSet, List<Integer>> 
splitByReplicaSet(Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap, IAnalysis 
analysis) {
     Map<TRegionReplicaSet, List<Integer>> splitMap = new HashMap<>();
-    for (int i = 0; i < dataRegionReplicaSets.size(); i++) {
-      List<Integer> sub_ranges =
-          splitMap.computeIfAbsent(dataRegionReplicaSets.get(i), x -> new 
ArrayList<>());
-      sub_ranges.add(ranges.get(2 * i));
-      sub_ranges.add(ranges.get(2 * i + 1));
+
+    for (Entry<IDeviceID, SplitInfo> entry : deviceIDSplitInfoMap.entrySet()) {
+      final IDeviceID deviceID = entry.getKey();
+      final SplitInfo splitInfo = entry.getValue();
+      final List<TRegionReplicaSet> replicaSets = analysis
+          .getDataPartitionInfo()
+          .getDataRegionReplicaSetForWriting(
+              deviceID, splitInfo.timePartitionSlots);
+      splitInfo.replicaSets = replicaSets;
+      // collect redirectInfo
+      analysis.addEndPointToRedirectNodeList(
+          replicaSets
+              .get(replicaSets.size() - 1)
+              .getDataNodeLocations()
+              .get(0)
+              .getClientRpcEndPoint());
+      for (int i = 0; i < replicaSets.size(); i++) {
+        List<Integer> sub_ranges =
+            splitMap.computeIfAbsent(replicaSets.get(i), x -> new 
ArrayList<>());
+        sub_ranges.add(splitInfo.ranges.get(2 * i));
+        sub_ranges.add(splitInfo.ranges.get(2 * i + 1));
+      }
     }
+    return splitMap;
+  }
 
-    List<Integer> locs;
-    for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : 
splitMap.entrySet()) {
-      // generate a new times and values
-      locs = entry.getValue();
-      // Avoid using system arraycopy when there is no need to split
-      if (splitMap.size() == 1 && locs.size() == 2) {
-        setRange(locs);
+  private List<WritePlanNode> doSplit(Map<TRegionReplicaSet, List<Integer>> 
splitMap) {
+    List<WritePlanNode> result = new ArrayList<>();
+
+    if (splitMap.size() == 1) {
+      final Entry<TRegionReplicaSet, List<Integer>> entry = 
splitMap.entrySet().iterator().next();
+      if (entry.getValue().size() == 2) {
+        // Avoid using system arraycopy when there is no need to split
+        setRange(entry.getValue());
         setDataRegionReplicaSet(entry.getKey());
         result.add(this);
         return result;
       }
-      for (int i = 0; i < locs.size(); i += 2) {
-        int start = locs.get(i);
-        int end = locs.get(i + 1);
-        int count = end - start;
-        long[] subTimes = new long[count];
-        int destLoc = 0;
-        Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
-        BitMap[] bitMaps = this.bitMaps == null ? null : 
initBitmaps(dataTypes.length, count);
-        System.arraycopy(times, start, subTimes, destLoc, end - start);
-        for (int k = 0; k < values.length; k++) {
-          if (dataTypes[k] != null) {
-            System.arraycopy(columns[k], start, values[k], destLoc, end - 
start);
-          }
-          if (bitMaps != null && this.bitMaps[k] != null) {
-            BitMap.copyOfRange(this.bitMaps[k], start, bitMaps[k], destLoc, 
end - start);
-          }
+    }
+
+    for (Map.Entry<TRegionReplicaSet, List<Integer>> entry : 
splitMap.entrySet()) {
+      result.add(generateOneSplit(entry));
+    }
+    return result;
+  }
+
+  private WritePlanNode generateOneSplit(Map.Entry<TRegionReplicaSet, 
List<Integer>> entry) {
+    List<Integer> locs;
+    // generate a new times and values
+    locs = entry.getValue();
+    int count = 0;
+    for (int i = 0; i < locs.size(); i += 2) {
+      int start = locs.get(i);
+      int end = locs.get(i + 1);
+      count += end - start;
+    }
+
+    long[] subTimes = new long[count];
+    Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
+    BitMap[] newBitMaps = this.bitMaps == null ? null : 
initBitmaps(dataTypes.length, count);
+    InsertTabletNode subNode =
+        new InsertTabletNode(
+            getPlanNodeId(),
+            devicePath,
+            isAligned,
+            measurements,
+            dataTypes,
+            measurementSchemas,
+            subTimes,
+            newBitMaps,
+            values,
+            subTimes.length,
+            columnCategories);
+    int destLoc = 0;
+
+    for (int i = 0; i < locs.size(); i += 2) {
+      int start = locs.get(i);
+      int end = locs.get(i + 1);
+      final int length = end - start;
+
+      System.arraycopy(times, start, subTimes, destLoc, length);
+      for (int k = 0; k < values.length; k++) {
+        if (dataTypes[k] != null) {
+          System.arraycopy(columns[k], start, values[k], destLoc, length);
+        }
+        if (newBitMaps != null && this.bitMaps[k] != null) {
+          BitMap.copyOfRange(this.bitMaps[k], start, newBitMaps[k], destLoc, 
length);
         }
-        InsertTabletNode subNode =
-            new InsertTabletNode(
-                getPlanNodeId(),
-                devicePath,
-                isAligned,
-                measurements,
-                dataTypes,
-                measurementSchemas,
-                subTimes,
-                bitMaps,
-                values,
-                subTimes.length);
-        subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
-        subNode.setRange(locs);
-        subNode.setDataRegionReplicaSet(entry.getKey());
-        result.add(subNode);
       }
+      destLoc += length;
     }
-    return result;
+    subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
+    subNode.setRange(locs);
+    subNode.setDataRegionReplicaSet(entry.getKey());
+    return subNode;
+  }
+
+  public List<WritePlanNode> splitByPartition(IAnalysis analysis,
+      IntFunction<IDeviceID> rowNumDeviceIdMapper) {
+    // only single device in single database
+    if (times.length == 0) {
+      return Collections.emptyList();
+    }
+
+    final Map<IDeviceID, SplitInfo> deviceIDSplitInfoMap = 
collectSplitRanges(rowNumDeviceIdMapper);
+    final Map<TRegionReplicaSet, List<Integer>> splitMap = splitByReplicaSet(
+        deviceIDSplitInfoMap, analysis);
+
+    return doSplit(splitMap);
   }
 
   @TestOnly
@@ -1103,4 +1184,28 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     }
     return new TimeValuePair(times[lastIdx], value);
   }
+
+  public IDeviceID getTableDeviceID(int rowIdx) {
+    if (deviceIDs == null) {
+      deviceIDs = new IDeviceID[rowCount];
+    }
+    if (deviceIDs[rowIdx] == null) {
+      String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+      deviceIdSegments[0] = this.devicePath.getFullPath();
+      for (int i = 0; i < idColumnIndices.size(); i++) {
+        final Integer columnIndex = idColumnIndices.get(i);
+        deviceIdSegments[i + 1] = ((Binary[]) 
columns[columnIndex])[rowIdx].toString();
+      }
+      deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+    }
+
+    return deviceIDs[rowIdx];
+  }
+
+  private class SplitInfo {
+    // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
+    private List<Integer> ranges = new ArrayList<>();
+    private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
+    private List<TRegionReplicaSet> replicaSets;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index a7537c4e680..37ccd002762 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -40,6 +40,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedStatement;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
@@ -106,6 +108,9 @@ public class LogicalPlanner {
     if (statement instanceof Explain) {
       return createRelationPlan(analysis, (Query) ((Explain) 
statement).getStatement());
     }
+    if (statement instanceof WrappedStatement) {
+      return createRelationPlan(analysis, ((WrappedStatement) statement));
+    }
     throw new IllegalStateException(
         "Unsupported statement type: " + statement.getClass().getSimpleName());
   }
@@ -144,6 +149,10 @@ public class LogicalPlanner {
     return outputNode;
   }
 
+  private RelationPlan createRelationPlan(Analysis analysis, WrappedStatement 
statement) {
+    return getRelationPlanner(analysis).process(statement, null);
+  }
+
   private RelationPlan createRelationPlan(Analysis analysis, Query query) {
     return getRelationPlanner(analysis).process(query, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index c0386bcbbb2..6603a8cab08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -13,9 +13,11 @@
  */
 package org.apache.iotdb.db.queryengine.plan.relational.planner;
 
+import java.util.Collections;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
@@ -25,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
@@ -42,6 +45,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 
 import static java.util.Objects.requireNonNull;
 
@@ -175,4 +179,24 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
   protected RelationPlan visitExcept(Except node, Void context) {
     throw new IllegalStateException("Except is not supported in current 
version.");
   }
+
+  @Override
+  protected RelationPlan visitInsertTablet(InsertTablet node, Void context) {
+    final InsertTabletStatement insertTabletStatement = 
node.getInnerTreeStatement();
+    InsertTabletNode insertNode =
+        new InsertTabletNode(
+            idAllocator.genPlanNodeId(),
+            insertTabletStatement.getDevicePath(),
+            insertTabletStatement.isAligned(),
+            insertTabletStatement.getMeasurements(),
+            insertTabletStatement.getDataTypes(),
+            insertTabletStatement.getMeasurementSchemas(),
+            insertTabletStatement.getTimes(),
+            insertTabletStatement.getBitMaps(),
+            insertTabletStatement.getColumns(),
+            insertTabletStatement.getRowCount(),
+            insertTabletStatement.getColumnCategories());
+    
insertNode.setFailedMeasurementNumber(insertTabletStatement.getFailedMeasurementNumber());
+    return new RelationPlan(insertNode, analysis.getScope(node), 
Collections.emptyList());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index ccfd5b91cd0..5efc6f33fab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -38,6 +39,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
 
 public class TableDistributionPlanner {
+
   private final Analysis analysis;
   private final LogicalQueryPlan logicalQueryPlan;
   private final MPPQueryContext mppQueryContext;
@@ -79,7 +81,9 @@ public class TableDistributionPlanner {
     List<FragmentInstance> fragmentInstances =
         mppQueryContext.getQueryType() == QueryType.READ
             ? new TableModelQueryFragmentPlanner(subPlan, analysis, 
mppQueryContext).plan()
-            : new WriteFragmentParallelPlanner(subPlan, analysis, 
mppQueryContext).parallelPlan();
+            :
+                new WriteFragmentParallelPlanner(subPlan, analysis, 
mppQueryContext,
+                    WritePlanNode::splitByTablePartition).parallelPlan();
 
     // Only execute this step for READ operation
     if (mppQueryContext.getQueryType() == QueryType.READ) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index 6aa4568a6ff..f81a26dc594 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 6a48221189e..b7964aaf333 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -248,10 +248,12 @@ public abstract class InsertBaseStatement extends 
Statement {
 
   public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
     this.columnCategories = columnCategories;
-    idColumnIndices = new ArrayList<>();
-    for (int i = 0; i < columnCategories.length; i++) {
-      if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
-        idColumnIndices.add(i);
+    if (columnCategories != null) {
+      idColumnIndices = new ArrayList<>();
+      for (int i = 0; i < columnCategories.length; i++) {
+        if (columnCategories[i].equals(TsTableColumnCategory.ID)) {
+          idColumnIndices.add(i);
+        }
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
index 849d45c61c8..1896d854fc6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java
@@ -52,7 +52,7 @@ public class LoadTsFileNodeTest {
     } catch (NotImplementedException ignored) {
     }
     try {
-      node.splitByPartition(new Analysis());
+      node.splitByTreePartition(new Analysis());
       Assert.fail();
     } catch (NotImplementedException ignored) {
     }
@@ -75,7 +75,7 @@ public class LoadTsFileNodeTest {
     } catch (NotImplementedException ignored) {
     }
     try {
-      node.splitByPartition(new Analysis());
+      node.splitByTreePartition(new Analysis());
       Assert.fail();
     } catch (NotImplementedException ignored) {
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index 8b70093595f..faefd94b638 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -211,7 +211,7 @@ public class WritePlanNodeSplitTest {
     Analysis analysis = new Analysis();
     analysis.setDataPartitionInfo(dataPartition);
 
-    List<WritePlanNode> insertTabletNodeList = 
insertTabletNode.splitByPartition(analysis);
+    List<WritePlanNode> insertTabletNodeList = 
insertTabletNode.splitByTreePartition(analysis);
 
     Assert.assertEquals(6, insertTabletNodeList.size());
     for (WritePlanNode insertNode : insertTabletNodeList) {
@@ -237,7 +237,7 @@ public class WritePlanNodeSplitTest {
     analysis = new Analysis();
     analysis.setDataPartitionInfo(dataPartition);
 
-    insertTabletNodeList = insertTabletNode.splitByPartition(analysis);
+    insertTabletNodeList = insertTabletNode.splitByTreePartition(analysis);
 
     Assert.assertEquals(5, insertTabletNodeList.size());
     for (WritePlanNode insertNode : insertTabletNodeList) {
@@ -281,7 +281,7 @@ public class WritePlanNodeSplitTest {
     Analysis analysis = new Analysis();
     analysis.setDataPartitionInfo(dataPartition);
 
-    List<WritePlanNode> insertTabletNodeList = 
insertMultiTabletsNode.splitByPartition(analysis);
+    List<WritePlanNode> insertTabletNodeList = 
insertMultiTabletsNode.splitByTreePartition(analysis);
 
     Assert.assertEquals(6, insertTabletNodeList.size());
   }
@@ -315,7 +315,7 @@ public class WritePlanNodeSplitTest {
     Analysis analysis = new Analysis();
     analysis.setDataPartitionInfo(dataPartition);
 
-    List<WritePlanNode> insertRowsNodeList = 
insertRowsNode.splitByPartition(analysis);
+    List<WritePlanNode> insertRowsNodeList = 
insertRowsNode.splitByTreePartition(analysis);
 
     Assert.assertEquals(8, insertRowsNodeList.size());
   }
@@ -354,7 +354,7 @@ public class WritePlanNodeSplitTest {
     analysis.setDataPartitionInfo(dataPartition);
 
     List<WritePlanNode> insertRowsOfOneDeviceNodeList =
-        insertRowsOfOneDeviceNode.splitByPartition(analysis);
+        insertRowsOfOneDeviceNode.splitByTreePartition(analysis);
 
     Assert.assertEquals(5, insertRowsOfOneDeviceNodeList.size());
     for (WritePlanNode insertNode : insertRowsOfOneDeviceNodeList) {
@@ -390,7 +390,7 @@ public class WritePlanNodeSplitTest {
     analysis = new Analysis();
     analysis.setDataPartitionInfo(dataPartition);
 
-    insertRowsOfOneDeviceNodeList = 
insertRowsOfOneDeviceNode.splitByPartition(analysis);
+    insertRowsOfOneDeviceNodeList = 
insertRowsOfOneDeviceNode.splitByTreePartition(analysis);
 
     Assert.assertEquals(5, insertRowsOfOneDeviceNodeList.size());
     for (WritePlanNode insertNode : insertRowsOfOneDeviceNodeList) {

Reply via email to