This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch identitySink in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a6e9e95ba9f5a26e70fc8cc5660cc03288d6af68 Author: Alima777 <[email protected]> AuthorDate: Thu Mar 2 11:39:08 2023 +0800 IOTDB-5610 Don't pipeline cosumeAllNode and consumeOneByOneNode with only one child --- .../org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index efbb423fbf..5a3c88a318 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -2346,7 +2346,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // children after pipelining List<Operator> parentPipelineChildren = new ArrayList<>(); int finalExchangeNum = context.getExchangeSumNum(); - if (context.getDegreeOfParallelism() == 1) { + if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 1) { // If dop = 1, we don't create extra pipeline for (PlanNode localChild : node.getChildren()) { Operator childOperation = localChild.accept(this, context); @@ -2411,6 +2411,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP // Otherwise, the first group will belong to the parent pipeline if (i == 0) { for (int j = startIndex; j < endIndex; j++) { + context.setDegreeOfParallelism(1); Operator childOperation = node.getChildren().get(j).accept(this, context); parentPipelineChildren.add(childOperation); afterwardsNodes.add(node.getChildren().get(j)); @@ -2515,7 +2516,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP int finalExchangeNum = context.getExchangeSumNum(); // 1. divide every child to pipeline using the max dop - if (context.getDegreeOfParallelism() == 1) { + if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 1) { // If dop = 1, we don't create extra pipeline for (PlanNode childSource : node.getChildren()) { Operator childOperation = childSource.accept(this, context);
