This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch prepareInput
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ecaa9f1a4c2ecad2f834b2ea08e1c2bbebfe37a2
Author: lancelly <[email protected]>
AuthorDate: Tue Oct 10 16:35:03 2023 +0800

    add time watch on prepareInput of AbstractConsumeAllOperator
---
 .../process/AbstractConsumeAllOperator.java        | 100 ++++++++++++++++-----
 .../operator/process/MergeSortOperator.java        |  56 ++++++------
 .../process/join/RowBasedTimeJoinOperator.java     |  57 ++++++------
 3 files changed, 127 insertions(+), 86 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
index e677ce5eafa..c078bfb1308 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.successfulAsList;
 
@@ -37,11 +38,17 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
   protected final List<Operator> children;
   protected final int inputOperatorsCount;
   /** TsBlock from child operator. Only one cache now. */
-  protected final TsBlock[] inputTsBlocks;
+  protected TsBlock[] inputTsBlocks;
 
   protected final boolean[] canCallNext;
   protected int readyChildIndex;
 
+  /** Index of the child that is currently fetching input */
+  protected int currentChildIndex = 0;
+
+  /** Indicate whether we found an empty child input in one loop */
+  protected boolean hasEmptyChild = false;
+
   protected AbstractConsumeAllOperator(OperatorContext operatorContext, 
List<Operator> children) {
     this.operatorContext = operatorContext;
     this.children = children;
@@ -84,35 +91,51 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
    * @throws Exception errors happened while getting tsblock from children
    */
   protected boolean prepareInput() throws Exception {
-    boolean allReady = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (!isEmpty(i) || children.get(i) == null) {
+
+    // start stopwatch
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+    long start = System.nanoTime();
+
+    while (System.nanoTime() - start < maxRuntime && currentChildIndex < 
inputOperatorsCount) {
+      if (canSkipCurrentChild(currentChildIndex)) {
+        currentChildIndex++;
         continue;
       }
-      if (canCallNext[i] && children.get(i).hasNextWithTimer()) {
-        inputTsBlocks[i] = getNextTsBlock(i);
-        canCallNext[i] = false;
-        // child operator has next but return an empty TsBlock which means 
that it may not
-        // finish calculation in given time slice.
-        // In such case, TimeJoinOperator can't go on calculating, so we just 
return null.
-        // We can also use the while loop here to continuously call the 
hasNext() and next()
-        // methods of the child operator until its hasNext() returns false or 
the next() gets
-        // the data that is not empty, but this will cause the execution time 
of the while loop
-        // to be uncontrollable and may exceed all allocated time slice
-        if (isEmpty(i)) {
-          allReady = false;
+      if (canCallNext[currentChildIndex]) {
+        if (children.get(currentChildIndex).hasNextWithTimer()) {
+          inputTsBlocks[currentChildIndex] = getNextTsBlock(currentChildIndex);
+          canCallNext[currentChildIndex] = false;
+          // child operator has next but return an empty TsBlock which means 
that it may not
+          // finish calculation in given time slice.
+          // In such case, TimeJoinOperator can't go on calculating, so we 
just return null.
+          // We can also use the while loop here to continuously call the 
hasNext() and next()
+          // methods of the child operator until its hasNext() returns false 
or the next() gets
+          // the data that is not empty, but this will cause the execution 
time of the while loop
+          // to be uncontrollable and may exceed all allocated time slice
+          if (isEmpty(currentChildIndex)) {
+            hasEmptyChild = true;
+          } else {
+            processCurrentInputTsBlock(currentChildIndex);
+          }
+        } else {
+          handleFinishedChild(currentChildIndex);
         }
       } else {
-        allReady = false;
-        if (canCallNext[i]) {
-          // canCallNext[i] == true means children.get(i).hasNext == false
-          // we can close the finished children
-          children.get(i).close();
-          children.set(i, null);
-        }
+        hasEmptyChild = true;
+      }
+      currentChildIndex++;
+    }
+
+    if (currentChildIndex == inputOperatorsCount) {
+      // start a new loop
+      currentChildIndex = 0;
+      if (!hasEmptyChild) {
+        return true;
+      } else {
+        hasEmptyChild = false;
       }
     }
-    return allReady;
+    return false;
   }
 
   /** If the tsBlock is null or has no more data in the tsBlock, return true; 
else return false. */
@@ -120,6 +143,32 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
     return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
   }
 
+  // region helper function used in prepareInput, the subclass can have its 
own implementation
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
+   */
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return !isEmpty(currentChildIndex) || children.get(currentChildIndex) == 
null;
+  }
+
+  /** @param currentInputIndex index of the input TsBlock */
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    // do nothing here, the subclass have its own implementation
+  }
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
+  // endregion
+
   @Override
   public void close() throws Exception {
     for (Operator child : children) {
@@ -127,6 +176,9 @@ public abstract class AbstractConsumeAllOperator extends 
AbstractOperator
         child.close();
       }
     }
+    children.clear();
+    // friendly for gc
+    inputTsBlocks = null;
   }
 
   protected TsBlock getNextTsBlock(int childIndex) throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
