This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/isBlockedDebug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 34aff4e1969e1c09ae72e879d56c2aaeb7b8d6e1 Author: Minghui Liu <[email protected]> AuthorDate: Mon Jul 18 12:00:47 2022 +0800 fix isBlocked() bug in AggregationOperator --- .../operator/process/AggregationOperator.java | 23 ++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java index 76b81c2873..8cf4aef8b0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import static com.google.common.util.concurrent.Futures.successfulAsList; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; @@ -75,6 +76,9 @@ public class AggregationOperator implements ProcessOperator { this.inputOperatorsCount = children.size(); this.inputTsBlocks = new TsBlock[inputOperatorsCount]; this.canCallNext = new boolean[inputOperatorsCount]; + for (int i = 0; i < inputOperatorsCount; i++) { + canCallNext[i] = false; + } this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow); @@ -93,13 +97,19 @@ public class AggregationOperator implements ProcessOperator { @Override public ListenableFuture<?> isBlocked() { + List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); for (int i = 0; i < inputOperatorsCount; i++) { ListenableFuture<?> blocked = children.get(i).isBlocked(); - if (!blocked.isDone()) { - return blocked; + if (blocked.isDone()) { + canCallNext[i] = true; + } else { + if (isEmpty(i)) { + listenableFutures.add(blocked); + canCallNext[i] = true; + } } } - return NOT_BLOCKED; + return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures); } @Override @@ -115,9 +125,6 @@ public class AggregationOperator implements ProcessOperator { // reset operator state resultTsBlockBuilder.reset(); - for (int i = 0; i < inputOperatorsCount; i++) { - canCallNext[i] = true; - } while (System.nanoTime() - start < maxRuntime && (curTimeRange != null || timeRangeIterator.hasNextTimeRange()) @@ -199,4 +206,8 @@ public class AggregationOperator implements ProcessOperator { curTimeRange = null; appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); } + + private boolean isEmpty(int index) { + return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty(); + } }
