This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TreeToTableView
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6829a689d084c0eefa818e7c5c8a0f0183d65429
Merge: af92ee2585d a877b82b819
Author: JackieTien97 <[email protected]>
AuthorDate: Fri Dec 20 09:22:28 2024 +0800

    resolve conflicts

 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   4 +
 .../iotdb/db/it/IoTDBSetConfigurationIT.java       |  52 +++
 .../IoTDBAlignByDeviceWithTemplateIT.java          |  28 +-
 .../db/it/query/IoTDBEncryptionValueQueryIT.java   |   2 +-
 .../iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java |  12 +-
 .../pipe/it/manual/IoTDBPipeMetaHistoricalIT.java  |  12 +-
 .../it/tablemodel/IoTDBTablePatternFormatIT.java   |   2 +-
 .../it/query/old/IoTDBSimpleQueryTableIT.java      |  20 +-
 .../scalar/IoTDBFormatFunctionTableIT.java         | 190 +++++++++++
 .../relational/it/schema/IoTDBDatabaseIT.java      | 100 ++++++
 .../it/session}/IoTDBSessionRelationalIT.java      | 372 ++++++++++++++++++++-
 .../it/local/IoTDBSubscriptionBasicIT.java         |  69 ++++
 .../param/IoTDBTestParamPullConsumerIT.java        |   8 +-
 .../param/IoTDBTestParamPushConsumerIT.java        |   8 +-
 .../regression/param/IoTDBTestParamTopicIT.java    |  11 +-
 .../iotdb/rpc/subscription/config/TopicConfig.java |   9 +-
 .../rpc/subscription/config/TopicConstant.java     |   8 +-
 .../session/subscription/SubscriptionSession.java  |   9 +
 .../consumer/SubscriptionConsumer.java             |  25 +-
 .../consumer/SubscriptionPullConsumer.java         |   4 +-
 .../session/subscription/util/IdentifierUtils.java |  10 +-
 .../read/ainode/GetAINodeConfigurationPlan.java    |  15 -
 .../request/read/model/GetModelInfoPlan.java       |  16 -
 .../request/read/model/ShowModelPlan.java          |  20 --
 .../manager/partition/PartitionMetrics.java        |   3 +-
 .../PipeConfigPhysicalPlanPatternParseVisitor.java |  12 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  14 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  43 ++-
 .../exception/sql/StatementAnalyzeException.java   |   4 +
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  17 +-
 .../dataregion/IoTDBDataRegionExtractor.java       |   6 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   3 +
 .../resource/memory/InsertNodeMemoryEstimator.java |   3 +
 .../db/pipe/resource/memory/PipeMemoryBlock.java   |  14 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 +-
 .../execution/executor/RegionWriteExecutor.java    |   7 +-
 .../operator/source/ShowQueriesOperator.java       |  23 +-
 .../AbstractAggregationTableScanOperator.java      | 171 +++++-----
 .../InformationSchemaContentSupplierFactory.java   |  33 +-
 .../TableAggregationTableScanOperator.java         |  23 +-
 ...eeAlignedDeviceViewAggregationScanOperator.java |  21 +-
 .../relational/ColumnTransformerBuilder.java       |   9 +
 .../db/queryengine/plan/analyze/AnalyzeUtils.java  |  10 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  11 +-
 .../plan/analyze/TemplatedAggregationAnalyze.java  |   6 +-
 .../queryengine/plan/analyze/TemplatedAnalyze.java |   7 +-
 .../analyze/cache/partition/PartitionCache.java    |   2 +-
 .../plan/analyze/schema/SchemaValidator.java       |   6 +-
 .../plan/execution/IQueryExecution.java            |   3 +-
 .../queryengine/plan/execution/QueryExecution.java |   9 +-
 .../plan/execution/config/ConfigExecution.java     |   5 +-
 .../execution/config/TableConfigTaskVisitor.java   |  37 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  23 +-
 .../config/executor/IConfigTaskExecutor.java       |   4 +-
 .../config/metadata/relational/ShowDBTask.java     |  22 +-
 .../db/queryengine/plan/planner/IPlanner.java      |   4 +-
 .../plan/planner/TableOperatorGenerator.java       | 259 +++++++-------
 .../queryengine/plan/planner/TreeModelPlanner.java |   5 +-
 .../plan/node/write/RelationalDeleteDataNode.java  |  15 +-
 .../plan/relational/analyzer/Analysis.java         |   4 +-
 .../relational/analyzer/StatementAnalyzer.java     |   3 +
 .../relational/metadata/TableMetadataImpl.java     |  53 +--
 .../plan/relational/metadata/TableSchema.java      |  27 +-
 .../fetcher/TableHeaderSchemaValidator.java        |   3 +
 .../plan/relational/planner/RelationPlanner.java   |   4 +-
 .../plan/relational/planner/TableModelPlanner.java |   5 +-
 .../DataNodeLocationSupplierFactory.java           |  12 +-
 .../plan/relational/security/AccessControl.java    |   2 +-
 .../relational/sql/ast/AbstractTraverseDevice.java |   1 +
 .../relational/sql/ast/WrappedInsertStatement.java |  20 +-
 .../plan/relational/sql/parser/AstBuilder.java     |   4 +-
 .../plan/relational/sql/rewrite/ShowRewrite.java   |   4 +-
 .../plan/statement/crud/InsertBaseStatement.java   |   2 +-
 .../plan/statement/crud/InsertRowStatement.java    |   4 +-
 .../plan/statement/crud/InsertTabletStatement.java |   5 +-
 .../statement/metadata/ShowDatabaseStatement.java  |   6 +-
 .../unary/scalar/FormatColumnTransformer.java      | 119 +++++++
 .../schemaengine/schemaregion/utils/MetaUtils.java |   9 +-
 .../db/schemaengine/table/DataNodeTableCache.java  |  10 +-
 .../schemaengine/table/InformationSchemaUtils.java | 185 ++++++++++
 .../db/storageengine/dataregion/DataRegion.java    |   5 -
 .../compaction/repair/RepairDataFileScanUtil.java  |   2 +-
 .../compaction/schedule/CompactionTaskManager.java |  11 +-
 .../dataregion/memtable/AbstractMemTable.java      |   4 +-
 .../memtable/AlignedWritableMemChunk.java          |   6 +-
 .../memtable/AlignedWritableMemChunkGroup.java     |   5 +
 .../memtable/IWritableMemChunkGroup.java           |   2 +
 .../dataregion/memtable/WritableMemChunkGroup.java |   5 +
 .../response/SubscriptionEventTsFileResponse.java  |   2 +-
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  86 -----
 .../org/apache/iotdb/db/utils/DateTimeUtils.java   |   5 +
 .../db/utils/datastructure/AlignedTVList.java      |  26 +-
 .../apache/iotdb/db/metadata/MetaUtilsTest.java    |   9 +-
 .../operator/MergeTreeSortOperatorTest.java        |   4 +-
 .../plan/relational/analyzer/ShowQueriesTest.java  |  20 +-
 .../plan/relational/analyzer/TestMatadata.java     |  10 +-
 .../planner/assertions/PlanMatchPattern.java       |  12 +
 .../plan/statement/InsertStatementTest.java        |   2 +-
 .../conf/iotdb-system.properties.template          |   7 +-
 .../config/constant/PipeExtractorConstant.java     |   4 +
 .../pipe/datastructure/pattern/TablePattern.java   |  16 +-
 .../schema/column/ColumnHeaderConstant.java        |  10 +-
 .../commons/schema/table/InformationSchema.java    | 123 +++++++
 .../schema/table/InformationSchemaTable.java       |  96 ------
 .../schema/table/column/IdColumnSchema.java        |   9 +-
 .../table/column/MeasurementColumnSchema.java      |  48 +--
 .../relational/TableBuiltinScalarFunction.java     |   1 +
 .../iotdb/library/dprofile/UDAFIntegral.java       |  41 +--
 .../iotdb/library/drepair/UDTFTimestampRepair.java |  47 ++-
 .../library/drepair/util/TimestampInterval.java    |  22 +-
 .../library/drepair/util/TimestampRepair.java      |   2 +-
 113 files changed, 2119 insertions(+), 865 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggregationTableScanOperator.java
