This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregationOp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 033b38ca0b0f736a7eaef048130bc9f4354efe2f Author: Alima777 <[email protected]> AuthorDate: Mon May 9 17:16:24 2022 +0800 add RawDataAggregateOperator --- .../iotdb/db/mpp/aggregation/Aggregator.java | 42 ++++---- .../operator/process/RawDataAggregateOperator.java | 115 ++++++++++++++++++--- .../source/SeriesAggregateScanOperator.java | 39 ++----- 3 files changed, 133 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java index f083f65fe9..d38cbe88c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java @@ -55,32 +55,38 @@ public class Aggregator { this.inputLocationList = inputLocationList; } - // Used for SeriesAggregateScanOperator + // Used for SeriesAggregateScanOperator and RawDataAggregateOperator public void processTsBlock(TsBlock tsBlock) { checkArgument( - step.isInputRaw(), "Step in SeriesAggregateScanOperator can only process raw input"); + step.isInputRaw(), + "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); // TODO Aligned TimeSeries - accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange); + if (inputLocationList == null) { + accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange); + } else { + for (InputLocation[] inputLocations : inputLocationList) { + checkArgument( + inputLocations[0].getTsBlockIndex() == 1, + "RawDataAggregateOperator can only process one tsBlock input."); + Column[] timeValueColumn = new Column[2]; + timeValueColumn[0] = tsBlock.getTimeColumn(); + timeValueColumn[1] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex()); + accumulator.addInput(timeValueColumn, timeRange); + } + } } - // Used for aggregateOperator + // Used for AggregateOperator public void processTsBlocks(TsBlock[] tsBlock) { + checkArgument(!step.isInputRaw(), "Step in AggregateOperator cannot process raw input"); for (InputLocation[] inputLocations : inputLocationList) { - if (step.isInputRaw()) { - TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()]; - Column[] timeValueColumn = new Column[2]; - timeValueColumn[0] = rawTsBlock.getTimeColumn(); - timeValueColumn[1] = rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex()); - accumulator.addInput(timeValueColumn, timeRange); - } else { - Column[] columns = new Column[inputLocations.length]; - for (int i = 0; i < inputLocations.length; i++) { - columns[i] = - tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( - inputLocations[i].getValueColumnIndex()); - } - accumulator.addIntermediate(columns); + Column[] columns = new Column[inputLocations.length]; + for (int i = 0; i < inputLocations.length; i++) { + columns[i] = + tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( + inputLocations[i].getValueColumnIndex()); } + accumulator.addIntermediate(columns); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java index 5fb705dc3a..bbe6e6f041 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import com.google.common.util.concurrent.ListenableFuture; @@ -41,33 +42,38 @@ import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateS * RawDataAggregateOperator is used to process raw data tsBlock input calculating using value * filter. It's possible that there is more than one tsBlock input in one time interval. And it's * also possible that one tsBlock can cover multiple time intervals too. + * + * <p>Since raw data query with value filter is processed by FilterOperator above TimeJoinOperator, + * there we can see RawDataAggregateOperator as a one-to-one(one input, ont output) operator. + * + * <p>Return aggregation result in one time interval once. */ public class RawDataAggregateOperator implements ProcessOperator { private final OperatorContext operatorContext; private final List<Aggregator> aggregators; - private final List<Operator> children; - - private final int inputOperatorsCount; - private final TsBlock[] inputTsBlocks; - private final TsBlockBuilder tsBlockBuilder; - + private final Operator child; + private final boolean ascending; private ITimeRangeIterator timeRangeIterator; // current interval of aggregation window [curStartTime, curEndTime) private TimeRange curTimeRange; + private TsBlock preCachedData; + + // Using for building result tsBlock + private final TsBlockBuilder tsBlockBuilder; + public RawDataAggregateOperator( OperatorContext operatorContext, List<Aggregator> aggregators, - List<Operator> children, + Operator child, boolean ascending, GroupByTimeParameter groupByTimeParameter) { this.operatorContext = operatorContext; this.aggregators = aggregators; - this.children = children; + this.child = child; + this.ascending = ascending; - this.inputOperatorsCount = children.size(); - this.inputTsBlocks = new TsBlock[inputOperatorsCount]; List<TSDataType> dataTypes = new ArrayList<>(); for (Aggregator aggregator : aggregators) { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); @@ -83,26 +89,105 @@ public class RawDataAggregateOperator implements ProcessOperator { @Override public ListenableFuture<Void> isBlocked() { - return ProcessOperator.super.isBlocked(); + return child.isBlocked(); } @Override public TsBlock next() { - return null; + // 1. Clear previous aggregation result + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + aggregator.setTimeRange(curTimeRange); + } + + // 2. Calculate aggregation result based on current time window + while (!calcFromCacheData(curTimeRange)) { + preCachedData = child.next(); + } + + // 3. Update result using aggregators + return AggregateOperator.updateResultTsBlockFromAggregators( + tsBlockBuilder, aggregators, curTimeRange); } @Override public boolean hasNext() { - return false; + if (!timeRangeIterator.hasNextTimeRange()) { + return false; + } + curTimeRange = timeRangeIterator.nextTimeRange(); + return true; } @Override public void close() throws Exception { - ProcessOperator.super.close(); + child.close(); } @Override public boolean isFinished() { - return false; + return !this.hasNext(); + } + + /** @return if already get the result */ + private boolean calcFromCacheData(TimeRange curTimeRange) { + // check if the batchData does not contain points in current interval + if (preCachedData != null && satisfied(preCachedData, curTimeRange, ascending)) { + // skip points that cannot be calculated + preCachedData = skipOutOfTimeRangePoints(preCachedData, curTimeRange, ascending); + + for (Aggregator aggregator : aggregators) { + // current agg method has been calculated + if (aggregator.hasFinalResult()) { + continue; + } + + aggregator.processTsBlock(preCachedData); + } + } + // The result is calculated from the cache + return (preCachedData != null + && (ascending + ? preCachedData.getEndTime() >= curTimeRange.getMax() + : preCachedData.getStartTime() < curTimeRange.getMin())) + || isEndCalc(aggregators); + } + + // skip points that cannot be calculated + public static TsBlock skipOutOfTimeRangePoints( + TsBlock tsBlock, TimeRange curTimeRange, boolean ascending) { + TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); + if (ascending) { + while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) { + tsBlockIterator.next(); + } + } else { + while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) { + tsBlockIterator.next(); + } + } + return tsBlock.subTsBlock(tsBlockIterator.getRowIndex()); + } + + private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) { + TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); + if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { + return false; + } + + return ascending + ? (tsBlockIterator.getEndTime() >= timeRange.getMin() + && tsBlockIterator.currentTime() < timeRange.getMax()) + : (tsBlockIterator.getStartTime() < timeRange.getMax() + && tsBlockIterator.currentTime() >= timeRange.getMin()); + } + + public static boolean isEndCalc(List<Aggregator> aggregators) { + for (Aggregator aggregator : aggregators) { + if (!aggregator.hasFinalResult()) { + return false; + } + } + return true; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java index f86b5e5deb..23c8bc28be 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java @@ -44,6 +44,9 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import static org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.isEndCalc; +import static org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregateOperator.skipOutOfTimeRangePoints; + /** * This operator is responsible to do the aggregation calculation for one series based on global * time range and time split parameter. @@ -250,18 +253,18 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { && (ascending ? preCachedData.getEndTime() >= curTimeRange.getMax() : preCachedData.getStartTime() < curTimeRange.getMin())) - || isEndCalc(); + || isEndCalc(aggregators); } @SuppressWarnings("squid:S3776") private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) { // check if the batchData does not contain points in current interval - if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) { + if (tsBlock == null || !satisfied(tsBlock, curTimeRange, ascending)) { return; } // skip points that cannot be calculated - tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange); + tsBlock = skipOutOfTimeRangePoints(tsBlock, curTimeRange, ascending); for (Aggregator aggregator : aggregators) { // current agg method has been calculated @@ -278,22 +281,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { } } - // skip points that cannot be calculated - private TsBlock skipOutOfTimeRangePoints(TsBlock tsBlock, TimeRange curTimeRange) { - TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); - if (ascending) { - while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() < curTimeRange.getMin()) { - tsBlockIterator.next(); - } - } else { - while (tsBlockIterator.hasNext() && tsBlockIterator.currentTime() >= curTimeRange.getMax()) { - tsBlockIterator.next(); - } - } - return tsBlock.subTsBlock(tsBlockIterator.getRowIndex()); - } - - private boolean satisfied(TsBlock tsBlock, TimeRange timeRange) { + private boolean satisfied(TsBlock tsBlock, TimeRange timeRange, boolean ascending) { TsBlockSingleColumnIterator tsBlockIterator = tsBlock.getTsBlockSingleColumnIterator(); if (tsBlockIterator == null || !tsBlockIterator.hasNext()) { return false; @@ -313,15 +301,6 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { return true; } - private boolean isEndCalc() { - for (Aggregator aggregator : aggregators) { - if (!aggregator.hasFinalResult()) { - return false; - } - } - return true; - } - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private boolean readAndCalcFromPage(TimeRange curTimeRange) throws IOException { while (seriesScanUtil.hasNextPage()) { @@ -342,7 +321,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { && curTimeRange.contains(pageStatistics.getStartTime(), pageStatistics.getEndTime())) { calcFromStatistics(pageStatistics); seriesScanUtil.skipCurrentPage(); - if (isEndCalc()) { + if (isEndCalc(aggregators)) { return true; } continue; @@ -369,7 +348,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { calcFromBatch(tsBlock, curTimeRange); // judge whether the calculation finished - if (isEndCalc() + if (isEndCalc(aggregators) || (tsBlockIterator.hasNext() && (ascending ? tsBlockIterator.currentTime() >= curTimeRange.getMax()
