This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TreeToTableView in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6829a689d084c0eefa818e7c5c8a0f0183d65429 Merge: af92ee2585d a877b82b819 Author: JackieTien97 <[email protected]> AuthorDate: Fri Dec 20 09:22:28 2024 +0800 resolve conflicts .../it/env/cluster/config/MppCommonConfig.java | 6 + .../env/cluster/config/MppSharedCommonConfig.java | 7 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 4 + .../iotdb/db/it/IoTDBSetConfigurationIT.java | 52 +++ .../IoTDBAlignByDeviceWithTemplateIT.java | 28 +- .../db/it/query/IoTDBEncryptionValueQueryIT.java | 2 +- .../iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java | 12 +- .../pipe/it/manual/IoTDBPipeMetaHistoricalIT.java | 12 +- .../it/tablemodel/IoTDBTablePatternFormatIT.java | 2 +- .../it/query/old/IoTDBSimpleQueryTableIT.java | 20 +- .../scalar/IoTDBFormatFunctionTableIT.java | 190 +++++++++++ .../relational/it/schema/IoTDBDatabaseIT.java | 100 ++++++ .../it/session}/IoTDBSessionRelationalIT.java | 372 ++++++++++++++++++++- .../it/local/IoTDBSubscriptionBasicIT.java | 69 ++++ .../param/IoTDBTestParamPullConsumerIT.java | 8 +- .../param/IoTDBTestParamPushConsumerIT.java | 8 +- .../regression/param/IoTDBTestParamTopicIT.java | 11 +- .../iotdb/rpc/subscription/config/TopicConfig.java | 9 +- .../rpc/subscription/config/TopicConstant.java | 8 +- .../session/subscription/SubscriptionSession.java | 9 + .../consumer/SubscriptionConsumer.java | 25 +- .../consumer/SubscriptionPullConsumer.java | 4 +- .../session/subscription/util/IdentifierUtils.java | 10 +- .../read/ainode/GetAINodeConfigurationPlan.java | 15 - .../request/read/model/GetModelInfoPlan.java | 16 - .../request/read/model/ShowModelPlan.java | 20 -- .../manager/partition/PartitionMetrics.java | 3 +- .../PipeConfigPhysicalPlanPatternParseVisitor.java | 12 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 14 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 43 ++- .../exception/sql/StatementAnalyzeException.java | 4 + .../common/tsfile/PipeTsFileInsertionEvent.java | 17 +- .../dataregion/IoTDBDataRegionExtractor.java | 6 +- .../protocol/thrift/IoTDBDataNodeReceiver.java | 3 + .../resource/memory/InsertNodeMemoryEstimator.java | 3 + .../db/pipe/resource/memory/PipeMemoryBlock.java | 14 +- .../impl/DataNodeInternalRPCServiceImpl.java | 7 +- .../execution/executor/RegionWriteExecutor.java | 7 +- .../operator/source/ShowQueriesOperator.java | 23 +- .../AbstractAggregationTableScanOperator.java | 171 +++++----- .../InformationSchemaContentSupplierFactory.java | 33 +- .../TableAggregationTableScanOperator.java | 23 +- ...eeAlignedDeviceViewAggregationScanOperator.java | 21 +- .../relational/ColumnTransformerBuilder.java | 9 + .../db/queryengine/plan/analyze/AnalyzeUtils.java | 10 +- .../queryengine/plan/analyze/AnalyzeVisitor.java | 11 +- .../plan/analyze/TemplatedAggregationAnalyze.java | 6 +- .../queryengine/plan/analyze/TemplatedAnalyze.java | 7 +- .../analyze/cache/partition/PartitionCache.java | 2 +- .../plan/analyze/schema/SchemaValidator.java | 6 +- .../plan/execution/IQueryExecution.java | 3 +- .../queryengine/plan/execution/QueryExecution.java | 9 +- .../plan/execution/config/ConfigExecution.java | 5 +- .../execution/config/TableConfigTaskVisitor.java | 37 +- .../config/executor/ClusterConfigTaskExecutor.java | 23 +- .../config/executor/IConfigTaskExecutor.java | 4 +- .../config/metadata/relational/ShowDBTask.java | 22 +- .../db/queryengine/plan/planner/IPlanner.java | 4 +- .../plan/planner/TableOperatorGenerator.java | 259 +++++++------- .../queryengine/plan/planner/TreeModelPlanner.java | 5 +- .../plan/node/write/RelationalDeleteDataNode.java | 15 +- .../plan/relational/analyzer/Analysis.java | 4 +- .../relational/analyzer/StatementAnalyzer.java | 3 + .../relational/metadata/TableMetadataImpl.java | 53 +-- .../plan/relational/metadata/TableSchema.java | 27 +- .../fetcher/TableHeaderSchemaValidator.java | 3 + .../plan/relational/planner/RelationPlanner.java | 4 +- .../plan/relational/planner/TableModelPlanner.java | 5 +- .../DataNodeLocationSupplierFactory.java | 12 +- .../plan/relational/security/AccessControl.java | 2 +- .../relational/sql/ast/AbstractTraverseDevice.java | 1 + .../relational/sql/ast/WrappedInsertStatement.java | 20 +- .../plan/relational/sql/parser/AstBuilder.java | 4 +- .../plan/relational/sql/rewrite/ShowRewrite.java | 4 +- .../plan/statement/crud/InsertBaseStatement.java | 2 +- .../plan/statement/crud/InsertRowStatement.java | 4 +- .../plan/statement/crud/InsertTabletStatement.java | 5 +- .../statement/metadata/ShowDatabaseStatement.java | 6 +- .../unary/scalar/FormatColumnTransformer.java | 119 +++++++ .../schemaengine/schemaregion/utils/MetaUtils.java | 9 +- .../db/schemaengine/table/DataNodeTableCache.java | 10 +- .../schemaengine/table/InformationSchemaUtils.java | 185 ++++++++++ .../db/storageengine/dataregion/DataRegion.java | 5 - .../compaction/repair/RepairDataFileScanUtil.java | 2 +- .../compaction/schedule/CompactionTaskManager.java | 11 +- .../dataregion/memtable/AbstractMemTable.java | 4 +- .../memtable/AlignedWritableMemChunk.java | 6 +- .../memtable/AlignedWritableMemChunkGroup.java | 5 + .../memtable/IWritableMemChunkGroup.java | 2 + .../dataregion/memtable/WritableMemChunkGroup.java | 5 + .../response/SubscriptionEventTsFileResponse.java | 2 +- .../org/apache/iotdb/db/utils/CommonUtils.java | 86 ----- .../org/apache/iotdb/db/utils/DateTimeUtils.java | 5 + .../db/utils/datastructure/AlignedTVList.java | 26 +- .../apache/iotdb/db/metadata/MetaUtilsTest.java | 9 +- .../operator/MergeTreeSortOperatorTest.java | 4 +- .../plan/relational/analyzer/ShowQueriesTest.java | 20 +- .../plan/relational/analyzer/TestMatadata.java | 10 +- .../planner/assertions/PlanMatchPattern.java | 12 + .../plan/statement/InsertStatementTest.java | 2 +- .../conf/iotdb-system.properties.template | 7 +- .../config/constant/PipeExtractorConstant.java | 4 + .../pipe/datastructure/pattern/TablePattern.java | 16 +- .../schema/column/ColumnHeaderConstant.java | 10 +- .../commons/schema/table/InformationSchema.java | 123 +++++++ .../schema/table/InformationSchemaTable.java | 96 ------ .../schema/table/column/IdColumnSchema.java | 9 +- .../table/column/MeasurementColumnSchema.java | 48 +-- .../relational/TableBuiltinScalarFunction.java | 1 + .../iotdb/library/dprofile/UDAFIntegral.java | 41 +-- .../iotdb/library/drepair/UDTFTimestampRepair.java | 47 ++- .../library/drepair/util/TimestampInterval.java | 22 +- .../library/drepair/util/TimestampRepair.java | 2 +- 113 files changed, 2119 insertions(+), 865 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggregationTableScanOperator.java index ae12f2f8751,8ef9ddbbf04..87db5a3589c --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggregationTableScanOperator.java @@@ -67,13 -66,13 +66,13 @@@ import java.util.stream.Collectors import static java.lang.String.format; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING; ++import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; - public abstract class AbstractAggregationTableScanOperator - extends AbstractSeriesAggregationScanOperator { -public class TableAggregationTableScanOperator extends AbstractDataSourceOperator { ++public abstract class AbstractAggregationTableScanOperator extends AbstractDataSourceOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); @@@ -117,13 -113,12 +113,12 @@@ private boolean allAggregatorsHasFinalResult = false; - public TableAggregationTableScanOperator( + public AbstractAggregationTableScanOperator( PlanNodeId sourceId, OperatorContext context, - List<ColumnSchema> columnSchemas, - int[] columnsIndexArray, + List<ColumnSchema> aggColumnSchemas, + int[] aggColumnsIndexArray, List<DeviceEntry> deviceEntries, - Ordering scanOrder, SeriesScanOptions seriesScanOptions, List<String> measurementColumnNames, Set<String> allSensors, @@@ -452,9 -444,12 +444,9 @@@ case TIME: return inputRegion.getTimeColumn(); case ID: - // TODO avoid create deviceStatics multi times; count, sum can use time statistics String id = - (String) - deviceEntries - .get(currentDeviceIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); + getNthIdColumnValue( - deviceEntries.get(currentDeviceIndex), columnsIndexArray[columnIdx]); ++ deviceEntries.get(currentDeviceIndex), aggColumnsIndexArray[columnIdx]); return getIdOrAttrColumn( inputRegion.getTimeColumn().getPositionCount(), id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); @@@ -517,9 -515,12 +512,9 @@@ case TIME: return timeStatistics; case ID: - // TODO avoid create deviceStatics multi times; count, sum can use time statistics String id = - (String) - deviceEntries - .get(currentDeviceIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); + getNthIdColumnValue( - deviceEntries.get(currentDeviceIndex), columnsIndexArray[columnIdx]); ++ deviceEntries.get(currentDeviceIndex), aggColumnsIndexArray[columnIdx]); return getStatistics( timeStatistics, id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); case ATTRIBUTE: diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index e62c4a507eb,8ef9ddbbf04..d0d5f03ce39 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@@ -7,7 -7,7 +7,7 @@@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * -- * http://www.apache.org/licenses/LICENSE-2.0 ++ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@@ -26,14 -33,85 +26,13 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; --import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; -import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; -import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; -import org.apache.tsfile.block.column.Column; -import org.apache.tsfile.block.column.ColumnBuilder; -import org.apache.tsfile.common.conf.TSFileConfig; -import org.apache.tsfile.common.conf.TSFileDescriptor; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.metadata.StringArrayDeviceID; -import org.apache.tsfile.file.metadata.statistics.Statistics; -import org.apache.tsfile.file.metadata.statistics.StringStatistics; -import org.apache.tsfile.read.common.TimeRange; -import org.apache.tsfile.read.common.block.TsBlock; -import org.apache.tsfile.read.common.block.TsBlockBuilder; -import org.apache.tsfile.read.common.block.column.BinaryColumn; -import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; -import org.apache.tsfile.utils.Binary; -import org.apache.tsfile.utils.Pair; -import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.schema.IMeasurementSchema; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import static java.lang.String.format; -import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; -import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; -import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; - -public class TableAggregationTableScanOperator extends AbstractDataSourceOperator { - - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); - - private boolean finished = false; - private TsBlock inputTsBlock; - - private final List<TableAggregator> tableAggregators; - private final List<ColumnSchema> groupingKeySchemas; - private final int[] groupingKeyIndex; - - private final List<DeviceEntry> deviceEntries; - private final int deviceCount; - private int currentDeviceIndex; - private final List<String> measurementColumnNames; - private final Set<String> allSensors; - private final List<IMeasurementSchema> measurementSchemas; - private final List<TSDataType> measurementColumnTSDataTypes; - private final int measurementCount; - - private final List<ColumnSchema> aggColumnSchemas; - private final int[] aggColumnsIndexArray; - - private final SeriesScanOptions seriesScanOptions; - private final boolean ascending; - private final Ordering scanOrder; - // Some special data types(like BLOB) cannot use statistics - protected final boolean canUseStatistics; - private final long cachedRawDataSize; - - // stores all inputChannels of tableAggregators, - // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels should be [0, 1, 0] - private final List<Integer> aggregatorInputChannels; - - private QueryDataSource queryDataSource; - - private final ITableTimeRangeIterator timeIterator; - - private boolean allAggregatorsHasFinalResult = false; +public class TableAggregationTableScanOperator extends AbstractAggregationTableScanOperator { public TableAggregationTableScanOperator( PlanNodeId sourceId, @@@ -53,30 -128,732 +49,25 @@@ int[] groupingKeyIndex, ITableTimeRangeIterator tableTimeRangeIterator, boolean ascending, - long maxReturnSize, boolean canUseStatistics, - List<Integer> aggArguments) { + List<Integer> aggregatorInputChannels) { - - this.sourceId = sourceId; - this.operatorContext = context; - this.canUseStatistics = canUseStatistics; - this.tableAggregators = tableAggregators; - this.groupingKeySchemas = groupingKeySchemas; - this.groupingKeyIndex = groupingKeyIndex; - this.aggColumnSchemas = aggColumnSchemas; - this.aggColumnsIndexArray = aggColumnsIndexArray; - this.deviceEntries = deviceEntries; - this.deviceCount = deviceEntries.size(); - this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); - this.ascending = ascending; - this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC; - this.seriesScanOptions = seriesScanOptions; - this.measurementColumnNames = measurementColumnNames; - this.measurementCount = measurementColumnNames.size(); - this.cachedRawDataSize = - (1L + this.measurementCount) - * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); - this.allSensors = allSensors; - this.measurementSchemas = measurementSchemas; - this.measurementColumnTSDataTypes = - measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); - this.currentDeviceIndex = 0; - this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); - this.aggregatorInputChannels = aggregatorInputChannels; - this.timeIterator = tableTimeRangeIterator; - - constructAlignedSeriesScanUtil(); - } - - @Override - public boolean isFinished() throws Exception { - if (!finished) { - finished = !hasNextWithTimer(); - } - return finished; - } - - @Override - public long calculateMaxPeekMemory() { - return cachedRawDataSize + maxReturnSize; - } - - @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR - ? cachedRawDataSize - : 0; - } - - @Override - public boolean hasNext() throws Exception { - return timeIterator.hasCachedTimeRange() - || timeIterator.hasNextTimeRange() - || !resultTsBlockBuilder.isEmpty(); - } - - @Override - public TsBlock next() throws Exception { - - // optimize for sql: select count(*) from (select count(s1), sum(s1) from table) - if (tableAggregators.isEmpty() - && timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR - && resultTsBlockBuilder.getValueColumnBuilders().length == 0) { - resultTsBlockBuilder.reset(); - currentDeviceIndex = deviceCount; - timeIterator.setFinished(); - Column[] valueColumns = new Column[0]; - return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1), valueColumns); - } - - // start stopwatch, reset leftRuntimeOfOneNextCall - long start = System.nanoTime(); - leftRuntimeOfOneNextCall = 1000 * operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); - long maxRuntime = leftRuntimeOfOneNextCall; - - while (System.nanoTime() - start < maxRuntime - && (timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange()) - && !resultTsBlockBuilder.isFull()) { - - // calculate aggregation result on current time window - // return true if current time window is calc finished - if (calculateAggregationResultForCurrentTimeRange()) { - timeIterator.resetCurTimeRange(); - } - } - - if (resultTsBlockBuilder.getPositionCount() > 0) { - return buildResultTsBlock(); - } else { - return null; - } - } - - private TsBlock buildResultTsBlock() { - int declaredPositions = resultTsBlockBuilder.getPositionCount(); - ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - Column[] valueColumns = new Column[valueColumnBuilders.length]; - for (int i = 0; i < valueColumns.length; i++) { - valueColumns[i] = valueColumnBuilders[i].build(); - if (valueColumns[i].getPositionCount() != declaredPositions) { - throw new IllegalStateException( - format( - "Declared positions (%s) does not match column %s's number of entries (%s)", - declaredPositions, i, valueColumns[i].getPositionCount())); - } - } - - TsBlock resultTsBlock = - new TsBlock( - resultTsBlockBuilder.getPositionCount(), - new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), - valueColumns); - resultTsBlockBuilder.reset(); - return resultTsBlock; - } - - protected void constructAlignedSeriesScanUtil() { - DeviceEntry deviceEntry; - - if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) { - // for device which is not exist - deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList()); - } else { - deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); - } - - AlignedFullPath alignedPath = - constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas, allSensors); - - this.seriesScanUtil = - new AlignedSeriesScanUtil( - alignedPath, - scanOrder, - seriesScanOptions, - operatorContext.getInstanceContext(), - true, - measurementColumnTSDataTypes); - } - - /** Return true if we have the result of this timeRange. */ - protected boolean calculateAggregationResultForCurrentTimeRange() { - try { - if (calcFromCachedData()) { - updateResultTsBlock(); - checkIfAllAggregatorHasFinalResult(); - return true; - } - - if (readAndCalcFromPage()) { - updateResultTsBlock(); - checkIfAllAggregatorHasFinalResult(); - return true; - } - - // only when all the page data has been consumed, we need to read the chunk data - if (!seriesScanUtil.hasNextPage() && readAndCalcFromChunk()) { - updateResultTsBlock(); - checkIfAllAggregatorHasFinalResult(); - return true; - } - - // only when all the page and chunk data has been consumed, we need to read the file data - if (!seriesScanUtil.hasNextPage() - && !seriesScanUtil.hasNextChunk() - && readAndCalcFromFile()) { - updateResultTsBlock(); - checkIfAllAggregatorHasFinalResult(); - return true; - } - - // If the TimeRange is (Long.MIN_VALUE, Long.MAX_VALUE), for Aggregators like countAggregator, - // we have to consume all the data before we finish the aggregation calculation. - if (seriesScanUtil.hasNextPage() - || seriesScanUtil.hasNextChunk() - || seriesScanUtil.hasNextFile()) { - return false; - } else { - // all data of current device has been consumed - updateResultTsBlock(); - timeIterator.resetCurTimeRange(); - nextDevice(); - } - - if (currentDeviceIndex < deviceCount) { - // construct AlignedSeriesScanUtil for next device - constructAlignedSeriesScanUtil(); - queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); - } - - if (currentDeviceIndex >= deviceCount) { - // all devices have been consumed - timeIterator.setFinished(); - return true; - } else { - return false; - } - } catch (IOException e) { - throw new RuntimeException("Error while scanning the file", e); - } - } - - protected void updateResultTsBlock() { - appendAggregationResult(resultTsBlockBuilder, tableAggregators); - // after appendAggregationResult invoked, aggregators must be cleared - resetTableAggregators(); - } - - protected boolean calcFromCachedData() { - return calcUsingRawData(inputTsBlock); - } - - protected boolean calcUsingRawData(TsBlock tsBlock) { - Pair<Boolean, TsBlock> calcResult = calculateAggregationFromRawData(tsBlock, ascending); - inputTsBlock = calcResult.getRight(); - return calcResult.getLeft(); - } - - /** - * Calculate aggregation value on the time range from the tsBlock containing raw data. - * - * @return left - whether the aggregation calculation of the current time range has done; right - - * remaining tsBlock - */ - public Pair<Boolean, TsBlock> calculateAggregationFromRawData( - TsBlock inputTsBlock, boolean ascending) { - if (inputTsBlock == null || inputTsBlock.isEmpty()) { - return new Pair<>(false, inputTsBlock); - } - - updateCurTimeRange(inputTsBlock.getStartTime()); - - TimeRange curTimeRange = timeIterator.getCurTimeRange(); - // check if the tsBlock does not contain points in current interval - if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) { - // skip points that cannot be calculated - if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin()) - || (!ascending && inputTsBlock.getStartTime() > curTimeRange.getMax())) { - inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, ascending); - } - - inputTsBlock = process(inputTsBlock, curTimeRange); - } - - // judge whether the calculation finished - boolean isTsBlockOutOfBound = - inputTsBlock != null - && (ascending - ? inputTsBlock.getEndTime() > curTimeRange.getMax() - : inputTsBlock.getEndTime() < curTimeRange.getMin()); - return new Pair<>( - isAllAggregatorsHasFinalResult(tableAggregators) || isTsBlockOutOfBound, inputTsBlock); - } - - private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) { - // Get the row which need to be processed by aggregator - IWindow curWindow = new TimeWindow(curTimeRange); - Column timeColumn = inputTsBlock.getTimeColumn(); - int lastIndexToProcess = 0; - for (int i = 0; i < inputTsBlock.getPositionCount(); i++) { - if (!curWindow.satisfy(timeColumn, i)) { - break; - } - lastIndexToProcess = i; - } - - TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1); - Column[] valueColumns = new Column[aggregatorInputChannels.size()]; - for (int idx : aggregatorInputChannels) { - if (valueColumns[idx] != null) { - continue; - } - valueColumns[idx] = - buildValueColumn(aggColumnSchemas.get(idx).getColumnCategory(), inputRegion, idx); - } - - TsBlock tsBlock = - new TsBlock( - inputRegion.getPositionCount(), - new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, inputRegion.getPositionCount()), - valueColumns); - - for (TableAggregator aggregator : tableAggregators) { - // current agg method has been calculated - if (aggregator.hasFinalResult()) { - continue; - } - - aggregator.processBlock(tsBlock); - } - - int lastReadRowIndex = lastIndexToProcess + 1; - if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { - return null; - } else { - return inputTsBlock.subTsBlock(lastReadRowIndex); - } - } - - private Column buildValueColumn( - TsTableColumnCategory columnSchemaCategory, TsBlock inputRegion, int columnIdx) { - switch (columnSchemaCategory) { - case TIME: - return inputRegion.getTimeColumn(); - case ID: - // TODO avoid create deviceStatics multi times; count, sum can use time statistics - String id = - (String) - deviceEntries - .get(currentDeviceIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); - return getIdOrAttrColumn( - inputRegion.getTimeColumn().getPositionCount(), - id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); - case ATTRIBUTE: - Binary attr = - deviceEntries - .get(currentDeviceIndex) - .getAttributeColumnValues() - .get(aggColumnsIndexArray[columnIdx]); - return getIdOrAttrColumn(inputRegion.getTimeColumn().getPositionCount(), attr); - case MEASUREMENT: - return inputRegion.getColumn(aggColumnsIndexArray[columnIdx]); - default: - throw new IllegalStateException("Unsupported column type: " + columnSchemaCategory); - } - } - - private Column getIdOrAttrColumn(int positionCount, Binary columnName) { - if (columnName == null) { - return new RunLengthEncodedColumn( - new BinaryColumn(1, Optional.of(new boolean[] {true}), new Binary[] {null}), - positionCount); - } else { - return new RunLengthEncodedColumn( - new BinaryColumn(1, Optional.of(new boolean[] {false}), new Binary[] {columnName}), - positionCount); - } - } - - protected void calcFromStatistics(Statistics timeStatistics, Statistics[] valueStatistics) { - int idx = -1; - - for (TableAggregator aggregator : tableAggregators) { - if (aggregator.hasFinalResult()) { - idx += aggregator.getChannelCount(); - continue; - } - - Statistics[] statisticsArray = new Statistics[aggregator.getChannelCount()]; - for (int i = 0; i < aggregator.getChannelCount(); i++) { - idx++; - - TsTableColumnCategory columnSchemaCategory = - aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory(); - statisticsArray[i] = - buildStatistics( - columnSchemaCategory, - timeStatistics, - valueStatistics, - aggregatorInputChannels.get(idx)); - } - - aggregator.processStatistics(statisticsArray); - } - } - - private Statistics buildStatistics( - TsTableColumnCategory columnSchemaCategory, - Statistics timeStatistics, - Statistics[] valueStatistics, - int columnIdx) { - switch (columnSchemaCategory) { - case TIME: - return timeStatistics; - case ID: - // TODO avoid create deviceStatics multi times; count, sum can use time statistics - String id = - (String) - deviceEntries - .get(currentDeviceIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); - return getStatistics( - timeStatistics, id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); - case ATTRIBUTE: - Binary attr = - deviceEntries - .get(currentDeviceIndex) - .getAttributeColumnValues() - .get(aggColumnsIndexArray[columnIdx]); - return getStatistics(timeStatistics, attr); - case MEASUREMENT: - return valueStatistics[aggColumnsIndexArray[columnIdx]]; - default: - throw new IllegalStateException("Unsupported column type: " + columnSchemaCategory); - } - } - - private Statistics getStatistics(Statistics timeStatistics, Binary columnName) { - if (columnName == null) { - return null; - } else { - StringStatistics stringStatics = new StringStatistics(); - stringStatics.setCount((int) timeStatistics.getCount()); - stringStatics.setStartTime(timeStatistics.getStartTime()); - stringStatics.setEndTime(timeStatistics.getEndTime()); - stringStatics.initializeStats(columnName, columnName, columnName, columnName); - return stringStatics; - } - } - - @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) - public boolean readAndCalcFromFile() throws IOException { - // start stopwatch - long start = System.nanoTime(); - while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextFile()) { - if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) { - Statistics fileTimeStatistics = seriesScanUtil.currentFileTimeStatistics(); - - updateCurTimeRange(fileTimeStatistics.getStartTime()); - - if (fileTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { - if (ascending) { - return true; - } else { - seriesScanUtil.skipCurrentFile(); - continue; - } - } - - // calc from fileMetaData - if (timeIterator - .getCurTimeRange() - .contains(fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[measurementCount]; - for (int i = 0; i < measurementCount; i++) { - statisticsList[i] = seriesScanUtil.currentFileStatistics(i); - } - calcFromStatistics(fileTimeStatistics, statisticsList); - seriesScanUtil.skipCurrentFile(); - if (isAllAggregatorsHasFinalResult(tableAggregators)) { - return true; - } else { - continue; - } - } - } - - // read chunk - if (readAndCalcFromChunk()) { - return true; - } - } - - return false; - } - - @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) - protected boolean readAndCalcFromChunk() throws IOException { - // start stopwatch - long start = System.nanoTime(); - while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextChunk()) { - if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) { - Statistics chunkTimeStatistics = seriesScanUtil.currentChunkTimeStatistics(); - - updateCurTimeRange(chunkTimeStatistics.getStartTime()); - - if (chunkTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { - if (ascending) { - return true; - } else { - seriesScanUtil.skipCurrentChunk(); - continue; - } - } - - // calc from chunkMetaData - if (timeIterator - .getCurTimeRange() - .contains(chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { - // calc from chunkMetaData - Statistics[] statisticsList = new Statistics[measurementCount]; - for (int i = 0; i < measurementCount; i++) { - statisticsList[i] = seriesScanUtil.currentChunkStatistics(i); - } - calcFromStatistics(chunkTimeStatistics, statisticsList); - seriesScanUtil.skipCurrentChunk(); - if (isAllAggregatorsHasFinalResult(tableAggregators)) { - return true; - } else { - continue; - } - } - } - - // read page - if (readAndCalcFromPage()) { - return true; - } - } - return false; - } - - long leftRuntimeOfOneNextCall = Long.MAX_VALUE; - - @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) - protected boolean readAndCalcFromPage() throws IOException { - long start = System.nanoTime(); - try { - while (System.nanoTime() - start < leftRuntimeOfOneNextCall && seriesScanUtil.hasNextPage()) { - if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) { - Statistics pageTimeStatistics = seriesScanUtil.currentPageTimeStatistics(); - - updateCurTimeRange(pageTimeStatistics.getStartTime()); - - // There is no more eligible points in current time range - // TODO(beyyes) will not appear in table model? - if (pageTimeStatistics.getStartTime() > timeIterator.getCurTimeRange().getMax()) { - if (ascending) { - return true; - } else { - seriesScanUtil.skipCurrentPage(); - continue; - } - } - - // can use pageHeader - if (timeIterator - .getCurTimeRange() - .contains(pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[measurementCount]; - for (int i = 0; i < measurementCount; i++) { - statisticsList[i] = seriesScanUtil.currentPageStatistics(i); - } - calcFromStatistics(pageTimeStatistics, statisticsList); - seriesScanUtil.skipCurrentPage(); - if (isAllAggregatorsHasFinalResult(tableAggregators)) { - return true; - } else { - continue; - } - } - } - - // calc from page data - TsBlock originalTsBlock = seriesScanUtil.nextPage(); - if (originalTsBlock == null) { - continue; - } - - // calc from raw data - if (calcUsingRawData(originalTsBlock)) { - return true; - } - } - - return false; - } finally { - leftRuntimeOfOneNextCall -= (System.nanoTime() - start); - } - } - - private void updateCurTimeRange(long startTime) { - if (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { - timeIterator.updateCurTimeRange(startTime); - return; - } - - if (!timeIterator.hasCachedTimeRange()) { - timeIterator.updateCurTimeRange(startTime); - } else if (timeIterator.canFinishCurrentTimeRange(startTime)) { - updateResultTsBlock(); - timeIterator.resetCurTimeRange(); - timeIterator.updateCurTimeRange(startTime); - resetTableAggregators(); - } - } - - /** Append a row of aggregation results to the result tsBlock. */ - public void appendAggregationResult( - TsBlockBuilder tsBlockBuilder, List<? extends TableAggregator> aggregators) { - - // no data in current time range, just output empty - if (!timeIterator.hasCachedTimeRange()) { - return; - } - - ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); - - int groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); - int dateBinSize = - timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR - ? 1 - : 0; - - if (groupingKeyIndex != null) { - for (int i = 0; i < groupKeySize; i++) { - if (TsTableColumnCategory.ID == groupingKeySchemas.get(i).getColumnCategory()) { - String id = - (String) deviceEntries.get(currentDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1); - if (id == null) { - columnBuilders[i].appendNull(); - } else { - columnBuilders[i].writeBinary(new Binary(id, TSFileConfig.STRING_CHARSET)); - } - } else { - Binary attribute = - deviceEntries - .get(currentDeviceIndex) - .getAttributeColumnValues() - .get(groupingKeyIndex[i]); - if (attribute == null) { - columnBuilders[i].appendNull(); - } else { - columnBuilders[i].writeBinary(attribute); - } - } - } - } - - if (dateBinSize > 0) { - columnBuilders[groupKeySize].writeLong(timeIterator.getCurTimeRange().getMin()); - } - - for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(i).evaluate(columnBuilders[groupKeySize + dateBinSize + i]); - } - - tsBlockBuilder.declarePosition(); - } - - public boolean isAllAggregatorsHasFinalResult(List<TableAggregator> aggregators) { - // In groupByDateBin, we need read real data to calc next time range - if (timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR) { - return false; - } - - // no aggregation function, just output ids or attributes - if (aggregators.isEmpty()) { - return false; - } - - for (TableAggregator aggregator : aggregators) { - if (!aggregator.hasFinalResult()) { - return false; - } - } - - this.allAggregatorsHasFinalResult = true; - return true; - } - - private void checkIfAllAggregatorHasFinalResult() { - if (allAggregatorsHasFinalResult - && timeIterator.getType() - == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { - nextDevice(); - inputTsBlock = null; - - if (currentDeviceIndex < deviceCount) { - // construct AlignedSeriesScanUtil for next device - constructAlignedSeriesScanUtil(); - queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); - } - - if (currentDeviceIndex >= deviceCount) { - // all devices have been consumed - timeIterator.setFinished(); - } - - allAggregatorsHasFinalResult = false; - } - } - - private void nextDevice() { - currentDeviceIndex++; - this.operatorContext.recordSpecifiedInfo( - CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); - } - - private void resetTableAggregators() { - tableAggregators.forEach(TableAggregator::reset); - } - - @Override - public List<TSDataType> getResultDataTypes() { - int groupingKeySize = groupingKeySchemas != null ? groupingKeySchemas.size() : 0; - int dateBinSize = - timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR - ? 1 - : 0; - List<TSDataType> resultDataTypes = - new ArrayList<>(groupingKeySize + dateBinSize + tableAggregators.size()); - - if (groupingKeySchemas != null) { - for (int i = 0; i < groupingKeySchemas.size(); i++) { - resultDataTypes.add(TSDataType.STRING); - } - } - if (dateBinSize > 0) { - resultDataTypes.add(TSDataType.TIMESTAMP); - } - for (TableAggregator aggregator : tableAggregators) { - resultDataTypes.add(aggregator.getType()); - } - - return resultDataTypes; - } - - @Override - public void initQueryDataSource(IQueryDataSource dataSource) { - this.queryDataSource = (QueryDataSource) dataSource; - this.seriesScanUtil.initQueryDataSource(queryDataSource); - this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + super( + sourceId, + context, - columnSchemas, - columnsIndexArray, ++ aggColumnSchemas, ++ aggColumnsIndexArray, + deviceEntries, - scanOrder, + seriesScanOptions, + measurementColumnNames, + allSensors, + measurementSchemas, - maxTsBlockLineNum, - measurementCount, + tableAggregators, + groupingKeySchemas, + groupingKeyIndex, + tableTimeRangeIterator, + ascending, - maxReturnSize, + canUseStatistics, - aggArguments); ++ aggregatorInputChannels); } @Override diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java index 605b09277d1,00000000000..3efdf7638b0 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java @@@ -1,104 -1,0 +1,95 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; - import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.util.List; +import java.util.Set; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator.getNthIdColumnValueForTree; + +public class TreeAlignedDeviceViewAggregationScanOperator + extends AbstractAggregationTableScanOperator { + + // in iotdb, db level should at least be 2 level, like root.db + // if db level is 2, idColumnStartIndex is 0, and we use should treeDBLength to extract the first + // id column value + // if db level is larger than 2, idColumnStartIndex will be db level - 2 + private final int idColumnStartIndex; + + // only take effect, if db level is 2 level, for root.db.d1, IDeviceId will be [root.db.d1], + // treeDBLength will be 7 (root.db) + private final int treeDBLength; + + public TreeAlignedDeviceViewAggregationScanOperator( + PlanNodeId sourceId, + OperatorContext context, - List<ColumnSchema> columnSchemas, - int[] columnsIndexArray, ++ List<ColumnSchema> aggColumnSchemas, ++ int[] aggColumnsIndexArray, + List<DeviceEntry> deviceEntries, - Ordering scanOrder, + SeriesScanOptions seriesScanOptions, + List<String> measurementColumnNames, + Set<String> allSensors, + List<IMeasurementSchema> measurementSchemas, - int maxTsBlockLineNum, - int measurementCount, + List<TableAggregator> tableAggregators, + List<ColumnSchema> groupingKeySchemas, + int[] groupingKeyIndex, + ITableTimeRangeIterator tableTimeRangeIterator, + boolean ascending, - long maxReturnSize, + boolean canUseStatistics, - List<Integer> aggArguments, ++ List<Integer> aggregatorInputChannels, + int idColumnStartIndex, + int treeDBLength) { + super( + sourceId, + context, - columnSchemas, - columnsIndexArray, ++ aggColumnSchemas, ++ aggColumnsIndexArray, + deviceEntries, - scanOrder, + seriesScanOptions, + measurementColumnNames, + allSensors, + measurementSchemas, - maxTsBlockLineNum, - measurementCount, + tableAggregators, + groupingKeySchemas, + groupingKeyIndex, + tableTimeRangeIterator, + ascending, - maxReturnSize, + canUseStatistics, - aggArguments); ++ aggregatorInputChannels); + this.idColumnStartIndex = idColumnStartIndex; + this.treeDBLength = treeDBLength; + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return getNthIdColumnValueForTree( + deviceEntry, idColumnIndex, this.idColumnStartIndex, this.treeDBLength); + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 1989b1db555,3873973ca85..ed8b6f28ccc --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@@ -1900,54 -1890,23 +1894,27 @@@ public class TableOperatorGenerator ext context.getNextOperatorId(), node.getPlanNodeId(), TableAggregationTableScanOperator.class.getSimpleName()); - SeriesScanOptions.Builder scanOptionsBuilder = - node.getTimePredicate().isPresent() - ? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get()) - : new SeriesScanOptions.Builder(); - scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); - scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); - scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); - scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); - Expression pushDownPredicate = node.getPushDownPredicate(); - if (pushDownPredicate != null) { - scanOptionsBuilder.withPushDownFilter( - convertPredicateToFilter( - pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, timeColumnName)); - } - - Set<String> allSensors = new HashSet<>(measurementColumnNames); - // for time column - allSensors.add(""); - TableAggregationTableScanOperator aggTableScanOperator = - new TableAggregationTableScanOperator( - node.getPlanNodeId(), - operatorContext, - columnSchemas, - columnsIndexArray, - node.getDeviceEntries(), - scanAscending ? Ordering.ASC : Ordering.DESC, - scanOptionsBuilder.build(), + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + node.getAssignments(), measurementColumnNames, - allSensors, - measurementSchemas, - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), - measurementColumnCount, - aggregators, - groupingKeySchemas, - groupingKeyIndex, - timeRangeIterator, - scanAscending, - calculateMaxAggregationResultSize(), - canUseStatistic, - aggColumnIndexes); + measurementColumnsIndexMap, + timeColumnName, + node.getTimePredicate(), + node.getPushDownLimit(), + node.getPushDownOffset(), + node.isPushLimitToEachDevice(), + node.getPushDownPredicate()); - ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); + Set<String> allSensors = new HashSet<>(measurementColumnNames); + allSensors.add(""); // for time column - for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { + for (int i = 0; i < node.getDeviceEntries().size(); i++) { + if (node.getDeviceEntries().get(i) == null) { + throw new IllegalStateException( + "Device entries of index " + i + " in AggregationTableScanNode is empty"); + } AlignedFullPath alignedPath = constructAlignedPath( node.getDeviceEntries().get(i),
