This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch DONTBlock in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3dae858983da70286c562ca1bc6ba967740ed4ed Author: Alima777 <[email protected]> AuthorDate: Fri Feb 17 13:13:53 2023 +0800 implement AggregationOperator --- .../operator/process/AggregationOperator.java | 35 ++++++++++++---------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java index c320839813..bee71bbe77 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java @@ -120,19 +120,21 @@ public class AggregationOperator implements ProcessOperator { @Override public ListenableFuture<?> isBlocked() { + boolean isBlocked = false; List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); for (int i = 0; i < inputOperatorsCount; i++) { + if (!isEmpty(i)) { + continue; + } ListenableFuture<?> blocked = children.get(i).isBlocked(); if (blocked.isDone()) { + isBlocked = true; canCallNext[i] = true; - } else { - if (isEmpty(i)) { - listenableFutures.add(blocked); - canCallNext[i] = true; - } + } else if (!blocked.isDone()) { + listenableFutures.add(blocked); } } - return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures); + return isBlocked ? NOT_BLOCKED : successfulAsList(listenableFutures); } @Override @@ -190,21 +192,22 @@ public class AggregationOperator implements ProcessOperator { } private boolean prepareInput() { + boolean allReady = true; for (int i = 0; i < inputOperatorsCount; i++) { - if (inputTsBlocks[i] != null) { + if (!isEmpty(i)) { continue; } - if (!canCallNext[i]) { - return false; - } - - inputTsBlocks[i] = children.get(i).nextWithTimer(); - canCallNext[i] = false; - if (inputTsBlocks[i] == null) { - return false; + if (canCallNext[i]) { + inputTsBlocks[i] = children.get(i).nextWithTimer(); + canCallNext[i] = false; + if (inputTsBlocks[i] == null) { + allReady = false; + } + } else { + allReady = false; } } - return true; + return allReady; } private void calculateNextAggregationResult() {
