This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e51f5ef6a4 [IOTDB-3859] Fix wrong isBlocked() method implementation in
AggregationOperator (#6695)
e51f5ef6a4 is described below
commit e51f5ef6a40d6a9fd5ee2bf52cdc56f540e9f7e0
Author: liuminghui233 <[email protected]>
AuthorDate: Wed Jul 20 10:54:40 2022 +0800
[IOTDB-3859] Fix wrong isBlocked() method implementation in
AggregationOperator (#6695)
---
.../operator/process/AggregationOperator.java | 23 ++++++---
.../operator/AggregationOperatorTest.java | 54 ++++++++++++++++++----
2 files changed, 63 insertions(+), 14 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
index 76b81c2873..8cf4aef8b0 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java
@@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.google.common.util.concurrent.Futures.successfulAsList;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult;
import static
org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
@@ -75,6 +76,9 @@ public class AggregationOperator implements ProcessOperator {
this.inputOperatorsCount = children.size();
this.inputTsBlocks = new TsBlock[inputOperatorsCount];
this.canCallNext = new boolean[inputOperatorsCount];
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ canCallNext[i] = false;
+ }
this.timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending,
outputPartialTimeWindow);
@@ -93,13 +97,19 @@ public class AggregationOperator implements ProcessOperator
{
@Override
public ListenableFuture<?> isBlocked() {
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
for (int i = 0; i < inputOperatorsCount; i++) {
ListenableFuture<?> blocked = children.get(i).isBlocked();
- if (!blocked.isDone()) {
- return blocked;
+ if (blocked.isDone()) {
+ canCallNext[i] = true;
+ } else {
+ if (isEmpty(i)) {
+ listenableFutures.add(blocked);
+ canCallNext[i] = true;
+ }
}
}
- return NOT_BLOCKED;
+ return listenableFutures.isEmpty() ? NOT_BLOCKED :
successfulAsList(listenableFutures);
}
@Override
@@ -115,9 +125,6 @@ public class AggregationOperator implements ProcessOperator
{
// reset operator state
resultTsBlockBuilder.reset();
- for (int i = 0; i < inputOperatorsCount; i++) {
- canCallNext[i] = true;
- }
while (System.nanoTime() - start < maxRuntime
&& (curTimeRange != null || timeRangeIterator.hasNextTimeRange())
@@ -199,4 +206,8 @@ public class AggregationOperator implements ProcessOperator
{
curTimeRange = null;
appendAggregationResult(resultTsBlockBuilder, aggregators,
timeRangeIterator);
}
+
+ private boolean isEmpty(int index) {
+ return inputTsBlocks[index] == null || inputTsBlocks[index].isEmpty();
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
index a8380c27f1..69b470845b 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/AggregationOperatorTest.java
@@ -46,6 +46,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.junit.After;
import org.junit.Before;
@@ -55,6 +56,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -90,7 +92,8 @@ public class AggregationOperatorTest {
/** Try to aggregate unary intermediate result of one time series without
group by interval. */
@Test
- public void testAggregateIntermediateResult1() throws IllegalPathException {
+ public void testAggregateIntermediateResult1()
+ throws IllegalPathException, ExecutionException, InterruptedException {
List<AggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(AggregationType.COUNT);
aggregationTypes.add(AggregationType.SUM);
@@ -108,8 +111,16 @@ public class AggregationOperatorTest {
AggregationOperator aggregationOperator =
initAggregationOperator(aggregationTypes, null, inputLocations);
int count = 0;
- while (aggregationOperator.hasNext()) {
+ while (true) {
+ ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+ blocked.get();
+ if (!aggregationOperator.hasNext()) {
+ break;
+ }
TsBlock resultTsBlock = aggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
assertEquals(500, resultTsBlock.getColumn(0).getLong(0));
assertEquals(6524750.0, resultTsBlock.getColumn(1).getDouble(0), 0.0001);
assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
@@ -123,7 +134,8 @@ public class AggregationOperatorTest {
/** Try to aggregate binary intermediate result of one time series without
group by interval. */
@Test
- public void testAggregateIntermediateResult2() throws IllegalPathException {
+ public void testAggregateIntermediateResult2()
+ throws IllegalPathException, ExecutionException, InterruptedException {
List<AggregationType> aggregationTypes = new ArrayList<>();
aggregationTypes.add(AggregationType.AVG);
aggregationTypes.add(AggregationType.FIRST_VALUE);
@@ -140,8 +152,16 @@ public class AggregationOperatorTest {
AggregationOperator aggregationOperator =
initAggregationOperator(aggregationTypes, null, inputLocations);
int count = 0;
- while (aggregationOperator.hasNext()) {
+ while (true) {
+ ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+ blocked.get();
+ if (!aggregationOperator.hasNext()) {
+ break;
+ }
TsBlock resultTsBlock = aggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
assertEquals(13049.5, resultTsBlock.getColumn(0).getDouble(0), 0.001);
assertEquals(20000, resultTsBlock.getColumn(1).getInt(0));
assertEquals(10499, resultTsBlock.getColumn(2).getInt(0));
@@ -151,7 +171,8 @@ public class AggregationOperatorTest {
}
@Test
- public void testGroupByIntermediateResult1() throws IllegalPathException {
+ public void testGroupByIntermediateResult1()
+ throws IllegalPathException, ExecutionException, InterruptedException {
int[][] result =
new int[][] {
{100, 100, 100, 99},
@@ -179,8 +200,16 @@ public class AggregationOperatorTest {
AggregationOperator aggregationOperator =
initAggregationOperator(aggregationTypes, groupByTimeParameter,
inputLocations);
int count = 0;
- while (aggregationOperator.hasNext()) {
+ while (true) {
+ ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+ blocked.get();
+ if (!aggregationOperator.hasNext()) {
+ break;
+ }
TsBlock resultTsBlock = aggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));
@@ -197,7 +226,8 @@ public class AggregationOperatorTest {
}
@Test
- public void testGroupByIntermediateResult2() throws IllegalPathException {
+ public void testGroupByIntermediateResult2()
+ throws IllegalPathException, ExecutionException, InterruptedException {
double[][] result =
new double[][] {
{20049.5, 20149.5, 6249.5, 8429.808},
@@ -221,8 +251,16 @@ public class AggregationOperatorTest {
AggregationOperator aggregationOperator =
initAggregationOperator(aggregationTypes, groupByTimeParameter,
inputLocations);
int count = 0;
- while (aggregationOperator.hasNext()) {
+ while (true) {
+ ListenableFuture<?> blocked = aggregationOperator.isBlocked();
+ blocked.get();
+ if (!aggregationOperator.hasNext()) {
+ break;
+ }
TsBlock resultTsBlock = aggregationOperator.next();
+ if (resultTsBlock == null) {
+ continue;
+ }
int positionCount = resultTsBlock.getPositionCount();
for (int pos = 0; pos < positionCount; pos++) {
assertEquals(100 * count, resultTsBlock.getTimeColumn().getLong(pos));