This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/aggrOpRefactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b6848520a628dd32c73d8895a2ac1ab28a821793 Author: Minghui Liu <[email protected]> AuthorDate: Mon Jul 4 21:51:35 2022 +0800 refactor scan operator to batch process --- .../db/mpp/execution/operator/AggregationUtil.java | 11 +++++-- .../source/SeriesAggregationScanOperator.java | 37 ++++++++++++---------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java index a7ec92ec8c..64b80d0bfe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java @@ -35,11 +35,10 @@ import java.util.List; public class AggregationUtil { - public static TsBlock updateResultTsBlockFromAggregators( + public static void appendAggregationResult( TsBlockBuilder tsBlockBuilder, List<? extends Aggregator> aggregators, ITimeRangeIterator timeRangeIterator) { - tsBlockBuilder.reset(); TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); // Use start time of current time range as time column timeColumnBuilder.writeLong(timeRangeIterator.currentOutputTime()); @@ -54,6 +53,14 @@ public class AggregationUtil { aggregator.outputResult(columnBuilder); } tsBlockBuilder.declarePosition(); + } + + public static TsBlock updateResultTsBlockFromAggregators( + TsBlockBuilder tsBlockBuilder, + List<? extends Aggregator> aggregators, + ITimeRangeIterator timeRangeIterator) { + tsBlockBuilder.reset(); + appendAggregationResult(tsBlockBuilder, aggregators, timeRangeIterator); return tsBlockBuilder.build(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java index 06b4878d2b..1f0e11485c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregationScanOperator.java @@ -42,10 +42,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isEndCalc; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators; /** * This operator is responsible to do the aggregation calculation for one series based on global @@ -72,8 +72,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { protected TsBlock preCachedData; - protected final TsBlockBuilder tsBlockBuilder; - protected TsBlock resultTsBlock; + protected final TsBlockBuilder resultTsBlockBuilder; protected boolean finished = false; public SeriesAggregationScanOperator( @@ -118,7 +117,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { for (Aggregator aggregator : aggregators) { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } - tsBlockBuilder = new TsBlockBuilder(dataTypes); + this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes); this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, true); this.isGroupByQuery = groupByTimeParameter != null; } @@ -135,20 +134,25 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { @Override public TsBlock next() { - if (!timeRangeIterator.hasNextTimeRange()) { - return null; - } - curTimeRange = timeRangeIterator.nextTimeRange(); + resultTsBlockBuilder.reset(); + while (timeRangeIterator.hasNextTimeRange() && !resultTsBlockBuilder.isFull()) { + curTimeRange = timeRangeIterator.nextTimeRange(); - // 1. Clear previous aggregation result - for (Aggregator aggregator : aggregators) { - aggregator.reset(); - aggregator.updateTimeRange(curTimeRange); + // 1. Clear previous aggregation result + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + aggregator.updateTimeRange(curTimeRange); + } + + // 2. Calculate aggregation result based on current time window + calculateNextResult(); } - // 2. Calculate aggregation result based on current time window - calculateNextResult(); - return resultTsBlock; + if (resultTsBlockBuilder.getPositionCount() > 0) { + return resultTsBlockBuilder.build(); + } else { + return null; + } } @Override @@ -224,8 +228,7 @@ public class SeriesAggregationScanOperator implements DataSourceOperator { } protected void updateResultTsBlock() { - resultTsBlock = - updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator); + appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); } /** @return if already get the result */
