This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new e6ce64880b9 add distributed plan test
e6ce64880b9 is described below

commit e6ce64880b90fb030a373cfb1d800c1fc264b550
Author: DESKTOP-L0L5GPJ\jt <[email protected]>
AuthorDate: Fri Jun 28 11:58:43 2024 +0800

    add distributed plan test
---
 .../db/queryengine/plan/analyze/Analysis.java      | 12 +++
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  3 +
 .../db/queryengine/plan/analyze/Analyzer.java      |  1 +
 .../db/queryengine/plan/analyze/IAnalysis.java     |  4 +
 .../config/executor/ClusterConfigTaskExecutor.java |  1 +
 .../distribution/WriteFragmentParallelPlanner.java |  2 +-
 .../planner/plan/node/write/InsertTabletNode.java  | 85 ++++++++++++++++------
 .../plan/relational/analyzer/Analysis.java         | 20 ++++-
 .../plan/relational/analyzer/Analyzer.java         |  1 +
 .../relational/analyzer/ExpressionAnalyzer.java    |  1 +
 .../plan/relational/planner/LogicalPlanner.java    |  4 +
 .../plan/relational/analyzer/AnalyzerTest.java     | 38 ++++++----
 .../plan/statement/StatementTestUtils.java         |  4 +-
 .../iotdb/commons/partition/DataPartition.java     |  5 +-
 14 files changed, 135 insertions(+), 46 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index e7fdd26fde8..294caa6b99f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -82,6 +82,8 @@ public class Analysis implements IAnalysis {
   // Common Analysis
   
/////////////////////////////////////////////////////////////////////////////////////////////////
 
+  private String databaseName;
+
   // Statement
   private Statement statement;
 
@@ -988,4 +990,14 @@ public class Analysis implements IAnalysis {
   public boolean fromWhere(FilterNode filterNode) {
     return fromWhereFilterNodes.contains(NodeRef.of(filterNode));
   }
+
+  @Override
+  public void setDatabaseName(String database) {
+    this.databaseName = database;
+  }
+
+  @Override
+  public String getDatabaseName() {
+    return databaseName;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 365020f52a7..e0e68b99368 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -247,6 +247,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     Analysis analysis = visitQuery(explainStatement.getQueryStatement(), 
context);
     analysis.setRealStatement(explainStatement);
     analysis.setFinishQueryAfterAnalyze(true);
+    analysis.setDatabaseName(context.getSession().getDatabaseName().get());
     return analysis;
   }
 
@@ -261,6 +262,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             Collections.singletonList(
                 new ColumnHeader(ColumnHeaderConstant.EXPLAIN_ANALYZE, 
TSDataType.TEXT, null)),
             true));
+    analysis.setDatabaseName(context.getSession().getDatabaseName().get());
     return analysis;
   }
 
@@ -2790,6 +2792,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
   public Analysis visitPipeEnrichedStatement(
       PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) {
     Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, 
context);
+    analysis.setDatabaseName(context.getSession().getDatabaseName().get());
 
     // statement may be changed because of logical view
     pipeEnrichedStatement.setInnerStatement(analysis.getTreeStatement());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
