This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch prepareInput in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ecaa9f1a4c2ecad2f834b2ea08e1c2bbebfe37a2 Author: lancelly <[email protected]> AuthorDate: Tue Oct 10 16:35:03 2023 +0800 add time watch on prepareInput of AbstractConsumeAllOperator --- .../process/AbstractConsumeAllOperator.java | 100 ++++++++++++++++----- .../operator/process/MergeSortOperator.java | 56 ++++++------ .../process/join/RowBasedTimeJoinOperator.java | 57 ++++++------ 3 files changed, 127 insertions(+), 86 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java index e677ce5eafa..c078bfb1308 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractConsumeAllOperator.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.successfulAsList; @@ -37,11 +38,17 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator protected final List<Operator> children; protected final int inputOperatorsCount; /** TsBlock from child operator. Only one cache now. */ - protected final TsBlock[] inputTsBlocks; + protected TsBlock[] inputTsBlocks; protected final boolean[] canCallNext; protected int readyChildIndex; + /** Index of the child that is currently fetching input */ + protected int currentChildIndex = 0; + + /** Indicate whether we found an empty child input in one loop */ + protected boolean hasEmptyChild = false; + protected AbstractConsumeAllOperator(OperatorContext operatorContext, List<Operator> children) { this.operatorContext = operatorContext; this.children = children; @@ -84,35 +91,51 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator * @throws Exception errors happened while getting tsblock from children */ protected boolean prepareInput() throws Exception { - boolean allReady = true; - for (int i = 0; i < inputOperatorsCount; i++) { - if (!isEmpty(i) || children.get(i) == null) { + + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + while (System.nanoTime() - start < maxRuntime && currentChildIndex < inputOperatorsCount) { + if (canSkipCurrentChild(currentChildIndex)) { + currentChildIndex++; continue; } - if (canCallNext[i] && children.get(i).hasNextWithTimer()) { - inputTsBlocks[i] = getNextTsBlock(i); - canCallNext[i] = false; - // child operator has next but return an empty TsBlock which means that it may not - // finish calculation in given time slice. - // In such case, TimeJoinOperator can't go on calculating, so we just return null. - // We can also use the while loop here to continuously call the hasNext() and next() - // methods of the child operator until its hasNext() returns false or the next() gets - // the data that is not empty, but this will cause the execution time of the while loop - // to be uncontrollable and may exceed all allocated time slice - if (isEmpty(i)) { - allReady = false; + if (canCallNext[currentChildIndex]) { + if (children.get(currentChildIndex).hasNextWithTimer()) { + inputTsBlocks[currentChildIndex] = getNextTsBlock(currentChildIndex); + canCallNext[currentChildIndex] = false; + // child operator has next but return an empty TsBlock which means that it may not + // finish calculation in given time slice. + // In such case, TimeJoinOperator can't go on calculating, so we just return null. + // We can also use the while loop here to continuously call the hasNext() and next() + // methods of the child operator until its hasNext() returns false or the next() gets + // the data that is not empty, but this will cause the execution time of the while loop + // to be uncontrollable and may exceed all allocated time slice + if (isEmpty(currentChildIndex)) { + hasEmptyChild = true; + } else { + processCurrentInputTsBlock(currentChildIndex); + } + } else { + handleFinishedChild(currentChildIndex); } } else { - allReady = false; - if (canCallNext[i]) { - // canCallNext[i] == true means children.get(i).hasNext == false - // we can close the finished children - children.get(i).close(); - children.set(i, null); - } + hasEmptyChild = true; + } + currentChildIndex++; + } + + if (currentChildIndex == inputOperatorsCount) { + // start a new loop + currentChildIndex = 0; + if (!hasEmptyChild) { + return true; + } else { + hasEmptyChild = false; } } - return allReady; + return false; } /** If the tsBlock is null or has no more data in the tsBlock, return true; else return false. */ @@ -120,6 +143,32 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty(); } + // region helper function used in prepareInput, the subclass can have its own implementation + + /** + * @param currentChildIndex the index of the child + * @return true if we can skip the currentChild in prepareInput + */ + protected boolean canSkipCurrentChild(int currentChildIndex) { + return !isEmpty(currentChildIndex) || children.get(currentChildIndex) == null; + } + + /** @param currentInputIndex index of the input TsBlock */ + protected void processCurrentInputTsBlock(int currentInputIndex) { + // do nothing here, the subclass have its own implementation + } + + /** + * @param currentChildIndex the index of the child + * @throws Exception Potential Exception thrown by Operator.close() + */ + protected void handleFinishedChild(int currentChildIndex) throws Exception { + children.get(currentChildIndex).close(); + children.set(currentChildIndex, null); + } + + // endregion + @Override public void close() throws Exception { for (Operator child : children) { @@ -127,6 +176,9 @@ public abstract class AbstractConsumeAllOperator extends AbstractOperator child.close(); } } + children.clear(); + // friendly for gc + inputTsBlocks = null; } protected TsBlock getNextTsBlock(int childIndex) throws Exception { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java index e0644204893..5c5d8373b61 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MergeSortOperator.java @@ -216,40 +216,36 @@ public class MergeSortOperator extends AbstractConsumeAllOperator { return currentRetainedSize - minChildReturnSize; } + // region helper function used in prepareInput + /** - * Try to cache one result of each child. - * - * @return true if results of all children are ready or have no more TsBlocks. Return false if - * some children is blocked or return null. + * @param currentChildIndex the index of the child + * @return true if we can skip the currentChild in prepareInput */ @Override - protected boolean prepareInput() throws Exception { - boolean allReady = true; - for (int i = 0; i < inputOperatorsCount; i++) { - if (needCallNext(i)) { - continue; - } - if (canCallNext[i]) { - if (children.get(i).hasNextWithTimer()) { - inputTsBlocks[i] = getNextTsBlock(i); - canCallNext[i] = false; - if (isEmpty(i)) { - allReady = false; - } else { - mergeSortHeap.push(new MergeSortKey(inputTsBlocks[i], 0, i)); - } - } else { - noMoreTsBlocks[i] = true; - inputTsBlocks[i] = null; - } - } else { - allReady = false; - } - } - return allReady; + protected boolean canSkipCurrentChild(int currentChildIndex) { + return noMoreTsBlocks[currentChildIndex] + || !isEmpty(currentChildIndex) + || children.get(currentChildIndex) == null; } - private boolean needCallNext(int i) { - return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null; + /** @param currentInputIndex index of the input TsBlock */ + @Override + protected void processCurrentInputTsBlock(int currentInputIndex) { + mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0, currentInputIndex)); } + + /** + * @param currentChildIndex the index of the child + * @throws Exception Potential Exception thrown by Operator.close() + */ + @Override + protected void handleFinishedChild(int currentChildIndex) throws Exception { + noMoreTsBlocks[currentChildIndex] = true; + inputTsBlocks[currentChildIndex] = null; + children.get(currentChildIndex).close(); + children.set(currentChildIndex, null); + } + + // endregion } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java index 0f4de48a92a..65de39f5717 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/RowBasedTimeJoinOperator.java @@ -276,46 +276,39 @@ public class RowBasedTimeJoinOperator extends AbstractConsumeAllOperator { timeSelector.add(inputTsBlocks[index].getTimeByIndex(inputIndex[index])); } + // region helper function used in prepareInput + /** - * Try to cache one result of each child. - * - * @return true if results of all children are ready or have no more TsBlocks. Return false if - * some children is blocked or return null. + * @param currentChildIndex the index of the child + * @return true if we can skip the currentChild in prepareInput */ @Override - protected boolean prepareInput() throws Exception { - boolean allReady = true; - for (int i = 0; i < inputOperatorsCount; i++) { - if (needCallNext(i)) { - continue; - } - if (canCallNext[i]) { - if (children.get(i).hasNextWithTimer()) { - inputTsBlocks[i] = getNextTsBlock(i); - canCallNext[i] = false; - if (isEmpty(i)) { - allReady = false; - } else { - updateTimeSelector(i); - } - } else { - noMoreTsBlocks[i] = true; - inputTsBlocks[i] = null; - children.get(i).close(); - children.set(i, null); - } + protected boolean canSkipCurrentChild(int currentChildIndex) { + return noMoreTsBlocks[currentChildIndex] + || !isEmpty(currentChildIndex) + || children.get(currentChildIndex) == null; + } - } else { - allReady = false; - } - } - return allReady; + /** @param currentInputIndex index of the input TsBlock */ + @Override + protected void processCurrentInputTsBlock(int currentInputIndex) { + updateTimeSelector(currentInputIndex); } - private boolean needCallNext(int i) { - return noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null; + /** + * @param currentChildIndex the index of the child + * @throws Exception Potential Exception thrown by Operator.close() + */ + @Override + protected void handleFinishedChild(int currentChildIndex) throws Exception { + noMoreTsBlocks[currentChildIndex] = true; + inputTsBlocks[currentChildIndex] = null; + children.get(currentChildIndex).close(); + children.set(currentChildIndex, null); } + // endregion + @TestOnly public List<Operator> getChildren() { return children;
