This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregationOp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1938ed555873cfa02d1a324ba8d734e2e1899b15 Author: Alima777 <[email protected]> AuthorDate: Mon May 9 11:37:46 2022 +0800 add aggregate Operator --- .../iotdb/db/mpp/aggregation/Aggregator.java | 1 + .../operator/process/AggregateOperator.java | 91 ++++++++++++++++++++-- ...Operator.java => RawDataAggregateOperator.java} | 42 +++++++++- .../source/SeriesAggregateScanOperator.java | 41 ++++------ 4 files changed, 139 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java index a0a7b87f1b..f083f65fe9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java @@ -105,6 +105,7 @@ public class Aggregator { } public void reset() { + timeRange = new TimeRange(0, Long.MAX_VALUE); accumulator.reset(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java index 71e7817b94..9926c46496 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java @@ -21,23 +21,60 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator; + +/** + * AggregateOperator can process the situation: aggregation of intermediate aggregate result, it + * will output one result based on time interval too. One intermediate tsBlock input will only + * contain the result of one time interval exactly. + */ public class AggregateOperator implements ProcessOperator { private final OperatorContext operatorContext; private final List<Aggregator> aggregators; private final List<Operator> children; + private final int inputOperatorsCount; + private final TsBlock[] inputTsBlocks; + private final TsBlockBuilder tsBlockBuilder; + + private ITimeRangeIterator timeRangeIterator; + // current interval of aggregation window [curStartTime, curEndTime) + private TimeRange curTimeRange; + public AggregateOperator( - OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) { + OperatorContext operatorContext, + List<Aggregator> aggregators, + List<Operator> children, + boolean ascending, + GroupByTimeParameter groupByTimeParameter) { this.operatorContext = operatorContext; this.aggregators = aggregators; this.children = children; + + this.inputOperatorsCount = children.size(); + this.inputTsBlocks = new TsBlock[inputOperatorsCount]; + List<TSDataType> dataTypes = new ArrayList<>(); + for (Aggregator aggregator : aggregators) { + dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); + } + tsBlockBuilder = new TsBlockBuilder(dataTypes); + this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending); } @Override @@ -47,26 +84,68 @@ public class AggregateOperator implements ProcessOperator { @Override public ListenableFuture<Void> isBlocked() { - return ProcessOperator.super.isBlocked(); + for (int i = 0; i < inputOperatorsCount; i++) { + ListenableFuture<Void> blocked = children.get(i).isBlocked(); + if (!blocked.isDone()) { + return blocked; + } + } + return NOT_BLOCKED; } @Override public TsBlock next() { - return null; + // update input tsBlock + for (int i = 0; i < inputOperatorsCount; i++) { + inputTsBlocks[i] = children.get(i).next(); + } + // consume current input tsBlocks + for (Aggregator aggregator : aggregators) { + aggregator.reset(); + aggregator.processTsBlocks(inputTsBlocks); + } + // output result from aggregator + return updateResultTsBlockFromAggregators(tsBlockBuilder, aggregators, curTimeRange); } @Override public boolean hasNext() { - return false; + if (!timeRangeIterator.hasNextTimeRange()) { + return false; + } + curTimeRange = timeRangeIterator.nextTimeRange(); + return true; } @Override public void close() throws Exception { - ProcessOperator.super.close(); + for (Operator child : children) { + child.close(); + } } @Override public boolean isFinished() { - return false; + return !this.hasNext(); + } + + public static TsBlock updateResultTsBlockFromAggregators( + TsBlockBuilder tsBlockBuilder, List<Aggregator> aggregators, TimeRange curTimeRange) { + tsBlockBuilder.reset(); + TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); + // Use start time of current time range as time column + timeColumnBuilder.writeLong(curTimeRange.getMin()); + ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); + int columnIndex = 0; + for (Aggregator aggregator : aggregators) { + ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length]; + columnBuilder[0] = columnBuilders[columnIndex++]; + if (columnBuilder.length > 1) { + columnBuilder[1] = columnBuilders[columnIndex++]; + } + aggregator.outputResult(columnBuilder); + } + tsBlockBuilder.declarePosition(); + return tsBlockBuilder.build(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java similarity index 52% copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java index 71e7817b94..5fb705dc3a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AggregateOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregateOperator.java @@ -16,28 +16,64 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; +import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import com.google.common.util.concurrent.ListenableFuture; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -public class AggregateOperator implements ProcessOperator { +import static org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregateScanOperator.initTimeRangeIterator; + +/** + * RawDataAggregateOperator is used to process raw data tsBlock input calculating using value + * filter. It's possible that there is more than one tsBlock input in one time interval. And it's + * also possible that one tsBlock can cover multiple time intervals too. + */ +public class RawDataAggregateOperator implements ProcessOperator { private final OperatorContext operatorContext; private final List<Aggregator> aggregators; private final List<Operator> children; - public AggregateOperator( - OperatorContext operatorContext, List<Aggregator> aggregators, List<Operator> children) { + private final int inputOperatorsCount; + private final TsBlock[] inputTsBlocks; + private final TsBlockBuilder tsBlockBuilder; + + private ITimeRangeIterator timeRangeIterator; + // current interval of aggregation window [curStartTime, curEndTime) + private TimeRange curTimeRange; + + public RawDataAggregateOperator( + OperatorContext operatorContext, + List<Aggregator> aggregators, + List<Operator> children, + boolean ascending, + GroupByTimeParameter groupByTimeParameter) { this.operatorContext = operatorContext; this.aggregators = aggregators; this.children = children; + + this.inputOperatorsCount = children.size(); + this.inputTsBlocks = new TsBlock[inputOperatorsCount]; + List<TSDataType> dataTypes = new ArrayList<>(); + for (Aggregator aggregator : aggregators) { + dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); + } + tsBlockBuilder = new TsBlockBuilder(dataTypes); + this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java index fdc50a808b..f86b5e5deb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/SeriesAggregateScanOperator.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.execution.operator.process.AggregateOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator; @@ -33,8 +34,6 @@ import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockSingleColumnIterator; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import com.google.common.util.concurrent.ListenableFuture; @@ -101,7 +100,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); } tsBlockBuilder = new TsBlockBuilder(dataTypes); - this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter); + this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter, ascending); } /** @@ -109,7 +108,8 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { * Aggregation query has only one time window and the result set of it does not contain a * timestamp, so it doesn't matter what the time range returns. */ - public ITimeRangeIterator initTimeRangeIterator(GroupByTimeParameter groupByTimeParameter) { + public static ITimeRangeIterator initTimeRangeIterator( + GroupByTimeParameter groupByTimeParameter, boolean ascending) { if (groupByTimeParameter == null) { return new SingleTimeWindowIterator(0, Long.MAX_VALUE); } else { @@ -164,19 +164,19 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { // 2. Calculate aggregation result based on current time window if (calcFromCacheData(curTimeRange)) { - updateResultTsBlockUsingAggregateResult(); + updateResultTsBlockFromAggregators(); return true; } // read page data firstly if (readAndCalcFromPage(curTimeRange)) { - updateResultTsBlockUsingAggregateResult(); + updateResultTsBlockFromAggregators(); return true; } // read chunk data secondly if (readAndCalcFromChunk(curTimeRange)) { - updateResultTsBlockUsingAggregateResult(); + updateResultTsBlockFromAggregators(); return true; } @@ -185,7 +185,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { Statistics fileStatistics = seriesScanUtil.currentFileStatistics(); if (fileStatistics.getStartTime() >= curTimeRange.getMax()) { if (ascending) { - updateResultTsBlockUsingAggregateResult(); + updateResultTsBlockFromAggregators(); return true; } else { seriesScanUtil.skipCurrentFile(); @@ -202,35 +202,22 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { // read chunk if (readAndCalcFromChunk(curTimeRange)) { - updateResultTsBlockUsingAggregateResult(); + updateResultTsBlockFromAggregators(); return true; } } - updateResultTsBlockUsingAggregateResult(); + updateResultTsBlockFromAggregators(); return true; } catch (IOException e) { throw new RuntimeException("Error while scanning the file", e); } } - private void updateResultTsBlockUsingAggregateResult() { - tsBlockBuilder.reset(); - TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); - // Use start time of current time range as time column - timeColumnBuilder.writeLong(curTimeRange.getMin()); - ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); - int columnIndex = 0; - for (Aggregator aggregator : aggregators) { - ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length]; - columnBuilder[0] = columnBuilders[columnIndex++]; - if (columnBuilder.length > 1) { - columnBuilder[1] = columnBuilders[columnIndex++]; - } - aggregator.outputResult(columnBuilder); - } - tsBlockBuilder.declarePosition(); - resultTsBlock = tsBlockBuilder.build(); + private void updateResultTsBlockFromAggregators() { + resultTsBlock = + AggregateOperator.updateResultTsBlockFromAggregators( + tsBlockBuilder, aggregators, curTimeRange); hasCachedTsBlock = true; }
