This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 960952e0a7d [IOTDB-6201] Fix possible query timeout error in
TimeJoinNode and possible query cycle in last query.
960952e0a7d is described below
commit 960952e0a7d1eaf8d589fcc394bbc997ce0f12ba
Author: Beyyes <[email protected]>
AuthorDate: Fri Oct 20 09:29:51 2023 +0800
[IOTDB-6201] Fix possible query timeout error in TimeJoinNode and possible
query cycle in last query.
---
.../db/queryengine/common/MPPQueryContext.java | 13 +
.../distribution/DistributionPlanContext.java | 6 +-
.../planner/distribution/ExchangeNodeAdder.java | 103 ++++---
.../planner/distribution/NodeGroupContext.java | 8 +-
.../plan/planner/distribution/SourceRewriter.java | 96 ++++---
.../distribution/DistributionPlannerCycleTest.java | 81 ++++++
.../queryengine/plan/plan/distribution/Util2.java | 309 +++++++++++++++++++++
7 files changed, 525 insertions(+), 91 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 863b7acdfe0..9791b1d4423 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.common;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
@@ -42,6 +43,10 @@ public class MPPQueryContext {
private TEndPoint localInternalEndpoint;
private ResultNodeContext resultNodeContext;
+ // Main FragmentInstance, the other FragmentInstance should push data result
to this
+ // FragmentInstance
+ private TRegionReplicaSet mainFragmentLocatedRegion;
+
// When some DataNode cannot be connected, its endPoint will be put
// in this list. And the following retry will avoid planning fragment
// onto this node.
@@ -129,6 +134,14 @@ public class MPPQueryContext {
return endPointBlackList;
}
+ public TRegionReplicaSet getMainFragmentLocatedRegion() {
+ return this.mainFragmentLocatedRegion;
+ }
+
+ public void setMainFragmentLocatedRegion(TRegionReplicaSet region) {
+ this.mainFragmentLocatedRegion = region;
+ }
+
public TypeProvider getTypeProvider() {
return typeProvider;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
index ce6b25c26dc..7c1e963d526 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanContext.java
@@ -52,7 +52,11 @@ public class DistributionPlanContext {
return this;
}
- protected void setForceAddParent() {
+ public boolean isForceAddParent() {
+ return this.forceAddParent;
+ }
+
+ public void setForceAddParent() {
this.forceAddParent = true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 6b81b6ba0db..d22958a5a1f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -331,47 +331,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// optimize `order by time limit N align by device` query,
// to ensure that the number of ExchangeNode equals to DataRegionNum but
not equals to DeviceNum
if (node instanceof TopKNode) {
- TopKNode rootNode = (TopKNode) node;
- Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
- for (PlanNode child : visitedChildren) {
- if (child instanceof SingleDeviceViewNode) {
- ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
- }
- TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
- regionTopKNodeMap
- .computeIfAbsent(
- region,
- k -> {
- TopKNode childTopKNode =
- new TopKNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- rootNode.getTopValue(),
- rootNode.getMergeOrderParameter(),
- rootNode.getOutputColumnNames());
- context.putNodeDistribution(
- childTopKNode.getPlanNodeId(),
- new
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
- return childTopKNode;
- })
- .addChild(child);
- }
-
- for (Map.Entry<TRegionReplicaSet, TopKNode> entry :
regionTopKNodeMap.entrySet()) {
- TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
- TopKNode topKNode = entry.getValue();
-
- if (!dataRegion.equals(topKNodeLocatedRegion)) {
- ExchangeNode exchangeNode =
- new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
- exchangeNode.setChild(topKNode);
- exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
- context.hasExchangeNode = true;
- newNode.addChild(exchangeNode);
- } else {
- newNode.addChild(topKNode);
- }
- }
- return newNode;
+ return processTopNode(node, visitedChildren, context, newNode,
dataRegion);
}
// Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
@@ -412,6 +372,55 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
+ private PlanNode processTopNode(
+ MultiChildProcessNode node,
+ List<PlanNode> visitedChildren,
+ NodeGroupContext context,
+ MultiChildProcessNode newNode,
+ TRegionReplicaSet dataRegion) {
+ TopKNode rootNode = (TopKNode) node;
+ Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
+ for (PlanNode child : visitedChildren) {
+ if (child instanceof SingleDeviceViewNode) {
+ ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
+ }
+ TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
+ regionTopKNodeMap
+ .computeIfAbsent(
+ region,
+ k -> {
+ TopKNode childTopKNode =
+ new TopKNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ rootNode.getTopValue(),
+ rootNode.getMergeOrderParameter(),
+ rootNode.getOutputColumnNames());
+ context.putNodeDistribution(
+ childTopKNode.getPlanNodeId(),
+ new
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
+ return childTopKNode;
+ })
+ .addChild(child);
+ }
+
+ for (Map.Entry<TRegionReplicaSet, TopKNode> entry :
regionTopKNodeMap.entrySet()) {
+ TRegionReplicaSet topKNodeLocatedRegion = entry.getKey();
+ TopKNode topKNode = entry.getValue();
+
+ if (!dataRegion.equals(topKNodeLocatedRegion)) {
+ ExchangeNode exchangeNode =
+ new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(topKNode);
+ exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
+ context.hasExchangeNode = true;
+ newNode.addChild(exchangeNode);
+ } else {
+ newNode.addChild(topKNode);
+ }
+ }
+ return newNode;
+ }
+
@Override
public PlanNode visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -455,22 +464,28 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// Step 2: return the RegionReplicaSet with max node count
long maxCount = -1;
- TRegionReplicaSet result = null;
+ TRegionReplicaSet result = DataPartition.NOT_ASSIGNED;
for (Map.Entry<TRegionReplicaSet, Long> entry : groupByRegion.entrySet()) {
TRegionReplicaSet region = entry.getKey();
- long planNodeCount = entry.getValue();
if (DataPartition.NOT_ASSIGNED.equals(region)) {
continue;
}
+ if (region.equals(context.queryContext.getMainFragmentLocatedRegion())) {
+ return context.queryContext.getMainFragmentLocatedRegion();
+ }
if (region.equals(context.getMostlyUsedDataRegion())) {
return region;
}
+ long planNodeCount = entry.getValue();
if (planNodeCount > maxCount) {
maxCount = planNodeCount;
result = region;
+ } else if (planNodeCount == maxCount
+ && region.getRegionId().getId() < result.getRegionId().getId()) {
+ result = region;
}
}
- return result == null ? context.getMostlyUsedDataRegion() : result;
+ return result;
}
private TRegionReplicaSet calculateSchemaRegionByChildren(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
index 9ea36e7994c..2cdd071c931 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java
@@ -54,12 +54,12 @@ public class NodeGroupContext {
}
private TRegionReplicaSet getMostlyUsedDataRegion(PlanNode root) {
- Map<TRegionReplicaSet, Long> regionCount = new HashMap<>();
- countRegionOfSourceNodes(root, regionCount);
- if (regionCount.isEmpty()) {
+ Map<TRegionReplicaSet, Long> regionCountMap = new HashMap<>();
+ countRegionOfSourceNodes(root, regionCountMap);
+ if (regionCountMap.isEmpty()) {
return DataPartition.NOT_ASSIGNED;
}
- return Collections.max(regionCount.entrySet(),
Map.Entry.comparingByValue()).getKey();
+ return Collections.max(regionCountMap.entrySet(),
Map.Entry.comparingByValue()).getKey();
}
private void countRegionOfSourceNodes(PlanNode root, Map<TRegionReplicaSet,
Long> result) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index f19d036822f..02e160864dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -622,7 +622,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// For last query, we need to keep every FI's root node is
LastQueryMergeNode. So we
// force every region group have a parent node even if there is only 1
child for it.
context.setForceAddParent();
- PlanNode root = processRawMultiChildNode(node, context, true);
+ PlanNode root = processRawMultiChildNode(node, context, false);
if (context.queryMultiRegion) {
PlanNode newRoot = genLastQueryRootNode(node, context);
// add sort op for each if we add LastQueryMergeNode as root
@@ -694,53 +694,23 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
if (containsAggregationSource(node)) {
return planAggregationWithTimeJoin(node, context);
}
- return Collections.singletonList(processRawMultiChildNode(node, context,
false));
+ return Collections.singletonList(processRawMultiChildNode(node, context,
true));
}
+ // Only `visitTimeJoin` and `visitLastQuery` invoke this method
private PlanNode processRawMultiChildNode(
- MultiChildProcessNode node, DistributionPlanContext context, boolean
isLast) {
+ MultiChildProcessNode node, DistributionPlanContext context, boolean
isTimeJoin) {
MultiChildProcessNode root = (MultiChildProcessNode) node.clone();
- // Step 1: Get all source nodes. For the node which is not source, add it
as the child of
- // current TimeJoinNode
- List<SourceNode> sources = new ArrayList<>();
- for (PlanNode child : node.getChildren()) {
- if (child instanceof SeriesSourceNode) {
- // If the child is SeriesScanNode, we need to check whether this node
should be seperated
- // into several splits.
- SeriesSourceNode sourceNode = (SeriesSourceNode) child;
- List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(
- sourceNode.getPartitionPath(),
sourceNode.getPartitionTimeFilter());
- if (dataDistribution.size() > 1) {
- // We mark this variable to `true` if there is some series which is
distributed in multi
- // DataRegions
- context.setOneSeriesInMultiRegion(true);
- }
- // If the size of dataDistribution is m, this SeriesScanNode should be
seperated into m
- // SeriesScanNode.
- for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
-
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
- split.setRegionReplicaSet(dataRegion);
- sources.add(split);
- }
- }
- }
-
- // Step 2: For the source nodes, group them by the DataRegion.
- Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
-
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
- if (sourceGroup.size() > 1) {
- context.setQueryMultiRegion(true);
- }
+ Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
groupBySourceNodes(node, context);
- // Step 3: For the source nodes which belong to same data region, add a
TimeJoinNode for them
- // and make the
- // new TimeJoinNode as the child of current TimeJoinNode
+ // For the source nodes which belong to same data region, add a
TimeJoinNode for them
+ // and make the new TimeJoinNode as the child of current TimeJoinNode
// TODO: (xingtanzjr) optimize the procedure here to remove duplicated
TimeJoinNode
boolean addParent = false;
- for (List<SourceNode> seriesScanNodes : sourceGroup.values()) {
- if (seriesScanNodes.size() == 1 && (!context.forceAddParent || !isLast))
{
+ for (Map.Entry<TRegionReplicaSet, List<SourceNode>> entry :
sourceGroup.entrySet()) {
+ TRegionReplicaSet region = entry.getKey();
+ List<SourceNode> seriesScanNodes = entry.getValue();
+ if (seriesScanNodes.size() == 1 && (!context.isForceAddParent() ||
isTimeJoin)) {
root.addChild(seriesScanNodes.get(0));
continue;
}
@@ -750,8 +720,12 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// At last, we can use the parameter `addParent` to judge whether to
create new
// MultiChildNode.
boolean appendToRootDirectly =
- sourceGroup.size() == 1 || (!addParent && !context.forceAddParent);
+ sourceGroup.size() == 1 || (!addParent &&
!context.isForceAddParent());
if (appendToRootDirectly) {
+ // In non-last query, this code can be reached at most once
+ // And we set region as MainFragmentLocatedRegion, the others Region
should transfer data to
+ // this region
+ context.queryContext.setMainFragmentLocatedRegion(region);
seriesScanNodes.forEach(root::addChild);
addParent = true;
} else {
@@ -777,6 +751,44 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return root;
}
+ private Map<TRegionReplicaSet, List<SourceNode>> groupBySourceNodes(
+ MultiChildProcessNode node, DistributionPlanContext context) {
+ // Step 1: Get all source nodes. For the node which is not source, add it
as the child of
+ // current TimeJoinNode
+ List<SourceNode> sources = new ArrayList<>();
+ for (PlanNode child : node.getChildren()) {
+ if (child instanceof SeriesSourceNode) {
+ // If the child is SeriesScanNode, we need to check whether this node
should be seperated
+ // into several splits.
+ SeriesSourceNode sourceNode = (SeriesSourceNode) child;
+ List<TRegionReplicaSet> dataDistribution =
+ analysis.getPartitionInfo(
+ sourceNode.getPartitionPath(),
sourceNode.getPartitionTimeFilter());
+ if (dataDistribution.size() > 1) {
+ // If there is some series which is distributed in multi DataRegions
+ context.setOneSeriesInMultiRegion(true);
+ }
+ // If the size of dataDistribution is N, this SeriesScanNode should be
seperated into N
+ // SeriesScanNode.
+ for (TRegionReplicaSet dataRegion : dataDistribution) {
+ SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone();
+
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ split.setRegionReplicaSet(dataRegion);
+ sources.add(split);
+ }
+ }
+ }
+
+ // Step 2: For the source nodes, group them by the DataRegion.
+ Map<TRegionReplicaSet, List<SourceNode>> sourceGroup =
+
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+ if (sourceGroup.size() > 1) {
+ context.setQueryMultiRegion(true);
+ }
+
+ return sourceGroup;
+ }
+
private boolean containsAggregationSource(TimeJoinNode node) {
for (PlanNode child : node.getChildren()) {
if (child instanceof SeriesAggregationScanNode
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
new file mode 100644
index 00000000000..8961c1e207f
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/DistributionPlannerCycleTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import
org.apache.iotdb.db.queryengine.plan.planner.distribution.DistributionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DistributionPlannerCycleTest {
+
+ // Query sql: `select * from root.sg.d1,root.sg.d2`
+ // root.sg.d1 has 2 SeriesScanNodes, root.sg.d2 has 3 SeriesScanNodes.
+ //
+ //
------------------------------------------------------------------------------------------------
+ // Note: d1.s1[1] means a SeriesScanNode with target series d1.s1 and its
data region is 1
+ //
+ // IdentityNode
+ // ____________________|_____________
+ // | | \
+ // d1.s1[1] d1.s2[1] Exchange
+ // |
+ // TimeJoinNode
+ // / \ \
+ // d2.s1[2] d2.s2[2]
d2.s3[2]
+ //
------------------------------------------------------------------------------------------------
+ @Test
+ public void timeJoinNodeTest() {
+ QueryId queryId = new QueryId("test");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+
+ String sql = "select * from root.sg.d1,root.sg.d2";
+ Analysis analysis = Util2.analyze(sql, context);
+ PlanNode logicalPlanNode = Util2.genLogicalPlan(analysis, context);
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ PlanNode firstNode =
+
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+ PlanNode secondNode =
+
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
+ assertEquals(3, firstNode.getChildren().size());
+ assertTrue(firstNode.getChildren().get(0) instanceof SeriesScanNode);
+ assertTrue(firstNode.getChildren().get(1) instanceof SeriesScanNode);
+ assertTrue(firstNode.getChildren().get(2) instanceof ExchangeNode);
+ assertEquals(3, secondNode.getChildren().size());
+ assertTrue(secondNode.getChildren().get(0) instanceof SeriesScanNode);
+ assertTrue(secondNode.getChildren().get(1) instanceof SeriesScanNode);
+ assertTrue(secondNode.getChildren().get(2) instanceof SeriesScanNode);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
new file mode 100644
index 00000000000..5124c61c28b
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util2.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.plan.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaEntityNode;
+import
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaInternalNode;
+import
org.apache.iotdb.db.queryengine.common.schematree.node.SchemaMeasurementNode;
+import org.apache.iotdb.db.queryengine.common.schematree.node.SchemaNode;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
+import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.mockito.Mockito;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Util2 {
+ public static final Analysis ANALYSIS = constructAnalysis();
+
+ public static Analysis constructAnalysis() {
+ TRegionReplicaSet dataRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
+ Collections.singletonList(genDataNodeLocation(11, "192.0.1.1")));
+ List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
+ d1DataRegions.add(dataRegion1);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new
HashMap<>();
+ d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
+
+ TRegionReplicaSet dataRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
+ Collections.singletonList(genDataNodeLocation(21, "192.0.1.1")));
+ List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
+ d2DataRegions.add(dataRegion2);
+ Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new
HashMap<>();
+ d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
+
+ DataPartition dataPartition =
+ new DataPartition(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
+ dataPartitionMap = new HashMap<>();
+ Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> sgPartitionMap =
+ new HashMap<>();
+ String device1 = "root.sg.d1";
+ String device2 = "root.sg.d2";
+ String device3 = "root.sg.d3";
+ SeriesPartitionExecutor executor =
+ SeriesPartitionExecutor.getSeriesPartitionExecutor(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device1),
d1DataRegionMap);
+ sgPartitionMap.put(executor.getSeriesPartitionSlot(device2),
d2DataRegionMap);
+ dataPartitionMap.put("root.sg", sgPartitionMap);
+ dataPartition.setDataPartitionMap(dataPartitionMap);
+
+ Analysis analysis = new Analysis();
+ analysis.setDataPartitionInfo(dataPartition);
+
+ // construct schema partition
+ TRegionReplicaSet schemaRegion1 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+ Collections.singletonList(genDataNodeLocation(11, "192.0.1.1")));
+
+ TRegionReplicaSet schemaRegion2 =
+ new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+ Collections.singletonList(genDataNodeLocation(21, "192.0.1.1")));
+
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new
HashMap<>();
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device1),
schemaRegion1);
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device2),
schemaRegion2);
+ schemaRegionMap.put(executor.getSeriesPartitionSlot(device3),
schemaRegion2);
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
schemaPartitionMap = new HashMap<>();
+ schemaPartitionMap.put("root.sg", schemaRegionMap);
+ SchemaPartition schemaPartition =
+ new SchemaPartition(
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+ schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+ analysis.setDataPartitionInfo(dataPartition);
+ analysis.setSchemaPartitionInfo(schemaPartition);
+ analysis.setSchemaTree(genSchemaTree());
+ // to avoid some special case which is not the point of test
+ analysis.setStatement(Mockito.mock(QueryStatement.class));
+ Mockito.when(analysis.getStatement().isQuery()).thenReturn(false);
+ return analysis;
+ }
+
+ private static ISchemaTree genSchemaTree() {
+ SchemaNode root = new SchemaInternalNode("root");
+
+ SchemaNode sg = new SchemaInternalNode("sg");
+ root.addChild("sg", sg);
+
+ SchemaEntityNode d1 = new SchemaEntityNode("d1");
+ SchemaMeasurementNode s1 =
+ new SchemaMeasurementNode("s1", new MeasurementSchema("s1",
TSDataType.INT32));
+ SchemaMeasurementNode s2 =
+ new SchemaMeasurementNode("s2", new MeasurementSchema("s2",
TSDataType.DOUBLE));
+ sg.addChild("d1", d1);
+ d1.addChild("s1", s1);
+ d1.addChild("s2", s2);
+
+ SchemaEntityNode d2 = new SchemaEntityNode("d2");
+ SchemaMeasurementNode t1 =
+ new SchemaMeasurementNode("s1", new MeasurementSchema("t1",
TSDataType.INT32));
+ SchemaMeasurementNode t2 =
+ new SchemaMeasurementNode("s2", new MeasurementSchema("t2",
TSDataType.DOUBLE));
+ SchemaMeasurementNode t3 =
+ new SchemaMeasurementNode("s3", new MeasurementSchema("t3",
TSDataType.DOUBLE));
+ sg.addChild("d2", d2);
+ d2.addChild("s1", t1);
+ d2.addChild("s2", t2);
+ d2.addChild("s3", t3);
+
+ ClusterSchemaTree tree = new ClusterSchemaTree(root);
+ tree.setDatabases(Collections.singleton("root.sg"));
+
+ return tree;
+ }
+
+ public static Analysis analyze(String sql, MPPQueryContext context) {
+ Statement statement = StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
+ Analyzer analyzer = new Analyzer(context, getFakePartitionFetcher(),
getFakeSchemaFetcher());
+ return analyzer.analyze(statement);
+ }
+
+ public static PlanNode genLogicalPlan(Analysis analysis, MPPQueryContext
context) {
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ return planner.plan(analysis).getRootNode();
+ }
+
+ private static ISchemaFetcher getFakeSchemaFetcher() {
+ return new ISchemaFetcher() {
+ @Override
+ public ISchemaTree fetchSchema(PathPatternTree patternTree,
MPPQueryContext context) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public ISchemaTree fetchSchemaWithTags(PathPatternTree patternTree,
MPPQueryContext context) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public void fetchAndComputeSchemaWithAutoCreate(
+ ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation,
+ MPPQueryContext context) {}
+
+ @Override
+ public void fetchAndComputeSchemaWithAutoCreate(
+ List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList,
+ MPPQueryContext context) {}
+
+ @Override
+ public ISchemaTree fetchSchemaListWithAutoCreate(
+ List<PartialPath> devicePath,
+ List<String[]> measurements,
+ List<TSDataType[]> tsDataTypes,
+ List<TSEncoding[]> encodings,
+ List<CompressionType[]> compressionTypes,
+ List<Boolean> aligned,
+ MPPQueryContext context) {
+ return ANALYSIS.getSchemaTree();
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetInfo(PartialPath
devicePath) {
+ return null;
+ }
+
+ @Override
+ public Pair<Template, PartialPath> checkTemplateSetAndPreSetInfo(
+ PartialPath timeSeriesPath, String alias) {
+ return null;
+ }
+
+ @Override
+ public Map<Integer, Template> checkAllRelatedTemplate(PartialPath
pathPattern) {
+ return null;
+ }
+
+ @Override
+ public Pair<Template, List<PartialPath>> getAllPathsSetTemplate(String
templateName) {
+ return null;
+ }
+ };
+ }
+
+ private static IPartitionFetcher getFakePartitionFetcher() {
+ return new IPartitionFetcher() {
+ @Override
+ public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
+ return ANALYSIS.getSchemaPartitionInfo();
+ }
+
+ @Override
+ public SchemaPartition getOrCreateSchemaPartition(
+ PathPatternTree patternTree, String userName) {
+ return ANALYSIS.getSchemaPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getDataPartitionWithUnclosedTimeRange(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public DataPartition getOrCreateDataPartition(
+ List<DataPartitionQueryParam> dataPartitionQueryParams, String
userName) {
+ return ANALYSIS.getDataPartitionInfo();
+ }
+
+ @Override
+ public SchemaNodeManagementPartition
getSchemaNodeManagementPartitionWithLevel(
+ PathPatternTree patternTree, PathPatternTree scope, Integer level) {
+ return null;
+ }
+
+ @Override
+ public boolean updateRegionCache(TRegionRouteReq req) {
+ return false;
+ }
+
+ @Override
+ public void invalidAllCache() {}
+ };
+ }
+
+ private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String
ip) {
+ return new TDataNodeLocation()
+ .setDataNodeId(dataNodeId)
+ .setClientRpcEndPoint(new TEndPoint(ip, 9000))
+ .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001))
+ .setInternalEndPoint(new TEndPoint(ip, 9002));
+ }
+}