This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e51f5ef6a4 [IOTDB-3859] Fix wrong isBlocked() method implementation in 
AggregationOperator (#6695)
e51f5ef6a4 is described below

commit e51f5ef6a40d6a9fd5ee2bf52cdc56f540e9f7e0
Author: liuminghui233 <[email protected]>
AuthorDate: Wed Jul 20 10:54:40 2022 +0800

    [IOTDB-3859] Fix wrong isBlocked() method implementation in 
AggregationOperator (#6695)
---
 .../operator/process/AggregationOperator.java      | 23 ++++++---
 .../operator/AggregationOperatorTest.java          | 54 ++++++++++++++++++----
 2 files changed, 63 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 76b81c2873..8cf4aef8b0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static com.google.common.util.concurrent.Futures.successfulAsList;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
 import static 
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
 
@@ -75,6 +76,9 @@ public class AggregationOperator implements ProcessOperator {
     this.inputOperatorsCount = children.size();
     this.inputTsBlocks = new TsBlock[inputOperatorsCount];
     this.canCallNext = new boolean[inputOperatorsCount];
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      canCallNext[i] = false;
+    }
 
     this.timeRangeIterator =
         initTimeRangeIterator(groupByTimeParameter, ascending, 
outputPartialTimeWindow);
@@ -93,13 +97,19 @@ public class AggregationOperator implements ProcessOperator 
{
 
   @Override
   public ListenableFuture<?> isBlocked() {
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
     for (int i = 0; i < inputOperatorsCount; i++) {
       ListenableFuture<?> blocked = children.get(i).isBlocked();
-      if (!blocked.isDone()) {
-        return blocked;
+      if (blocked.isDone()) {
+        canCallNext[i] = true;
+      } else {
+        if (isEmpty(i)) {
+          listenableFutures.add(blocked);
+          canCallNext[i] = true;
+        }
       }
     }
-    return NOT_BLOCKED;
+    return listenableFutures.isEmpty() ? NOT_BLOCKED : 
successfulAsList(listenableFutures);
   }
 
   @Override
@@ -115,9 +125,6 @@ public class AggregationOperator implements ProcessOperator 
{
 
     // reset operator state
     resultTsBlockBuilder.reset();
-    for (int i = 0; i < inputOperatorsCount; i++) {
-      canCallNext[i] = true;
-    }
 
     while (System.nanoTime() - start < maxRuntime
         && (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
@@ -199,4 +206,8 @@ public class AggregationOperator implements ProcessOperator 
{
     curTimeRange = null;
     appendAggregationResult(resultTsBlockBuilder, aggregators, 
timeRangeIterator);
   }
+
+  private boolean isEmpty(int index) {
+    return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index a8380c27f1..69b470845b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 import org.junit.After;
 import org.junit.Before;
@@ -55,6 +56,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -90,7 +92,8 @@ public class AggregationOperatorTest {
 
   /** Try to aggregate unary intermediate result of one time series without 
group by interval. */
   @Test
-  public void testAggregateIntermediateResult1() throws IllegalPathException {
+  public void testAggregateIntermediateResult1()
+      throws IllegalPathException, ExecutionException, InterruptedException {
     List<AggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(AggregationType.COUNT);
     aggregationTypes.add(AggregationType.SUM);
@@ -108,8 +111,16 @@ public class AggregationOperatorTest {
     AggregationOperator aggregationOperator =
         initAggregationOperator(aggregationTypes, null, inputLocations);
     int count = 0;
-    while (aggregationOperator.hasNext()) {
+    while (true) {
+      ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+      blocked.get();
+      if (!aggregationOperator.hasNext()) {
+        break;
+      }
       TsBlock resultTsBlock = aggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
       assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
       assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
       assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -123,7 +134,8 @@ public class AggregationOperatorTest {
 
   /** Try to aggregate binary intermediate result of one time series without 
group by interval. */
   @Test
-  public void testAggregateIntermediateResult2() throws IllegalPathException {
+  public void testAggregateIntermediateResult2()
+      throws IllegalPathException, ExecutionException, InterruptedException {
     List<AggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(AggregationType.AVG);
     aggregationTypes.add(AggregationType.FIRST_VALUE);
@@ -140,8 +152,16 @@ public class AggregationOperatorTest {
     AggregationOperator aggregationOperator =
         initAggregationOperator(aggregationTypes, null, inputLocations);
     int count = 0;
-    while (aggregationOperator.hasNext()) {
+    while (true) {
+      ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+      blocked.get();
+      if (!aggregationOperator.hasNext()) {
+        break;
+      }
       TsBlock resultTsBlock = aggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
       assertEquals(13049.5, resultTsBlock.getColumn(0).getDouble(0), 0.001);
       assertEquals(20000, resultTsBlock.getColumn(1).getInt(0));
       assertEquals(10499, resultTsBlock.getColumn(2).getInt(0));
@@ -151,7 +171,8 @@ public class AggregationOperatorTest {
   }
 
   @Test
-  public void testGroupByIntermediateResult1() throws IllegalPathException {
+  public void testGroupByIntermediateResult1()
+      throws IllegalPathException, ExecutionException, InterruptedException {
     int[][] result =
         new int[][] {
           {100, 100, 100, 99},
@@ -179,8 +200,16 @@ public class AggregationOperatorTest {
     AggregationOperator aggregationOperator =
         initAggregationOperator(aggregationTypes, groupByTimeParameter, 
inputLocations);
     int count = 0;
-    while (aggregationOperator.hasNext()) {
+    while (true) {
+      ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+      blocked.get();
+      if (!aggregationOperator.hasNext()) {
+        break;
+      }
       TsBlock resultTsBlock = aggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -197,7 +226,8 @@ public class AggregationOperatorTest {
   }
 
   @Test
-  public void testGroupByIntermediateResult2() throws IllegalPathException {
+  public void testGroupByIntermediateResult2()
+      throws IllegalPathException, ExecutionException, InterruptedException {
     double[][] result =
         new double[][] {
           {20049.5, 20149.5, 6249.5, 8429.808},
@@ -221,8 +251,16 @@ public class AggregationOperatorTest {
     AggregationOperator aggregationOperator =
         initAggregationOperator(aggregationTypes, groupByTimeParameter, 
inputLocations);
     int count = 0;
-    while (aggregationOperator.hasNext()) {
+    while (true) {
+      ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+      blocked.get();
+      if (!aggregationOperator.hasNext()) {
+        break;
+      }
       TsBlock resultTsBlock = aggregationOperator.next();
+      if (resultTsBlock == null) {
+        continue;
+      }
       int positionCount = resultTsBlock.getPositionCount();
       for (int pos = 0; pos < positionCount; pos++) {
         assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));

Reply via email to