This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch identitySink
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a6e9e95ba9f5a26e70fc8cc5660cc03288d6af68
Author: Alima777 <[email protected]>
AuthorDate: Thu Mar 2 11:39:08 2023 +0800

    IOTDB-5610 Don't pipeline cosumeAllNode and consumeOneByOneNode with only 
one child
---
 .../org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java  | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index efbb423fbf..5a3c88a318 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -2346,7 +2346,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     // children after pipelining
     List<Operator> parentPipelineChildren = new ArrayList<>();
     int finalExchangeNum = context.getExchangeSumNum();
-    if (context.getDegreeOfParallelism() == 1) {
+    if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 
1) {
       // If dop = 1, we don't create extra pipeline
       for (PlanNode localChild : node.getChildren()) {
         Operator childOperation = localChild.accept(this, context);
@@ -2411,6 +2411,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
           // Otherwise, the first group will belong to the parent pipeline
           if (i == 0) {
             for (int j = startIndex; j < endIndex; j++) {
+              context.setDegreeOfParallelism(1);
               Operator childOperation = node.getChildren().get(j).accept(this, 
context);
               parentPipelineChildren.add(childOperation);
               afterwardsNodes.add(node.getChildren().get(j));
@@ -2515,7 +2516,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     int finalExchangeNum = context.getExchangeSumNum();
 
     // 1. divide every child to pipeline using the max dop
-    if (context.getDegreeOfParallelism() == 1) {
+    if (context.getDegreeOfParallelism() == 1 || node.getChildren().size() == 
1) {
       // If dop = 1, we don't create extra pipeline
       for (PlanNode childSource : node.getChildren()) {
         Operator childOperation = childSource.accept(this, context);

Reply via email to