index f7ca04f7d05..ed3b686b977 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
@@ -45,6 +45,7 @@ public class Analyzer {
     long startTime = System.nanoTime();
     Analysis analysis =
         new AnalyzeVisitor(partitionFetcher, schemaFetcher).process(statement, 
context);
+    analysis.setDatabaseName(context.getSession().getDatabaseName().get());
 
     if (statement.isQuery()) {
       QueryPlanCostMetricSet.getInstance().recordPlanCost(ANALYZER, 
System.nanoTime() - startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index ce9f043c9bb..05c3638a95d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -71,4 +71,8 @@ public interface IAnalysis {
   void addEndPointToRedirectNodeList(TEndPoint endPoint);
 
   TimePredicate getCovertedTimePredicate();
+
+  void setDatabaseName(String databaseName);
+
+  String getDatabaseName();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index ef900fddafe..aaa3eb01bd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2272,6 +2272,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     
createLogicalViewStatement.setQueryStatement(alterLogicalViewStatement.getQueryStatement());
 
     final Analysis analysis = Analyzer.analyze(createLogicalViewStatement, 
context);
+    analysis.setDatabaseName(context.getSession().getDatabaseName().get());
     if (analysis.isFailed()) {
       future.setException(
           new IoTDBException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 975e0c1736e..3f44175f404 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -64,7 +64,7 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     PlanFragment fragment = subPlan.getPlanFragment();
     PlanNode node = fragment.getPlanNodeTree();
     if (!(node instanceof WritePlanNode)) {
-      throw new IllegalArgumentException("PlanNode should be IWritePlanNode in 
WRITE operation");
+      throw new IllegalArgumentException("PlanNode should be IWritePlanNode in 
WRITE operation:" + node.getClass());
     }
     List<WritePlanNode> splits = nodeSplitter.apply(((WritePlanNode) node), 
analysis);
     List<FragmentInstance> ret = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 87d3bac372e..46289ce6d0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -176,7 +177,8 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+  }
 
   @Override
   public PlanNodeType getType() {
@@ -258,7 +260,8 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
       final List<TRegionReplicaSet> replicaSets =
           analysis
               .getDataPartitionInfo()
-              .getDataRegionReplicaSetForWriting(deviceID, 
splitInfo.timePartitionSlots);
+              .getDataRegionReplicaSetForWriting(deviceID, 
splitInfo.timePartitionSlots,
+                  analysis.getDatabaseName());
       splitInfo.replicaSets = replicaSets;
       // collect redirectInfo
       analysis.addEndPointToRedirectNodeList(
@@ -374,9 +377,11 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
         switch (dataTypes[i]) {
           case TEXT:
           case BLOB:
-          case STRING:
             values[i] = new Binary[rowSize];
             break;
+          case STRING:
+            values[i] = new String[rowSize];
+            break;
           case FLOAT:
             values[i] = new float[rowSize];
             break;
@@ -454,7 +459,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     ReadWriteIOUtils.write((byte) (isAligned ? 1 : 0), stream);
   }
 
-  /** Serialize measurements or measurement schemas, ignoring failed time 
series */
+  /**
+   * Serialize measurements or measurement schemas, ignoring failed time series
+   */
   private void writeMeasurementsOrSchemas(ByteBuffer buffer) {
     ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), 
buffer);
     ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), 
buffer);
@@ -473,7 +480,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize measurements or measurement schemas, ignoring failed time 
series */
+  /**
+   * Serialize measurements or measurement schemas, ignoring failed time series
+   */
   private void writeMeasurementsOrSchemas(DataOutputStream stream) throws 
IOException {
     ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), 
stream);
     ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), 
stream);
@@ -492,7 +501,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize data types, ignoring failed time series */
+  /**
+   * Serialize data types, ignoring failed time series
+   */
   private void writeDataTypes(ByteBuffer buffer) {
     for (int i = 0; i < dataTypes.length; i++) {
       // ignore failed partial insert
@@ -503,7 +514,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize data types, ignoring failed time series */
+  /**
+   * Serialize data types, ignoring failed time series
+   */
   private void writeDataTypes(DataOutputStream stream) throws IOException {
     for (int i = 0; i < dataTypes.length; i++) {
       // ignore failed partial insert
@@ -528,7 +541,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize bitmaps, ignoring failed time series */
+  /**
+   * Serialize bitmaps, ignoring failed time series
+   */
   private void writeBitMaps(ByteBuffer buffer) {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer);
     if (bitMaps != null) {
@@ -548,7 +563,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize bitmaps, ignoring failed time series */
+  /**
+   * Serialize bitmaps, ignoring failed time series
+   */
   private void writeBitMaps(DataOutputStream stream) throws IOException {
     ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream);
     if (bitMaps != null) {
@@ -568,7 +585,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize values, ignoring failed time series */
+  /**
+   * Serialize values, ignoring failed time series
+   */
   private void writeValues(ByteBuffer buffer) {
     for (int i = 0; i < columns.length; i++) {
       // ignore failed partial insert
@@ -579,7 +598,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize values, ignoring failed time series */
+  /**
+   * Serialize values, ignoring failed time series
+   */
   private void writeValues(DataOutputStream stream) throws IOException {
     for (int i = 0; i < columns.length; i++) {
       // ignore failed partial insert
@@ -737,13 +758,18 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   // region serialize & deserialize methods for WAL
-  /** Serialized size for wal */
+
+  /**
+   * Serialized size for wal
+   */
   @Override
   public int serializedSize() {
     return serializedSize(0, rowCount);
   }
 
-  /** Serialized size for wal */
+  /**
+   * Serialized size for wal
+   */
   public int serializedSize(int start, int end) {
     return Short.BYTES + subSerializeSize(start, end);
   }
@@ -846,7 +872,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     buffer.put((byte) (isAligned ? 1 : 0));
   }
 
-  /** Serialize measurement schemas, ignoring failed time series */
+  /**
+   * Serialize measurement schemas, ignoring failed time series
+   */
   private void writeMeasurementSchemas(IWALByteBufferView buffer) {
     buffer.putInt(measurements.length - getFailedMeasurementNumber());
     serializeMeasurementSchemasToWAL(buffer);
@@ -859,7 +887,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize bitmaps, ignoring failed time series */
+  /**
+   * Serialize bitmaps, ignoring failed time series
+   */
   private void writeBitMaps(IWALByteBufferView buffer, int start, int end) {
     buffer.put(BytesUtils.boolToByte(bitMaps != null));
     if (bitMaps != null) {
@@ -882,7 +912,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Serialize values, ignoring failed time series */
+  /**
+   * Serialize values, ignoring failed time series
+   */
   private void writeValues(IWALByteBufferView buffer, int start, int end) {
     for (int i = 0; i < columns.length; i++) {
       // ignore failed partial insert
@@ -942,7 +974,9 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
     }
   }
 
-  /** Deserialize from wal */
+  /**
+   * Deserialize from wal
+   */
   public static InsertTabletNode deserializeFromWAL(DataInputStream stream) 
throws IOException {
     // we do not store plan node id in wal entry
     InsertTabletNode insertNode = new InsertTabletNode(new PlanNodeId(""));
@@ -1034,9 +1068,15 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     InsertTabletNode that = (InsertTabletNode) o;
     return rowCount == that.rowCount
         && Arrays.equals(times, that.times)
@@ -1170,6 +1210,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   private static class PartitionSplitInfo {
+
     // for each List in split, they are range1.start, range1.end, 
range2.start, range2.end, ...
     private List<Integer> ranges = new ArrayList<>();
     private List<TTimePartitionSlot> timePartitionSlots = new ArrayList<>();
@@ -1180,7 +1221,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
    * Split the tablet of the given range according to Table deviceID.
    *
    * @param start inclusive
-   * @param end exclusive
+   * @param end   exclusive
    * @return each the number in the pair is the end offset of a device
    */
   public List<Pair<IDeviceID, Integer>> splitByDevice(int start, int end) {
@@ -1188,7 +1229,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   /**
-   * @param results insertion result of each row
+   * @param results      insertion result of each row
    * @param rowTTLGetter the ttl associated with each row
    * @return the position of the first alive row
    * @throws OutOfTTLException if all rows have expired the TTL
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 525bd8251b0..6bb868ec0ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -91,6 +91,9 @@ import static java.util.Objects.requireNonNull;
 
 public class Analysis implements IAnalysis {
 
+  private String databaseName;
+  private List<TEndPoint> redirectNodeList;
+
   @Nullable private Statement root;
 
   private final Map<NodeRef<Parameter>, Expression> parameters;
@@ -687,12 +690,15 @@ public class Analysis implements IAnalysis {
 
   @Override
   public void setRedirectNodeList(List<TEndPoint> redirectNodeList) {
-    throw new UnsupportedOperationException();
+    this.redirectNodeList = redirectNodeList;
   }
 
   @Override
   public void addEndPointToRedirectNodeList(TEndPoint endPoint) {
-    throw new UnsupportedOperationException();
+    if (redirectNodeList == null) {
+      redirectNodeList = new ArrayList<>();
+    }
+    redirectNodeList.add(endPoint);
   }
 
   @Override
@@ -950,4 +956,14 @@ public class Analysis implements IAnalysis {
       return unmodifiableList(quantifiedComparisonSubqueries);
     }
   }
+
+  @Override
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  @Override
+  public String getDatabaseName() {
+    return databaseName;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
index 91f625a7fe9..88f235902e0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
@@ -57,6 +57,7 @@ public class Analyzer {
 
   public Analysis analyze(Statement statement) {
     Analysis analysis = new Analysis(statement, parameterLookup);
+    analysis.setDatabaseName(session.getDatabaseName().get());
     StatementAnalyzer analyzer =
         statementAnalyzerFactory.createStatementAnalyzer(
             analysis, session, warningCollector, CorrelationSupport.ALLOWED);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
index f5969d4e40d..96df7aee5e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java
@@ -1328,6 +1328,7 @@ public class ExpressionAnalyzer {
       Map<NodeRef<Parameter>, Expression> parameters,
       WarningCollector warningCollector) {
     Analysis analysis = new Analysis(null, parameters);
+    analysis.setDatabaseName(session.getDatabaseName().get());
     ExpressionAnalyzer analyzer =
         new ExpressionAnalyzer(
             metadata,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 6d582fdcc9f..f1dcea2f310 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
@@ -122,6 +123,9 @@ public class LogicalPlanner {
   }
 
   private PlanNode createOutputPlan(RelationPlan plan, Analysis analysis) {
+    if (plan.getRoot() instanceof WritePlanNode) {
+      return plan.getRoot();
+    }
     ImmutableList.Builder<Symbol> outputs = ImmutableList.builder();
     ImmutableList.Builder<String> names = ImmutableList.builder();
     List<ColumnHeader> columnHeaders = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 2a6f64e6bb6..3c6c642ca44 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -537,6 +538,10 @@ public class AnalyzerTest {
       public DataPartition getOrCreateDataPartition(
           List<DataPartitionQueryParam> dataPartitionQueryParams, String 
userName) {
         int seriesSlotNum = 1000;
+        String partitionExecutorName = 
"org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
+        SeriesPartitionExecutor seriesPartitionExecutor = 
SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            partitionExecutorName, seriesSlotNum);
+
         Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
             dataPartitionMap = new HashMap<>();
         assertEquals(3, dataPartitionQueryParams.size());
@@ -548,34 +553,31 @@ public class AnalyzerTest {
           String tableName = 
dataPartitionQueryParam.getDeviceID().getTableName();
           assertEquals(StatementTestUtils.tableName(), tableName);
 
-          int partitionSlot = Math.abs(tableName.hashCode()) % seriesSlotNum;
-          TSeriesPartitionSlot seriesPartitionSlot = new 
TSeriesPartitionSlot(partitionSlot);
+          TSeriesPartitionSlot partitionSlot = 
seriesPartitionExecutor.getSeriesPartitionSlot(
+              dataPartitionQueryParam.getDeviceID());
           for (TTimePartitionSlot tTimePartitionSlot : 
dataPartitionQueryParam.getTimePartitionSlotList()) {
             dataPartitionMap.computeIfAbsent(databaseName, d -> new 
HashMap<>())
-                .computeIfAbsent(seriesPartitionSlot, slot -> new HashMap<>())
+                .computeIfAbsent(partitionSlot, slot -> new HashMap<>())
                 .computeIfAbsent(tTimePartitionSlot, slot -> new ArrayList<>())
                 .add(new TRegionReplicaSet(new TConsensusGroupId(
-                    TConsensusGroupType.DataRegion, partitionSlot), 
Collections.singletonList(
-                    new TDataNodeLocation(partitionSlot, null, null, null, 
null, null))));
+                    TConsensusGroupType.DataRegion, partitionSlot.slotId), 
Collections.singletonList(
+                    new TDataNodeLocation(partitionSlot.slotId, null, null, 
null, null, null))));
           }
         }
-        return new DataPartition(dataPartitionMap, "dummy", seriesSlotNum);
+        return new DataPartition(dataPartitionMap, partitionExecutorName, 
seriesSlotNum);
       }
     };
 
     InsertTabletStatement insertTabletStatement = 
StatementTestUtils.genInsertTabletStatement(true);
     context = new MPPQueryContext("", queryId, sessionInfo, null, null);
     actualAnalysis = 
analyzeStatement(insertTabletStatement.toRelationalStatement(context),
-        mockMetadata, new SqlParser());
+        mockMetadata, new SqlParser(), sessionInfo);
     logicalQueryPlan =
         new LogicalPlanner(
             context, mockMetadata, sessionInfo, getFakePartitionFetcher(), 
WarningCollector.NOOP)
             .plan(actualAnalysis);
 
-    OutputNode node = (OutputNode) logicalQueryPlan.getRootNode();
-    assertEquals(1, node.getChildren().size());
-    RelationalInsertTabletNode insertTabletNode = (RelationalInsertTabletNode) 
node.getChildren()
-        .get(0);
+    RelationalInsertTabletNode insertTabletNode = (RelationalInsertTabletNode) 
logicalQueryPlan.getRootNode();
 
     assertEquals(insertTabletNode.getTableName(), 
StatementTestUtils.tableName());
     assertEquals(3, insertTabletNode.getRowCount());
@@ -586,20 +588,24 @@ public class AnalyzerTest {
     }
     assertEquals(columns, insertTabletNode.getColumns());
     assertArrayEquals(StatementTestUtils.genTimestamps(), 
insertTabletNode.getTimes());
+
+    distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getInstances().size());
   }
 
   public static Analysis analyzeSQL(String sql, Metadata metadata) {
     SqlParser sqlParser = new SqlParser();
     Statement statement = sqlParser.createStatement(sql, 
ZoneId.systemDefault());
-    return analyzeStatement(statement, metadata, sqlParser);
+    SessionInfo session =
+        new SessionInfo(
+            0, "test", ZoneId.systemDefault(), "testdb", 
IClientSession.SqlDialect.TABLE);
+    return analyzeStatement(statement, metadata, sqlParser, session);
   }
 
   public static Analysis analyzeStatement(Statement statement, Metadata 
metadata,
-      SqlParser sqlParser) {
+      SqlParser sqlParser, SessionInfo session) {
     try {
-      SessionInfo session =
-          new SessionInfo(
-              0, "test", ZoneId.systemDefault(), "testdb", 
IClientSession.SqlDialect.TABLE);
       StatementAnalyzerFactory statementAnalyzerFactory =
           new StatementAnalyzerFactory(metadata, sqlParser, nopAccessControl);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
index 12abb28878e..008144db4d9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/StatementTestUtils.java
@@ -47,7 +47,7 @@ public class StatementTestUtils {
   }
 
   public static TSDataType[] genDataTypes() {
-    return new TSDataType[]{TSDataType.TEXT, TSDataType.TEXT, 
TSDataType.DOUBLE};
+    return new TSDataType[]{TSDataType.STRING, TSDataType.STRING, 
TSDataType.DOUBLE};
   }
 
   public static TsTableColumnCategory[] genColumnCategories() {
@@ -78,7 +78,7 @@ public class StatementTestUtils {
     return new Object[]{
         new String[]{"a", "b", "c"},
         new String[]{"x", "y", "z"},
-        new Double[]{1.0, 2.0, 3.0}
+        new double[]{1.0, 2.0, 3.0}
     };
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 926dca39697..a84692aa8e5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -155,17 +155,16 @@ public class DataPartition extends Partition {
   }
 
   public List<TRegionReplicaSet> getDataRegionReplicaSetForWriting(
-      IDeviceID deviceID, List<TTimePartitionSlot> timePartitionSlotList) {
+      IDeviceID deviceID, List<TTimePartitionSlot> timePartitionSlotList, 
String databaseName) {
     // A list of data region replica sets will store data in a same time 
partition.
     // We will insert data to the last set in the list.
     // TODO return the latest dataRegionReplicaSet for each time partition
-    String storageGroup = getStorageGroupByDevice(deviceID);
     TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceID);
     // IMPORTANT TODO: (xingtanzjr) need to handle the situation for write 
operation that there are
     // more than 1 Regions for one timeSlot
     List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>();
     Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
-        dataBasePartitionMap = dataPartitionMap.get(storageGroup);
+        dataBasePartitionMap = dataPartitionMap.get(databaseName);
     Map<TTimePartitionSlot, List<TRegionReplicaSet>> slotReplicaSetMap =
         dataBasePartitionMap.get(seriesPartitionSlot);
     for (TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {

Reply via email to