This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8051908a265c74c351d1c6d7b9da6fdc1008248b Author: Alima777 <[email protected]> AuthorDate: Wed Feb 8 11:29:18 2023 +0800 implement consumeAllChildren pipeline divided by dop --- .../plan/planner/LocalExecutionPlanContext.java | 9 +++ .../db/mpp/plan/planner/OperatorTreeGenerator.java | 79 ++++++++++++++++++---- .../db/mpp/plan/planner/plan/node/PlanNode.java | 13 ++++ .../planner/plan/node/process/AggregationNode.java | 16 ++++- .../planner/plan/node/process/DeviceMergeNode.java | 9 +++ .../plan/node/process/GroupByLevelNode.java | 10 +++ .../planner/plan/node/process/GroupByTagNode.java | 15 +++- .../planner/plan/node/process/MergeSortNode.java | 9 +++ .../planner/plan/node/process/TimeJoinNode.java | 8 +++ .../plan/node/process/VerticallyConcatNode.java | 6 ++ 10 files changed, 156 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java index 2d8e6d7051..32988da5ca 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java @@ -59,6 +59,7 @@ public class LocalExecutionPlanContext { private final AtomicInteger nextOperatorId; private final TypeProvider typeProvider; private final Map<String, Set<String>> allSensorsMap; + private int degreeOfParallelism = 4; // this is shared with all subContexts private AtomicInteger nextPipelineId; private List<PipelineDriverFactory> pipelineDriverFactories; @@ -142,6 +143,14 @@ public class LocalExecutionPlanContext { return driverContext; } + public int getDegreeOfParallelism() { + return degreeOfParallelism; + } + + public void setDegreeOfParallelism(int degreeOfParallelism) { + this.degreeOfParallelism = degreeOfParallelism; + } + private int getNextPipelineId() { return nextPipelineId.getAndIncrement(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index de21989478..d397db9980 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -216,7 +216,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -2211,22 +2213,71 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP private List<Operator> dealWithConsumeAllChildrenPipelineBreaker( PlanNode node, LocalExecutionPlanContext context) { // children after pipelining - List<Operator> children = new ArrayList<>(); + LinkedList<Operator> parentPipelineChildren = new LinkedList<>(); int finalExchangeNum = context.getExchangeSumNum(); - for (PlanNode childSource : node.getChildren()) { - // Create pipelines for children - LocalExecutionPlanContext subContext = context.createSubContext(); - Operator childOperation = childSource.accept(this, subContext); - // If the child belongs to another fragment instance, we don't create pipeline for it - if (childOperation instanceof ExchangeOperator) { - children.add(childOperation); + // 1. exclude ExchangeOperator first + Iterator<PlanNode> childrenIterator = node.getChildren().listIterator(); + while (childrenIterator.hasNext()) { + PlanNode childSource = childrenIterator.next(); + if (childSource instanceof ExchangeNode) { + Operator childOperation = childSource.accept(this, context); finalExchangeNum += 1; - } else { + parentPipelineChildren.add(childOperation); + // Remove exchangeNode directly for later use + childrenIterator.remove(); + } + } + List<PlanNode> localChildren = node.getChildren(); + if (context.getDegreeOfParallelism() == 1) { + // If dop = 1, we don't create extra pipeline + for (PlanNode localChild : localChildren) { + Operator childOperation = localChild.accept(this, context); + parentPipelineChildren.add(childOperation); + } + } else { + // 2. divide every childNumInEachPipeline localChildren to different pipeline + int childNumInEachPipeline = + Math.max(1, localChildren.size() / context.getDegreeOfParallelism()); + // If dop > size(children) + 1, we can allocate extra dop to child node + // Extra dop = dop - size(children), since dop = 1 means serial but not 0 + int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1); + int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size()); + + for (int i = 0; i < maxDop; i++) { + // Only if dop >= size(children) + 1, split all children to new pipeline + // Otherwise, the first group but not last will belong to the parent pipeline since the + // children number of last group is greaterEqual than the first group + if (i == 0 && context.getDegreeOfParallelism() < localChildren.size() + 1) { + for (int j = 0; j < childNumInEachPipeline; j++) { + Operator childOperation = localChildren.get(j).accept(this, context); + parentPipelineChildren.addFirst(childOperation); + } + continue; + } + LocalExecutionPlanContext subContext = context.createSubContext(); + subContext.setDegreeOfParallelism(dopForChild); + // Create partial parent operator for children + PlanNode partialParentNode = null; + Operator partialParentOperator = null; + if (childNumInEachPipeline == 1) { + partialParentNode = localChildren.get(i); + partialParentOperator = localChildren.get(i).accept(this, subContext); + } else { + // PartialParentNode is equals to parentNode except children + int startIndex = i * childNumInEachPipeline, + endIndex = i < (maxDop - 1) ? (i + 1) * childNumInEachPipeline : localChildren.size(); + partialParentNode = node.createSubNode(i, startIndex, endIndex); + for (int j = startIndex; j < endIndex; j++) { + partialParentNode.addChild(localChildren.get(i)); + } + partialParentOperator = partialParentNode.accept(this, context); + } ISinkHandle localSinkHandle = MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline( - subContext.getDriverContext(), childSource.getPlanNodeId().getId()); + // Attention, there is no parent node, use first child node instead + subContext.getDriverContext(), localChildren.get(i).getPlanNodeId().getId()); subContext.setSinkHandle(localSinkHandle); - subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext()); + subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext()); ExchangeOperator sourceOperator = new ExchangeOperator( @@ -2237,17 +2288,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline( ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(), context.getDriverContext()), - childSource.getPlanNodeId()); + partialParentNode.getPlanNodeId()); context .getTimeSliceAllocator() .recordExecutionWeight(sourceOperator.getOperatorContext(), 1); - children.add(sourceOperator); + parentPipelineChildren.add(sourceOperator); context.addExchangeOperator(sourceOperator); finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1; } } context.setExchangeSumNum(finalExchangeNum); - return children; + return parentPipelineChildren; } private List<Operator> dealWithConsumeChildrenOneByOneNode( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java index 80ec99d1f3..ae66ddc843 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java @@ -65,6 +65,19 @@ public abstract class PlanNode implements IConsensusRequest { @Override public abstract PlanNode clone(); + /** + * Create sub node which has exactly the same function of origin node, only its children is a part + * of it, which is composed by the [startIndex, endIndex) of origin children list. + * + * @param subNodeId the sub node id + * @param startIndex the start Index of origin children + * @param endIndex the endIndex Index of origin children + */ + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + throw new UnsupportedOperationException( + String.format("Can't Create subNode for %s", this.getClass().toString())); + } + public PlanNode cloneWithChildren(List<PlanNode> children) { if (!(children == null || allowedChildCount() == CHILD_COUNT_NO_LIMIT diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java index 6c0c7703e8..69b8eb02db 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/AggregationNode.java @@ -50,8 +50,7 @@ import java.util.stream.Collectors; public class AggregationNode extends MultiChildProcessNode { // The list of aggregate functions, each AggregateDescriptor will be output as one or two column - // of - // result TsBlock + // of result TsBlock protected List<AggregationDescriptor> aggregationDescriptorList; // The parameter of `group by time`. @@ -167,6 +166,19 @@ public class AggregationNode extends MultiChildProcessNode { getScanOrder()); } + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new AggregationNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + children.subList(startIndex, endIndex), + // TODO figure out the relation of aggregationDescriptorList and children node + getAggregationDescriptorList().subList(startIndex, endIndex), + getGroupByTimeParameter(), + getGroupByParameter(), + getGroupByExpression(), + outputEndTime, + getScanOrder()); + } + @Override public List<String> getOutputColumnNames() { List<String> outputColumnNames = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java index d3ec660e7f..c56d965c49 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/DeviceMergeNode.java @@ -63,6 +63,15 @@ public class DeviceMergeNode extends MultiChildProcessNode { return new DeviceMergeNode(getPlanNodeId(), getMergeOrderParameter(), getDevices()); } + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new DeviceMergeNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + getMergeOrderParameter(), + // TODO figure out the relation of devices and children node + devices.subList(startIndex, endIndex)); + } + @Override public List<String> getOutputColumnNames() { return children.stream() diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java index da7b8dc9c0..8959eae92b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByLevelNode.java @@ -93,6 +93,16 @@ public class GroupByLevelNode extends MultiChildProcessNode { getPlanNodeId(), getGroupByLevelDescriptors(), this.groupByTimeParameter, this.scanOrder); } + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new GroupByLevelNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + // TODO figure out the relation of aggregationDescriptorList and children node + this.groupByLevelDescriptors.subList(startIndex, endIndex), + this.groupByTimeParameter, + this.scanOrder); + } + public List<CrossSeriesAggregationDescriptor> getGroupByLevelDescriptors() { return groupByLevelDescriptors; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java index c2e558bdc7..87580783f2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/GroupByTagNode.java @@ -28,9 +28,8 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.commons.lang3.Validate; - import javax.annotation.Nullable; +import org.apache.commons.lang3.Validate; import java.io.DataOutputStream; import java.io.IOException; @@ -99,6 +98,18 @@ public class GroupByTagNode extends MultiChildProcessNode { this.outputColumnNames); } + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new GroupByTagNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + this.groupByTimeParameter, + this.scanOrder, + // TODO figure out the relation of aggregationDescriptorList and children node + this.tagKeys, + this.tagValuesToAggregationDescriptors, + this.outputColumnNames); + } + @Override public List<String> getOutputColumnNames() { List<String> ret = new ArrayList<>(tagKeys); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java index 564a5dbfa4..6c69a70722 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java @@ -54,6 +54,15 @@ public class MergeSortNode extends MultiChildProcessNode { return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(), outputColumns); } + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new MergeSortNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + getMergeOrderParameter(), + // TODO figure out the relation of outputColumns and children node + outputColumns.subList(startIndex, endIndex)); + } + @Override public List<String> getOutputColumnNames() { return outputColumns; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java index 2598cd4e28..2e89b841d0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java @@ -62,6 +62,14 @@ public class TimeJoinNode extends MultiChildProcessNode { return new TimeJoinNode(getPlanNodeId(), getMergeOrder()); } + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new TimeJoinNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), + getMergeOrder(), + children.subList(startIndex, endIndex)); + } + @Override public List<String> getOutputColumnNames() { return children.stream() diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java index 24760a4228..ab6ef28fda 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/VerticallyConcatNode.java @@ -47,6 +47,12 @@ public class VerticallyConcatNode extends MultiChildProcessNode { return new VerticallyConcatNode(getPlanNodeId()); } + @Override + public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { + return new VerticallyConcatNode( + new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId))); + } + @Override public List<String> getOutputColumnNames() { return children.stream()