index ae12f2f8751,8ef9ddbbf04..87db5a3589c
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggregationTableScanOperator.java
@@@ -67,13 -66,13 +66,13 @@@ import java.util.stream.Collectors
  
  import static java.lang.String.format;
  import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 +import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
++import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
 +import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.constructAlignedPath;
  import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
  import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
  
- public abstract class AbstractAggregationTableScanOperator
-     extends AbstractSeriesAggregationScanOperator {
 -public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperator {
++public abstract class AbstractAggregationTableScanOperator extends 
AbstractDataSourceOperator {
  
    private static final long INSTANCE_SIZE =
        
RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class);
@@@ -117,13 -113,12 +113,12 @@@
  
    private boolean allAggregatorsHasFinalResult = false;
  
 -  public TableAggregationTableScanOperator(
 +  public AbstractAggregationTableScanOperator(
        PlanNodeId sourceId,
        OperatorContext context,
-       List<ColumnSchema> columnSchemas,
-       int[] columnsIndexArray,
+       List<ColumnSchema> aggColumnSchemas,
+       int[] aggColumnsIndexArray,
        List<DeviceEntry> deviceEntries,
-       Ordering scanOrder,
        SeriesScanOptions seriesScanOptions,
        List<String> measurementColumnNames,
        Set<String> allSensors,
@@@ -452,9 -444,12 +444,9 @@@
        case TIME:
          return inputRegion.getTimeColumn();
        case ID:
 -        // TODO avoid create deviceStatics multi times; count, sum can use 
time statistics
          String id =
 -            (String)
 -                deviceEntries
 -                    .get(currentDeviceIndex)
 -                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
 +            getNthIdColumnValue(
-                 deviceEntries.get(currentDeviceIndex), 
columnsIndexArray[columnIdx]);
++                deviceEntries.get(currentDeviceIndex), 
aggColumnsIndexArray[columnIdx]);
          return getIdOrAttrColumn(
              inputRegion.getTimeColumn().getPositionCount(),
              id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
@@@ -517,9 -515,12 +512,9 @@@
        case TIME:
          return timeStatistics;
        case ID:
 -        // TODO avoid create deviceStatics multi times; count, sum can use 
time statistics
          String id =
 -            (String)
 -                deviceEntries
 -                    .get(currentDeviceIndex)
 -                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
 +            getNthIdColumnValue(
-                 deviceEntries.get(currentDeviceIndex), 
columnsIndexArray[columnIdx]);
++                deviceEntries.get(currentDeviceIndex), 
aggColumnsIndexArray[columnIdx]);
          return getStatistics(
              timeStatistics, id == null ? null : new Binary(id, 
TSFileConfig.STRING_CHARSET));
        case ATTRIBUTE:
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e62c4a507eb,8ef9ddbbf04..d0d5f03ce39
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@@ -7,7 -7,7 +7,7 @@@
   * "License"); you may not use this file except in compliance
   * with the License.  You may obtain a copy of the License at
   *
-- *      http://www.apache.org/licenses/LICENSE-2.0
++ *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing,
   * software distributed under the License is distributed on an
@@@ -26,14 -33,85 +26,13 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
--import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 -import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 -import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
  
 -import org.apache.tsfile.block.column.Column;
 -import org.apache.tsfile.block.column.ColumnBuilder;
 -import org.apache.tsfile.common.conf.TSFileConfig;
 -import org.apache.tsfile.common.conf.TSFileDescriptor;
 -import org.apache.tsfile.enums.TSDataType;
 -import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 -import org.apache.tsfile.file.metadata.statistics.Statistics;
 -import org.apache.tsfile.file.metadata.statistics.StringStatistics;
 -import org.apache.tsfile.read.common.TimeRange;
 -import org.apache.tsfile.read.common.block.TsBlock;
 -import org.apache.tsfile.read.common.block.TsBlockBuilder;
 -import org.apache.tsfile.read.common.block.column.BinaryColumn;
 -import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
 -import org.apache.tsfile.utils.Binary;
 -import org.apache.tsfile.utils.Pair;
 -import org.apache.tsfile.utils.RamUsageEstimator;
  import org.apache.tsfile.write.schema.IMeasurementSchema;
  
 -import java.io.IOException;
 -import java.util.ArrayList;
 -import java.util.Collections;
  import java.util.List;
 -import java.util.Optional;
  import java.util.Set;
 -import java.util.concurrent.TimeUnit;
 -import java.util.stream.Collectors;
  
 -import static java.lang.String.format;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
 -import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 -import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
 -import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
 -
 -public class TableAggregationTableScanOperator extends 
AbstractDataSourceOperator {
 -
 -  private static final long INSTANCE_SIZE =
 -      
RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class);
 -
 -  private boolean finished = false;
 -  private TsBlock inputTsBlock;
 -
 -  private final List<TableAggregator> tableAggregators;
 -  private final List<ColumnSchema> groupingKeySchemas;
 -  private final int[] groupingKeyIndex;
 -
 -  private final List<DeviceEntry> deviceEntries;
 -  private final int deviceCount;
 -  private int currentDeviceIndex;
 -  private final List<String> measurementColumnNames;
 -  private final Set<String> allSensors;
 -  private final List<IMeasurementSchema> measurementSchemas;
 -  private final List<TSDataType> measurementColumnTSDataTypes;
 -  private final int measurementCount;
 -
 -  private final List<ColumnSchema> aggColumnSchemas;
 -  private final int[] aggColumnsIndexArray;
 -
 -  private final SeriesScanOptions seriesScanOptions;
 -  private final boolean ascending;
 -  private final Ordering scanOrder;
 -  // Some special data types(like BLOB) cannot use statistics
 -  protected final boolean canUseStatistics;
 -  private final long cachedRawDataSize;
 -
 -  // stores all inputChannels of tableAggregators,
 -  // e.g. for aggregation `last(s1), count(s2), count(s1)`, the inputChannels 
should be [0, 1, 0]
 -  private final List<Integer> aggregatorInputChannels;
 -
 -  private QueryDataSource queryDataSource;
 -
 -  private final ITableTimeRangeIterator timeIterator;
 -
 -  private boolean allAggregatorsHasFinalResult = false;
 +public class TableAggregationTableScanOperator extends 
AbstractAggregationTableScanOperator {
  
    public TableAggregationTableScanOperator(
        PlanNodeId sourceId,
@@@ -53,30 -128,732 +49,25 @@@
        int[] groupingKeyIndex,
        ITableTimeRangeIterator tableTimeRangeIterator,
        boolean ascending,
-       long maxReturnSize,
        boolean canUseStatistics,
-       List<Integer> aggArguments) {
+       List<Integer> aggregatorInputChannels) {
 -
 -    this.sourceId = sourceId;
 -    this.operatorContext = context;
 -    this.canUseStatistics = canUseStatistics;
 -    this.tableAggregators = tableAggregators;
 -    this.groupingKeySchemas = groupingKeySchemas;
 -    this.groupingKeyIndex = groupingKeyIndex;
 -    this.aggColumnSchemas = aggColumnSchemas;
 -    this.aggColumnsIndexArray = aggColumnsIndexArray;
 -    this.deviceEntries = deviceEntries;
 -    this.deviceCount = deviceEntries.size();
 -    this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
 -    this.ascending = ascending;
 -    this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
 -    this.seriesScanOptions = seriesScanOptions;
 -    this.measurementColumnNames = measurementColumnNames;
 -    this.measurementCount = measurementColumnNames.size();
 -    this.cachedRawDataSize =
 -        (1L + this.measurementCount)
 -            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 -    this.allSensors = allSensors;
 -    this.measurementSchemas = measurementSchemas;
 -    this.measurementColumnTSDataTypes =
 -        
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
 -    this.currentDeviceIndex = 0;
 -    this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
 -    this.aggregatorInputChannels = aggregatorInputChannels;
 -    this.timeIterator = tableTimeRangeIterator;
 -
 -    constructAlignedSeriesScanUtil();
 -  }
 -
 -  @Override
 -  public boolean isFinished() throws Exception {
 -    if (!finished) {
 -      finished = !hasNextWithTimer();
 -    }
 -    return finished;
 -  }
 -
 -  @Override
 -  public long calculateMaxPeekMemory() {
 -    return cachedRawDataSize + maxReturnSize;
 -  }
 -
 -  @Override
 -  public long calculateMaxReturnSize() {
 -    return maxReturnSize;
 -  }
 -
 -  @Override
 -  public long calculateRetainedSizeAfterCallingNext() {
 -    return timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
 -        ? cachedRawDataSize
 -        : 0;
 -  }
 -
 -  @Override
 -  public boolean hasNext() throws Exception {
 -    return timeIterator.hasCachedTimeRange()
 -        || timeIterator.hasNextTimeRange()
 -        || !resultTsBlockBuilder.isEmpty();
 -  }
 -
 -  @Override
 -  public TsBlock next() throws Exception {
 -
 -    // optimize for sql: select count(*) from (select count(s1), sum(s1) from 
table)
 -    if (tableAggregators.isEmpty()
 -        && timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
 -        && resultTsBlockBuilder.getValueColumnBuilders().length == 0) {
 -      resultTsBlockBuilder.reset();
 -      currentDeviceIndex = deviceCount;
 -      timeIterator.setFinished();
 -      Column[] valueColumns = new Column[0];
 -      return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
1), valueColumns);
 -    }
 -
 -    // start stopwatch, reset leftRuntimeOfOneNextCall
 -    long start = System.nanoTime();
 -    leftRuntimeOfOneNextCall = 1000 * 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 -    long maxRuntime = leftRuntimeOfOneNextCall;
 -
 -    while (System.nanoTime() - start < maxRuntime
 -        && (timeIterator.hasCachedTimeRange() || 
timeIterator.hasNextTimeRange())
 -        && !resultTsBlockBuilder.isFull()) {
 -
 -      // calculate aggregation result on current time window
 -      // return true if current time window is calc finished
 -      if (calculateAggregationResultForCurrentTimeRange()) {
 -        timeIterator.resetCurTimeRange();
 -      }
 -    }
 -
 -    if (resultTsBlockBuilder.getPositionCount() > 0) {
 -      return buildResultTsBlock();
 -    } else {
 -      return null;
 -    }
 -  }
 -
 -  private TsBlock buildResultTsBlock() {
 -    int declaredPositions = resultTsBlockBuilder.getPositionCount();
 -    ColumnBuilder[] valueColumnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
 -    Column[] valueColumns = new Column[valueColumnBuilders.length];
 -    for (int i = 0; i < valueColumns.length; i++) {
 -      valueColumns[i] = valueColumnBuilders[i].build();
 -      if (valueColumns[i].getPositionCount() != declaredPositions) {
 -        throw new IllegalStateException(
 -            format(
 -                "Declared positions (%s) does not match column %s's number of 
entries (%s)",
 -                declaredPositions, i, valueColumns[i].getPositionCount()));
 -      }
 -    }
 -
 -    TsBlock resultTsBlock =
 -        new TsBlock(
 -            resultTsBlockBuilder.getPositionCount(),
 -            new RunLengthEncodedColumn(
 -                TIME_COLUMN_TEMPLATE, 
resultTsBlockBuilder.getPositionCount()),
 -            valueColumns);
 -    resultTsBlockBuilder.reset();
 -    return resultTsBlock;
 -  }
 -
 -  protected void constructAlignedSeriesScanUtil() {
 -    DeviceEntry deviceEntry;
 -
 -    if (this.deviceEntries.isEmpty() || 
this.deviceEntries.get(this.currentDeviceIndex) == null) {
 -      // for device which is not exist
 -      deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), 
Collections.emptyList());
 -    } else {
 -      deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
 -    }
 -
 -    AlignedFullPath alignedPath =
 -        constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas, allSensors);
 -
 -    this.seriesScanUtil =
 -        new AlignedSeriesScanUtil(
 -            alignedPath,
 -            scanOrder,
 -            seriesScanOptions,
 -            operatorContext.getInstanceContext(),
 -            true,
 -            measurementColumnTSDataTypes);
 -  }
 -
 -  /** Return true if we have the result of this timeRange. */
 -  protected boolean calculateAggregationResultForCurrentTimeRange() {
 -    try {
 -      if (calcFromCachedData()) {
 -        updateResultTsBlock();
 -        checkIfAllAggregatorHasFinalResult();
 -        return true;
 -      }
 -
 -      if (readAndCalcFromPage()) {
 -        updateResultTsBlock();
 -        checkIfAllAggregatorHasFinalResult();
 -        return true;
 -      }
 -
 -      // only when all the page data has been consumed, we need to read the 
chunk data
 -      if (!seriesScanUtil.hasNextPage() && readAndCalcFromChunk()) {
 -        updateResultTsBlock();
 -        checkIfAllAggregatorHasFinalResult();
 -        return true;
 -      }
 -
 -      // only when all the page and chunk data has been consumed, we need to 
read the file data
 -      if (!seriesScanUtil.hasNextPage()
 -          && !seriesScanUtil.hasNextChunk()
 -          && readAndCalcFromFile()) {
 -        updateResultTsBlock();
 -        checkIfAllAggregatorHasFinalResult();
 -        return true;
 -      }
 -
 -      // If the TimeRange is (Long.MIN_VALUE, Long.MAX_VALUE), for 
Aggregators like countAggregator,
 -      // we have to consume all the data before we finish the aggregation 
calculation.
 -      if (seriesScanUtil.hasNextPage()
 -          || seriesScanUtil.hasNextChunk()
 -          || seriesScanUtil.hasNextFile()) {
 -        return false;
 -      } else {
 -        // all data of current device has been consumed
 -        updateResultTsBlock();
 -        timeIterator.resetCurTimeRange();
 -        nextDevice();
 -      }
 -
 -      if (currentDeviceIndex < deviceCount) {
 -        // construct AlignedSeriesScanUtil for next device
 -        constructAlignedSeriesScanUtil();
 -        queryDataSource.reset();
 -        this.seriesScanUtil.initQueryDataSource(queryDataSource);
 -      }
 -
 -      if (currentDeviceIndex >= deviceCount) {
 -        // all devices have been consumed
 -        timeIterator.setFinished();
 -        return true;
 -      } else {
 -        return false;
 -      }
 -    } catch (IOException e) {
 -      throw new RuntimeException("Error while scanning the file", e);
 -    }
 -  }
 -
 -  protected void updateResultTsBlock() {
 -    appendAggregationResult(resultTsBlockBuilder, tableAggregators);
 -    // after appendAggregationResult invoked, aggregators must be cleared
 -    resetTableAggregators();
 -  }
 -
 -  protected boolean calcFromCachedData() {
 -    return calcUsingRawData(inputTsBlock);
 -  }
 -
 -  protected boolean calcUsingRawData(TsBlock tsBlock) {
 -    Pair<Boolean, TsBlock> calcResult = 
calculateAggregationFromRawData(tsBlock, ascending);
 -    inputTsBlock = calcResult.getRight();
 -    return calcResult.getLeft();
 -  }
 -
 -  /**
 -   * Calculate aggregation value on the time range from the tsBlock 
containing raw data.
 -   *
 -   * @return left - whether the aggregation calculation of the current time 
range has done; right -
 -   *     remaining tsBlock
 -   */
 -  public Pair<Boolean, TsBlock> calculateAggregationFromRawData(
 -      TsBlock inputTsBlock, boolean ascending) {
 -    if (inputTsBlock == null || inputTsBlock.isEmpty()) {
 -      return new Pair<>(false, inputTsBlock);
 -    }
 -
 -    updateCurTimeRange(inputTsBlock.getStartTime());
 -
 -    TimeRange curTimeRange = timeIterator.getCurTimeRange();
 -    // check if the tsBlock does not contain points in current interval
 -    if (satisfiedTimeRange(inputTsBlock, curTimeRange, ascending)) {
 -      // skip points that cannot be calculated
 -      if ((ascending && inputTsBlock.getStartTime() < curTimeRange.getMin())
 -          || (!ascending && inputTsBlock.getStartTime() > 
curTimeRange.getMax())) {
 -        inputTsBlock = skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, 
ascending);
 -      }
 -
 -      inputTsBlock = process(inputTsBlock, curTimeRange);
 -    }
 -
 -    // judge whether the calculation finished
 -    boolean isTsBlockOutOfBound =
 -        inputTsBlock != null
 -            && (ascending
 -                ? inputTsBlock.getEndTime() > curTimeRange.getMax()
 -                : inputTsBlock.getEndTime() < curTimeRange.getMin());
 -    return new Pair<>(
 -        isAllAggregatorsHasFinalResult(tableAggregators) || 
isTsBlockOutOfBound, inputTsBlock);
 -  }
 -
 -  private TsBlock process(TsBlock inputTsBlock, TimeRange curTimeRange) {
 -    // Get the row which need to be processed by aggregator
 -    IWindow curWindow = new TimeWindow(curTimeRange);
 -    Column timeColumn = inputTsBlock.getTimeColumn();
 -    int lastIndexToProcess = 0;
 -    for (int i = 0; i < inputTsBlock.getPositionCount(); i++) {
 -      if (!curWindow.satisfy(timeColumn, i)) {
 -        break;
 -      }
 -      lastIndexToProcess = i;
 -    }
 -
 -    TsBlock inputRegion = inputTsBlock.getRegion(0, lastIndexToProcess + 1);
 -    Column[] valueColumns = new Column[aggregatorInputChannels.size()];
 -    for (int idx : aggregatorInputChannels) {
 -      if (valueColumns[idx] != null) {
 -        continue;
 -      }
 -      valueColumns[idx] =
 -          buildValueColumn(aggColumnSchemas.get(idx).getColumnCategory(), 
inputRegion, idx);
 -    }
 -
 -    TsBlock tsBlock =
 -        new TsBlock(
 -            inputRegion.getPositionCount(),
 -            new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
inputRegion.getPositionCount()),
 -            valueColumns);
 -
 -    for (TableAggregator aggregator : tableAggregators) {
 -      // current agg method has been calculated
 -      if (aggregator.hasFinalResult()) {
 -        continue;
 -      }
 -
 -      aggregator.processBlock(tsBlock);
 -    }
 -
 -    int lastReadRowIndex = lastIndexToProcess + 1;
 -    if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
 -      return null;
 -    } else {
 -      return inputTsBlock.subTsBlock(lastReadRowIndex);
 -    }
 -  }
 -
 -  private Column buildValueColumn(
 -      TsTableColumnCategory columnSchemaCategory, TsBlock inputRegion, int 
columnIdx) {
 -    switch (columnSchemaCategory) {
 -      case TIME:
 -        return inputRegion.getTimeColumn();
 -      case ID:
 -        // TODO avoid create deviceStatics multi times; count, sum can use 
time statistics
 -        String id =
 -            (String)
 -                deviceEntries
 -                    .get(currentDeviceIndex)
 -                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
 -        return getIdOrAttrColumn(
 -            inputRegion.getTimeColumn().getPositionCount(),
 -            id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
 -      case ATTRIBUTE:
 -        Binary attr =
 -            deviceEntries
 -                .get(currentDeviceIndex)
 -                .getAttributeColumnValues()
 -                .get(aggColumnsIndexArray[columnIdx]);
 -        return 
getIdOrAttrColumn(inputRegion.getTimeColumn().getPositionCount(), attr);
 -      case MEASUREMENT:
 -        return inputRegion.getColumn(aggColumnsIndexArray[columnIdx]);
 -      default:
 -        throw new IllegalStateException("Unsupported column type: " + 
columnSchemaCategory);
 -    }
 -  }
 -
 -  private Column getIdOrAttrColumn(int positionCount, Binary columnName) {
 -    if (columnName == null) {
 -      return new RunLengthEncodedColumn(
 -          new BinaryColumn(1, Optional.of(new boolean[] {true}), new Binary[] 
{null}),
 -          positionCount);
 -    } else {
 -      return new RunLengthEncodedColumn(
 -          new BinaryColumn(1, Optional.of(new boolean[] {false}), new 
Binary[] {columnName}),
 -          positionCount);
 -    }
 -  }
 -
 -  protected void calcFromStatistics(Statistics timeStatistics, Statistics[] 
valueStatistics) {
 -    int idx = -1;
 -
 -    for (TableAggregator aggregator : tableAggregators) {
 -      if (aggregator.hasFinalResult()) {
 -        idx += aggregator.getChannelCount();
 -        continue;
 -      }
 -
 -      Statistics[] statisticsArray = new 
Statistics[aggregator.getChannelCount()];
 -      for (int i = 0; i < aggregator.getChannelCount(); i++) {
 -        idx++;
 -
 -        TsTableColumnCategory columnSchemaCategory =
 -            
aggColumnSchemas.get(aggregatorInputChannels.get(idx)).getColumnCategory();
 -        statisticsArray[i] =
 -            buildStatistics(
 -                columnSchemaCategory,
 -                timeStatistics,
 -                valueStatistics,
 -                aggregatorInputChannels.get(idx));
 -      }
 -
 -      aggregator.processStatistics(statisticsArray);
 -    }
 -  }
 -
 -  private Statistics buildStatistics(
 -      TsTableColumnCategory columnSchemaCategory,
 -      Statistics timeStatistics,
 -      Statistics[] valueStatistics,
 -      int columnIdx) {
 -    switch (columnSchemaCategory) {
 -      case TIME:
 -        return timeStatistics;
 -      case ID:
 -        // TODO avoid create deviceStatics multi times; count, sum can use 
time statistics
 -        String id =
 -            (String)
 -                deviceEntries
 -                    .get(currentDeviceIndex)
 -                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
 -        return getStatistics(
 -            timeStatistics, id == null ? null : new Binary(id, 
TSFileConfig.STRING_CHARSET));
 -      case ATTRIBUTE:
 -        Binary attr =
 -            deviceEntries
 -                .get(currentDeviceIndex)
 -                .getAttributeColumnValues()
 -                .get(aggColumnsIndexArray[columnIdx]);
 -        return getStatistics(timeStatistics, attr);
 -      case MEASUREMENT:
 -        return valueStatistics[aggColumnsIndexArray[columnIdx]];
 -      default:
 -        throw new IllegalStateException("Unsupported column type: " + 
columnSchemaCategory);
 -    }
 -  }
 -
 -  private Statistics getStatistics(Statistics timeStatistics, Binary 
columnName) {
 -    if (columnName == null) {
 -      return null;
 -    } else {
 -      StringStatistics stringStatics = new StringStatistics();
 -      stringStatics.setCount((int) timeStatistics.getCount());
 -      stringStatics.setStartTime(timeStatistics.getStartTime());
 -      stringStatics.setEndTime(timeStatistics.getEndTime());
 -      stringStatics.initializeStats(columnName, columnName, columnName, 
columnName);
 -      return stringStatics;
 -    }
 -  }
 -
 -  @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
 -  public boolean readAndCalcFromFile() throws IOException {
 -    // start stopwatch
 -    long start = System.nanoTime();
 -    while (System.nanoTime() - start < leftRuntimeOfOneNextCall && 
seriesScanUtil.hasNextFile()) {
 -      if (canUseStatistics && seriesScanUtil.canUseCurrentFileStatistics()) {
 -        Statistics fileTimeStatistics = 
seriesScanUtil.currentFileTimeStatistics();
 -
 -        updateCurTimeRange(fileTimeStatistics.getStartTime());
 -
 -        if (fileTimeStatistics.getStartTime() > 
timeIterator.getCurTimeRange().getMax()) {
 -          if (ascending) {
 -            return true;
 -          } else {
 -            seriesScanUtil.skipCurrentFile();
 -            continue;
 -          }
 -        }
 -
 -        // calc from fileMetaData
 -        if (timeIterator
 -            .getCurTimeRange()
 -            .contains(fileTimeStatistics.getStartTime(), 
fileTimeStatistics.getEndTime())) {
 -          Statistics[] statisticsList = new Statistics[measurementCount];
 -          for (int i = 0; i < measurementCount; i++) {
 -            statisticsList[i] = seriesScanUtil.currentFileStatistics(i);
 -          }
 -          calcFromStatistics(fileTimeStatistics, statisticsList);
 -          seriesScanUtil.skipCurrentFile();
 -          if (isAllAggregatorsHasFinalResult(tableAggregators)) {
 -            return true;
 -          } else {
 -            continue;
 -          }
 -        }
 -      }
 -
 -      // read chunk
 -      if (readAndCalcFromChunk()) {
 -        return true;
 -      }
 -    }
 -
 -    return false;
 -  }
 -
 -  @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
 -  protected boolean readAndCalcFromChunk() throws IOException {
 -    // start stopwatch
 -    long start = System.nanoTime();
 -    while (System.nanoTime() - start < leftRuntimeOfOneNextCall && 
seriesScanUtil.hasNextChunk()) {
 -      if (canUseStatistics && seriesScanUtil.canUseCurrentChunkStatistics()) {
 -        Statistics chunkTimeStatistics = 
seriesScanUtil.currentChunkTimeStatistics();
 -
 -        updateCurTimeRange(chunkTimeStatistics.getStartTime());
 -
 -        if (chunkTimeStatistics.getStartTime() > 
timeIterator.getCurTimeRange().getMax()) {
 -          if (ascending) {
 -            return true;
 -          } else {
 -            seriesScanUtil.skipCurrentChunk();
 -            continue;
 -          }
 -        }
 -
 -        // calc from chunkMetaData
 -        if (timeIterator
 -            .getCurTimeRange()
 -            .contains(chunkTimeStatistics.getStartTime(), 
chunkTimeStatistics.getEndTime())) {
 -          // calc from chunkMetaData
 -          Statistics[] statisticsList = new Statistics[measurementCount];
 -          for (int i = 0; i < measurementCount; i++) {
 -            statisticsList[i] = seriesScanUtil.currentChunkStatistics(i);
 -          }
 -          calcFromStatistics(chunkTimeStatistics, statisticsList);
 -          seriesScanUtil.skipCurrentChunk();
 -          if (isAllAggregatorsHasFinalResult(tableAggregators)) {
 -            return true;
 -          } else {
 -            continue;
 -          }
 -        }
 -      }
 -
 -      // read page
 -      if (readAndCalcFromPage()) {
 -        return true;
 -      }
 -    }
 -    return false;
 -  }
 -
 -  long leftRuntimeOfOneNextCall = Long.MAX_VALUE;
 -
 -  @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"})
 -  protected boolean readAndCalcFromPage() throws IOException {
 -    long start = System.nanoTime();
 -    try {
 -      while (System.nanoTime() - start < leftRuntimeOfOneNextCall && 
seriesScanUtil.hasNextPage()) {
 -        if (canUseStatistics && seriesScanUtil.canUseCurrentPageStatistics()) 
{
 -          Statistics pageTimeStatistics = 
seriesScanUtil.currentPageTimeStatistics();
 -
 -          updateCurTimeRange(pageTimeStatistics.getStartTime());
 -
 -          // There is no more eligible points in current time range
 -          // TODO(beyyes) will not appear in table model?
 -          if (pageTimeStatistics.getStartTime() > 
timeIterator.getCurTimeRange().getMax()) {
 -            if (ascending) {
 -              return true;
 -            } else {
 -              seriesScanUtil.skipCurrentPage();
 -              continue;
 -            }
 -          }
 -
 -          // can use pageHeader
 -          if (timeIterator
 -              .getCurTimeRange()
 -              .contains(pageTimeStatistics.getStartTime(), 
pageTimeStatistics.getEndTime())) {
 -            Statistics[] statisticsList = new Statistics[measurementCount];
 -            for (int i = 0; i < measurementCount; i++) {
 -              statisticsList[i] = seriesScanUtil.currentPageStatistics(i);
 -            }
 -            calcFromStatistics(pageTimeStatistics, statisticsList);
 -            seriesScanUtil.skipCurrentPage();
 -            if (isAllAggregatorsHasFinalResult(tableAggregators)) {
 -              return true;
 -            } else {
 -              continue;
 -            }
 -          }
 -        }
 -
 -        // calc from page data
 -        TsBlock originalTsBlock = seriesScanUtil.nextPage();
 -        if (originalTsBlock == null) {
 -          continue;
 -        }
 -
 -        // calc from raw data
 -        if (calcUsingRawData(originalTsBlock)) {
 -          return true;
 -        }
 -      }
 -
 -      return false;
 -    } finally {
 -      leftRuntimeOfOneNextCall -= (System.nanoTime() - start);
 -    }
 -  }
 -
 -  private void updateCurTimeRange(long startTime) {
 -    if (timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) {
 -      timeIterator.updateCurTimeRange(startTime);
 -      return;
 -    }
 -
 -    if (!timeIterator.hasCachedTimeRange()) {
 -      timeIterator.updateCurTimeRange(startTime);
 -    } else if (timeIterator.canFinishCurrentTimeRange(startTime)) {
 -      updateResultTsBlock();
 -      timeIterator.resetCurTimeRange();
 -      timeIterator.updateCurTimeRange(startTime);
 -      resetTableAggregators();
 -    }
 -  }
 -
 -  /** Append a row of aggregation results to the result tsBlock. */
 -  public void appendAggregationResult(
 -      TsBlockBuilder tsBlockBuilder, List<? extends TableAggregator> 
aggregators) {
 -
 -    // no data in current time range, just output empty
 -    if (!timeIterator.hasCachedTimeRange()) {
 -      return;
 -    }
 -
 -    ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
 -
 -    int groupKeySize = groupingKeySchemas == null ? 0 : 
groupingKeySchemas.size();
 -    int dateBinSize =
 -        timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
 -            ? 1
 -            : 0;
 -
 -    if (groupingKeyIndex != null) {
 -      for (int i = 0; i < groupKeySize; i++) {
 -        if (TsTableColumnCategory.ID == 
groupingKeySchemas.get(i).getColumnCategory()) {
 -          String id =
 -              (String) 
deviceEntries.get(currentDeviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
 -          if (id == null) {
 -            columnBuilders[i].appendNull();
 -          } else {
 -            columnBuilders[i].writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
 -          }
 -        } else {
 -          Binary attribute =
 -              deviceEntries
 -                  .get(currentDeviceIndex)
 -                  .getAttributeColumnValues()
 -                  .get(groupingKeyIndex[i]);
 -          if (attribute == null) {
 -            columnBuilders[i].appendNull();
 -          } else {
 -            columnBuilders[i].writeBinary(attribute);
 -          }
 -        }
 -      }
 -    }
 -
 -    if (dateBinSize > 0) {
 -      
columnBuilders[groupKeySize].writeLong(timeIterator.getCurTimeRange().getMin());
 -    }
 -
 -    for (int i = 0; i < aggregators.size(); i++) {
 -      aggregators.get(i).evaluate(columnBuilders[groupKeySize + dateBinSize + 
i]);
 -    }
 -
 -    tsBlockBuilder.declarePosition();
 -  }
 -
 -  public boolean isAllAggregatorsHasFinalResult(List<TableAggregator> 
aggregators) {
 -    // In groupByDateBin, we need read real data to calc next time range
 -    if (timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR) {
 -      return false;
 -    }
 -
 -    // no aggregation function, just output ids or attributes
 -    if (aggregators.isEmpty()) {
 -      return false;
 -    }
 -
 -    for (TableAggregator aggregator : aggregators) {
 -      if (!aggregator.hasFinalResult()) {
 -        return false;
 -      }
 -    }
 -
 -    this.allAggregatorsHasFinalResult = true;
 -    return true;
 -  }
 -
 -  private void checkIfAllAggregatorHasFinalResult() {
 -    if (allAggregatorsHasFinalResult
 -        && timeIterator.getType()
 -            == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) 
{
 -      nextDevice();
 -      inputTsBlock = null;
 -
 -      if (currentDeviceIndex < deviceCount) {
 -        // construct AlignedSeriesScanUtil for next device
 -        constructAlignedSeriesScanUtil();
 -        queryDataSource.reset();
 -        this.seriesScanUtil.initQueryDataSource(queryDataSource);
 -      }
 -
 -      if (currentDeviceIndex >= deviceCount) {
 -        // all devices have been consumed
 -        timeIterator.setFinished();
 -      }
 -
 -      allAggregatorsHasFinalResult = false;
 -    }
 -  }
 -
 -  private void nextDevice() {
 -    currentDeviceIndex++;
 -    this.operatorContext.recordSpecifiedInfo(
 -        CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
 -  }
 -
 -  private void resetTableAggregators() {
 -    tableAggregators.forEach(TableAggregator::reset);
 -  }
 -
 -  @Override
 -  public List<TSDataType> getResultDataTypes() {
 -    int groupingKeySize = groupingKeySchemas != null ? 
groupingKeySchemas.size() : 0;
 -    int dateBinSize =
 -        timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
 -            ? 1
 -            : 0;
 -    List<TSDataType> resultDataTypes =
 -        new ArrayList<>(groupingKeySize + dateBinSize + 
tableAggregators.size());
 -
 -    if (groupingKeySchemas != null) {
 -      for (int i = 0; i < groupingKeySchemas.size(); i++) {
 -        resultDataTypes.add(TSDataType.STRING);
 -      }
 -    }
 -    if (dateBinSize > 0) {
 -      resultDataTypes.add(TSDataType.TIMESTAMP);
 -    }
 -    for (TableAggregator aggregator : tableAggregators) {
 -      resultDataTypes.add(aggregator.getType());
 -    }
 -
 -    return resultDataTypes;
 -  }
 -
 -  @Override
 -  public void initQueryDataSource(IQueryDataSource dataSource) {
 -    this.queryDataSource = (QueryDataSource) dataSource;
 -    this.seriesScanUtil.initQueryDataSource(queryDataSource);
 -    this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
 +    super(
 +        sourceId,
 +        context,
-         columnSchemas,
-         columnsIndexArray,
++        aggColumnSchemas,
++        aggColumnsIndexArray,
 +        deviceEntries,
-         scanOrder,
 +        seriesScanOptions,
 +        measurementColumnNames,
 +        allSensors,
 +        measurementSchemas,
-         maxTsBlockLineNum,
-         measurementCount,
 +        tableAggregators,
 +        groupingKeySchemas,
 +        groupingKeyIndex,
 +        tableTimeRangeIterator,
 +        ascending,
-         maxReturnSize,
 +        canUseStatistics,
-         aggArguments);
++        aggregatorInputChannels);
    }
  
    @Override
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java
index 605b09277d1,00000000000..3efdf7638b0
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java
@@@ -1,104 -1,0 +1,95 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 +
 +import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
 +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 +import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
- import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 +
 +import org.apache.tsfile.write.schema.IMeasurementSchema;
 +
 +import java.util.List;
 +import java.util.Set;
 +
 +import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator.getNthIdColumnValueForTree;
 +
 +public class TreeAlignedDeviceViewAggregationScanOperator
 +    extends AbstractAggregationTableScanOperator {
 +
 +  // in iotdb, db level should at least be 2 level, like root.db
 +  // if db level is 2, idColumnStartIndex is 0, and we use should 
treeDBLength to extract the first
 +  // id column value
 +  // if db level is larger than 2, idColumnStartIndex will be db level - 2
 +  private final int idColumnStartIndex;
 +
 +  // only take effect, if db level is 2 level, for root.db.d1, IDeviceId will 
be [root.db.d1],
 +  // treeDBLength will be 7 (root.db)
 +  private final int treeDBLength;
 +
 +  public TreeAlignedDeviceViewAggregationScanOperator(
 +      PlanNodeId sourceId,
 +      OperatorContext context,
-       List<ColumnSchema> columnSchemas,
-       int[] columnsIndexArray,
++      List<ColumnSchema> aggColumnSchemas,
++      int[] aggColumnsIndexArray,
 +      List<DeviceEntry> deviceEntries,
-       Ordering scanOrder,
 +      SeriesScanOptions seriesScanOptions,
 +      List<String> measurementColumnNames,
 +      Set<String> allSensors,
 +      List<IMeasurementSchema> measurementSchemas,
-       int maxTsBlockLineNum,
-       int measurementCount,
 +      List<TableAggregator> tableAggregators,
 +      List<ColumnSchema> groupingKeySchemas,
 +      int[] groupingKeyIndex,
 +      ITableTimeRangeIterator tableTimeRangeIterator,
 +      boolean ascending,
-       long maxReturnSize,
 +      boolean canUseStatistics,
-       List<Integer> aggArguments,
++      List<Integer> aggregatorInputChannels,
 +      int idColumnStartIndex,
 +      int treeDBLength) {
 +    super(
 +        sourceId,
 +        context,
-         columnSchemas,
-         columnsIndexArray,
++        aggColumnSchemas,
++        aggColumnsIndexArray,
 +        deviceEntries,
-         scanOrder,
 +        seriesScanOptions,
 +        measurementColumnNames,
 +        allSensors,
 +        measurementSchemas,
-         maxTsBlockLineNum,
-         measurementCount,
 +        tableAggregators,
 +        groupingKeySchemas,
 +        groupingKeyIndex,
 +        tableTimeRangeIterator,
 +        ascending,
-         maxReturnSize,
 +        canUseStatistics,
-         aggArguments);
++        aggregatorInputChannels);
 +    this.idColumnStartIndex = idColumnStartIndex;
 +    this.treeDBLength = treeDBLength;
 +  }
 +
 +  @Override
 +  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
 +    return getNthIdColumnValueForTree(
 +        deviceEntry, idColumnIndex, this.idColumnStartIndex, 
this.treeDBLength);
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 1989b1db555,3873973ca85..ed8b6f28ccc
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@@ -1900,54 -1890,23 +1894,27 @@@ public class TableOperatorGenerator ext
                  context.getNextOperatorId(),
                  node.getPlanNodeId(),
                  TableAggregationTableScanOperator.class.getSimpleName());
-     SeriesScanOptions.Builder scanOptionsBuilder =
-         node.getTimePredicate().isPresent()
-             ? getSeriesScanOptionsBuilder(context, 
node.getTimePredicate().get())
-             : new SeriesScanOptions.Builder();
-     scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
-     scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
-     
scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
-     scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
-     Expression pushDownPredicate = node.getPushDownPredicate();
-     if (pushDownPredicate != null) {
-       scanOptionsBuilder.withPushDownFilter(
-           convertPredicateToFilter(
-               pushDownPredicate, measurementColumnsIndexMap, columnSchemaMap, 
timeColumnName));
-     }
- 
-     Set<String> allSensors = new HashSet<>(measurementColumnNames);
-     // for time column
-     allSensors.add("");
-     TableAggregationTableScanOperator aggTableScanOperator =
-         new TableAggregationTableScanOperator(
-             node.getPlanNodeId(),
-             operatorContext,
-             columnSchemas,
-             columnsIndexArray,
-             node.getDeviceEntries(),
-             scanAscending ? Ordering.ASC : Ordering.DESC,
-             scanOptionsBuilder.build(),
+     SeriesScanOptions seriesScanOptions =
+         buildSeriesScanOptions(
+             context,
+             node.getAssignments(),
              measurementColumnNames,
-             allSensors,
-             measurementSchemas,
-             
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
-             measurementColumnCount,
-             aggregators,
-             groupingKeySchemas,
-             groupingKeyIndex,
-             timeRangeIterator,
-             scanAscending,
-             calculateMaxAggregationResultSize(),
-             canUseStatistic,
-             aggColumnIndexes);
+             measurementColumnsIndexMap,
+             timeColumnName,
+             node.getTimePredicate(),
+             node.getPushDownLimit(),
+             node.getPushDownOffset(),
+             node.isPushLimitToEachDevice(),
+             node.getPushDownPredicate());
  
-     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
+     Set<String> allSensors = new HashSet<>(measurementColumnNames);
+     allSensors.add(""); // for time column
  
-     for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
+     for (int i = 0; i < node.getDeviceEntries().size(); i++) {
 +      if (node.getDeviceEntries().get(i) == null) {
 +        throw new IllegalStateException(
 +            "Device entries of index " + i + " in AggregationTableScanNode is 
empty");
 +      }
        AlignedFullPath alignedPath =
            constructAlignedPath(
                node.getDeviceEntries().get(i),

Reply via email to