index e0644204893..5c5d8373b61 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java
@@ -216,40 +216,36 @@ public class MergeSortOperator extends 
AbstractConsumeAllOperator {
     return currentRetainedSize - minChildReturnSize;
   }
 
+  // region helper function used in prepareInput
+
   /**
-   * Try to cache one result of each child.
-   *
-   * @return true if results of all children are ready or have no more 
TsBlocks. Return false if
-   *     some children is blocked or return null.
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
    */
   @Override
-  protected boolean prepareInput() throws Exception {
-    boolean allReady = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (needCallNext(i)) {
-        continue;
-      }
-      if (canCallNext[i]) {
-        if (children.get(i).hasNextWithTimer()) {
-          inputTsBlocks[i] = getNextTsBlock(i);
-          canCallNext[i] = false;
-          if (isEmpty(i)) {
-            allReady = false;
-          } else {
-            mergeSortHeap.push(new MergeSortKey(inputTsBlocks[i], 0, i));
-          }
-        } else {
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-        }
-      } else {
-        allReady = false;
-      }
-    }
-    return allReady;
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return noMoreTsBlocks[currentChildIndex]
+        || !isEmpty(currentChildIndex)
+        || children.get(currentChildIndex) == null;
   }
 
-  private boolean needCallNext(int i) {
-    return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+  /** @param currentInputIndex index of the input TsBlock */
+  @Override
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0, 
currentInputIndex));
   }
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
+  // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
index 0f4de48a92a..65de39f5717 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java
@@ -276,46 +276,39 @@ public class RowBasedTimeJoinOperator extends 
AbstractConsumeAllOperator {
     timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index]));
   }
 
+  // region helper function used in prepareInput
+
   /**
-   * Try to cache one result of each child.
-   *
-   * @return true if results of all children are ready or have no more 
TsBlocks. Return false if
-   *     some children is blocked or return null.
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
    */
   @Override
-  protected boolean prepareInput() throws Exception {
-    boolean allReady = true;
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      if (needCallNext(i)) {
-        continue;
-      }
-      if (canCallNext[i]) {
-        if (children.get(i).hasNextWithTimer()) {
-          inputTsBlocks[i] = getNextTsBlock(i);
-          canCallNext[i] = false;
-          if (isEmpty(i)) {
-            allReady = false;
-          } else {
-            updateTimeSelector(i);
-          }
-        } else {
-          noMoreTsBlocks[i] = true;
-          inputTsBlocks[i] = null;
-          children.get(i).close();
-          children.set(i, null);
-        }
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return noMoreTsBlocks[currentChildIndex]
+        || !isEmpty(currentChildIndex)
+        || children.get(currentChildIndex) == null;
+  }
 
-      } else {
-        allReady = false;
-      }
-    }
-    return allReady;
+  /** @param currentInputIndex index of the input TsBlock */
+  @Override
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    updateTimeSelector(currentInputIndex);
   }
 
-  private boolean needCallNext(int i) {
-    return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null;
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
   }
 
+  // endregion
+
   @TestOnly
   public List<Operator> getChildren() {
     return children;

Reply via email to