This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1adc1a9bbc00eaf018e01faacb8f75ae959ba7b5 Author: Alima777 <[email protected]> AuthorDate: Sat Feb 11 19:24:38 2023 +0800 implement createSubNode() for consumeAllNode --- .../plan/planner/distribution/SourceRewriter.java | 16 ++------------ .../db/mpp/plan/planner/plan/node/PlanNode.java | 2 +- .../planner/plan/node/process/AggregationNode.java | 25 ++++++++++++++-------- .../planner/plan/node/process/DeviceMergeNode.java | 7 ++---- .../plan/node/process/GroupByLevelNode.java | 7 ++---- .../planner/plan/node/process/GroupByTagNode.java | 12 +++-------- .../planner/plan/node/process/MergeSortNode.java | 3 +-- 7 files changed, 27 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index bbc973ab5f..ad0fbfcc09 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -1060,7 +1060,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte private Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> splitAggregationSourceByPartition(PlanNode root, DistributionPlanContext context) { // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree - List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root); + List<SeriesAggregationSourceNode> rawSources = AggregationNode.findAggregationSourceNode(root); // Step 1: construct SeriesAggregationSourceNode for each data region of one Path List<SeriesAggregationSourceNode> sources = new ArrayList<>(); @@ -1092,7 +1092,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte boolean[] eachSeriesOneRegion, Map<PartialPath, Integer> regionCountPerSeries) { // Step 0: get all SeriesAggregationSourceNode in PlanNodeTree - List<SeriesAggregationSourceNode> rawSources = findAggregationSourceNode(root); + List<SeriesAggregationSourceNode> rawSources = AggregationNode.findAggregationSourceNode(root); // Step 1: construct SeriesAggregationSourceNode for each data region of one Path for (SeriesAggregationSourceNode child : rawSources) { @@ -1144,18 +1144,6 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return dataDistribution.size(); } - private List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) { - if (node == null) { - return new ArrayList<>(); - } - if (node instanceof SeriesAggregationSourceNode) { - return Collections.singletonList((SeriesAggregationSourceNode) node); - } - List<SeriesAggregationSourceNode> ret = new ArrayList<>(); - node.getChildren().forEach(child -> ret.addAll(findAggregationSourceNode(child))); - return ret; - } - public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) { return node.accept(this, context); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java index ae66ddc843..280ebe48e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java @@ -75,7 +75,7 @@ public abstract class PlanNode implements IConsensusRequest { */ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { throw new UnsupportedOperationException( - String.format("Can't Create subNode for %s", this.getClass().toString())); + String.format("Can't create subNode for %s", this.getClass().toString())); } public PlanNode cloneWithChildren(List<PlanNode> children) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java index 69b8eb02db..479fa24996 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationSourceNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByParameter; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; @@ -36,6 +37,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -167,16 +169,9 @@ public class AggregationNode extends MultiChildProcessNode { } public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new AggregationNode( + return new HorizontallyConcatNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), - children.subList(startIndex, endIndex), - // TODO figure out the relation of aggregationDescriptorList and children node - getAggregationDescriptorList().subList(startIndex, endIndex), - getGroupByTimeParameter(), - getGroupByParameter(), - getGroupByExpression(), - outputEndTime, - getScanOrder()); + new ArrayList<>(children.subList(startIndex, endIndex))); } @Override @@ -194,6 +189,18 @@ public class AggregationNode extends MultiChildProcessNode { return outputColumnNames; } + public static List<SeriesAggregationSourceNode> findAggregationSourceNode(PlanNode node) { + if (node == null) { + return new ArrayList<>(); + } + if (node instanceof SeriesAggregationSourceNode) { + return Collections.singletonList((SeriesAggregationSourceNode) node); + } + List<SeriesAggregationSourceNode> ret = new ArrayList<>(); + node.getChildren().forEach(child -> ret.addAll(findAggregationSourceNode(child))); + return ret; + } + @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitAggregation(this, context); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java index c56d965c49..faa21947d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java @@ -65,11 +65,8 @@ public class DeviceMergeNode extends MultiChildProcessNode { @Override public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new DeviceMergeNode( - new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), - getMergeOrderParameter(), - // TODO figure out the relation of devices and children node - devices.subList(startIndex, endIndex)); + throw new UnsupportedOperationException( + "DeviceMergeNode should have only one local child in single data region."); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java index 8959eae92b..f1833a7cee 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java @@ -95,12 +95,9 @@ public class GroupByLevelNode extends MultiChildProcessNode { @Override public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new GroupByLevelNode( + return new HorizontallyConcatNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), - // TODO figure out the relation of aggregationDescriptorList and children node - this.groupByLevelDescriptors.subList(startIndex, endIndex), - this.groupByTimeParameter, - this.scanOrder); + new ArrayList<>(children.subList(startIndex, endIndex))); } public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java index 485aea5733..d0333e79a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java @@ -28,9 +28,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.commons.lang3.Validate; - import javax.annotation.Nullable; +import org.apache.commons.lang3.Validate; import java.io.DataOutputStream; import java.io.IOException; @@ -101,14 +100,9 @@ public class GroupByTagNode extends MultiChildProcessNode { @Override public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { - return new GroupByTagNode( + return new HorizontallyConcatNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), - this.groupByTimeParameter, - this.scanOrder, - // TODO figure out the relation of aggregationDescriptorList and children node - this.tagKeys, - this.tagValuesToAggregationDescriptors, - this.outputColumnNames); + new ArrayList<>(children.subList(startIndex, endIndex))); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java index 6c69a70722..6fbd9e7ab4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java @@ -59,8 +59,7 @@ public class MergeSortNode extends MultiChildProcessNode { return new MergeSortNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), getMergeOrderParameter(), - // TODO figure out the relation of outputColumns and children node - outputColumns.subList(startIndex, endIndex)); + outputColumns); } @Override
