This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch advancePipeline
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4b2941e1db1f0e129f0434282afc1d60d60d79ba
Author: Alima777 <[email protected]>
AuthorDate: Fri Feb 10 13:54:46 2023 +0800

    Fix consumeAll index out of bounds  bug
---
 .../db/mpp/plan/planner/OperatorTreeGenerator.java | 82 ++++++++++++++--------
 .../plan/node/process/HorizontallyConcatNode.java  |  2 +-
 .../planner/plan/node/process/TimeJoinNode.java    |  2 +-
 3 files changed, 55 insertions(+), 31 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 845cb2be29..21156e3f82 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -212,6 +212,7 @@ import org.apache.commons.lang3.Validate;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -2215,42 +2216,48 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
     // children after pipelining
     LinkedList<Operator> parentPipelineChildren = new LinkedList<>();
     int finalExchangeNum = context.getExchangeSumNum();
-    // 1. exclude ExchangeOperator first
-    Iterator<PlanNode> childrenIterator = node.getChildren().listIterator();
-    while (childrenIterator.hasNext()) {
-      PlanNode childSource = childrenIterator.next();
-      if (childSource instanceof ExchangeNode) {
-        Operator childOperation = childSource.accept(this, context);
-        finalExchangeNum += 1;
-        parentPipelineChildren.add(childOperation);
-        // Remove exchangeNode directly for later use
-        childrenIterator.remove();
-      }
-    }
-    List<PlanNode> localChildren = node.getChildren();
     if (context.getDegreeOfParallelism() == 1) {
       // If dop = 1, we don't create extra pipeline
-      for (PlanNode localChild : localChildren) {
+      for (PlanNode localChild : node.getChildren()) {
         Operator childOperation = localChild.accept(this, context);
         parentPipelineChildren.add(childOperation);
       }
     } else {
+      // Keep it since we may change the structure of origin children nodes
+      LinkedList<PlanNode> afterwardsNodes = new LinkedList<>();
+      // 1. exclude ExchangeOperator first
+      Iterator<PlanNode> childrenIterator = node.getChildren().listIterator();
+      while (childrenIterator.hasNext()) {
+        PlanNode childSource = childrenIterator.next();
+        if (childSource instanceof ExchangeNode) {
+          Operator childOperation = childSource.accept(this, context);
+          finalExchangeNum += 1;
+          parentPipelineChildren.add(childOperation);
+          afterwardsNodes.add(childSource);
+          // Remove exchangeNode directly for later use
+          childrenIterator.remove();
+        }
+      }
+
       // 2. divide every childNumInEachPipeline localChildren to different 
pipeline
-      int childNumInEachPipeline =
-          Math.max(1, localChildren.size() / context.getDegreeOfParallelism());
+      List<PlanNode> localChildren = node.getChildren();
+      int[] childNumInEachPipeline =
+          getChildNumInEachPipeline(localChildren.size(), 
context.getDegreeOfParallelism());
       // If dop > size(children) + 1, we can allocate extra dop to child node
       // Extra dop = dop - size(children), since dop = 1 means serial but not 0
-      int maxDop = Math.min(context.getDegreeOfParallelism(), 
localChildren.size() + 1);
+      int childGroupNum = Math.min(context.getDegreeOfParallelism(), 
localChildren.size());
       int dopForChild = Math.max(1, context.getDegreeOfParallelism() - 
localChildren.size());
-
-      for (int i = 0; i < maxDop && i < localChildren.size(); i++) {
+      int startIndex, endIndex = 0;
+      for (int i = 0; i < childGroupNum; i++) {
+        startIndex = endIndex;
+        endIndex += childNumInEachPipeline[i];
         // Only if dop >= size(children) + 1, split all children to new 
pipeline
-        // Otherwise, the first group but not last will belong to the parent 
pipeline since the
-        // children number of last group is greaterEqual than the first group
+        // Otherwise, the first group will belong to the parent pipeline
         if (i == 0 && context.getDegreeOfParallelism() < localChildren.size() 
+ 1) {
-          for (int j = 0; j < childNumInEachPipeline; j++) {
+          for (int j = startIndex; j < endIndex; j++) {
             Operator childOperation = localChildren.get(j).accept(this, 
context);
             parentPipelineChildren.addFirst(childOperation);
+            afterwardsNodes.addFirst(localChildren.get(j));
           }
           continue;
         }
@@ -2259,18 +2266,14 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         // Create partial parent operator for children
         PlanNode partialParentNode = null;
         Operator partialParentOperator = null;
-        if (childNumInEachPipeline == 1) {
+
+        if (endIndex - startIndex == 1) {
           partialParentNode = localChildren.get(i);
           partialParentOperator = localChildren.get(i).accept(this, 
subContext);
         } else {
           // PartialParentNode is equals to parentNode except children
-          int startIndex = i * childNumInEachPipeline,
-              endIndex = i < (maxDop - 1) ? (i + 1) * childNumInEachPipeline : 
localChildren.size();
           partialParentNode = node.createSubNode(i, startIndex, endIndex);
-          for (int j = startIndex; j < endIndex; j++) {
-            partialParentNode.addChild(localChildren.get(i));
-          }
-          partialParentOperator = partialParentNode.accept(this, context);
+          partialParentOperator = partialParentNode.accept(this, subContext);
         }
         ISinkHandle localSinkHandle =
             MPP_DATA_EXCHANGE_MANAGER.createLocalSinkHandleForPipeline(
@@ -2293,14 +2296,35 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             .getTimeSliceAllocator()
             .recordExecutionWeight(sourceOperator.getOperatorContext(), 1);
         parentPipelineChildren.add(sourceOperator);
+        afterwardsNodes.add(partialParentNode);
         context.addExchangeOperator(sourceOperator);
         finalExchangeNum += subContext.getExchangeSumNum() - 
context.getExchangeSumNum() + 1;
       }
+      ((MultiChildProcessNode) node).setChildren(afterwardsNodes);
     }
     context.setExchangeSumNum(finalExchangeNum);
     return parentPipelineChildren;
   }
 
+  /**
+   * Now, we allocate children to each pipeline as average as possible. For 
example, 5 children with
+   * 3 dop, the children group will be [1, 2, 2]. After we can estimate the 
workload of each
+   * operator, maybe we can allocate based on workload rather than child 
number.
+   */
+  private int[] getChildNumInEachPipeline(int childrenSize, int dop) {
+    int[] childNumInEachPipeline = new int[Math.min(childrenSize, dop)];
+    if (childrenSize <= dop) {
+      Arrays.fill(childNumInEachPipeline, 1);
+    } else {
+      int avgChildNum = childrenSize / dop;
+      int splitIndex = childNumInEachPipeline.length - childrenSize % dop;
+      Arrays.fill(childNumInEachPipeline, 0, splitIndex, avgChildNum);
+      Arrays.fill(
+          childNumInEachPipeline, splitIndex, childNumInEachPipeline.length, 
avgChildNum + 1);
+    }
+    return childNumInEachPipeline;
+  }
+
   private List<Operator> dealWithConsumeChildrenOneByOneNode(
       PlanNode node, LocalExecutionPlanContext context) {
     List<Operator> parentPipelineChildren = new ArrayList<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
index 28ec4896f8..f6c785f8fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java
@@ -55,7 +55,7 @@ public class HorizontallyConcatNode extends 
MultiChildProcessNode {
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
     return new HorizontallyConcatNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
-        children.subList(startIndex, endIndex));
+        new ArrayList<>(children.subList(startIndex, endIndex)));
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
index 2e89b841d0..ed1b8e4fb0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TimeJoinNode.java
@@ -67,7 +67,7 @@ public class TimeJoinNode extends MultiChildProcessNode {
     return new TimeJoinNode(
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
         getMergeOrder(),
-        children.subList(startIndex, endIndex));
+        new ArrayList<>(children.subList(startIndex, endIndex)));
   }
 
   @Override

Reply via email to