This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/fix_query_cycle
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/fix_query_cycle by this 
push:
     new 035ca6c6716 fix cycle error
035ca6c6716 is described below

commit 035ca6c6716e5d106617662a6b2b62d14f25fe8c
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 19 12:30:47 2023 +0800

    fix cycle error
---
 .../distribution/DistributionPlanContext.java      | 16 ++++
 .../planner/distribution/ExchangeNodeAdder.java    | 94 ++++++++++++----------
 .../plan/planner/distribution/SourceRewriter.java  | 13 ++-
 .../distribution/DistributionPlannerCycleTest.java | 62 +++++++-------
 .../queryengine/plan/plan/distribution/Util2.java  |  8 +-
 5 files changed, 113 insertions(+), 80 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index bc88fef1f69..26116c012bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -19,9 +19,12 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.distribution;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class DistributionPlanContext {
@@ -38,6 +41,8 @@ public class DistributionPlanContext {
   // used by group by level
   private Map<String, Expression> columnNameToExpression;
 
+  private Map<PlanNodeId, TRegionReplicaSet> preferDataRegion;
+
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
@@ -83,4 +88,15 @@ public class DistributionPlanContext {
   public void setColumnNameToExpression(Map<String, Expression> 
columnNameToExpression) {
     this.columnNameToExpression = columnNameToExpression;
   }
+
+  public void putNodePreferDataRegion(PlanNodeId id, TRegionReplicaSet region) 
{
+    if (preferDataRegion == null) {
+      preferDataRegion = new HashMap<>();
+    }
+    preferDataRegion.put(id, region);
+  }
+
+  public Map<PlanNodeId, TRegionReplicaSet> getPreferDataRegion() {
+    return this.preferDataRegion;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 6b81b6ba0db..128a0378efa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -331,47 +331,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     // optimize `order by time limit N align by device` query,
     // to ensure that the number of ExchangeNode equals to DataRegionNum but 
not equals to DeviceNum
     if (node instanceof TopKNode) {
-      TopKNode rootNode = (TopKNode) node;
-      Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
-      for (PlanNode child : visitedChildren) {
-        if (child instanceof SingleDeviceViewNode) {
-          ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
-        }
-        TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
-        regionTopKNodeMap
-            .computeIfAbsent(
-                region,
-                k -> {
-                  TopKNode childTopKNode =
-                      new TopKNode(
-                          context.queryContext.getQueryId().genPlanNodeId(),
-                          rootNode.getTopValue(),
-                          rootNode.getMergeOrderParameter(),
-                          rootNode.getOutputColumnNames());
-                  context.putNodeDistribution(
-                      childTopKNode.getPlanNodeId(),
-                      new 
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
-                  return childTopKNode;
-                })
-            .addChild(child);
-      }
-
-      for (Map.Entry<TRegionReplicaSet, TopKNode> entry : 
regionTopKNodeMap.entrySet()) {
-        TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
-        TopKNode topKNode = entry.getValue();
-
-        if (!dataRegion.equals(topKNodeLocatedRegion)) {
-          ExchangeNode exchangeNode =
-              new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-          exchangeNode.setChild(topKNode);
-          exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
-          context.hasExchangeNode = true;
-          newNode.addChild(exchangeNode);
-        } else {
-          newNode.addChild(topKNode);
-        }
-      }
-      return newNode;
+      return processTopNode(node, visitedChildren, context, newNode, 
dataRegion);
     }
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
@@ -412,6 +372,55 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  private PlanNode processTopNode(
+      MultiChildProcessNode node,
+      List<PlanNode> visitedChildren,
+      NodeGroupContext context,
+      MultiChildProcessNode newNode,
+      TRegionReplicaSet dataRegion) {
+    TopKNode rootNode = (TopKNode) node;
+    Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
+    for (PlanNode child : visitedChildren) {
+      if (child instanceof SingleDeviceViewNode) {
+        ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
+      }
+      TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
+      regionTopKNodeMap
+          .computeIfAbsent(
+              region,
+              k -> {
+                TopKNode childTopKNode =
+                    new TopKNode(
+                        context.queryContext.getQueryId().genPlanNodeId(),
+                        rootNode.getTopValue(),
+                        rootNode.getMergeOrderParameter(),
+                        rootNode.getOutputColumnNames());
+                context.putNodeDistribution(
+                    childTopKNode.getPlanNodeId(),
+                    new 
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
+                return childTopKNode;
+              })
+          .addChild(child);
+    }
+
+    for (Map.Entry<TRegionReplicaSet, TopKNode> entry : 
regionTopKNodeMap.entrySet()) {
+      TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
+      TopKNode topKNode = entry.getValue();
+
+      if (!dataRegion.equals(topKNodeLocatedRegion)) {
+        ExchangeNode exchangeNode =
+            new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+        exchangeNode.setChild(topKNode);
+        exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
+        context.hasExchangeNode = true;
+        newNode.addChild(exchangeNode);
+      } else {
+        newNode.addChild(topKNode);
+      }
+    }
+    return newNode;
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -468,6 +477,9 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       if (planNodeCount > maxCount) {
         maxCount = planNodeCount;
         result = region;
+      } else if (planNodeCount == maxCount
+          && region.getRegionId().getId() > result.getRegionId().getId()) {
+        result = region;
       }
     }
     return result == null ? context.getMostlyUsedDataRegion() : result;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 88b4526d1f7..076ed0860dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -70,6 +70,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -735,12 +736,22 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       context.setQueryMultiRegion(true);
     }
 
+    // sort by size of SourceNode list firstly (reversed)
+    // then sort by region id (reversed)
+    List<Map.Entry<TRegionReplicaSet, List<SourceNode>>> list =
+        new LinkedList<>(sourceGroup.entrySet());
+    list.sort(
+        Comparator.comparingInt(
+                (Map.Entry<TRegionReplicaSet, List<SourceNode>> o) -> 
-o.getValue().size())
+            .thenComparingInt(o -> -o.getKey().getRegionId().getId()));
+
     // Step 3: For the source nodes which belong to same data region, add a 
TimeJoinNode for them
     // and make the
     // new TimeJoinNode as the child of current TimeJoinNode
     // TODO: (xingtanzjr) optimize the procedure here to remove duplicated 
TimeJoinNode
     boolean addParent = false;
-    for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
+    for (Map.Entry<TRegionReplicaSet, List<SourceNode>> entry : list) {
+      List<SourceNode> seriesScanNodes = entry.getValue();
       if (seriesScanNodes.size() == 1 && (!context.isForceAddParent() || 
isTimeJoin)) {
         root.addChild(seriesScanNodes.get(0));
         continue;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
index b764a032fd0..1b0599d626f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
@@ -20,51 +20,45 @@
 package org.apache.iotdb.db.queryengine.plan.plan.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.Collections;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
 public class DistributionPlannerCycleTest {
-    @Test
-    public void aaTest() {
-        QueryId queryId = new QueryId("test");
-        MPPQueryContext context =
-                new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
 
-        // 6. order by time, offset + limit
-        // on top of TOP-K NODE, LIMIT-NODE is necessary
-        String sql = "select * from root.sg.**";
-        Analysis analysis = Util2.analyze(sql, context);
-        PlanNode logicalPlanNode = Util2.genLogicalPlan(analysis, context);
-        DistributionPlanner planner =
-                new DistributionPlanner(analysis, new 
LogicalQueryPlan(context, logicalPlanNode));
-        DistributedQueryPlan plan = planner.planFragments();
-        System.out.println("a");
-    }
+  /**
+   * Query sql: `select * from root.sg.d1,root.sg.d2`
+   *
+   * <p>root.sg.d1 has 2 SeriesScanNodes, root.sg.d2 has 3 SeriesScanNodes.
+   *
+   * <p>Identity | d2-Scan1 d2-Scan2 d2-Scan3 TimeJoin | d1-Scan1 d1-Scan2
+   */
+  @Test
+  public void timeJoinNodeTest() {
+    QueryId queryId = new QueryId("test");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+
+    String sql = "select * from root.sg.d1,root.sg.d2";
+    Analysis analysis = Util2.analyze(sql, context);
+    PlanNode logicalPlanNode = Util2.genLogicalPlan(analysis, context);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    PlanNode firstNode =
+        
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    PlanNode secondNode =
+        
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertEquals(4, firstNode.getChildren().size());
+    assertEquals(2, secondNode.getChildren().size());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
index c78d0bdf855..08f7010fbdc 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
@@ -80,8 +80,8 @@ public class Util2 {
     Analysis analysis = new Analysis();
 
     String device1 = "root.sg.d1";
-    String device2 = "root.sg.d22";
-    String device3 = "root.sg.d333";
+    String device2 = "root.sg.d2";
+    String device3 = "root.sg.d3";
 
     TRegionReplicaSet dataRegion1 =
         new TRegionReplicaSet(
@@ -178,8 +178,8 @@ public class Util2 {
     d1.addChild("s1", s1);
     d1.addChild("s2", s2);
 
-    SchemaEntityNode d2 = new SchemaEntityNode("d22");
-    sg.addChild("d22", d2);
+    SchemaEntityNode d2 = new SchemaEntityNode("d2");
+    sg.addChild("d2", d2);
     d2.addChild("t1", t1);
     d2.addChild("t2", t2);
     d2.addChild("t3", t3);

Reply via email to