This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/isBlockedDebug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 34aff4e1969e1c09ae72e879d56c2aaeb7b8d6e1
Author: Minghui Liu <[email protected]>
AuthorDate: Mon Jul 18 12:00:47 2022 +0800

    fix isBlocked() bug in AggregationOperator
---
 .../operator/process/AggregationOperator.java      | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 76b81c2873..8cf4aef8b0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
@@ -75,6 +76,9 @@ public class AggregationOperator implements ProcessOperator {
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
     this.canCallNext = new boolean[inputOperatorsCount];
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      canCallNext[i] = false;
+    }
 
     this.timeRangeIterator =
         initTimeRangeIterator(groupByTimeParameter, ascending, 
outputPartialTimeWindow);
@@ -93,13 +97,19 @@ public class AggregationOperator implements ProcessOperator 
{
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
       ListenableFuture<?> blocked = children.get(i).isBlocked();
-      if (!blocked.isDone()) {
-        return blocked;
+      if (blocked.isDone()) {
+        canCallNext[i] = true;
+      } else {
+        if (isEmpty(i)) {
+          listenableFutures.add(blocked);
+          canCallNext[i] = true;
+        }
       }
     }
-    return NOT_BLOCKED;
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
   }
 
   @Override
@@ -115,9 +125,6 @@ public class AggregationOperator implements ProcessOperator 
{
 
     // reset operator state
     resultTsBlockBuilder.reset();
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      canCallNext[i] = true;
-    }
 
     while (System.nanoTime() - start < maxRuntime
         && (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
@@ -199,4 +206,8 @@ public class AggregationOperator implements ProcessOperator 
{
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, 
timeRangeIterator);
   }
+
+  private boolean isEmpty(int index) {
+    return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
+  }
 }

Reply via email to