This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 960952e0a7d [IOTDB-6201] Fix possible query timeout error in 
TimeJoinNode and possible query cycle in last query.
960952e0a7d is described below

commit 960952e0a7d1eaf8d589fcc394bbc997ce0f12ba
Author: Beyyes <[email protected]>
AuthorDate: Fri Oct 20 09:29:51 2023 +0800

    [IOTDB-6201] Fix possible query timeout error in TimeJoinNode and possible 
query cycle in last query.
---
 .../db/queryengine/common/MPPQueryContext.java     |  13 +
 .../distribution/DistributionPlanContext.java      |   6 +-
 .../planner/distribution/ExchangeNodeAdder.java    | 103 ++++---
 .../planner/distribution/NodeGroupContext.java     |   8 +-
 .../plan/planner/distribution/SourceRewriter.java  |  96 ++++---
 .../distribution/DistributionPlannerCycleTest.java |  81 ++++++
 .../queryengine/plan/plan/distribution/Util2.java  | 309 +++++++++++++++++++++
 7 files changed, 525 insertions(+), 91 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 863b7acdfe0..9791b1d4423 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 
@@ -42,6 +43,10 @@ public class MPPQueryContext {
   private TEndPoint localInternalEndpoint;
   private ResultNodeContext resultNodeContext;
 
+  // Main FragmentInstance, the other FragmentInstance should push data result 
to this
+  // FragmentInstance
+  private TRegionReplicaSet mainFragmentLocatedRegion;
+
   // When some DataNode cannot be connected, its endPoint will be put
   // in this list. And the following retry will avoid planning fragment
   // onto this node.
@@ -129,6 +134,14 @@ public class MPPQueryContext {
     return endPointBlackList;
   }
 
+  public TRegionReplicaSet getMainFragmentLocatedRegion() {
+    return this.mainFragmentLocatedRegion;
+  }
+
+  public void setMainFragmentLocatedRegion(TRegionReplicaSet region) {
+    this.mainFragmentLocatedRegion = region;
+  }
+
   public TypeProvider getTypeProvider() {
     return typeProvider;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index ce6b25c26dc..7c1e963d526 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -52,7 +52,11 @@ public class DistributionPlanContext {
     return this;
   }
 
-  protected void setForceAddParent() {
+  public boolean isForceAddParent() {
+    return this.forceAddParent;
+  }
+
+  public void setForceAddParent() {
     this.forceAddParent = true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 6b81b6ba0db..d22958a5a1f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -331,47 +331,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     // optimize `order by time limit N align by device` query,
     // to ensure that the number of ExchangeNode equals to DataRegionNum but 
not equals to DeviceNum
     if (node instanceof TopKNode) {
-      TopKNode rootNode = (TopKNode) node;
-      Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
-      for (PlanNode child : visitedChildren) {
-        if (child instanceof SingleDeviceViewNode) {
-          ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
-        }
-        TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
-        regionTopKNodeMap
-            .computeIfAbsent(
-                region,
-                k -> {
-                  TopKNode childTopKNode =
-                      new TopKNode(
-                          context.queryContext.getQueryId().genPlanNodeId(),
-                          rootNode.getTopValue(),
-                          rootNode.getMergeOrderParameter(),
-                          rootNode.getOutputColumnNames());
-                  context.putNodeDistribution(
-                      childTopKNode.getPlanNodeId(),
-                      new 
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
-                  return childTopKNode;
-                })
-            .addChild(child);
-      }
-
-      for (Map.Entry<TRegionReplicaSet, TopKNode> entry : 
regionTopKNodeMap.entrySet()) {
-        TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
-        TopKNode topKNode = entry.getValue();
-
-        if (!dataRegion.equals(topKNodeLocatedRegion)) {
-          ExchangeNode exchangeNode =
-              new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-          exchangeNode.setChild(topKNode);
-          exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
-          context.hasExchangeNode = true;
-          newNode.addChild(exchangeNode);
-        } else {
-          newNode.addChild(topKNode);
-        }
-      }
-      return newNode;
+      return processTopNode(node, visitedChildren, context, newNode, 
dataRegion);
     }
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
@@ -412,6 +372,55 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  private PlanNode processTopNode(
+      MultiChildProcessNode node,
+      List<PlanNode> visitedChildren,
+      NodeGroupContext context,
+      MultiChildProcessNode newNode,
+      TRegionReplicaSet dataRegion) {
+    TopKNode rootNode = (TopKNode) node;
+    Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
+    for (PlanNode child : visitedChildren) {
+      if (child instanceof SingleDeviceViewNode) {
+        ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
+      }
+      TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
+      regionTopKNodeMap
+          .computeIfAbsent(
+              region,
+              k -> {
+                TopKNode childTopKNode =
+                    new TopKNode(
+                        context.queryContext.getQueryId().genPlanNodeId(),
+                        rootNode.getTopValue(),
+                        rootNode.getMergeOrderParameter(),
+                        rootNode.getOutputColumnNames());
+                context.putNodeDistribution(
+                    childTopKNode.getPlanNodeId(),
+                    new 
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
+                return childTopKNode;
+              })
+          .addChild(child);
+    }
+
+    for (Map.Entry<TRegionReplicaSet, TopKNode> entry : 
regionTopKNodeMap.entrySet()) {
+      TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
+      TopKNode topKNode = entry.getValue();
+
+      if (!dataRegion.equals(topKNodeLocatedRegion)) {
+        ExchangeNode exchangeNode =
+            new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+        exchangeNode.setChild(topKNode);
+        exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
+        context.hasExchangeNode = true;
+        newNode.addChild(exchangeNode);
+      } else {
+        newNode.addChild(topKNode);
+      }
+    }
+    return newNode;
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -455,22 +464,28 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
 
     // Step 2: return the RegionReplicaSet with max node count
     long maxCount = -1;
-    TRegionReplicaSet result = null;
+    TRegionReplicaSet result = DataPartition.NOT_ASSIGNED;
     for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) {
       TRegionReplicaSet region = entry.getKey();
-      long planNodeCount = entry.getValue();
       if (DataPartition.NOT_ASSIGNED.equals(region)) {
         continue;
       }
+      if (region.equals(context.queryContext.getMainFragmentLocatedRegion())) {
+        return context.queryContext.getMainFragmentLocatedRegion();
+      }
       if (region.equals(context.getMostlyUsedDataRegion())) {
         return region;
       }
+      long planNodeCount = entry.getValue();
       if (planNodeCount > maxCount) {
         maxCount = planNodeCount;
         result = region;
+      } else if (planNodeCount == maxCount
+          && region.getRegionId().getId() < result.getRegionId().getId()) {
+        result = region;
       }
     }
-    return result == null ? context.getMostlyUsedDataRegion() : result;
+    return result;
   }
 
   private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 9ea36e7994c..2cdd071c931 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -54,12 +54,12 @@ public class NodeGroupContext {
   }
 
   private TRegionReplicaSet getMostlyUsedDataRegion(PlanNode root) {
-    Map<TRegionReplicaSet, Long> regionCount = new HashMap<>();
-    countRegionOfSourceNodes(root, regionCount);
-    if (regionCount.isEmpty()) {
+    Map<TRegionReplicaSet, Long> regionCountMap = new HashMap<>();
+    countRegionOfSourceNodes(root, regionCountMap);
+    if (regionCountMap.isEmpty()) {
       return DataPartition.NOT_ASSIGNED;
     }
-    return Collections.max(regionCount.entrySet(), 
Map.Entry.comparingByValue()).getKey();
+    return Collections.max(regionCountMap.entrySet(), 
Map.Entry.comparingByValue()).getKey();
   }
 
   private void countRegionOfSourceNodes(PlanNode root, Map<TRegionReplicaSet, 
Long> result) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index f19d036822f..02e160864dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -622,7 +622,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     // For last query, we need to keep every FI's root node is 
LastQueryMergeNode. So we
     // force every region group have a parent node even if there is only 1 
child for it.
     context.setForceAddParent();
-    PlanNode root = processRawMultiChildNode(node, context, true);
+    PlanNode root = processRawMultiChildNode(node, context, false);
     if (context.queryMultiRegion) {
       PlanNode newRoot = genLastQueryRootNode(node, context);
       // add sort op for each if we add LastQueryMergeNode as root
@@ -694,53 +694,23 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     if (containsAggregationSource(node)) {
       return planAggregationWithTimeJoin(node, context);
     }
-    return Collections.singletonList(processRawMultiChildNode(node, context, 
false));
+    return Collections.singletonList(processRawMultiChildNode(node, context, 
true));
   }
 
+  // Only `visitTimeJoin` and `visitLastQuery` invoke this method
   private PlanNode processRawMultiChildNode(
-      MultiChildProcessNode node, DistributionPlanContext context, boolean 
isLast) {
+      MultiChildProcessNode node, DistributionPlanContext context, boolean 
isTimeJoin) {
     MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
-    // Step 1: Get all source nodes. For the node which is not source, add it 
as the child of
-    // current TimeJoinNode
-    List<SourceNode> sources = new ArrayList<>();
-    for (PlanNode child : node.getChildren()) {
-      if (child instanceof SeriesSourceNode) {
-        // If the child is SeriesScanNode, we need to check whether this node 
should be seperated
-        // into several splits.
-        SeriesSourceNode sourceNode = (SeriesSourceNode) child;
-        List<TRegionReplicaSet> dataDistribution =
-            analysis.getPartitionInfo(
-                sourceNode.getPartitionPath(), 
sourceNode.getPartitionTimeFilter());
-        if (dataDistribution.size() > 1) {
-          // We mark this variable to `true` if there is some series which is 
distributed in multi
-          // DataRegions
-          context.setOneSeriesInMultiRegion(true);
-        }
-        // If the size of dataDistribution is m, this SeriesScanNode should be 
seperated into m
-        // SeriesScanNode.
-        for (TRegionReplicaSet dataRegion : dataDistribution) {
-          SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
-          
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
-          split.setRegionReplicaSet(dataRegion);
-          sources.add(split);
-        }
-      }
-    }
-
-    // Step 2: For the source nodes, group them by the DataRegion.
-    Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
-        
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
-    if (sourceGroup.size() > 1) {
-      context.setQueryMultiRegion(true);
-    }
+    Map<TRegionReplicaSet, List<SourceNode>> sourceGroup = 
groupBySourceNodes(node, context);
 
-    // Step 3: For the source nodes which belong to same data region, add a 
TimeJoinNode for them
-    // and make the
-    // new TimeJoinNode as the child of current TimeJoinNode
+    // For the source nodes which belong to same data region, add a 
TimeJoinNode for them
+    // and make the new TimeJoinNode as the child of current TimeJoinNode
     // TODO: (xingtanzjr) optimize the procedure here to remove duplicated 
TimeJoinNode
     boolean addParent = false;
-    for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
-      if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast)) 
{
+    for (Map.Entry<TRegionReplicaSet, List<SourceNode>> entry : 
sourceGroup.entrySet()) {
+      TRegionReplicaSet region = entry.getKey();
+      List<SourceNode> seriesScanNodes = entry.getValue();
+      if (seriesScanNodes.size() == 1 && (!context.isForceAddParent() || 
isTimeJoin)) {
         root.addChild(seriesScanNodes.get(0));
         continue;
       }
@@ -750,8 +720,12 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       // At last, we can use the parameter `addParent` to judge whether to 
create new
       // MultiChildNode.
       boolean appendToRootDirectly =
-          sourceGroup.size() == 1 || (!addParent && !context.forceAddParent);
+          sourceGroup.size() == 1 || (!addParent && 
!context.isForceAddParent());
       if (appendToRootDirectly) {
+        // In non-last query, this code can be reached at most once
+        // And we set region as MainFragmentLocatedRegion, the others Region 
should transfer data to
+        // this region
+        context.queryContext.setMainFragmentLocatedRegion(region);
         seriesScanNodes.forEach(root::addChild);
         addParent = true;
       } else {
@@ -777,6 +751,44 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return root;
   }
 
+  private Map<TRegionReplicaSet, List<SourceNode>> groupBySourceNodes(
+      MultiChildProcessNode node, DistributionPlanContext context) {
+    // Step 1: Get all source nodes. For the node which is not source, add it 
as the child of
+    // current TimeJoinNode
+    List<SourceNode> sources = new ArrayList<>();
+    for (PlanNode child : node.getChildren()) {
+      if (child instanceof SeriesSourceNode) {
+        // If the child is SeriesScanNode, we need to check whether this node 
should be seperated
+        // into several splits.
+        SeriesSourceNode sourceNode = (SeriesSourceNode) child;
+        List<TRegionReplicaSet> dataDistribution =
+            analysis.getPartitionInfo(
+                sourceNode.getPartitionPath(), 
sourceNode.getPartitionTimeFilter());
+        if (dataDistribution.size() > 1) {
+          // If there is some series which is distributed in multi DataRegions
+          context.setOneSeriesInMultiRegion(true);
+        }
+        // If the size of dataDistribution is N, this SeriesScanNode should be 
seperated into N
+        // SeriesScanNode.
+        for (TRegionReplicaSet dataRegion : dataDistribution) {
+          SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
+          
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+          split.setRegionReplicaSet(dataRegion);
+          sources.add(split);
+        }
+      }
+    }
+
+    // Step 2: For the source nodes, group them by the DataRegion.
+    Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+        
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+    if (sourceGroup.size() > 1) {
+      context.setQueryMultiRegion(true);
+    }
+
+    return sourceGroup;
+  }
+
   private boolean containsAggregationSource(TimeJoinNode node) {
     for (PlanNode child : node.getChildren()) {
       if (child instanceof SeriesAggregationScanNode
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
new file mode 100644
index 00000000000..8961c1e207f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DistributionPlannerCycleTest {
+
+  // Query sql: `select * from root.sg.d1,root.sg.d2`
+  // root.sg.d1 has 2 SeriesScanNodes, root.sg.d2 has 3 SeriesScanNodes.
+  //
+  // 
------------------------------------------------------------------------------------------------
+  // Note: d1.s1[1] means a SeriesScanNode with target series d1.s1 and its 
data region is 1
+  //
+  //                                       IdentityNode
+  //                           ____________________|_____________
+  //                           |      |                          \
+  //                      d1.s1[1]   d1.s2[1]                  Exchange
+  //                                                             |
+  //                                                          TimeJoinNode
+  //                                                         /      \      \
+  //                                                    d2.s1[2]  d2.s2[2] 
d2.s3[2]
+  // 
------------------------------------------------------------------------------------------------
+  @Test
+  public void timeJoinNodeTest() {
+    QueryId queryId = new QueryId("test");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+
+    String sql = "select * from root.sg.d1,root.sg.d2";
+    Analysis analysis = Util2.analyze(sql, context);
+    PlanNode logicalPlanNode = Util2.genLogicalPlan(analysis, context);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    PlanNode firstNode =
+        
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    PlanNode secondNode =
+        
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertEquals(3, firstNode.getChildren().size());
+    assertTrue(firstNode.getChildren().get(0) instanceof SeriesScanNode);
+    assertTrue(firstNode.getChildren().get(1) instanceof SeriesScanNode);
+    assertTrue(firstNode.getChildren().get(2) instanceof ExchangeNode);
+    assertEquals(3, secondNode.getChildren().size());
+    assertTrue(secondNode.getChildren().get(0) instanceof SeriesScanNode);
+    assertTrue(secondNode.getChildren().get(1) instanceof SeriesScanNode);
+    assertTrue(secondNode.getChildren().get(2) instanceof SeriesScanNode);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
new file mode 100644
index 00000000000..5124c61c28b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaEntityNode;
+import 
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaInternalNode;
+import 
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaMeasurementNode;
+import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.mockito.Mockito;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Util2 {
+  public static final Analysis ANALYSIS = constructAnalysis();
+
+  public static Analysis constructAnalysis() {
+    TRegionReplicaSet dataRegion1 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+            Collections.singletonList(genDataNodeLocation(11, "192.0.1.1")));
+    List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+    d1DataRegions.add(dataRegion1);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new 
HashMap<>();
+    d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+    TRegionReplicaSet dataRegion2 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+            Collections.singletonList(genDataNodeLocation(21, "192.0.1.1")));
+    List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+    d2DataRegions.add(dataRegion2);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new 
HashMap<>();
+    d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+    DataPartition dataPartition =
+        new DataPartition(
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
+        dataPartitionMap = new HashMap<>();
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> sgPartitionMap =
+        new HashMap<>();
+    String device1 = "root.sg.d1";
+    String device2 = "root.sg.d2";
+    String device3 = "root.sg.d3";
+    SeriesPartitionExecutor executor =
+        SeriesPartitionExecutor.getSeriesPartitionExecutor(
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device1), 
d1DataRegionMap);
+    sgPartitionMap.put(executor.getSeriesPartitionSlot(device2), 
d2DataRegionMap);
+    dataPartitionMap.put("root.sg", sgPartitionMap);
+    dataPartition.setDataPartitionMap(dataPartitionMap);
+
+    Analysis analysis = new Analysis();
+    analysis.setDataPartitionInfo(dataPartition);
+
+    // construct schema partition
+    TRegionReplicaSet schemaRegion1 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+            Collections.singletonList(genDataNodeLocation(11, "192.0.1.1")));
+
+    TRegionReplicaSet schemaRegion2 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+            Collections.singletonList(genDataNodeLocation(21, "192.0.1.1")));
+
+    Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new 
HashMap<>();
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device1), 
schemaRegion1);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device2), 
schemaRegion2);
+    schemaRegionMap.put(executor.getSeriesPartitionSlot(device3), 
schemaRegion2);
+    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> 
schemaPartitionMap = new HashMap<>();
+    schemaPartitionMap.put("root.sg", schemaRegionMap);
+    SchemaPartition schemaPartition =
+        new SchemaPartition(
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+    analysis.setDataPartitionInfo(dataPartition);
+    analysis.setSchemaPartitionInfo(schemaPartition);
+    analysis.setSchemaTree(genSchemaTree());
+    // to avoid some special case which is not the point of test
+    analysis.setStatement(Mockito.mock(QueryStatement.class));
+    Mockito.when(analysis.getStatement().isQuery()).thenReturn(false);
+    return analysis;
+  }
+
+  private static ISchemaTree genSchemaTree() {
+    SchemaNode root = new SchemaInternalNode("root");
+
+    SchemaNode sg = new SchemaInternalNode("sg");
+    root.addChild("sg", sg);
+
+    SchemaEntityNode d1 = new SchemaEntityNode("d1");
+    SchemaMeasurementNode s1 =
+        new SchemaMeasurementNode("s1", new MeasurementSchema("s1", 
TSDataType.INT32));
+    SchemaMeasurementNode s2 =
+        new SchemaMeasurementNode("s2", new MeasurementSchema("s2", 
TSDataType.DOUBLE));
+    sg.addChild("d1", d1);
+    d1.addChild("s1", s1);
+    d1.addChild("s2", s2);
+
+    SchemaEntityNode d2 = new SchemaEntityNode("d2");
+    SchemaMeasurementNode t1 =
+        new SchemaMeasurementNode("s1", new MeasurementSchema("t1", 
TSDataType.INT32));
+    SchemaMeasurementNode t2 =
+        new SchemaMeasurementNode("s2", new MeasurementSchema("t2", 
TSDataType.DOUBLE));
+    SchemaMeasurementNode t3 =
+        new SchemaMeasurementNode("s3", new MeasurementSchema("t3", 
TSDataType.DOUBLE));
+    sg.addChild("d2", d2);
+    d2.addChild("s1", t1);
+    d2.addChild("s2", t2);
+    d2.addChild("s3", t3);
+
+    ClusterSchemaTree tree = new ClusterSchemaTree(root);
+    tree.setDatabases(Collections.singleton("root.sg"));
+
+    return tree;
+  }
+
+  public static Analysis analyze(String sql, MPPQueryContext context) {
+    Statement statement = StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
+    Analyzer analyzer = new Analyzer(context, getFakePartitionFetcher(), 
getFakeSchemaFetcher());
+    return analyzer.analyze(statement);
+  }
+
+  public static PlanNode genLogicalPlan(Analysis analysis, MPPQueryContext 
context) {
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    return planner.plan(analysis).getRootNode();
+  }
+
+  private static ISchemaFetcher getFakeSchemaFetcher() {
+    return new ISchemaFetcher() {
+      @Override
+      public ISchemaTree fetchSchema(PathPatternTree patternTree, 
MPPQueryContext context) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public ISchemaTree fetchSchemaWithTags(PathPatternTree patternTree, 
MPPQueryContext context) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public void fetchAndComputeSchemaWithAutoCreate(
+          ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation,
+          MPPQueryContext context) {}
+
+      @Override
+      public void fetchAndComputeSchemaWithAutoCreate(
+          List<? extends ISchemaComputationWithAutoCreation> 
schemaComputationWithAutoCreationList,
+          MPPQueryContext context) {}
+
+      @Override
+      public ISchemaTree fetchSchemaListWithAutoCreate(
+          List<PartialPath> devicePath,
+          List<String[]> measurements,
+          List<TSDataType[]> tsDataTypes,
+          List<TSEncoding[]> encodings,
+          List<CompressionType[]> compressionTypes,
+          List<Boolean> aligned,
+          MPPQueryContext context) {
+        return ANALYSIS.getSchemaTree();
+      }
+
+      @Override
+      public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath 
devicePath) {
+        return null;
+      }
+
+      @Override
+      public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+          PartialPath timeSeriesPath, String alias) {
+        return null;
+      }
+
+      @Override
+      public Map<Integer, Template> checkAllRelatedTemplate(PartialPath 
pathPattern) {
+        return null;
+      }
+
+      @Override
+      public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String 
templateName) {
+        return null;
+      }
+    };
+  }
+
+  private static IPartitionFetcher getFakePartitionFetcher() {
+    return new IPartitionFetcher() {
+      @Override
+      public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+        return ANALYSIS.getSchemaPartitionInfo();
+      }
+
+      @Override
+      public SchemaPartition getOrCreateSchemaPartition(
+          PathPatternTree patternTree, String userName) {
+        return ANALYSIS.getSchemaPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getDataPartition(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getDataPartitionWithUnclosedTimeRange(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getOrCreateDataPartition(
+          Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public DataPartition getOrCreateDataPartition(
+          List<DataPartitionQueryParam> dataPartitionQueryParams, String 
userName) {
+        return ANALYSIS.getDataPartitionInfo();
+      }
+
+      @Override
+      public SchemaNodeManagementPartition 
getSchemaNodeManagementPartitionWithLevel(
+          PathPatternTree patternTree, PathPatternTree scope, Integer level) {
+        return null;
+      }
+
+      @Override
+      public boolean updateRegionCache(TRegionRouteReq req) {
+        return false;
+      }
+
+      @Override
+      public void invalidAllCache() {}
+    };
+  }
+
+  private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String 
ip) {
+    return new TDataNodeLocation()
+        .setDataNodeId(dataNodeId)
+        .setClientRpcEndPoint(new TEndPoint(ip, 9000))
+        .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001))
+        .setInternalEndPoint(new TEndPoint(ip, 9002));
+  }
+}


Reply via email to