This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 69b429b8d7 [IOTDB-3333] Distribution planning for
SlidingWindowAggregationNode (#6103)
69b429b8d7 is described below
commit 69b429b8d7c44f9b7bc914dd7be72cefec9e8fca
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Wed Jun 1 13:54:11 2022 +0800
[IOTDB-3333] Distribution planning for SlidingWindowAggregationNode (#6103)
---
.../distribution/DistributionPlanContext.java | 11 +
.../planner/distribution/ExchangeNodeAdder.java | 18 ++
.../plan/planner/distribution/SourceRewriter.java | 182 ++++++++++----
.../node/process/SlidingWindowAggregationNode.java | 10 +-
.../distribution/AggregationDistributionTest.java | 277 +++++++++++++++++++++
5 files changed, 454 insertions(+), 44 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 6f9e16e6ee..88f13eddf4 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -22,9 +22,20 @@ package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
public class DistributionPlanContext {
+ protected boolean isRoot;
protected MPPQueryContext queryContext;
protected DistributionPlanContext(MPPQueryContext queryContext) {
+ this.isRoot = true;
this.queryContext = queryContext;
}
+
+ protected DistributionPlanContext copy() {
+ return new DistributionPlanContext(queryContext);
+ }
+
+ protected DistributionPlanContext setRoot(boolean isRoot) {
+ this.isRoot = isRoot;
+ return this;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index 4de9558101..3373722007 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -250,6 +251,23 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
+ @Override
+ public PlanNode visitSlidingWindowAggregation(
+ SlidingWindowAggregationNode node, NodeGroupContext context) {
+ return processOneChildNode(node, context);
+ }
+
+ private PlanNode processOneChildNode(PlanNode node, NodeGroupContext
context) {
+ PlanNode newNode = node.clone();
+ PlanNode child = visit(node.getChildren().get(0), context);
+ newNode.addChild(child);
+ TRegionReplicaSet dataRegion =
context.getNodeDistribution(child.getPlanNodeId()).region;
+ context.putNodeDistribution(
+ newNode.getPlanNodeId(),
+ new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN,
dataRegion));
+ return newNode;
+ }
+
private TRegionReplicaSet calculateDataRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
// Step 1: calculate the count of children group by DataRegion.
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 7cbc01469a..6c20206302 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LastQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
@@ -52,6 +53,7 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -452,6 +454,17 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return false;
}
+ // This method is only used to process the PlanNodeTree whose root is
SlidingWindowAggregationNode
+ @Override
+ public PlanNode visitSlidingWindowAggregation(
+ SlidingWindowAggregationNode node, DistributionPlanContext context) {
+ DistributionPlanContext childContext = context.copy().setRoot(false);
+ PlanNode child = visit(node.getChild(), childContext);
+ PlanNode newRoot = node.clone();
+ newRoot.addChild(child);
+ return newRoot;
+ }
+
private PlanNode planAggregationWithTimeJoin(TimeJoinNode root,
DistributionPlanContext context) {
List<SeriesAggregationSourceNode> sources =
splitAggregationSourceByPartition(root, context);
@@ -469,7 +482,7 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
rootAggDescriptorList.add(
new AggregationDescriptor(
descriptor.getAggregationType(),
- AggregationStep.FINAL,
+ context.isRoot ? AggregationStep.FINAL :
AggregationStep.PARTIAL,
descriptor.getInputExpressions()));
});
}
@@ -512,6 +525,64 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
sources.stream().collect(Collectors.groupingBy(SourceNode::getRegionReplicaSet));
+ boolean containsSlidingWindow =
+ root.getChildren().size() == 1
+ && root.getChildren().get(0) instanceof
SlidingWindowAggregationNode;
+
+ GroupByLevelNode newRoot =
+ containsSlidingWindow
+ ? groupSourcesForGroupByLevelWithSlidingWindow(
+ root,
+ (SlidingWindowAggregationNode) root.getChildren().get(0),
+ sourceGroup,
+ context)
+ : groupSourcesForGroupByLevel(root, sourceGroup, context);
+
+ // Then, we calculate the attributes for GroupByLevelNode in each level
+ calculateGroupByLevelNodeAttributes(newRoot, 0);
+ return newRoot;
+ }
+
+ private GroupByLevelNode groupSourcesForGroupByLevelWithSlidingWindow(
+ GroupByLevelNode root,
+ SlidingWindowAggregationNode slidingWindowNode,
+ Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup,
+ DistributionPlanContext context) {
+ GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
+ List<SlidingWindowAggregationNode> groups = new ArrayList<>();
+ sourceGroup.forEach(
+ (dataRegion, sourceNodes) -> {
+ SlidingWindowAggregationNode parentOfGroup =
+ (SlidingWindowAggregationNode) slidingWindowNode.clone();
+
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ if (sourceNodes.size() == 1) {
+ parentOfGroup.addChild(sourceNodes.get(0));
+ } else {
+ TimeJoinNode timeJoinNode =
+ new TimeJoinNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
root.getScanOrder());
+ sourceNodes.forEach(timeJoinNode::addChild);
+ parentOfGroup.addChild(timeJoinNode);
+ }
+ groups.add(parentOfGroup);
+ });
+ for (int i = 0; i < groups.size(); i++) {
+ if (i == 0) {
+ newRoot.addChild(groups.get(i));
+ continue;
+ }
+ GroupByLevelNode parent = (GroupByLevelNode) root.clone();
+ parent.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+ parent.addChild(groups.get(i));
+ newRoot.addChild(parent);
+ }
+ return newRoot;
+ }
+
+ private GroupByLevelNode groupSourcesForGroupByLevel(
+ GroupByLevelNode root,
+ Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup,
+ DistributionPlanContext context) {
GroupByLevelNode newRoot = (GroupByLevelNode) root.clone();
final boolean[] addParent = {false};
sourceGroup.forEach(
@@ -523,8 +594,6 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
sourceNodes.forEach(newRoot::addChild);
addParent[0] = true;
} else {
- // We clone a TimeJoinNode from root to make the params to be
consistent.
- // But we need to assign a new ID to it
GroupByLevelNode parentOfGroup = (GroupByLevelNode) root.clone();
parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
sourceNodes.forEach(parentOfGroup::addChild);
@@ -532,55 +601,69 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
}
}
});
-
- // Then, we calculate the attributes for GroupByLevelNode in each level
- calculateGroupByLevelNodeAttributes(newRoot, 0);
return newRoot;
}
+ // TODO: (xingtanzjr) consider to implement the descriptor construction in
every class
private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
if (node == null) {
return;
}
node.getChildren().forEach(child ->
calculateGroupByLevelNodeAttributes(child, level + 1));
- if (!(node instanceof GroupByLevelNode)) {
- return;
- }
- GroupByLevelNode handle = (GroupByLevelNode) node;
// Construct all outputColumns from children. Using Set here to avoid
duplication
Set<String> childrenOutputColumns = new HashSet<>();
- handle
- .getChildren()
- .forEach(child ->
childrenOutputColumns.addAll(child.getOutputColumnNames()));
-
- // Check every OutputColumn of GroupByLevelNode and set the Expression of
corresponding
- // AggregationDescriptor
- List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
- for (GroupByLevelDescriptor originalDescriptor :
handle.getGroupByLevelDescriptors()) {
- List<Expression> descriptorExpression = new ArrayList<>();
- for (String childColumn : childrenOutputColumns) {
- // If this condition matched, the childColumn should come from
GroupByLevelNode
- if (isAggColumnMatchExpression(childColumn,
originalDescriptor.getOutputExpression())) {
- descriptorExpression.add(originalDescriptor.getOutputExpression());
- continue;
- }
- for (Expression exp : originalDescriptor.getInputExpressions()) {
- if (isAggColumnMatchExpression(childColumn, exp)) {
- descriptorExpression.add(exp);
+ node.getChildren().forEach(child ->
childrenOutputColumns.addAll(child.getOutputColumnNames()));
+
+ if (node instanceof SlidingWindowAggregationNode) {
+ SlidingWindowAggregationNode handle = (SlidingWindowAggregationNode)
node;
+ List<AggregationDescriptor> descriptorList = new ArrayList<>();
+ for (AggregationDescriptor originalDescriptor :
handle.getAggregationDescriptorList()) {
+ boolean keep = false;
+ for (String childColumn : childrenOutputColumns) {
+ for (Expression exp : originalDescriptor.getInputExpressions()) {
+ if (isAggColumnMatchExpression(childColumn, exp)) {
+ keep = true;
+ }
}
}
+ if (keep) {
+ descriptorList.add(originalDescriptor);
+ }
}
- if (descriptorExpression.size() == 0) {
- continue;
- }
- GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
- descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.PARTIAL);
- descriptor.setInputExpressions(descriptorExpression);
+ handle.setAggregationDescriptorList(descriptorList);
+ }
- descriptorList.add(descriptor);
+ if (node instanceof GroupByLevelNode) {
+ GroupByLevelNode handle = (GroupByLevelNode) node;
+ // Check every OutputColumn of GroupByLevelNode and set the Expression
of corresponding
+ // AggregationDescriptor
+ List<GroupByLevelDescriptor> descriptorList = new ArrayList<>();
+ for (GroupByLevelDescriptor originalDescriptor :
handle.getGroupByLevelDescriptors()) {
+ List<Expression> descriptorExpression = new ArrayList<>();
+ for (String childColumn : childrenOutputColumns) {
+ // If this condition matched, the childColumn should come from
GroupByLevelNode
+ if (isAggColumnMatchExpression(childColumn,
originalDescriptor.getOutputExpression())) {
+ descriptorExpression.add(originalDescriptor.getOutputExpression());
+ continue;
+ }
+ for (Expression exp : originalDescriptor.getInputExpressions()) {
+ if (isAggColumnMatchExpression(childColumn, exp)) {
+ descriptorExpression.add(exp);
+ }
+ }
+ }
+ if (descriptorExpression.size() == 0) {
+ continue;
+ }
+ GroupByLevelDescriptor descriptor = originalDescriptor.deepClone();
+ descriptor.setStep(level == 0 ? AggregationStep.FINAL :
AggregationStep.PARTIAL);
+ descriptor.setInputExpressions(descriptorExpression);
+
+ descriptorList.add(descriptor);
+ }
+ handle.setGroupByLevelDescriptors(descriptorList);
}
- handle.setGroupByLevelDescriptors(descriptorList);
}
// TODO: (xingtanzjr) need to confirm the logic when processing UDF
@@ -592,26 +675,27 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
}
private List<SeriesAggregationSourceNode> splitAggregationSourceByPartition(
- MultiChildNode root, DistributionPlanContext context) {
+ PlanNode root, DistributionPlanContext context) {
+ // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree
+ List<SeriesAggregationSourceNode> rawSources =
findAggregationSourceNode(root);
// Step 1: split SeriesAggregationSourceNode according to data partition
List<SeriesAggregationSourceNode> sources = new ArrayList<>();
Map<PartialPath, Integer> regionCountPerSeries = new HashMap<>();
- for (PlanNode child : root.getChildren()) {
- SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) child;
+ for (SeriesAggregationSourceNode child : rawSources) {
List<TRegionReplicaSet> dataDistribution =
- analysis.getPartitionInfo(handle.getPartitionPath(),
handle.getPartitionTimeFilter());
+ analysis.getPartitionInfo(child.getPartitionPath(),
child.getPartitionTimeFilter());
for (TRegionReplicaSet dataRegion : dataDistribution) {
- SeriesAggregationSourceNode split = (SeriesAggregationSourceNode)
handle.clone();
+ SeriesAggregationSourceNode split = (SeriesAggregationSourceNode)
child.clone();
split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
split.setRegionReplicaSet(dataRegion);
// Let each split reference different object of
AggregationDescriptorList
split.setAggregationDescriptorList(
- handle.getAggregationDescriptorList().stream()
+ child.getAggregationDescriptorList().stream()
.map(AggregationDescriptor::deepClone)
.collect(Collectors.toList()));
sources.add(split);
}
- regionCountPerSeries.put(handle.getPartitionPath(),
dataDistribution.size());
+ regionCountPerSeries.put(child.getPartitionPath(),
dataDistribution.size());
}
// Step 2: change the step for each SeriesAggregationSourceNode according
to its split count
@@ -626,6 +710,18 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
return sources;
}
+ private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode
node) {
+ if (node == null) {
+ return new ArrayList<>();
+ }
+ if (node instanceof SeriesAggregationSourceNode) {
+ return Collections.singletonList((SeriesAggregationSourceNode) node);
+ }
+ List<SeriesAggregationSourceNode> ret = new ArrayList<>();
+ node.getChildren().forEach(child ->
ret.addAll(findAggregationSourceNode(child)));
+ return ret;
+ }
+
public PlanNode visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
index d0df526b93..9362e40227 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SlidingWindowAggregationNode.java
@@ -40,7 +40,7 @@ public class SlidingWindowAggregationNode extends ProcessNode
{
// The list of aggregate functions, each AggregateDescriptor will be output
as one column of
// result TsBlock
- private final List<AggregationDescriptor> aggregationDescriptorList;
+ private List<AggregationDescriptor> aggregationDescriptorList;
// The parameter of `group by time`.
private final GroupByTimeParameter groupByTimeParameter;
@@ -74,6 +74,10 @@ public class SlidingWindowAggregationNode extends
ProcessNode {
return aggregationDescriptorList;
}
+ public void setAggregationDescriptorList(List<AggregationDescriptor>
aggregationDescriptorList) {
+ this.aggregationDescriptorList = aggregationDescriptorList;
+ }
+
public GroupByTimeParameter getGroupByTimeParameter() {
return groupByTimeParameter;
}
@@ -175,4 +179,8 @@ public class SlidingWindowAggregationNode extends
ProcessNode {
public int hashCode() {
return Objects.hash(super.hashCode(), aggregationDescriptorList,
groupByTimeParameter, child);
}
+
+ public String toString() {
+ return String.format("SlidingWindowAggregationNode-%s", getPlanNodeId());
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index cfa79db920..e47068a7ff 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -35,8 +35,10 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
@@ -45,10 +47,12 @@ import
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByLevelDescriptor;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
@@ -57,6 +61,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -101,6 +106,54 @@ public class AggregationDistributionTest {
root.getChildren().forEach(child -> verifyAggregationStep(expected,
child));
}
+ @Test
+ public void testTimeJoinAggregationWithSlidingWindow() throws
IllegalPathException {
+ QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(),
OrderBy.TIMESTAMP_ASC);
+ String d1s1Path = "root.sg.d1.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path,
AggregationType.COUNT));
+
+ String d3s1Path = "root.sg.d333.s1";
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path,
AggregationType.COUNT));
+
+ SlidingWindowAggregationNode slidingWindowAggregationNode =
+ genSlidingWindowAggregationNode(
+ queryId,
+ Arrays.asList(new PartialPath(d1s1Path), new
PartialPath(d3s1Path)),
+ AggregationType.COUNT,
+ AggregationStep.PARTIAL,
+ null);
+
+ slidingWindowAggregationNode.addChild(timeJoinNode);
+
+ Analysis analysis = Util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(
+ analysis, new LogicalQueryPlan(context,
slidingWindowAggregationNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep,
f.getFragment().getRoot()));
+ AggregationNode aggregationNode =
+ (AggregationNode)
+ fragmentInstances
+ .get(0)
+ .getFragment()
+ .getRoot()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0);
+ aggregationNode
+ .getAggregationDescriptorList()
+ .forEach(d -> Assert.assertEquals(AggregationStep.PARTIAL,
d.getStep()));
+ }
+
@Test
public void testTimeJoinAggregationMultiPerRegion() throws
IllegalPathException {
QueryId queryId = new QueryId("test_query_time_join_aggregation");
@@ -234,6 +287,89 @@ public class AggregationDistributionTest {
(GroupByLevelNode)
fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
}
+ @Test
+ public void testGroupByLevelNodeWithSlidingWindow() throws
IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_with_sliding_window");
+ String d3s1Path = "root.sg.d333.s1";
+ String d4s1Path = "root.sg.d4444.s1";
+ String groupedPath = "root.sg.*.s1";
+
+ SlidingWindowAggregationNode slidingWindowAggregationNode =
+ genSlidingWindowAggregationNode(
+ queryId,
+ Arrays.asList(new PartialPath(d3s1Path), new
PartialPath(d4s1Path)),
+ AggregationType.COUNT,
+ AggregationStep.PARTIAL,
+ null);
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(),
OrderBy.TIMESTAMP_ASC);
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d3s1Path,
AggregationType.COUNT));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d4s1Path,
AggregationType.COUNT));
+ slidingWindowAggregationNode.addChild(timeJoinNode);
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Collections.singletonList(slidingWindowAggregationNode),
+ Collections.singletonList(
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d3s1Path)),
+ new TimeSeriesOperand(new PartialPath(d4s1Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPath)))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
+
+ Analysis analysis = Util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context,
groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(2, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d3s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d4s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep,
f.getFragment().getRoot()));
+
+ Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+ expectedDescriptorValue.put(groupedPath, Arrays.asList(groupedPath,
d3s1Path, d4s1Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue,
+ (GroupByLevelNode)
fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+ Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+ expectedDescriptorValue2.put(groupedPath, Arrays.asList(d3s1Path,
d4s1Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue2,
+ (GroupByLevelNode)
fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+
+ verifySlidingWindowDescriptor(
+ Arrays.asList(d3s1Path, d4s1Path),
+ (SlidingWindowAggregationNode)
+ fragmentInstances
+ .get(0)
+ .getFragment()
+ .getRoot()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0));
+ verifySlidingWindowDescriptor(
+ Arrays.asList(d3s1Path, d4s1Path),
+ (SlidingWindowAggregationNode)
+ fragmentInstances
+ .get(1)
+ .getFragment()
+ .getRoot()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0));
+ }
+
@Test
public void testGroupByLevelTwoSeries() throws IllegalPathException {
QueryId queryId = new QueryId("test_group_by_level_two_series");
@@ -349,6 +485,118 @@ public class AggregationDistributionTest {
(GroupByLevelNode)
fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
}
+ @Test
+ public void testGroupByLevelWithSliding2Series2Devices3Regions() throws
IllegalPathException {
+ QueryId queryId = new QueryId("test_group_by_level_two_series");
+ String d1s1Path = "root.sg.d1.s1";
+ String d1s2Path = "root.sg.d1.s2";
+ String d2s1Path = "root.sg.d22.s1";
+ String groupedPathS1 = "root.sg.*.s1";
+ String groupedPathS2 = "root.sg.*.s2";
+
+ TimeJoinNode timeJoinNode = new TimeJoinNode(queryId.genPlanNodeId(),
OrderBy.TIMESTAMP_ASC);
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s1Path,
AggregationType.COUNT));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d1s2Path,
AggregationType.COUNT));
+ timeJoinNode.addChild(genAggregationSourceNode(queryId, d2s1Path,
AggregationType.COUNT));
+
+ SlidingWindowAggregationNode slidingWindowAggregationNode =
+ genSlidingWindowAggregationNode(
+ queryId,
+ Arrays.asList(
+ new PartialPath(d1s1Path), new PartialPath(d1s2Path), new
PartialPath(d2s1Path)),
+ AggregationType.COUNT,
+ AggregationStep.PARTIAL,
+ null);
+ slidingWindowAggregationNode.addChild(timeJoinNode);
+
+ GroupByLevelNode groupByLevelNode =
+ new GroupByLevelNode(
+ new PlanNodeId("TestGroupByLevelNode"),
+ Collections.singletonList(slidingWindowAggregationNode),
+ Arrays.asList(
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Arrays.asList(
+ new TimeSeriesOperand(new PartialPath(d1s1Path)),
+ new TimeSeriesOperand(new PartialPath(d2s1Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPathS1))),
+ new GroupByLevelDescriptor(
+ AggregationType.COUNT,
+ AggregationStep.FINAL,
+ Collections.singletonList(new TimeSeriesOperand(new
PartialPath(d1s2Path))),
+ new TimeSeriesOperand(new PartialPath(groupedPathS2)))),
+ null,
+ OrderBy.TIMESTAMP_ASC);
+ Analysis analysis = Util.constructAnalysis();
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new
TEndPoint());
+ DistributionPlanner planner =
+ new DistributionPlanner(analysis, new LogicalQueryPlan(context,
groupByLevelNode));
+ DistributedQueryPlan plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ Map<String, AggregationStep> expectedStep = new HashMap<>();
+ expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+ expectedStep.put(d1s2Path, AggregationStep.PARTIAL);
+ expectedStep.put(d2s1Path, AggregationStep.PARTIAL);
+ List<FragmentInstance> fragmentInstances = plan.getInstances();
+ fragmentInstances.forEach(f -> verifyAggregationStep(expectedStep,
f.getFragment().getRoot()));
+
+ Map<String, List<String>> expectedDescriptorValue = new HashMap<>();
+ expectedDescriptorValue.put(groupedPathS1, Arrays.asList(groupedPathS1,
d1s1Path));
+ expectedDescriptorValue.put(groupedPathS2, Arrays.asList(groupedPathS2,
d1s2Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue,
+ (GroupByLevelNode)
fragmentInstances.get(0).getFragment().getRoot().getChildren().get(0));
+
+ Map<String, List<String>> expectedDescriptorValue2 = new HashMap<>();
+ expectedDescriptorValue2.put(groupedPathS1,
Collections.singletonList(d2s1Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue2,
+ (GroupByLevelNode)
fragmentInstances.get(1).getFragment().getRoot().getChildren().get(0));
+
+ Map<String, List<String>> expectedDescriptorValue3 = new HashMap<>();
+ expectedDescriptorValue3.put(groupedPathS1,
Collections.singletonList(d1s1Path));
+ expectedDescriptorValue3.put(groupedPathS2,
Collections.singletonList(d1s2Path));
+ verifyGroupByLevelDescriptor(
+ expectedDescriptorValue3,
+ (GroupByLevelNode)
fragmentInstances.get(2).getFragment().getRoot().getChildren().get(0));
+
+ verifySlidingWindowDescriptor(
+ Arrays.asList(d1s1Path, d1s2Path),
+ (SlidingWindowAggregationNode)
+ fragmentInstances
+ .get(0)
+ .getFragment()
+ .getRoot()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0));
+ verifySlidingWindowDescriptor(
+ Collections.singletonList(d2s1Path),
+ (SlidingWindowAggregationNode)
+ fragmentInstances
+ .get(1)
+ .getFragment()
+ .getRoot()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0));
+ verifySlidingWindowDescriptor(
+ Arrays.asList(d1s1Path, d1s2Path),
+ (SlidingWindowAggregationNode)
+ fragmentInstances
+ .get(2)
+ .getFragment()
+ .getRoot()
+ .getChildren()
+ .get(0)
+ .getChildren()
+ .get(0));
+ }
+
@Test
public void testAggregation1Series1Region() throws IllegalPathException {
QueryId queryId = new QueryId("test_aggregation_1_series_1_region");
@@ -378,6 +626,35 @@ public class AggregationDistributionTest {
}
}
+ private void verifySlidingWindowDescriptor(
+ List<String> expected, SlidingWindowAggregationNode node) {
+ List<AggregationDescriptor> descriptorList =
node.getAggregationDescriptorList();
+ assertEquals(expected.size(), descriptorList.size());
+ Map<String, Integer> verification = new HashMap<>();
+ descriptorList.forEach(
+ d ->
verification.put(d.getInputExpressions().get(0).getExpressionString(), 1));
+ assertEquals(expected.size(), verification.size());
+ expected.forEach(v -> assertEquals(1, (int) verification.get(v)));
+ }
+
+ private SlidingWindowAggregationNode genSlidingWindowAggregationNode(
+ QueryId queryId,
+ List<PartialPath> paths,
+ AggregationType type,
+ AggregationStep step,
+ GroupByTimeParameter groupByTimeParameter) {
+ return new SlidingWindowAggregationNode(
+ queryId.genPlanNodeId(),
+ paths.stream()
+ .map(
+ path ->
+ new AggregationDescriptor(
+ type, step, Collections.singletonList(new
TimeSeriesOperand(path))))
+ .collect(Collectors.toList()),
+ groupByTimeParameter,
+ OrderBy.TIMESTAMP_ASC);
+ }
+
private SeriesAggregationSourceNode genAggregationSourceNode(
QueryId queryId, String path, AggregationType type) throws
IllegalPathException {
List<AggregationDescriptor> descriptors = new ArrayList<>();