This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f5b88fb0e93 [IOTDB-6297] Optimize the distribute plan in the situation
of `aggregation with align by device`
f5b88fb0e93 is described below
commit f5b88fb0e932c01ff601a26c829be7429efb1f38
Author: Beyyes <[email protected]>
AuthorDate: Thu Feb 22 14:31:22 2024 +0800
[IOTDB-6297] Optimize the distribute plan in the situation of `aggregation
with align by device`
---
.../db/queryengine/plan/analyze/Analysis.java | 11 +
.../plan/planner/LogicalPlanBuilder.java | 1 -
.../planner/distribution/DistributionPlanner.java | 2 +-
.../planner/distribution/ExchangeNodeAdder.java | 62 +--
.../planner/distribution/NodeDistribution.java | 24 +-
.../plan/planner/distribution/SourceRewriter.java | 328 +++++++++------
...anNodeRewriter.java => BaseSourceRewriter.java} | 2 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 9 +
.../plan/planner/plan/node/PlanNode.java | 2 +-
.../plan/planner/plan/node/PlanNodeType.java | 7 +-
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../node/process/AggregationMergeSortNode.java | 143 +++++++
.../operator/AggregationOperatorTest.java | 17 +-
.../distribution/AggregationAlignByDeviceTest.java | 451 +++++++++++++++++++++
.../distribution/AggregationDistributionTest.java | 18 +-
15 files changed, 902 insertions(+), 180 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 4e9c5ec6b15..3014cdec431 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -171,8 +171,11 @@ public class Analysis {
// indicates whether DeviceView need special process when rewriteSource in
DistributionPlan,
// you can see SourceRewriter#visitDeviceView to get more information
+ // deviceViewSpecialProcess equals true when all Aggregation Functions and
DIFF
private boolean deviceViewSpecialProcess;
+ private boolean existDeviceCrossRegion;
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// Query Common Analysis (above DeviceView)
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -655,6 +658,14 @@ public class Analysis {
this.deviceViewSpecialProcess = deviceViewSpecialProcess;
}
+ public boolean isExistDeviceCrossRegion() {
+ return existDeviceCrossRegion;
+ }
+
+ public void setExistDeviceCrossRegion() {
+ this.existDeviceCrossRegion = true;
+ }
+
public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
return deviceViewIntoPathDescriptor;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index f72f28b3d93..0aebfcab1c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -855,7 +855,6 @@ public class LogicalPlanBuilder {
-1);
this.root = mergeSortNode;
} else {
- // order by based on device, use DeviceViewNode
this.root =
addDeviceViewNode(
orderByParameter,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index aa7d14608c7..755119b2607 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -144,7 +144,7 @@ public class DistributionPlanner {
if (child instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) child;
TRegionReplicaSet regionOfChild =
-
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
+
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion();
MultiChildrenSinkNode newChild =
memo.computeIfAbsent(
regionOfChild,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 06043bd2e41..a8025719967 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -32,6 +32,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
@@ -115,13 +116,14 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
NodeDistribution nodeDistribution =
new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
PlanNode newNode = node.clone();
- nodeDistribution.region =
calculateSchemaRegionByChildren(node.getChildren(), context);
+
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(),
context));
context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
node.getChildren()
.forEach(
child -> {
- if (!nodeDistribution.region.equals(
- context.getNodeDistribution(child.getPlanNodeId()).region)) {
+ if (!nodeDistribution
+ .getRegion()
+
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
ExchangeNode exchangeNode =
new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
@@ -200,6 +202,12 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitAggregationMergeSort(
+ AggregationMergeSortNode node, NodeGroupContext context) {
+ return processMultiChildNode(node, context);
+ }
+
@Override
public PlanNode visitDeviceMerge(DeviceMergeNode node, NodeGroupContext
context) {
return processMultiChildNode(node, context);
@@ -265,7 +273,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// else we set the selected mostlyUsedDataRegion to this node
dataRegion =
isChildrenDistributionSame
- ? context.getNodeDistribution(leftChild.getPlanNodeId()).region
+ ?
context.getNodeDistribution(leftChild.getPlanNodeId()).getRegion()
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
@@ -284,7 +292,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
// parent.
- if
(!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).region))
{
+ if
(!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).getRegion()))
{
if (leftChild instanceof SingleDeviceViewNode) {
((SingleDeviceViewNode) leftChild).setCacheOutputColumnNames(true);
}
@@ -298,7 +306,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
newNode.setLeftChild(leftChild);
}
- if
(!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).region))
{
+ if
(!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).getRegion()))
{
if (rightChild instanceof SingleDeviceViewNode) {
((SingleDeviceViewNode) rightChild).setCacheOutputColumnNames(true);
}
@@ -377,7 +385,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
.map(child -> visit(child, context))
.collect(Collectors.toList());
- // DataRegion which node locates
+ // DataRegion in which node locates
TRegionReplicaSet dataRegion;
boolean isChildrenDistributionSame =
nodeDistributionIsSame(visitedChildren, context);
NodeDistributionType distributionType =
@@ -390,7 +398,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// else we set the selected mostlyUsedDataRegion to this node
dataRegion =
isChildrenDistributionSame
- ?
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
+ ?
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).getRegion()
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(), new NodeDistribution(distributionType,
dataRegion));
@@ -409,21 +417,17 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
// optimize `order by time|expression limit N align by device` query,
// to ensure that the number of ExchangeNode equals to DataRegionNum but
not equals to DeviceNum
if (node instanceof TopKNode) {
- return processTopNode(node, visitedChildren, context, newNode,
dataRegion);
+ return processTopKNode(node, visitedChildren, context, newNode,
dataRegion);
}
// Otherwise, we need to add ExchangeNode for the child whose DataRegion
is different from the
// parent.
for (PlanNode child : visitedChildren) {
- if
(!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region))
{
+ if
(!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion()))
{
if (child instanceof SingleDeviceViewNode) {
((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
}
- ExchangeNode exchangeNode =
- new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
- exchangeNode.setChild(child);
- exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- context.hasExchangeNode = true;
+ ExchangeNode exchangeNode = genExchangeNode(context, child);
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
@@ -450,7 +454,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
- private PlanNode processTopNode(
+ private PlanNode processTopKNode(
MultiChildProcessNode node,
List<PlanNode> visitedChildren,
NodeGroupContext context,
@@ -459,7 +463,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
TopKNode rootNode = (TopKNode) node;
Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
for (PlanNode child : visitedChildren) {
- TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
+ TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
regionTopKNodeMap
.computeIfAbsent(
region,
@@ -483,11 +487,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
TopKNode topKNode = entry.getValue();
if (!dataRegion.equals(topKNodeLocatedRegion)) {
- ExchangeNode exchangeNode =
- new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
- exchangeNode.setChild(topKNode);
- exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
- context.hasExchangeNode = true;
+ ExchangeNode exchangeNode = genExchangeNode(context, topKNode);
newNode.addChild(exchangeNode);
} else {
newNode.addChild(topKNode);
@@ -496,6 +496,14 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
+ private ExchangeNode genExchangeNode(NodeGroupContext context, PlanNode
child) {
+ ExchangeNode exchangeNode = new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(child);
+ exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+ context.hasExchangeNode = true;
+ return exchangeNode;
+ }
+
@Override
public PlanNode visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -506,7 +514,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
PlanNode newNode = node.clone();
PlanNode child = visit(node.getChildren().get(0), context);
newNode.addChild(child);
- TRegionReplicaSet dataRegion =
context.getNodeDistribution(child.getPlanNodeId()).region;
+ TRegionReplicaSet dataRegion =
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(),
new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN,
dataRegion));
@@ -523,9 +531,9 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
Collectors.groupingBy(
child -> {
TRegionReplicaSet region =
-
context.getNodeDistribution(child.getPlanNodeId()).region;
+
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
if (region == null
- &&
context.getNodeDistribution(child.getPlanNodeId()).type
+ &&
context.getNodeDistribution(child.getPlanNodeId()).getType()
== NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
return
calculateSchemaRegionByChildren(child.getChildren(), context);
}
@@ -571,7 +579,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
private TRegionReplicaSet calculateSchemaRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
// We always make the schemaRegion of MetaMergeNode to be the same as its
first child.
- return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
+ return
context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
}
private boolean nodeDistributionIsSame(List<PlanNode> children,
NodeGroupContext context) {
@@ -579,7 +587,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
NodeDistribution first =
context.getNodeDistribution(children.get(0).getPlanNodeId());
for (int i = 1; i < children.size(); i++) {
NodeDistribution next =
context.getNodeDistribution(children.get(i).getPlanNodeId());
- if (first.region == null || !first.region.equals(next.region)) {
+ if (first.getRegion() == null ||
!first.getRegion().equals(next.getRegion())) {
return false;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
index 4b2ca213c6d..2c793c2c39e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
@@ -22,15 +22,31 @@ package
org.apache.iotdb.db.queryengine.plan.planner.distribution;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
public class NodeDistribution {
- protected NodeDistributionType type;
- protected TRegionReplicaSet region;
+ private NodeDistributionType type;
+ private TRegionReplicaSet region;
- protected NodeDistribution(NodeDistributionType type, TRegionReplicaSet
region) {
+ public NodeDistribution(NodeDistributionType type, TRegionReplicaSet region)
{
this.type = type;
this.region = region;
}
- protected NodeDistribution(NodeDistributionType type) {
+ public NodeDistribution(NodeDistributionType type) {
this.type = type;
}
+
+ public NodeDistributionType getType() {
+ return this.type;
+ }
+
+ public void setType(NodeDistributionType type) {
+ this.type = type;
+ }
+
+ public TRegionReplicaSet getRegion() {
+ return this.region;
+ }
+
+ public void setRegion(TRegionReplicaSet region) {
+ this.region = region;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 7b5a82062f9..53814483e4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -30,14 +30,15 @@ import
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.SimplePlanNodeRewriter;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
@@ -49,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChild
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
@@ -85,7 +87,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
-public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanContext> {
+public class SourceRewriter extends
BaseSourceRewriter<DistributionPlanContext> {
private final Analysis analysis;
@@ -170,37 +172,66 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
"size of devices and its children in DeviceViewNode should be same");
}
- // If the DeviceView is mixed with Function that need to merge data from
different Data Region,
- // it should be processed by a special logic.
- // Now the Functions are : all Aggregation Functions and DIFF
- if (analysis.isDeviceViewSpecialProcess()) {
- return processSpecialDeviceView(node, context);
- }
-
+ // Step 1: constructs DeviceViewSplits
Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
-
List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
- // Step 1: constructs DeviceViewSplit
- Map<String, String> outputDeviceToQueriedDevicesMap =
- analysis.getOutputDeviceToQueriedDevicesMap();
for (int i = 0; i < node.getDevices().size(); i++) {
String outputDevice = node.getDevices().get(i);
PlanNode child = node.getChildren().get(i);
List<TRegionReplicaSet> regionReplicaSets =
- !analysis.useLogicalView()
+ analysis.useLogicalView()
? new ArrayList<>(
- analysis.getPartitionInfo(outputDevice,
context.getPartitionTimeFilter()))
- : new ArrayList<>(
analysis.getPartitionInfo(
- outputDeviceToQueriedDevicesMap.get(outputDevice),
- context.getPartitionTimeFilter()));
+
analysis.getOutputDeviceToQueriedDevicesMap().get(outputDevice),
+ context.getPartitionTimeFilter()))
+ : new ArrayList<>(
+ analysis.getPartitionInfo(outputDevice,
context.getPartitionTimeFilter()));
+ if (regionReplicaSets.size() > 1) {
+ // specialProcess and existDeviceCrossRegion, use the old aggregation
logic
+ analysis.setExistDeviceCrossRegion();
+ if (analysis.isDeviceViewSpecialProcess()) {
+ return processSpecialDeviceView(node, context);
+ }
+ }
deviceViewSplits.add(new DeviceViewSplit(outputDevice, child,
regionReplicaSets));
relatedDataRegions.addAll(regionReplicaSets);
}
// Step 2: Iterate all partition and create DeviceViewNode for each region
List<PlanNode> deviceViewNodeList = new ArrayList<>();
+ if (analysis.isExistDeviceCrossRegion()) {
+ constructDeviceViewNodeListWithCrossRegion(
+ deviceViewNodeList, relatedDataRegions, deviceViewSplits, node,
context);
+ } else {
+ constructDeviceViewNodeListWithoutCrossRegion(
+ deviceViewNodeList, deviceViewSplits, node, context, analysis);
+ }
+
+ // 1. Only one DeviceViewNode, the is no need to use MergeSortNode.
+ // 2. for DeviceView+SortNode case, the parent of DeviceViewNode will be
SortNode, MergeSortNode
+ // will be generated in {@link #visitSort}.
+ // 3. for DeviceView+TopKNode case, there is no need MergeSortNode,
TopKNode can output the
+ // sorted result.
+ if (deviceViewNodeList.size() == 1 || analysis.isHasSortNode() ||
analysis.isUseTopKNode()) {
+ return deviceViewNodeList;
+ }
+
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames());
+ deviceViewNodeList.forEach(mergeSortNode::addChild);
+ return Collections.singletonList(mergeSortNode);
+ }
+
+ private void constructDeviceViewNodeListWithCrossRegion(
+ List<PlanNode> deviceViewNodeList,
+ Set<TRegionReplicaSet> relatedDataRegions,
+ List<DeviceViewSplit> deviceViewSplits,
+ DeviceViewNode node,
+ DistributionPlanContext context) {
for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
List<String> devices = new ArrayList<>();
List<PlanNode> children = new ArrayList<>();
@@ -216,20 +247,49 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
}
deviceViewNodeList.add(regionDeviceViewNode);
}
+ }
- if (deviceViewNodeList.size() == 1 || analysis.isHasSortNode() ||
analysis.isUseTopKNode()) {
- return deviceViewNodeList;
- }
+ private void constructDeviceViewNodeListWithoutCrossRegion(
+ List<PlanNode> deviceViewNodeList,
+ List<DeviceViewSplit> deviceViewSplits,
+ DeviceViewNode node,
+ DistributionPlanContext context,
+ Analysis analysis) {
- MergeSortNode mergeSortNode =
- new MergeSortNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
- node.getOutputColumnNames());
- for (PlanNode deviceViewNode : deviceViewNodeList) {
- mergeSortNode.addChild(deviceViewNode);
+ Map<TRegionReplicaSet, DeviceViewNode> regionDeviceViewMap = new
HashMap<>();
+ for (DeviceViewSplit split : deviceViewSplits) {
+ if (split.dataPartitions.size() != 1) {
+ throw new IllegalStateException(
+ "In non-cross data region device-view situation, "
+ + "each device should only have on data partition.");
+ }
+ TRegionReplicaSet region = split.dataPartitions.iterator().next();
+ DeviceViewNode regionDeviceViewNode =
+ regionDeviceViewMap.computeIfAbsent(
+ region,
+ k -> {
+ DeviceViewNode deviceViewNode =
cloneDeviceViewNodeWithoutChild(node, context);
+ deviceViewNodeList.add(deviceViewNode);
+ return deviceViewNode;
+ });
+ PlanNode childNode = split.buildPlanNodeInRegion(region,
context.queryContext);
+ if (analysis.isDeviceViewSpecialProcess()) {
+ List<PlanNode> rewriteResult = rewrite(childNode, context);
+ if (rewriteResult.size() != 1) {
+ throw new IllegalStateException(
+ "In non-cross data region aggregation device-view situation, "
+ + "each rewrite child node of DeviceView should only be
one.");
+ }
+ childNode = rewriteResult.get(0);
+ }
+ regionDeviceViewNode.addChildDeviceNode(split.device, childNode);
}
- return Collections.singletonList(mergeSortNode);
+ }
+
+ @Override
+ public List<PlanNode> visitAggregationMergeSort(
+ AggregationMergeSortNode node, DistributionPlanContext context) {
+ return null;
}
private List<PlanNode> processSpecialDeviceView(
@@ -253,41 +313,35 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
node.getDeviceToMeasurementIndexesMap());
}
- private static class DeviceViewSplit {
- protected String device;
- protected PlanNode root;
- protected Set<TRegionReplicaSet> dataPartitions;
-
- protected DeviceViewSplit(
- String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
- this.device = device;
- this.root = root;
- this.dataPartitions = new HashSet<>();
- this.dataPartitions.addAll(dataPartitions);
- }
-
- protected PlanNode buildPlanNodeInRegion(
- TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
- return buildPlanNodeInRegion(this.root, regionReplicaSet, context);
+ @Override
+ public List<PlanNode> visitTransform(TransformNode node,
DistributionPlanContext context) {
+ List<PlanNode> children = rewrite(node.getChild(), context);
+ if (children.size() == 1) {
+ node.setChild(children.get(0));
+ return Collections.singletonList(node);
}
- protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
- return this.dataPartitions.contains(regionReplicaSet);
- }
+ // for some query such as `select avg(s1)+avg(s2) from root.** where
count(s3)>1 align by
+ // device`,
+ // the children of TransformNode may be N(N>1) DeviceViewNodes, so need
return N TransformNodes
+ return children.stream()
+ .map(
+ child -> {
+ TransformNode transformNode =
cloneTransformNodeWithOutChild(node, context);
+ transformNode.addChild(child);
+ return transformNode;
+ })
+ .collect(Collectors.toList());
+ }
- private PlanNode buildPlanNodeInRegion(
- PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext
context) {
- List<PlanNode> children =
- root.getChildren().stream()
- .map(child -> buildPlanNodeInRegion(child, regionReplicaSet,
context))
- .collect(Collectors.toList());
- PlanNode newRoot = root.cloneWithChildren(children);
- newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
- if (newRoot instanceof SourceNode) {
- ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
- }
- return newRoot;
- }
+ private TransformNode cloneTransformNodeWithOutChild(
+ TransformNode node, DistributionPlanContext context) {
+ return new TransformNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getOutputExpressions(),
+ node.isKeepNull(),
+ node.getZoneId(),
+ node.getScanOrder());
}
@Override
@@ -347,23 +401,21 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
}
});
schemaRegions.forEach(
- region -> {
- addSchemaSourceNode(
- root,
- seed.getPath(),
- region,
- context.queryContext.getQueryId().genPlanNodeId(),
- seed);
- });
+ region ->
+ addSchemaSourceNode(
+ root,
+ seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed));
regionsOfSystemDatabase.forEach(
- region -> {
- addSchemaSourceNode(
- root,
- seed.getPath(),
- region,
- context.queryContext.getQueryId().genPlanNodeId(),
- seed);
- });
+ region ->
+ addSchemaSourceNode(
+ root,
+ seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed));
} else {
// the path pattern may only overlap with part of storageGroup or
storageGroup.**, need filter
PathPatternTree patternTree = new PathPatternTree();
@@ -394,31 +446,29 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
List<PartialPath> filteredPathPatternList =
filterPathPattern(patternTree, storageGroup);
schemaRegionSet.forEach(
- region -> {
- addSchemaSourceNode(
- root,
- filteredPathPatternList.size() == 1
- ? filteredPathPatternList.get(0)
- : seed.getPath(),
- region,
- context.queryContext.getQueryId().genPlanNodeId(),
- seed);
- });
+ region ->
+ addSchemaSourceNode(
+ root,
+ filteredPathPatternList.size() == 1
+ ? filteredPathPatternList.get(0)
+ : seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed));
});
if (!regionsOfSystemDatabase.isEmpty()) {
List<PartialPath> filteredPathPatternList =
filterPathPattern(patternTree, SchemaConstant.SYSTEM_DATABASE);
regionsOfSystemDatabase.forEach(
- region -> {
- addSchemaSourceNode(
- root,
- filteredPathPatternList.size() == 1
- ? filteredPathPatternList.get(0)
- : seed.getPath(),
- region,
- context.queryContext.getQueryId().genPlanNodeId(),
- seed);
- });
+ region ->
+ addSchemaSourceNode(
+ root,
+ filteredPathPatternList.size() == 1
+ ? filteredPathPatternList.get(0)
+ : seed.getPath(),
+ region,
+ context.queryContext.getQueryId().genPlanNodeId(),
+ seed));
}
}
return Collections.singletonList(root);
@@ -462,11 +512,10 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
.getSchemaPartitionInfo()
.getSchemaPartitionMap()
.forEach(
- (storageGroup, deviceGroup) -> {
- deviceGroup.forEach(
- (deviceGroupId, schemaRegionReplicaSet) ->
- schemaRegions.add(schemaRegionReplicaSet));
- });
+ (storageGroup, deviceGroup) ->
+ deviceGroup.forEach(
+ (deviceGroupId, schemaRegionReplicaSet) ->
+ schemaRegions.add(schemaRegionReplicaSet)));
schemaRegions.forEach(
region -> {
SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode)
seed.clone();
@@ -564,14 +613,13 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
- descriptor -> {
- leafAggDescriptorList.add(
- new AggregationDescriptor(
- descriptor.getAggregationFuncName(),
- AggregationStep.PARTIAL,
- descriptor.getInputExpressions(),
- descriptor.getInputAttributes()));
- });
+ descriptor ->
+ leafAggDescriptorList.add(
+ new AggregationDescriptor(
+ descriptor.getAggregationFuncName(),
+ AggregationStep.PARTIAL,
+ descriptor.getInputExpressions(),
+ descriptor.getInputAttributes())));
leafAggDescriptorList.forEach(
d ->
LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
@@ -579,14 +627,13 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
.forEach(
- descriptor -> {
- rootAggDescriptorList.add(
- new AggregationDescriptor(
- descriptor.getAggregationFuncName(),
- context.isRoot ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE,
- descriptor.getInputExpressions(),
- descriptor.getInputAttributes()));
- });
+ descriptor ->
+ rootAggDescriptorList.add(
+ new AggregationDescriptor(
+ descriptor.getAggregationFuncName(),
+ context.isRoot ? AggregationStep.FINAL :
AggregationStep.INTERMEDIATE,
+ descriptor.getInputExpressions(),
+ descriptor.getInputAttributes())));
AggregationNode aggregationNode =
new AggregationNode(
@@ -1436,14 +1483,14 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
source
.getAggregationDescriptorList()
.forEach(
- d -> {
+ descriptor -> {
if (isSingle) {
- d.setStep(AggregationStep.SINGLE);
+ descriptor.setStep(AggregationStep.SINGLE);
} else {
eachSeriesOneRegion[0] = false;
- d.setStep(AggregationStep.PARTIAL);
+ descriptor.setStep(AggregationStep.PARTIAL);
LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
- d, context.queryContext.getTypeProvider());
+ descriptor, context.queryContext.getTypeProvider());
}
});
}
@@ -1475,4 +1522,41 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
+
+ private static class DeviceViewSplit {
+ protected String device;
+ protected PlanNode root;
+ protected Set<TRegionReplicaSet> dataPartitions;
+
+ protected DeviceViewSplit(
+ String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
+ this.device = device;
+ this.root = root;
+ this.dataPartitions = new HashSet<>();
+ this.dataPartitions.addAll(dataPartitions);
+ }
+
+ protected PlanNode buildPlanNodeInRegion(
+ TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+ return buildPlanNodeInRegion(this.root, regionReplicaSet, context);
+ }
+
+ protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
+ return this.dataPartitions.contains(regionReplicaSet);
+ }
+
+ private PlanNode buildPlanNodeInRegion(
+ PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext
context) {
+ List<PlanNode> children =
+ root.getChildren().stream()
+ .map(child -> buildPlanNodeInRegion(child, regionReplicaSet,
context))
+ .collect(Collectors.toList());
+ PlanNode newRoot = root.cloneWithChildren(children);
+ newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
+ if (newRoot instanceof SourceNode) {
+ ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
+ }
+ return newRoot;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/SimplePlanNodeRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/BaseSourceRewriter.java
similarity index 95%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/SimplePlanNodeRewriter.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/BaseSourceRewriter.java
index 42329085405..11f8afd6448 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/SimplePlanNodeRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/BaseSourceRewriter.java
@@ -24,7 +24,7 @@ import java.util.List;
import static com.google.common.collect.ImmutableList.toImmutableList;
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
+public class BaseSourceRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
@Override
public List<PlanNode> visitPlan(PlanNode node, C context) {
// TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index fcebd9b0c66..7bfd680c40e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -188,6 +189,14 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitAggregationMergeSort(
+ AggregationMergeSortNode node, GraphContext context) {
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("AggregationMergeSort-%s",
node.getPlanNodeId().getId()));
+ return render(node, boxValue, context);
+ }
+
@Override
public List<String> visitMergeSort(MergeSortNode node, GraphContext context)
{
List<String> boxValue = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 6d13dde35ff..d147d316318 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -87,7 +87,7 @@ public abstract class PlanNode implements IConsensusRequest {
*/
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
throw new UnsupportedOperationException(
- String.format("Can't create subNode for %s",
this.getClass().toString()));
+ String.format("Can't create subNode for %s", this.getClass()));
}
public PlanNode cloneWithChildren(List<PlanNode> children) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 5eb722c7422..3b40eb38a1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -60,6 +60,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -200,7 +201,9 @@ public enum PlanNodeType {
PIPE_ENRICHED_DELETE_SCHEMA((short) 86),
INNER_TIME_JOIN((short) 87),
- LEFT_OUTER_TIME_JOIN((short) 88);
+ LEFT_OUTER_TIME_JOIN((short) 88),
+
+ AGG_MERGE_SORT((short) 89);
public static final int BYTES = Short.BYTES;
@@ -425,6 +428,8 @@ public enum PlanNodeType {
return InnerTimeJoinNode.deserialize(buffer);
case 88:
return LeftOuterTimeJoinNode.deserialize(buffer);
+ case 89:
+ return AggregationMergeSortNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index ed7072a76eb..1091d9a352a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -57,6 +57,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -231,6 +232,10 @@ public abstract class PlanVisitor<R, C> {
return visitMultiChildProcess(node, context);
}
+ public R visitAggregationMergeSort(AggregationMergeSortNode node, C context)
{
+ return visitMultiChildProcess(node, context);
+ }
+
public R visitDeviceMerge(DeviceMergeNode node, C context) {
return visitMultiChildProcess(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
new file mode 100644
index 00000000000..41c4cfb2b55
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class AggregationMergeSortNode extends MultiChildProcessNode {
+
+ private final OrderByParameter mergeOrderParameter;
+
+ private final List<String> outputColumns;
+
+ public AggregationMergeSortNode(
+ PlanNodeId id, OrderByParameter mergeOrderParameter, List<String>
outputColumns) {
+ super(id);
+ this.mergeOrderParameter = mergeOrderParameter;
+ this.outputColumns = outputColumns;
+ }
+
+ public AggregationMergeSortNode(
+ PlanNodeId id,
+ List<PlanNode> children,
+ OrderByParameter mergeOrderParameter,
+ List<String> outputColumns) {
+ super(id, children);
+ this.mergeOrderParameter = mergeOrderParameter;
+ this.outputColumns = outputColumns;
+ }
+
+ public OrderByParameter getMergeOrderParameter() {
+ return mergeOrderParameter;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new AggregationMergeSortNode(getPlanNodeId(),
getMergeOrderParameter(), outputColumns);
+ }
+
+ @Override
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new AggregationMergeSortNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)),
+ getMergeOrderParameter(),
+ outputColumns);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return outputColumns;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitAggregationMergeSort(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
+ mergeOrderParameter.serializeAttributes(byteBuffer);
+ ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+ for (String column : outputColumns) {
+ ReadWriteIOUtils.write(column, byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.AGG_MERGE_SORT.serialize(stream);
+ mergeOrderParameter.serializeAttributes(stream);
+ ReadWriteIOUtils.write(outputColumns.size(), stream);
+ for (String column : outputColumns) {
+ ReadWriteIOUtils.write(column, stream);
+ }
+ }
+
+ public static AggregationMergeSortNode deserialize(ByteBuffer byteBuffer) {
+ OrderByParameter orderByParameter =
OrderByParameter.deserialize(byteBuffer);
+ int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> outputColumns = new ArrayList<>();
+ while (columnSize > 0) {
+ outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
+ columnSize--;
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new AggregationMergeSortNode(planNodeId, orderByParameter,
outputColumns);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AggregationMergeSortNode that = (AggregationMergeSortNode) o;
+ return Objects.equals(mergeOrderParameter, that.getMergeOrderParameter());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), mergeOrderParameter);
+ }
+
+ @Override
+ public String toString() {
+ return "AggregationMergeSort-" + this.getPlanNodeId();
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
index ee98da4047d..c7644217bd3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
@@ -125,9 +125,8 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -169,8 +168,7 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -221,8 +219,7 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -275,8 +272,7 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -321,10 +317,7 @@ public class AggregationOperatorTest {
driverContext.addOperatorContext(3, planNodeId3,
AggregationOperator.class.getSimpleName());
driverContext
.getOperatorContexts()
- .forEach(
- operatorContext -> {
- operatorContext.setMaxRunTime(TEST_TIME_SLICE);
- });
+ .forEach(operatorContext ->
OperatorContext.setMaxRunTime(TEST_TIME_SLICE));
MeasurementPath measurementPath1 =
new MeasurementPath(AGGREGATION_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
new file mode 100644
index 00000000000..0ed46adfedb
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AggregationAlignByDeviceTest {
+ private static final long LIMIT_VALUE = 10;
+
+ QueryId queryId = new QueryId("test");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql;
+ Analysis analysis;
+ PlanNode logicalPlanNode;
+ DistributionPlanner planner;
+ DistributedQueryPlan plan;
+ PlanNode firstFiRoot;
+ PlanNode secondFiRoot;
+ PlanNode firstFiTopNode;
+
+ // ================= no scaling situation, i.e., each device only in one
data region ===========
+
+ /*
+ * IdentitySinkNode-10
+ * └──MergeSort-7
+ * ├──DeviceView-5
+ * │ └──SeriesAggregationScanNode-1:[SeriesPath: root.sg.d22.s1,
+ * Descriptor: [AggregationDescriptor(first_value, SINGLE)],
+ * DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+ * └──ExchangeNode-8: [SourceAddress:192.0.4.1/test.2.0/9]
+ *
+ * IdentitySinkNode-9
+ * └──DeviceView-6
+ * └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d55555.s1,
+ * Descriptor: [AggregationDescriptor(first_value, SINGLE)],
+ * DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ */
+ @Test
+ public void orderByDeviceTest1() {
+ // one aggregation measurement, two devices
+ sql = "select first_value(s1) from root.sg.d22, root.sg.d55555 align by
device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(
+ firstFiTopNode.getChildren().get(0).getChildren().get(0)
+ instanceof SeriesAggregationScanNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof IdentitySinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(
+ secondFiRoot.getChildren().get(0).getChildren().get(0)
+ instanceof SeriesAggregationScanNode);
+
+ // one aggregation measurement, one device
+ sql = "select first_value(s1) from root.sg.d22 align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof
SeriesAggregationScanNode);
+
+ // two aggregation measurement, two devices
+ sql = "select first_value(s1), count(s2) from root.sg.d22, root.sg.d55555
align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(
+ firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof
HorizontallyConcatNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof IdentitySinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(
+ secondFiRoot.getChildren().get(0).getChildren().get(0) instanceof
HorizontallyConcatNode);
+
+ // two aggregation measurement, one device
+ sql = "select first_value(s1), count(s2) from root.sg.d22 align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof
HorizontallyConcatNode);
+ }
+
+ /*
+ * IdentitySinkNode-31
+ * └──MergeSort-28
+ * ├──DeviceView-12
+ * │ └──AggregationNode-17
+ * │ └──FilterNode-16
+ * │ └──FullOuterTimeJoinNode-15
+ * │ ├──SeriesScanNode-18:[SeriesPath: root.sg.d22.s1,
+ * DataRegion: TConsensusGroupId(type:DataRegion,
id:3)]
+ * │ └──SeriesScanNode-19:[SeriesPath: root.sg.d22.s2,
+ * DataRegion: TConsensusGroupId(type:DataRegion,
id:3)]
+ * └──ExchangeNode-29: [SourceAddress:192.0.4.1/test.2.0/30]
+ *
+ * IdentitySinkNode-30
+ * └──DeviceView-20
+ * └──AggregationNode-25
+ * └──FilterNode-24
+ * └──FullOuterTimeJoinNode-23
+ * ├──SeriesScanNode-26:[SeriesPath: root.sg.d55555.s1,
+ * DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ * └──SeriesScanNode-27:[SeriesPath: root.sg.d55555.s2,
+ * DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ */
+ @Test
+ public void orderByDeviceTest2() {
+ // one aggregation measurement, two devices, with filter
+ sql = "select first_value(s1) from root.sg.d22, root.sg.d55555 where s2>1
align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
+ assertTrue(
+
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof FilterNode);
+ assertTrue(
+ firstFiTopNode
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof FullOuterTimeJoinNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof IdentitySinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(secondFiRoot.getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
+ assertTrue(
+
secondFiRoot.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof FilterNode);
+ assertTrue(
+ secondFiRoot
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof FullOuterTimeJoinNode);
+
+ // two aggregation measurement, two devices, with filter
+ sql =
+ "select first_value(s1), count(s2) from root.sg.d22, root.sg.d55555
where s2>1 align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
+ assertTrue(
+
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof FilterNode);
+ assertTrue(
+ firstFiTopNode
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof FullOuterTimeJoinNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof IdentitySinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(secondFiRoot.getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
+ assertTrue(
+
secondFiRoot.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof FilterNode);
+ assertTrue(
+ secondFiRoot
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof FullOuterTimeJoinNode);
+ }
+
+ /*
+ * IdentitySinkNode-32
+ * └──TransformNode-10
+ * └──MergeSort-27
+ * ├──SortNode-28
+ * │ └──TransformNode-25
+ * │ └──DeviceView-11
+ * │ └──HorizontallyConcatNode-17
+ * │ ├──SeriesAggregationScanNode-15:[SeriesPath:
root.sg.d22.s1,
+ * Descriptor: [AggregationDescriptor(avg,
SINGLE)],
+ * DataRegion:
TConsensusGroupId(type:DataRegion, id:3)]
+ * │ └──SeriesAggregationScanNode-16:[SeriesPath:
root.sg.d22.s2,
+ * Descriptor: [AggregationDescriptor(avg,
SINGLE),
+ * AggregationDescriptor(max_value, SINGLE)],
+ * DataRegion:
TConsensusGroupId(type:DataRegion, id:3)]
+ * └──ExchangeNode-30: [SourceAddress:192.0.4.1/test.2.0/31]
+ *
+ * IdentitySinkNode-31
+ * └──SortNode-29
+ * └──TransformNode-26
+ * └──DeviceView-18
+ * └──HorizontallyConcatNode-24
+ * ├──SeriesAggregationScanNode-22:[SeriesPath:
root.sg.d55555.s1,
+ * Descriptor: [AggregationDescriptor(avg, SINGLE)],
+ * DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ * └──SeriesAggregationScanNode-23:[SeriesPath:
root.sg.d55555.s2,
+ * Descriptor: [AggregationDescriptor(avg, SINGLE),
AggregationDescriptor(max_value, SINGLE)],
+ * DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+ */
+ @Test
+ public void orderByExpressionTest() {
+ sql =
+ "select avg(s1)+avg(s2) from root.sg.d22, root.sg.d55555 order by
max_value(s2) align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof TransformNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0)
instanceof SortNode);
+ assertTrue(
+
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof TransformNode);
+ assertTrue(
+ firstFiTopNode
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof DeviceViewNode);
+ assertTrue(
+ firstFiTopNode
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof HorizontallyConcatNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof IdentitySinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof SortNode);
+ assertTrue(secondFiRoot.getChildren().get(0).getChildren().get(0)
instanceof TransformNode);
+ assertTrue(
+
secondFiRoot.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof DeviceViewNode);
+ assertTrue(
+ secondFiRoot
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0)
+ instanceof HorizontallyConcatNode);
+ }
+
+ @Test
+ public void orderByTimeTest1() {
+ // one aggregation measurement, two devices
+ sql =
+ "select first_value(s1) from root.sg.d22, root.sg.d55555 order by time
desc align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof
SingleDeviceViewNode);
+ assertTrue(
+ firstFiTopNode.getChildren().get(0).getChildren().get(0)
+ instanceof SeriesAggregationScanNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof ShuffleSinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof
SingleDeviceViewNode);
+ assertTrue(
+ secondFiRoot.getChildren().get(0).getChildren().get(0)
+ instanceof SeriesAggregationScanNode);
+
+ // one aggregation measurement, one device
+ sql = "select first_value(s1) from root.sg.d22 order by time desc align by
device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof
SeriesAggregationScanNode);
+
+ // two aggregation measurement, two devices
+ sql =
+ "select first_value(s1), count(s2) from root.sg.d22, root.sg.d55555 "
+ + "order by time desc align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof MergeSortNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof
SingleDeviceViewNode);
+ assertTrue(
+ firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof
HorizontallyConcatNode);
+
+ secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+ assertTrue(secondFiRoot instanceof ShuffleSinkNode);
+ assertTrue(secondFiRoot.getChildren().get(0) instanceof
SingleDeviceViewNode);
+ assertTrue(
+ secondFiRoot.getChildren().get(0).getChildren().get(0) instanceof
HorizontallyConcatNode);
+
+ // two aggregation measurement, one device
+ sql = "select first_value(s1), count(s2) from root.sg.d22 order by time
desc align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(1, plan.getInstances().size());
+
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ firstFiTopNode = firstFiRoot.getChildren().get(0);
+ assertTrue(firstFiTopNode instanceof DeviceViewNode);
+ assertTrue(firstFiTopNode.getChildren().get(0) instanceof
HorizontallyConcatNode);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
index 812d5319e19..b04c6c4b847 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
@@ -68,7 +68,7 @@ import static org.junit.Assert.assertTrue;
public class AggregationDistributionTest {
@Test
- public void testAggregation1Series2Regions() throws IllegalPathException {
+ public void testAggregation1Series2Regions() {
QueryId queryId = new QueryId("test_1_series_2_regions");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
@@ -95,7 +95,7 @@ public class AggregationDistributionTest {
}
@Test
- public void testAggregation1Series2RegionsWithSlidingWindow() throws
IllegalPathException {
+ public void testAggregation1Series2RegionsWithSlidingWindow() {
QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
@@ -130,7 +130,7 @@ public class AggregationDistributionTest {
}
@Test
- public void testTimeJoinAggregationSinglePerRegion() throws
IllegalPathException {
+ public void testTimeJoinAggregationSinglePerRegion() {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
@@ -166,9 +166,7 @@ public class AggregationDistributionTest {
SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
List<AggregationDescriptor> descriptorList =
handle.getAggregationDescriptorList();
descriptorList.forEach(
- d -> {
-
assertEquals(expected.get(handle.getPartitionPath().getFullPath()),
d.getStep());
- });
+ d ->
assertEquals(expected.get(handle.getPartitionPath().getFullPath()),
d.getStep()));
}
root.getChildren().forEach(child -> verifyAggregationStep(expected,
child));
}
@@ -190,7 +188,7 @@ public class AggregationDistributionTest {
}
@Test
- public void testTimeJoinAggregationWithSlidingWindow() throws
IllegalPathException {
+ public void testTimeJoinAggregationWithSlidingWindow() {
QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
@@ -229,7 +227,7 @@ public class AggregationDistributionTest {
}
@Test
- public void testTimeJoinAggregationMultiPerRegion() throws
IllegalPathException {
+ public void testTimeJoinAggregationMultiPerRegion() {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
@@ -254,7 +252,7 @@ public class AggregationDistributionTest {
}
@Test
- public void testTimeJoinAggregationMultiPerRegion2() throws
IllegalPathException {
+ public void testTimeJoinAggregationMultiPerRegion2() {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
@@ -775,7 +773,7 @@ public class AggregationDistributionTest {
}
@Test
- public void testAlignByDevice1Device2Region() throws IllegalPathException {
+ public void testAlignByDevice1Device2Region() {
QueryId queryId = new QueryId("test_align_by_device_1_device_2_region");
MPPQueryContext context =
new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());