This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch perfect_tableagg in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 622e3a5c16ba61e4fc62a55159fc8b5e6ad8eb58 Author: Beyyes <[email protected]> AuthorDate: Fri Dec 20 00:29:33 2024 +0800 perfect TableAggregationTableScanOperator --- .../TableAggregationTableScanOperator.java | 159 ++++++------- .../plan/planner/TableOperatorGenerator.java | 251 ++++++++++----------- 2 files changed, 192 insertions(+), 218 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index e963963bf7d..8ef9ddbbf04 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -24,7 +24,7 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow; @@ -49,7 +49,6 @@ import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.BinaryColumn; -import org.apache.tsfile.read.common.block.column.LongColumn; import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -68,120 +67,97 @@ import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; -public class TableAggregationTableScanOperator extends AbstractSeriesAggregationScanOperator { +public class TableAggregationTableScanOperator extends AbstractDataSourceOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); - private final List<TableAggregator> tableAggregators; + private boolean finished = false; + private TsBlock inputTsBlock; + private final List<TableAggregator> tableAggregators; private final List<ColumnSchema> groupingKeySchemas; private final int[] groupingKeyIndex; - public static final LongColumn TIME_COLUMN_TEMPLATE = - new LongColumn(1, Optional.empty(), new long[] {0}); - - private final List<ColumnSchema> columnSchemas; - - private final int[] columnsIndexArray; - private final List<DeviceEntry> deviceEntries; - private final int deviceCount; - - private final Ordering scanOrder; - private final SeriesScanOptions seriesScanOptions; - + private int currentDeviceIndex; private final List<String> measurementColumnNames; private final Set<String> allSensors; - private final List<IMeasurementSchema> measurementSchemas; - private final List<TSDataType> measurementColumnTSDataTypes; + private final int measurementCount; - // TODO calc maxTsBlockLineNum using date_bin - private final int maxTsBlockLineNum; + private final List<ColumnSchema> aggColumnSchemas; + private final int[] aggColumnsIndexArray; - // for different aggregations aiming to same column, use this variable to point to same column - private final List<Integer> aggArguments; + private final SeriesScanOptions seriesScanOptions; + private final boolean ascending; + private final Ordering scanOrder; + // Some special data types(like BLOB) cannot use statistics + protected final boolean canUseStatistics; + private final long cachedRawDataSize; - private QueryDataSource queryDataSource; + // stores all inputChannels of tableAggregators, + // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0] + private final List<Integer> aggregatorInputChannels; - private int currentDeviceIndex; + private QueryDataSource queryDataSource; - ITableTimeRangeIterator timeIterator; + private final ITableTimeRangeIterator timeIterator; private boolean allAggregatorsHasFinalResult = false; public TableAggregationTableScanOperator( PlanNodeId sourceId, OperatorContext context, - List<ColumnSchema> columnSchemas, - int[] columnsIndexArray, + List<ColumnSchema> aggColumnSchemas, + int[] aggColumnsIndexArray, List<DeviceEntry> deviceEntries, - Ordering scanOrder, SeriesScanOptions seriesScanOptions, List<String> measurementColumnNames, Set<String> allSensors, List<IMeasurementSchema> measurementSchemas, - int maxTsBlockLineNum, - int measurementCount, List<TableAggregator> tableAggregators, List<ColumnSchema> groupingKeySchemas, int[] groupingKeyIndex, ITableTimeRangeIterator tableTimeRangeIterator, boolean ascending, - long maxReturnSize, boolean canUseStatistics, - List<Integer> aggArguments) { - - super( - sourceId, - context, - null, - measurementCount, - null, - null, - ascending, - false, - null, - maxReturnSize, - (1L + measurementCount) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(), - canUseStatistics); + List<Integer> aggregatorInputChannels) { + this.sourceId = sourceId; + this.operatorContext = context; + this.canUseStatistics = canUseStatistics; this.tableAggregators = tableAggregators; this.groupingKeySchemas = groupingKeySchemas; this.groupingKeyIndex = groupingKeyIndex; - - this.sourceId = sourceId; - this.operatorContext = context; - this.columnSchemas = columnSchemas; - this.columnsIndexArray = columnsIndexArray; + this.aggColumnSchemas = aggColumnSchemas; + this.aggColumnsIndexArray = aggColumnsIndexArray; this.deviceEntries = deviceEntries; this.deviceCount = deviceEntries.size(); this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); - this.scanOrder = scanOrder; + this.ascending = ascending; + this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC; this.seriesScanOptions = seriesScanOptions; this.measurementColumnNames = measurementColumnNames; + this.measurementCount = measurementColumnNames.size(); + this.cachedRawDataSize = + (1L + this.measurementCount) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); this.allSensors = allSensors; this.measurementSchemas = measurementSchemas; this.measurementColumnTSDataTypes = measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); this.currentDeviceIndex = 0; this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); - this.aggArguments = aggArguments; + this.aggregatorInputChannels = aggregatorInputChannels; this.timeIterator = tableTimeRangeIterator; - if (tableTimeRangeIterator.getType() - == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { - curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE); - } - - this.maxReturnSize = maxReturnSize; - this.maxTsBlockLineNum = maxTsBlockLineNum; constructAlignedSeriesScanUtil(); } @@ -192,9 +168,23 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation finished = !hasNextWithTimer(); } return finished; + } + + @Override + public long calculateMaxPeekMemory() { + return cachedRawDataSize + maxReturnSize; + } - // return (retainedTsBlock == null) - // && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? cachedRawDataSize + : 0; } @Override @@ -231,7 +221,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation // return true if current time window is calc finished if (calculateAggregationResultForCurrentTimeRange()) { timeIterator.resetCurTimeRange(); - // curTimeRange = null; } } @@ -266,7 +255,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation return resultTsBlock; } - private void constructAlignedSeriesScanUtil() { + protected void constructAlignedSeriesScanUtil() { DeviceEntry deviceEntry; if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) { @@ -290,7 +279,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } /** Return true if we have the result of this timeRange. */ - @Override protected boolean calculateAggregationResultForCurrentTimeRange() { try { if (calcFromCachedData()) { @@ -353,14 +341,12 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } } - @Override protected void updateResultTsBlock() { appendAggregationResult(resultTsBlockBuilder, tableAggregators); // after appendAggregationResult invoked, aggregators must be cleared resetTableAggregators(); } - @Override protected boolean calcFromCachedData() { return calcUsingRawData(inputTsBlock); } @@ -420,13 +406,13 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); - Column[] valueColumns = new Column[aggArguments.size()]; - for (int idx : aggArguments) { + Column[] valueColumns = new Column[aggregatorInputChannels.size()]; + for (int idx : aggregatorInputChannels) { if (valueColumns[idx] != null) { continue; } valueColumns[idx] = - buildValueColumn(columnSchemas.get(idx).getColumnCategory(), inputRegion, idx); + buildValueColumn(aggColumnSchemas.get(idx).getColumnCategory(), inputRegion, idx); } TsBlock tsBlock = @@ -463,7 +449,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation (String) deviceEntries .get(currentDeviceIndex) - .getNthSegment(columnsIndexArray[columnIdx] + 1); + .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); return getIdOrAttrColumn( inputRegion.getTimeColumn().getPositionCount(), id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); @@ -472,10 +458,10 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation deviceEntries .get(currentDeviceIndex) .getAttributeColumnValues() - .get(columnsIndexArray[columnIdx]); + .get(aggColumnsIndexArray[columnIdx]); return getIdOrAttrColumn(inputRegion.getTimeColumn().getPositionCount(), attr); case MEASUREMENT: - return inputRegion.getColumn(columnsIndexArray[columnIdx]); + return inputRegion.getColumn(aggColumnsIndexArray[columnIdx]); default: throw new IllegalStateException("Unsupported column type: " + columnSchemaCategory); } @@ -507,10 +493,13 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation idx++; TsTableColumnCategory columnSchemaCategory = - columnSchemas.get(aggArguments.get(idx)).getColumnCategory(); + aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); statisticsArray[i] = buildStatistics( - columnSchemaCategory, timeStatistics, valueStatistics, aggArguments.get(idx)); + columnSchemaCategory, + timeStatistics, + valueStatistics, + aggregatorInputChannels.get(idx)); } aggregator.processStatistics(statisticsArray); @@ -531,7 +520,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation (String) deviceEntries .get(currentDeviceIndex) - .getNthSegment(columnsIndexArray[columnIdx] + 1); + .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); return getStatistics( timeStatistics, id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); case ATTRIBUTE: @@ -539,10 +528,10 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation deviceEntries .get(currentDeviceIndex) .getAttributeColumnValues() - .get(columnsIndexArray[columnIdx]); + .get(aggColumnsIndexArray[columnIdx]); return getStatistics(timeStatistics, attr); case MEASUREMENT: - return valueStatistics[columnsIndexArray[columnIdx]]; + return valueStatistics[aggColumnsIndexArray[columnIdx]]; default: throw new IllegalStateException("Unsupported column type: " + columnSchemaCategory); } @@ -562,7 +551,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) - @Override public boolean readAndCalcFromFile() throws IOException { // start stopwatch long start = System.nanoTime(); @@ -585,8 +573,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation if (timeIterator .getCurTimeRange() .contains(fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { + Statistics[] statisticsList = new Statistics[measurementCount]; + for (int i = 0; i < measurementCount; i++) { statisticsList[i] = seriesScanUtil.currentFileStatistics(i); } calcFromStatistics(fileTimeStatistics, statisticsList); @@ -632,8 +620,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation .getCurTimeRange() .contains(chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { // calc from chunkMetaData - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { + Statistics[] statisticsList = new Statistics[measurementCount]; + for (int i = 0; i < measurementCount; i++) { statisticsList[i] = seriesScanUtil.currentChunkStatistics(i); } calcFromStatistics(chunkTimeStatistics, statisticsList); @@ -681,8 +669,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation if (timeIterator .getCurTimeRange() .contains(pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { + Statistics[] statisticsList = new Statistics[measurementCount]; + for (int i = 0; i < measurementCount; i++) { statisticsList[i] = seriesScanUtil.currentPageStatistics(i); } calcFromStatistics(pageTimeStatistics, statisticsList); @@ -866,7 +854,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation this.queryDataSource = (QueryDataSource) dataSource; this.seriesScanUtil.initQueryDataSource(queryDataSource); this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); - this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 7dcdbe04c95..3873973ca85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -378,21 +378,18 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } } - SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate().isPresent() - ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) - : new SeriesScanOptions.Builder(); - scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); - scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); - scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); - scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); - - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter( - pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); - } + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + columnSchemaMap, + measurementColumnNames, + measurementColumnsIndexMap, + timeColumnName, + node.getTimePredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.getPushDownPredicate()); OperatorContext operatorContext = context @@ -421,7 +418,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution columnsIndexArray, node.getDeviceEntries(), node.getScanOrder(), - scanOptionsBuilder.build(), + seriesScanOptions, measurementColumnNames, allSensors, measurementSchemas, @@ -1748,74 +1745,71 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution public Operator visitAggregationTableScan( AggregationTableScanNode node, LocalExecutionPlanContext context) { + List<String> measurementColumnNames = new ArrayList<>(); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); + Map<String, Integer> measurementColumnsIndexMap = new HashMap<>(); + List<TableAggregator> aggregators = new ArrayList<>(node.getAggregations().size()); - Map<Symbol, Integer> columnLayout = new HashMap<>(node.getAggregations().size()); + List<Integer> aggregatorInputChannels = + new ArrayList<>( + (int) + node.getAggregations().values().stream() + .mapToLong(aggregation -> aggregation.getArguments().size()) + .sum()); + int aggDistinctArgumentCount = + (int) + node.getAggregations().values().stream() + .flatMap(aggregation -> aggregation.getArguments().stream()) + .map(Symbol::from) + .distinct() + .count(); + List<ColumnSchema> aggColumnSchemas = new ArrayList<>(aggDistinctArgumentCount); + Map<Symbol, Integer> aggColumnLayout = new HashMap<>(aggDistinctArgumentCount); + int[] aggColumnsIndexArray = new int[aggDistinctArgumentCount]; - int distinctArgumentCount = node.getAssignments().size(); - int aggregationsCount = node.getAggregations().size(); - List<Integer> aggColumnIndexes = new ArrayList<>(); + String timeColumnName = null; int channel = 0; - int idx = -1; int measurementColumnCount = 0; - Map<Symbol, Integer> idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap(); - Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments(); - List<ColumnSchema> columnSchemas = new ArrayList<>(aggregationsCount); - int[] columnsIndexArray = new int[distinctArgumentCount]; - List<String> measurementColumnNames = new ArrayList<>(); - Map<String, Integer> measurementColumnsIndexMap = new HashMap<>(); - String timeColumnName = null; - List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); - for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : node.getAggregations().entrySet()) { - AggregationNode.Aggregation aggregation = entry.getValue(); - - for (Expression argument : aggregation.getArguments()) { - idx++; + for (Expression argument : entry.getValue().getArguments()) { Symbol symbol = Symbol.from(argument); - ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol), symbol + " is null"); - switch (schema.getColumnCategory()) { - case ID: - case ATTRIBUTE: - if (!columnLayout.containsKey(symbol)) { - columnsIndexArray[channel] = - requireNonNull(idAndAttributeColumnsIndexMap.get(symbol), symbol + " is null"); - columnSchemas.add(schema); - } - break; - case MEASUREMENT: - if (!columnLayout.containsKey(symbol)) { - columnsIndexArray[channel] = measurementColumnCount; + ColumnSchema schema = + requireNonNull(node.getAssignments().get(symbol), symbol + " is null"); + if (!aggColumnLayout.containsKey(symbol)) { + switch (schema.getColumnCategory()) { + case ID: + case ATTRIBUTE: + aggColumnsIndexArray[channel] = + requireNonNull(node.getIdAndAttributeIndexMap().get(symbol), symbol + " is null"); + break; + case MEASUREMENT: + aggColumnsIndexArray[channel] = measurementColumnCount; measurementColumnCount++; measurementColumnNames.add(schema.getName()); measurementSchemas.add( new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); - columnSchemas.add(schema); measurementColumnsIndexMap.put(symbol.getName(), measurementColumnCount - 1); - } - break; - case TIME: - if (!columnLayout.containsKey(symbol)) { - columnsIndexArray[channel] = -1; - columnSchemas.add(schema); + break; + case TIME: + aggColumnsIndexArray[channel] = -1; timeColumnName = symbol.getName(); - } - break; - default: - throw new IllegalArgumentException( - "Unexpected column category: " + schema.getColumnCategory()); - } + break; + default: + throw new IllegalArgumentException( + "Unexpected column category: " + schema.getColumnCategory()); + } - if (!columnLayout.containsKey(symbol)) { - aggColumnIndexes.add(channel); - columnLayout.put(symbol, channel++); + aggColumnSchemas.add(schema); + aggregatorInputChannels.add(channel); + aggColumnLayout.put(symbol, channel++); } else { - aggColumnIndexes.add(columnLayout.get(symbol)); + aggregatorInputChannels.add(aggColumnLayout.get(symbol)); } } } for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { - if (!columnLayout.containsKey(entry.getKey()) + if (!aggColumnLayout.containsKey(entry.getKey()) && entry.getValue().getColumnCategory() == MEASUREMENT) { measurementColumnCount++; measurementColumnNames.add(entry.getValue().getName()); @@ -1835,7 +1829,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution for (Map.Entry<Symbol, AggregationNode.Aggregation> entry : node.getAggregations().entrySet()) { aggregators.add( buildAggregator( - columnLayout, + aggColumnLayout, entry.getKey(), entry.getValue(), node.getStep(), @@ -1853,9 +1847,9 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution for (int i = 0; i < node.getGroupingKeys().size(); i++) { Symbol groupingKey = node.getGroupingKeys().get(i); - if (idAndAttributeColumnsIndexMap.containsKey(groupingKey)) { - groupingKeySchemas.add(columnSchemaMap.get(groupingKey)); - groupingKeyIndex[i] = idAndAttributeColumnsIndexMap.get(groupingKey); + if (node.getIdAndAttributeIndexMap().containsKey(groupingKey)) { + groupingKeySchemas.add(node.getAssignments().get(groupingKey)); + groupingKeyIndex[i] = node.getIdAndAttributeIndexMap().get(groupingKey); } else { if (node.getProjection() != null && !node.getProjection().getMap().isEmpty() @@ -1896,62 +1890,81 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution context.getNextOperatorId(), node.getPlanNodeId(), TableAggregationTableScanOperator.class.getSimpleName()); - SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate().isPresent() - ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) - : new SeriesScanOptions.Builder(); - scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); - scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); - scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); - scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter( - pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); - } + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + node.getAssignments(), + measurementColumnNames, + measurementColumnsIndexMap, + timeColumnName, + node.getTimePredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.getPushDownPredicate()); Set<String> allSensors = new HashSet<>(measurementColumnNames); - // for time column - allSensors.add(""); + allSensors.add(""); // for time column + + for (int i = 0; i < node.getDeviceEntries().size(); i++) { + AlignedFullPath alignedPath = + constructAlignedPath( + node.getDeviceEntries().get(i), + measurementColumnNames, + measurementSchemas, + allSensors); + ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); + } + + context.getDriverContext().setInputDriver(true); + TableAggregationTableScanOperator aggTableScanOperator = new TableAggregationTableScanOperator( node.getPlanNodeId(), operatorContext, - columnSchemas, - columnsIndexArray, + aggColumnSchemas, + aggColumnsIndexArray, node.getDeviceEntries(), - scanAscending ? Ordering.ASC : Ordering.DESC, - scanOptionsBuilder.build(), + seriesScanOptions, measurementColumnNames, allSensors, measurementSchemas, - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), - measurementColumnCount, aggregators, groupingKeySchemas, groupingKeyIndex, timeRangeIterator, scanAscending, - calculateMaxAggregationResultSize(), canUseStatistic, - aggColumnIndexes); - + aggregatorInputChannels); ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); + return aggTableScanOperator; + } - for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { - AlignedFullPath alignedPath = - constructAlignedPath( - node.getDeviceEntries().get(i), - measurementColumnNames, - measurementSchemas, - allSensors); - ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); + private SeriesScanOptions buildSeriesScanOptions( + LocalExecutionPlanContext context, + Map<Symbol, ColumnSchema> columnSchemaMap, + List<String> measurementColumnNames, + Map<String, Integer> measurementColumnsIndexMap, + String timeColumnName, + Optional<Expression> timePredicate, + long pushDownLimit, + long pushDownOffset, + boolean pushLimitToEachDevice, + Expression pushDownPredicate) { + SeriesScanOptions.Builder scanOptionsBuilder = + timePredicate + .map(expression -> getSeriesScanOptionsBuilder(context, expression)) + .orElseGet(SeriesScanOptions.Builder::new); + scanOptionsBuilder.withPushDownLimit(pushDownLimit); + scanOptionsBuilder.withPushDownOffset(pushDownOffset); + scanOptionsBuilder.withPushLimitToEachDevice(pushLimitToEachDevice); + scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); + if (pushDownPredicate != null) { + scanOptionsBuilder.withPushDownFilter( + convertPredicateToFilter( + pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); } - - context.getDriverContext().setInputDriver(true); - - return aggTableScanOperator; + return scanOptionsBuilder.build(); } @Override @@ -2035,30 +2048,4 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } return new boolean[] {canUseStatistic, isAscending}; } - - public static long calculateMaxAggregationResultSize( - // List<? extends AggregationDescriptor> aggregationDescriptors, - // ITimeRangeIterator timeRangeIterator - ) { - // TODO perfect max aggregation result size logic - return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); - - // long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION; - // for (AggregationDescriptor descriptor : aggregationDescriptors) { - // List<TSDataType> outPutDataTypes = - // descriptor.getOutputColumnNames().stream() - // .map(typeProvider::getTableModelType) - // .collect(Collectors.toList()); - // for (TSDataType tsDataType : outPutDataTypes) { - // timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType); - // } - // } - // - // return Math.min( - // TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(), - // Math.min( - // TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), - // timeRangeIterator.getTotalIntervalNum()) - // * timeValueColumnsSizePerLine); - } }
