This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch DONTBlock
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3dae858983da70286c562ca1bc6ba967740ed4ed
Author: Alima777 <[email protected]>
AuthorDate: Fri Feb 17 13:13:53 2023 +0800

    implement AggregationOperator
---
 .../operator/process/AggregationOperator.java      | 35 ++++++++++++----------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index c320839813..bee71bbe77 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -120,19 +120,21 @@ public class AggregationOperator implements 
ProcessOperator {
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    boolean isBlocked = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isEmpty(i)) {
+        continue;
+      }
       ListenableFuture<?> blocked = children.get(i).isBlocked();
       if (blocked.isDone()) {
+        isBlocked = true;
         canCallNext[i] = true;
-      } else {
-        if (isEmpty(i)) {
-          listenableFutures.add(blocked);
-          canCallNext[i] = true;
-        }
+      } else if (!blocked.isDone()) {
+        listenableFutures.add(blocked);
       }
     }
-    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
+    return isBlocked ? NOT_BLOCKED : successfulAsList(listenableFutures);
   }
 
   @Override
@@ -190,21 +192,22 @@ public class AggregationOperator implements 
ProcessOperator {
   }
 
   private boolean prepareInput() {
+    boolean allReady = true;
     for (int i = 0; i < inputOperatorsCount; i++) {
-      if (inputTsBlocks[i] != null) {
+      if (!isEmpty(i)) {
         continue;
       }
-      if (!canCallNext[i]) {
-        return false;
-      }
-
-      inputTsBlocks[i] = children.get(i).nextWithTimer();
-      canCallNext[i] = false;
-      if (inputTsBlocks[i] == null) {
-        return false;
+      if (canCallNext[i]) {
+        inputTsBlocks[i] = children.get(i).nextWithTimer();
+        canCallNext[i] = false;
+        if (inputTsBlocks[i] == null) {
+          allReady = false;
+        }
+      } else {
+        allReady = false;
       }
     }
-    return true;
+    return allReady;
   }
 
   private void calculateNextAggregationResult() {

Reply via email to