This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/aggrOpRefactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aef556d6d0a14f874c88ba08b415bf8dd953013f Author: Minghui Liu <[email protected]> AuthorDate: Wed Jul 6 18:28:29 2022 +0800 refactor AggregationOperator to batch process --- .../operator/process/AggregationOperator.java | 72 ++++++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java index 0027328e1e..9f02abab9e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregationOperator.java @@ -34,8 +34,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.updateResultTsBlockFromAggregators; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.skipOutOfTimeRangePoints; /** * AggregationOperator can process the situation: aggregation of intermediate aggregate result, it @@ -50,12 +51,16 @@ public class AggregationOperator implements ProcessOperator { private final int inputOperatorsCount; private final TsBlock[] inputTsBlocks; - private final TsBlockBuilder tsBlockBuilder; + private final TsBlockBuilder resultTsBlockBuilder; private final ITimeRangeIterator timeRangeIterator; // current interval of aggregation window [curStartTime, curEndTime) private TimeRange curTimeRange; + private final boolean ascending; + + private final boolean[] canCallNext; + public AggregationOperator( OperatorContext operatorContext, List<Aggregator> aggregators, @@ -73,9 +78,12 @@ public class AggregationOperator implements ProcessOperator { for (Aggregator aggregator : aggregators) { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } - tsBlockBuilder = new TsBlockBuilder(dataTypes); + this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes); this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending, outputPartialTimeWindow); + this.ascending = ascending; + + this.canCallNext = new boolean[inputOperatorsCount]; } @Override @@ -96,30 +104,70 @@ public class AggregationOperator implements ProcessOperator { @Override public TsBlock next() { - // update input tsBlock - if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { - curTimeRange = timeRangeIterator.nextTimeRange(); + resultTsBlockBuilder.reset(); + for (int i = 0; i < inputOperatorsCount; i++) { + canCallNext[i] = true; } + + while ((curTimeRange != null || timeRangeIterator.hasNextTimeRange()) + && !resultTsBlockBuilder.isFull()) { + if (hasCachedData()) { + calculateNextResult(); + } else { + break; + } + } + + if (resultTsBlockBuilder.getPositionCount() > 0) { + return resultTsBlockBuilder.build(); + } else { + return null; + } + } + + private boolean hasCachedData() { for (int i = 0; i < inputOperatorsCount; i++) { if (inputTsBlocks[i] != null) { continue; } + if (!canCallNext[i]) { + return false; + } + inputTsBlocks[i] = children.get(i).next(); + canCallNext[i] = false; if (inputTsBlocks[i] == null) { - return null; + return false; } } + return true; + } + + private void calculateNextResult() { + if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { + curTimeRange = timeRangeIterator.nextTimeRange(); + // consume current input tsBlocks + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + aggregator.updateTimeRange(curTimeRange); + } + } + // consume current input tsBlocks for (Aggregator aggregator : aggregators) { - aggregator.reset(); aggregator.processTsBlocks(inputTsBlocks); } - // output result from aggregator - curTimeRange = null; + for (int i = 0; i < inputOperatorsCount; i++) { - inputTsBlocks[i] = null; + inputTsBlocks[i] = skipOutOfTimeRangePoints(inputTsBlocks[i], curTimeRange, ascending); + if (inputTsBlocks[i].isEmpty()) { + inputTsBlocks[i] = null; + } } - return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, timeRangeIterator); + curTimeRange = null; + + // output result from aggregator + appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); } @Override
