This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/fix_query_cycle
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/fix_query_cycle by this
push:
new 035ca6c6716 fix cycle error
035ca6c6716 is described below
commit 035ca6c6716e5d106617662a6b2b62d14f25fe8c
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 19 12:30:47 2023 +0800
fix cycle error
---
.../distribution/DistributionPlanContext.java | 16 ++++
.../planner/distribution/ExchangeNodeAdder.java | 94 ++++++++++++----------
.../plan/planner/distribution/SourceRewriter.java | 13 ++-
.../distribution/DistributionPlannerCycleTest.java | 62 +++++++-------
.../queryengine/plan/plan/distribution/Util2.java | 8 +-
5 files changed, 113 insertions(+), 80 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index bc88fef1f69..26116c012bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -19,9 +19,12 @@
package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import java.util.HashMap;
import java.util.Map;
public class DistributionPlanContext {
@@ -38,6 +41,8 @@ public class DistributionPlanContext {
// used by group by level
private Map<String, Expression> columnNameToExpression;
+ private Map<PlanNodeId, TRegionReplicaSet> preferDataRegion;
+
protected DistributionPlanContext(MPPQueryContext queryContext) {
this.isRoot = true;
this.queryContext = queryContext;
@@ -83,4 +88,15 @@ public class DistributionPlanContext {
public void setColumnNameToExpression(Map<String, Expression>
columnNameToExpression) {
this.columnNameToExpression = columnNameToExpression;
}
+
+ public void putNodePreferDataRegion(PlanNodeId id, TRegionReplicaSet region)
{
+ if (preferDataRegion == null) {
+ preferDataRegion = new HashMap<>();
+ }
+ preferDataRegion.put(id, region);
+ }
+
+ public Map<PlanNodeId, TRegionReplicaSet> getPreferDataRegion() {
+ return this.preferDataRegion;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 6b81b6ba0db..128a0378efa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -331,47 +331,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// optimize `order by time limit N align by device` query,
// to ensure that the number of ExchangeNode equals to DataRegionNum but
not equals to DeviceNum
if (node instanceof TopKNode) {
- TopKNode rootNode = (TopKNode) node;
- Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
- for (PlanNode child : visitedChildren) {
- if (child instanceof SingleDeviceViewNode) {
- ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
- }
- TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
- regionTopKNodeMap
- .computeIfAbsent(
- region,
- k -> {
- TopKNode childTopKNode =
- new TopKNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- rootNode.getTopValue(),
- rootNode.getMergeOrderParameter(),
- rootNode.getOutputColumnNames());
- context.putNodeDistribution(
- childTopKNode.getPlanNodeId(),
- new
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
- return childTopKNode;
- })
- .addChild(child);
- }
-
- for (Map.Entry<TRegionReplicaSet, TopKNode> entry :
regionTopKNodeMap.entrySet()) {
- TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
- TopKNode topKNode = entry.getValue();
-
- if (!dataRegion.equals(topKNodeLocatedRegion)) {
- ExchangeNode exchangeNode =
- new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
- exchangeNode.setChild(topKNode);
- exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
- context.hasExchangeNode = true;
- newNode.addChild(exchangeNode);
- } else {
- newNode.addChild(topKNode);
- }
- }
- return newNode;
+ return processTopNode(node, visitedChildren, context, newNode,
dataRegion);
}
// Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
@@ -412,6 +372,55 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
+ private PlanNode processTopNode(
+ MultiChildProcessNode node,
+ List<PlanNode> visitedChildren,
+ NodeGroupContext context,
+ MultiChildProcessNode newNode,
+ TRegionReplicaSet dataRegion) {
+ TopKNode rootNode = (TopKNode) node;
+ Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
+ for (PlanNode child : visitedChildren) {
+ if (child instanceof SingleDeviceViewNode) {
+ ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
+ }
+ TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
+ regionTopKNodeMap
+ .computeIfAbsent(
+ region,
+ k -> {
+ TopKNode childTopKNode =
+ new TopKNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ rootNode.getTopValue(),
+ rootNode.getMergeOrderParameter(),
+ rootNode.getOutputColumnNames());
+ context.putNodeDistribution(
+ childTopKNode.getPlanNodeId(),
+ new
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
+ return childTopKNode;
+ })
+ .addChild(child);
+ }
+
+ for (Map.Entry<TRegionReplicaSet, TopKNode> entry :
regionTopKNodeMap.entrySet()) {
+ TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
+ TopKNode topKNode = entry.getValue();
+
+ if (!dataRegion.equals(topKNodeLocatedRegion)) {
+ ExchangeNode exchangeNode =
+ new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(topKNode);
+ exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
+ context.hasExchangeNode = true;
+ newNode.addChild(exchangeNode);
+ } else {
+ newNode.addChild(topKNode);
+ }
+ }
+ return newNode;
+ }
+
@Override
public PlanNode visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -468,6 +477,9 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
if (planNodeCount > maxCount) {
maxCount = planNodeCount;
result = region;
+ } else if (planNodeCount == maxCount
+ && region.getRegionId().getId() > result.getRegionId().getId()) {
+ result = region;
}
}
return result == null ? context.getMostlyUsedDataRegion() : result;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 88b4526d1f7..076ed0860dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -70,6 +70,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -735,12 +736,22 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
context.setQueryMultiRegion(true);
}
+ // sort by size of SourceNode list firstly (reversed)
+ // then sort by region id (reversed)
+ List<Map.Entry<TRegionReplicaSet, List<SourceNode>>> list =
+ new LinkedList<>(sourceGroup.entrySet());
+ list.sort(
+ Comparator.comparingInt(
+ (Map.Entry<TRegionReplicaSet, List<SourceNode>> o) ->
-o.getValue().size())
+ .thenComparingInt(o -> -o.getKey().getRegionId().getId()));
+
// Step 3: For the source nodes which belong to same data region, add a
TimeJoinNode for them
// and make the
// new TimeJoinNode as the child of current TimeJoinNode
// TODO: (xingtanzjr) optimize the procedure here to remove duplicated
TimeJoinNode
boolean addParent = false;
- for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
+ for (Map.Entry<TRegionReplicaSet, List<SourceNode>> entry : list) {
+ List<SourceNode> seriesScanNodes = entry.getValue();
if (seriesScanNodes.size() == 1 && (!context.isForceAddParent() ||
isTimeJoin)) {
root.addChild(seriesScanNodes.get(0));
continue;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
index b764a032fd0..1b0599d626f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
@@ -20,51 +20,45 @@
package org.apache.iotdb.db.queryengine.plan.plan.distribution;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
-import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.Test;
-import java.util.Arrays;
-import java.util.Collections;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class DistributionPlannerCycleTest {
- @Test
- public void aaTest() {
- QueryId queryId = new QueryId("test");
- MPPQueryContext context =
- new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
- // 6. order by time, offset + limit
- // on top of TOP-K NODE, LIMIT-NODE is necessary
- String sql = "select * from root.sg.**";
- Analysis analysis = Util2.analyze(sql, context);
- PlanNode logicalPlanNode = Util2.genLogicalPlan(analysis, context);
- DistributionPlanner planner =
- new DistributionPlanner(analysis, new
LogicalQueryPlan(context, logicalPlanNode));
- DistributedQueryPlan plan = planner.planFragments();
- System.out.println("a");
- }
+ /**
+ * Query sql: `select * from root.sg.d1,root.sg.d2`
+ *
+ * <p>root.sg.d1 has 2 SeriesScanNodes, root.sg.d2 has 3 SeriesScanNodes.
+ *
+ * <p>Identity | d2-Scan1 d2-Scan2 d2-Scan3 TimeJoin | d1-Scan1 d1-Scan2
+ */
+ @Test
+ public void timeJoinNodeTest() {
+ QueryId queryId = new QueryId("test");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+
+ String sql = "select * from root.sg.d1,root.sg.d2";
+ Analysis analysis = Util2.analyze(sql, context);
+ PlanNode logicalPlanNode = Util2.genLogicalPlan(analysis, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ PlanNode firstNode =
+
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ PlanNode secondNode =
+
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertEquals(4, firstNode.getChildren().size());
+ assertEquals(2, secondNode.getChildren().size());
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
index c78d0bdf855..08f7010fbdc 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
@@ -80,8 +80,8 @@ public class Util2 {
Analysis analysis = new Analysis();
String device1 = "root.sg.d1";
- String device2 = "root.sg.d22";
- String device3 = "root.sg.d333";
+ String device2 = "root.sg.d2";
+ String device3 = "root.sg.d3";
TRegionReplicaSet dataRegion1 =
new TRegionReplicaSet(
@@ -178,8 +178,8 @@ public class Util2 {
d1.addChild("s1", s1);
d1.addChild("s2", s2);
- SchemaEntityNode d2 = new SchemaEntityNode("d22");
- sg.addChild("d22", d2);
+ SchemaEntityNode d2 = new SchemaEntityNode("d2");
+ sg.addChild("d2", d2);
d2.addChild("t1", t1);
d2.addChild("t2", t2);
d2.addChild("t3", t3);