This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4b2941e1db1f0e129f0434282afc1d60d60d79ba Author: Alima777 <[email protected]> AuthorDate: Fri Feb 10 13:54:46 2023 +0800 Fix consumeAll index out of bounds bug --- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 82 ++++++++++++++-------- .../plan/node/process/HorizontallyConcatNode.java | 2 +- .../planner/plan/node/process/TimeJoinNode.java | 2 +- 3 files changed, 55 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 845cb2be29..21156e3f82 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -212,6 +212,7 @@ import org.apache.commons.lang3.Validate; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -2215,42 +2216,48 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // children after pipelining LinkedList<Operator> parentPipelineChildren = new LinkedList<>(); int finalExchangeNum = context.getExchangeSumNum(); - // 1. exclude ExchangeOperator first - Iterator<PlanNode> childrenIterator = node.getChildren().listIterator(); - while (childrenIterator.hasNext()) { - PlanNode childSource = childrenIterator.next(); - if (childSource instanceof ExchangeNode) { - Operator childOperation = childSource.accept(this, context); - finalExchangeNum += 1; - parentPipelineChildren.add(childOperation); - // Remove exchangeNode directly for later use - childrenIterator.remove(); - } - } - List<PlanNode> localChildren = node.getChildren(); if (context.getDegreeOfParallelism() == 1) { // If dop = 1, we don't create extra pipeline - for (PlanNode localChild : localChildren) { + for (PlanNode localChild : node.getChildren()) { Operator childOperation = localChild.accept(this, context); parentPipelineChildren.add(childOperation); } } else { + // Keep it since we may change the structure of origin children nodes + LinkedList<PlanNode> afterwardsNodes = new LinkedList<>(); + // 1. exclude ExchangeOperator first + Iterator<PlanNode> childrenIterator = node.getChildren().listIterator(); + while (childrenIterator.hasNext()) { + PlanNode childSource = childrenIterator.next(); + if (childSource instanceof ExchangeNode) { + Operator childOperation = childSource.accept(this, context); + finalExchangeNum += 1; + parentPipelineChildren.add(childOperation); + afterwardsNodes.add(childSource); + // Remove exchangeNode directly for later use + childrenIterator.remove(); + } + } + // 2. divide every childNumInEachPipeline localChildren to different pipeline - int childNumInEachPipeline = - Math.max(1, localChildren.size() / context.getDegreeOfParallelism()); + List<PlanNode> localChildren = node.getChildren(); + int[] childNumInEachPipeline = + getChildNumInEachPipeline(localChildren.size(), context.getDegreeOfParallelism()); // If dop > size(children) + 1, we can allocate extra dop to child node // Extra dop = dop - size(children), since dop = 1 means serial but not 0 - int maxDop = Math.min(context.getDegreeOfParallelism(), localChildren.size() + 1); + int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildren.size()); int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildren.size()); - - for (int i = 0; i < maxDop && i < localChildren.size(); i++) { + int startIndex, endIndex = 0; + for (int i = 0; i < childGroupNum; i++) { + startIndex = endIndex; + endIndex += childNumInEachPipeline[i]; // Only if dop >= size(children) + 1, split all children to new pipeline - // Otherwise, the first group but not last will belong to the parent pipeline since the - // children number of last group is greaterEqual than the first group + // Otherwise, the first group will belong to the parent pipeline if (i == 0 && context.getDegreeOfParallelism() < localChildren.size() + 1) { - for (int j = 0; j < childNumInEachPipeline; j++) { + for (int j = startIndex; j < endIndex; j++) { Operator childOperation = localChildren.get(j).accept(this, context); parentPipelineChildren.addFirst(childOperation); + afterwardsNodes.addFirst(localChildren.get(j)); } continue; } @@ -2259,18 +2266,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // Create partial parent operator for children PlanNode partialParentNode = null; Operator partialParentOperator = null; - if (childNumInEachPipeline == 1) { + + if (endIndex - startIndex == 1) { partialParentNode = localChildren.get(i); partialParentOperator = localChildren.get(i).accept(this, subContext); } else { // PartialParentNode is equals to parentNode except children - int startIndex = i * childNumInEachPipeline, - endIndex = i < (maxDop - 1) ? (i + 1) * childNumInEachPipeline : localChildren.size(); partialParentNode = node.createSubNode(i, startIndex, endIndex); - for (int j = startIndex; j < endIndex; j++) { - partialParentNode.addChild(localChildren.get(i)); - } - partialParentOperator = partialParentNode.accept(this, context); + partialParentOperator = partialParentNode.accept(this, subContext); } ISinkHandle localSinkHandle = MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline( @@ -2293,14 +2296,35 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP .getTimeSliceAllocator() .recordExecutionWeight(sourceOperator.getOperatorContext(), 1); parentPipelineChildren.add(sourceOperator); + afterwardsNodes.add(partialParentNode); context.addExchangeOperator(sourceOperator); finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1; } + ((MultiChildProcessNode) node).setChildren(afterwardsNodes); } context.setExchangeSumNum(finalExchangeNum); return parentPipelineChildren; } + /** + * Now, we allocate children to each pipeline as average as possible. For example, 5 children with + * 3 dop, the children group will be [1, 2, 2]. After we can estimate the workload of each + * operator, maybe we can allocate based on workload rather than child number. + */ + private int[] getChildNumInEachPipeline(int childrenSize, int dop) { + int[] childNumInEachPipeline = new int[Math.min(childrenSize, dop)]; + if (childrenSize <= dop) { + Arrays.fill(childNumInEachPipeline, 1); + } else { + int avgChildNum = childrenSize / dop; + int splitIndex = childNumInEachPipeline.length - childrenSize % dop; + Arrays.fill(childNumInEachPipeline, 0, splitIndex, avgChildNum); + Arrays.fill( + childNumInEachPipeline, splitIndex, childNumInEachPipeline.length, avgChildNum + 1); + } + return childNumInEachPipeline; + } + private List<Operator> dealWithConsumeChildrenOneByOneNode( PlanNode node, LocalExecutionPlanContext context) { List<Operator> parentPipelineChildren = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java index 28ec4896f8..f6c785f8fc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java @@ -55,7 +55,7 @@ public class HorizontallyConcatNode extends MultiChildProcessNode { public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) { return new HorizontallyConcatNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), - children.subList(startIndex, endIndex)); + new ArrayList<>(children.subList(startIndex, endIndex))); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java index 2e89b841d0..ed1b8e4fb0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java @@ -67,7 +67,7 @@ public class TimeJoinNode extends MultiChildProcessNode { return new TimeJoinNode( new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)), getMergeOrder(), - children.subList(startIndex, endIndex)); + new ArrayList<>(children.subList(startIndex, endIndex))); } @Override
