This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch advancePipeline in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7f9f6e056881833dba743d68ffa266862391ba79 Author: Alima777 <[email protected]> AuthorDate: Wed Feb 15 11:33:37 2023 +0800 change pipeline logic --- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 169 +++++++++++++-------- .../db/mpp/plan/plan/PipelineBuilderTest.java | 9 +- 2 files changed, 109 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 5d99a29ad1..f24dd4103b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2243,77 +2243,86 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // Keep it since we may change the structure of origin children nodes List<PlanNode> afterwardsNodes = new ArrayList<>(); // 1. Calculate localChildren size - int localChildrenSize = 0; - for (PlanNode child : node.getChildren()) { - if (!(child instanceof ExchangeNode)) { + int localChildrenSize = 0, firstChildIndex = -1; + for (int i = 0; i < node.getChildren().size(); i++) { + if (!(node.getChildren().get(i) instanceof ExchangeNode)) { localChildrenSize++; + firstChildIndex = firstChildIndex == -1 ? i : firstChildIndex; + // deal with exchangeNode at head + } else if (firstChildIndex == -1) { + Operator childOperation = node.getChildren().get(i).accept(this, context); + finalExchangeNum += 1; + parentPipelineChildren.add(childOperation); + afterwardsNodes.add(node.getChildren().get(i)); } } - // 2. divide every childNumInEachPipeline localChildren to different pipeline - int[] childNumInEachPipeline = - getChildNumInEachPipeline( - node.getChildren(), localChildrenSize, context.getDegreeOfParallelism()); - // If dop > size(children) + 1, we can allocate extra dop to child node - // Extra dop = dop - size(children), since dop = 1 means serial but not 0 - int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize); + if (firstChildIndex == -1) { + context.setExchangeSumNum(finalExchangeNum); + return parentPipelineChildren; + } + // If dop > localChildrenSize + 1, we can allocate extra dop to child node + // Extra dop = dop - localChildrenSize, since dop = 1 means serial but not 0 int dopForChild = Math.max(1, context.getDegreeOfParallelism() - localChildrenSize); - int startIndex, endIndex = 0; - for (int i = 0; i < childGroupNum; i++) { - startIndex = endIndex; - endIndex += childNumInEachPipeline[i]; - // Only if dop >= size(children) + 1, split all children to new pipeline - // Otherwise, the first group will belong to the parent pipeline - if (i == 0 && context.getDegreeOfParallelism() < localChildrenSize + 1) { - for (int j = startIndex; j < endIndex; j++) { - Operator childOperation = node.getChildren().get(j).accept(this, context); + // If dop > localChildrenSize, we create one new pipeline for each child + if (context.getDegreeOfParallelism() > localChildrenSize) { + for (int i = firstChildIndex; i < node.getChildren().size(); i++) { + PlanNode childNode = node.getChildren().get(i); + if (childNode instanceof ExchangeNode) { + Operator childOperation = childNode.accept(this, context); + finalExchangeNum += 1; parentPipelineChildren.add(childOperation); - afterwardsNodes.add(node.getChildren().get(j)); + } else { + LocalExecutionPlanContext subContext = context.createSubContext(); + subContext.setDegreeOfParallelism(dopForChild); + + int originPipeNum = context.getPipelineNumber(); + Operator sourceOperator = createNewPipelineForChildNode(context, subContext, childNode); + parentPipelineChildren.add(sourceOperator); + dopForChild = + Math.max(1, dopForChild - (subContext.getPipelineNumber() - 1 - originPipeNum)); + finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1; } - continue; } - LocalExecutionPlanContext subContext = context.createSubContext(); - subContext.setDegreeOfParallelism(dopForChild); - // Create partial parent operator for children - PlanNode partialParentNode = null; - Operator partialParentOperator = null; - - int originPipeNum = context.getPipelineNumber(); - if (endIndex - startIndex == 1) { - partialParentNode = node.getChildren().get(i); - partialParentOperator = node.getChildren().get(i).accept(this, subContext); - } else { - // PartialParentNode is equals to parentNode except children - partialParentNode = node.createSubNode(i, startIndex, endIndex); - partialParentOperator = partialParentNode.accept(this, subContext); + } else { + // If dop <= localChildrenSize, we have to divide every childNumInEachPipeline localChildren + // to different pipeline + int[] childNumInEachPipeline = + getChildNumInEachPipeline( + node.getChildren(), localChildrenSize, context.getDegreeOfParallelism()); + int childGroupNum = Math.min(context.getDegreeOfParallelism(), localChildrenSize); + int startIndex, endIndex = firstChildIndex; + for (int i = 0; i < childGroupNum; i++) { + startIndex = endIndex; + endIndex += childNumInEachPipeline[i]; + // Only if dop >= size(children) + 1, split all children to new pipeline + // Otherwise, the first group will belong to the parent pipeline + if (i == 0) { + for (int j = startIndex; j < endIndex; j++) { + Operator childOperation = node.getChildren().get(j).accept(this, context); + parentPipelineChildren.add(childOperation); + afterwardsNodes.add(node.getChildren().get(j)); + } + continue; + } + LocalExecutionPlanContext subContext = context.createSubContext(); + subContext.setDegreeOfParallelism(1); + // Create partial parent operator for children + PlanNode partialParentNode = null; + if (endIndex - startIndex == 1) { + partialParentNode = node.getChildren().get(i); + } else { + // PartialParentNode is equals to parentNode except children + partialParentNode = node.createSubNode(i, startIndex, endIndex); + } + + Operator sourceOperator = + createNewPipelineForChildNode(context, subContext, partialParentNode); + parentPipelineChildren.add(sourceOperator); + afterwardsNodes.add(partialParentNode); + finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1; } - // update dop for child - dopForChild = Math.max(1, dopForChild - (subContext.getPipelineNumber() - originPipeNum)); - ISinkHandle localSinkHandle = - MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline( - // Attention, there is no parent node, use first child node instead - subContext.getDriverContext(), node.getChildren().get(i).getPlanNodeId().getId()); - subContext.setSinkHandle(localSinkHandle); - subContext.addPipelineDriverFactory(partialParentOperator, subContext.getDriverContext()); - - ExchangeOperator sourceOperator = - new ExchangeOperator( - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()), - MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline( - ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(), - context.getDriverContext()), - partialParentNode.getPlanNodeId()); - context - .getTimeSliceAllocator() - .recordExecutionWeight(sourceOperator.getOperatorContext(), 1); - parentPipelineChildren.add(sourceOperator); - afterwardsNodes.add(partialParentNode); - context.addExchangeOperator(sourceOperator); - finalExchangeNum += subContext.getExchangeSumNum() - context.getExchangeSumNum() + 1; + ((MultiChildProcessNode) node).setChildren(afterwardsNodes); } - ((MultiChildProcessNode) node).setChildren(afterwardsNodes); } context.setExchangeSumNum(finalExchangeNum); return parentPipelineChildren; @@ -2332,9 +2341,13 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP int[] childNumInEachPipeline = new int[maxPipelineNum]; int avgChildNum = Math.max(1, localChildrenSize / dop); // allocate remaining child to group from splitIndex - int splitIndex = - localChildrenSize <= dop ? maxPipelineNum : maxPipelineNum - localChildrenSize % dop; - int pipelineIndex = 0, childIndex = 0; + int splitIndex = maxPipelineNum - localChildrenSize % dop; + int childIndex = 0; + // Skip ExchangeNode at head + while (childIndex < allChildren.size() && allChildren.get(childIndex) instanceof ExchangeNode) { + childIndex++; + } + int pipelineIndex = 0; while (pipelineIndex < maxPipelineNum) { int childNum = pipelineIndex < splitIndex ? avgChildNum : avgChildNum + 1; int originChildIndex = childIndex; @@ -2353,6 +2366,32 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return childNumInEachPipeline; } + private Operator createNewPipelineForChildNode( + LocalExecutionPlanContext context, LocalExecutionPlanContext subContext, PlanNode childNode) { + Operator childOperation = childNode.accept(this, subContext); + ISinkHandle localSinkHandle = + MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline( + // Attention, there is no parent node, use first child node instead + subContext.getDriverContext(), childNode.getPlanNodeId().getId()); + subContext.setSinkHandle(localSinkHandle); + subContext.addPipelineDriverFactory(childOperation, subContext.getDriverContext()); + + ExchangeOperator sourceOperator = + new ExchangeOperator( + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), null, ExchangeOperator.class.getSimpleName()), + MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForPipeline( + ((LocalSinkHandle) localSinkHandle).getSharedTsBlockQueue(), + context.getDriverContext()), + childNode.getPlanNodeId()); + + context.getTimeSliceAllocator().recordExecutionWeight(sourceOperator.getOperatorContext(), 1); + context.addExchangeOperator(sourceOperator); + return sourceOperator; + } + public List<Operator> dealWithConsumeChildrenOneByOneNode( PlanNode node, LocalExecutionPlanContext context) { List<Operator> parentPipelineChildren = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java index c0ee46142b..d3db6d7811 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java @@ -679,22 +679,23 @@ public class PipelineBuilderTest { @Test public void testGetChildNumInEachPipeline() { List<PlanNode> allChildren = new ArrayList<>(); - allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null)); allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode1"))); + allChildren.add(new SeriesScanNode(new PlanNodeId("localNode1"), null)); + allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2"))); allChildren.add(new SeriesScanNode(new PlanNodeId("localNode2"), null)); int[] childNumInEachPipeline = - operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 3); + operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 2, 2); assertEquals(2, childNumInEachPipeline.length); assertEquals(2, childNumInEachPipeline[0]); assertEquals(1, childNumInEachPipeline[1]); allChildren.add(new SeriesScanNode(new PlanNodeId("localNode3"), null)); allChildren.add(new SeriesScanNode(new PlanNodeId("localNode4"), null)); - allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode2"))); allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode3"))); - allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null)); allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode4"))); + allChildren.add(new SeriesScanNode(new PlanNodeId("localNode5"), null)); + allChildren.add(new ExchangeNode(new PlanNodeId("remoteNode5"))); childNumInEachPipeline = operatorTreeGenerator.getChildNumInEachPipeline(allChildren, 5, 3); assertEquals(3, childNumInEachPipeline.length); assertEquals(2, childNumInEachPipeline[0]);
