This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TreeToTableView
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 57029e8dfc087374eeb6ce95b0a8e9b62c63f9f9
Merge: c34ea18e34a 90e7a809d02
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Dec 31 19:33:58 2024 +0800

    resolve conflicts

 .github/workflows/cluster-it-1c1d.yml              |   2 +-
 .github/workflows/cluster-it-1c1d1a.yml            |   4 +-
 .github/workflows/cluster-it-1c3d.yml              |   4 +-
 .github/workflows/compile-check.yml                |   4 +-
 .github/workflows/dependency-check.yml             |   4 +-
 .github/workflows/multi-language-client.yml        |   4 +-
 .github/workflows/pipe-it-2cluster.yml             |   4 +-
 .github/workflows/sonar-codecov.yml                |   4 +-
 .github/workflows/table-cluster-it-1c1d.yml        |   6 +-
 .github/workflows/table-cluster-it-1c3d.yml        |   4 +-
 .github/workflows/todos-check.yml                  |   2 +-
 .github/workflows/unit-test.yml                    |   6 +-
 example/client-cpp-example/pom.xml                 |   4 +
 example/client-cpp-example/src/CMakeLists.txt      |   3 +
 .../src/TableModelSessionExample.cpp               | 129 +--
 .../java/org/apache/iotdb/TableHttpExample.java    |   4 +-
 .../java/org/apache/iotdb/TableHttpsExample.java   |   4 +-
 .../org/apache/iotdb/TableModelSessionExample.java |  14 +-
 .../apache/iotdb/TableModelSessionPoolExample.java |  28 +-
 .../apache/iotdb/udf/AggregateFunctionExample.java |   2 +-
 .../apache/iotdb/udf/ScalarFunctionExample.java    |   2 +-
 integration-test/pom.xml                           |  44 +-
 .../org/apache/iotdb/itbase/category/ManualIT.java |   9 +-
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |  73 +-
 .../iotdb/db/it/schema/IoTDBCreateDatabaseIT.java  | 193 ++++
 .../db/it/schema/IoTDBCreateStorageGroupIT.java    | 153 ----
 .../it/autocreate/IoTDBPipeAutoConflictIT.java     |  51 ++
 .../manual/IoTDBPipeTypeConversionISessionIT.java  |   2 +-
 .../IoTDBPipeTypeConversionISessionIT.java         | 498 +++++++++++
 .../it/tablemodel/IoTDBPipeTypeConversionIT.java   | 632 ++++++++++++++
 .../pipe/it/tablemodel/IoTDBPipeWithLoadIT.java    |  58 +-
 .../iotdb/pipe/it/tablemodel/TableModelUtils.java  |  20 +-
 .../it/db/it/IoTDBCaseWhenThenTableIT.java         |   6 +-
 .../relational/it/db/it/IoTDBDeletionTableIT.java  | 968 +++++++++++++++++++--
 .../it/db/it/IoTDBDisableDeletionTableIT.java      | 120 ---
 .../it/db/it/IoTDBExecuteBatchTableIT.java         |  26 +-
 .../it/db/it/IoTDBFlushQueryTableIT.java           |  30 +-
 .../it/db/it/IoTDBInsertAlignedValuesTableIT.java  | 122 ++-
 .../relational/it/db/it/IoTDBInsertTableIT.java    | 267 +++---
 .../it/db/it/IoTDBMultiDeviceTableIT.java          |  90 +-
 ...va => IoTDBMultiTAGsWithAttributesTableIT.java} | 135 ++-
 .../relational/it/db/it/IoTDBRecoverTableIT.java   |  16 +-
 .../it/db/it/IoTDBRecoverUnclosedTableIT.java      |  16 +-
 .../relational/it/db/it/IoTDBRestartTableIT.java   |  51 +-
 .../udf/IoTDBUserDefinedAggregateFunctionIT.java   |   2 +-
 .../it/udf/IoTDBUserDefinedScalarFunctionIT.java   |   4 +-
 .../it/query/old/IoTDBDatetimeFormatTableIT.java   |   2 +-
 .../it/query/old/IoTDBFilterBetweenTableIT.java    |   2 +-
 .../it/query/old/IoTDBFilterNullTableIT.java       |   2 +-
 .../it/query/old/IoTDBFilterTableIT.java           |  10 +-
 .../it/query/old/IoTDBNestedQueryTableIT.java      |   6 +-
 .../it/query/old/IoTDBSimpleQueryTableIT.java      |  39 +-
 .../alignbydevice/IoTDBAlignByDeviceTableIT.java   |   2 +-
 .../IoTDBAlignByDeviceWithTemplateTableIT.java     |   2 +-
 ...oTDBOrderByLimitOffsetAlignByDeviceTableIT.java |   4 +-
 .../IoTDBOrderByWithAlignByDeviceTableIT.java      |   6 +-
 .../IoTDBAlignedOffsetLimitPushDownTableIT.java    |   2 +-
 .../it/query/old/aligned/TableUtils.java           |   2 +-
 .../scalar/IoTDBCastFunctionTableIT.java           |   6 +-
 .../scalar/IoTDBCastFunctionTableSpecialIT.java    |   2 +-
 .../scalar/IoTDBDiffFunctionTableIT.java           |   2 +-
 .../scalar/IoTDBReplaceFunctionTableIT.java        |   2 +-
 .../scalar/IoTDBRoundFunctionTableIT.java          |   2 +-
 .../scalar/IoTDBScalarFunctionTableIT.java         |  62 +-
 .../scalar/IoTDBSubStringFunctionTableIT.java      |   2 +-
 .../it/query/old/orderBy/IoTDBOrderByTableIT.java  |  36 +-
 .../it/query/old/query/IoTDBArithmeticTableIT.java |   4 +-
 .../it/query/old/query/IoTDBFuzzyQueryTableIT.java |   2 +-
 .../it/query/old/query/IoTDBInTableIT.java         |   4 +-
 ...oTDBNoSelectExpressionAfterAnalyzedTableIT.java |   2 +-
 .../query/old/query/IoTDBNullOperandTableIT.java   |   2 +-
 .../it/query/old/query/IoTDBPaginationTableIT.java |   4 +-
 .../it/query/old/query/IoTDBQueryDemoTableIT.java  |   2 +-
 .../IoTDBQueryWithComplexValueFilterTableIT.java   |   2 +-
 .../it/query/old/query/IoTDBResultSetTableIT.java  |   6 +-
 .../query/IoTDBSelectCompareExpressionTableIT.java |   2 +-
 .../it/query/recent/IoTDBFillTableIT.java          |   6 +-
 .../it/query/recent/IoTDBGapFillTableIT.java       |   2 +-
 .../it/query/recent/IoTDBNullIdQueryIT.java        |   4 +-
 .../it/query/recent/IoTDBTableAggregationIT.java   |  32 +-
 .../recent/subquery/SubqueryDataSetUtils.java      |   4 +-
 .../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java |   6 +-
 .../it/rest/it/IoTDBRestServiceFlushQueryIT.java   |  34 +-
 .../relational/it/rest/it/IoTDBRestServiceIT.java  |  54 +-
 .../it/IoTDBRestServiceInsertAlignedValuesIT.java  |  67 +-
 .../relational/it/schema/IoTDBDatabaseIT.java      |  18 +-
 .../iotdb/relational/it/schema/IoTDBDeviceIT.java  |  14 +-
 .../iotdb/relational/it/schema/IoTDBTableIT.java   |  56 +-
 .../it/session/IoTDBSessionRelationalIT.java       | 282 +++---
 .../it/session/IoTDBTableModelSessionIT.java       |   8 +-
 .../pool/IoTDBInsertTableSessionPoolIT.java        |  57 +-
 .../session/pool/IoTDBTableModelSessionPoolIT.java |   4 +-
 .../udf/api/relational/AggregateFunction.java      |   3 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     |  26 +-
 .../src/main/java/org/apache/iotdb/cli/Cli.java    |   4 +-
 iotdb-client/client-cpp/src/main/Session.h         |  37 +-
 iotdb-client/client-cpp/src/main/TableSession.cpp  |   2 +-
 iotdb-client/client-cpp/src/main/TableSession.h    |   2 +-
 .../src/test/cpp/sessionRelationalIT.cpp           |  26 +-
 iotdb-client/client-py/iotdb/utils/NumpyTablet.py  |   2 +-
 iotdb-client/client-py/iotdb/utils/Tablet.py       |   6 +-
 .../client-py/table_model_session_example.py       |   2 +-
 .../client-py/table_model_session_pool_example.py  |   2 +-
 .../tests/integration/test_relational_session.py   |  14 +-
 .../tests/integration/test_tablemodel_insert.py    | 408 ++++-----
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |   4 +
 .../apache/iotdb/rpc/TElasticFramedTransport.java  |  15 +-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +
 .../iotdb/rpc/TElasticFramedTransportTest.java     |  71 ++
 .../session/subscription/SubscriptionSession.java  |   8 +-
 .../iotdb/session/SessionCacheLeaderTest.java      |  12 +-
 .../ainode/iotdb/ainode/model/model_factory.py     |   2 -
 iotdb-core/ainode/poetry.lock                      |   8 +-
 .../iotdb/confignode/manager/load/LoadManager.java |   9 -
 .../manager/load/balancer/RouteBalancer.java       |   7 +-
 .../router/priority/GreedyPriorityBalancer.java    |  42 +-
 .../router/priority/IPriorityBalancer.java         |   5 +-
 .../router/priority/LeaderPriorityBalancer.java    |  21 +-
 .../confignode/manager/load/cache/LoadCache.java   |  16 -
 .../load/cache/node/DataNodeHeartbeatCache.java    |   3 +-
 .../runtime/PipeRuntimeCoordinator.java            |   4 +-
 .../runtime/heartbeat/PipeHeartbeat.java           |   8 +-
 .../persistence/schema/ClusterSchemaInfo.java      |   6 +-
 .../confignode/persistence/schema/ConfigMTree.java |   2 +-
 .../procedure/env/RegionMaintainHandler.java       |  29 +-
 .../impl/region/AddRegionPeerProcedure.java        |  12 +-
 .../impl/region/RegionMigrateProcedure.java        |  34 +-
 .../impl/region/RemoveRegionPeerProcedure.java     |  22 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  10 +-
 .../router/priority/GreedyPriorityTest.java        |  13 +-
 .../priority/LeaderPriorityBalancerTest.java       |  65 +-
 .../persistence/schema/ConfigMTreeTest.java        |   8 +-
 .../schema/table/AddTableColumnProcedureTest.java  |   4 +-
 .../schema/table/CreateTableProcedureTest.java     |   8 +-
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |   8 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |  28 +-
 .../service/IoTConsensusRPCServiceProcessor.java   |   6 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |  23 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  29 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  22 +-
 .../schemaregion/SchemaExecutionVisitor.java       |   5 +-
 .../db/exception/VerifyMetadataException.java      |  12 +-
 ...va => VerifyMetadataTypeMismatchException.java} |   8 +-
 .../{ => load}/LoadEmptyFileException.java         |   2 +-
 .../db/exception/{ => load}/LoadFileException.java |   2 +-
 .../{ => load}/LoadReadOnlyException.java          |   2 +-
 .../LoadRuntimeOutOfMemoryException.java           |   2 +-
 .../{ => load}/PartitionViolationException.java    |   2 +-
 .../load/RegionReplicaSetChangedException.java}    |  22 +-
 .../agent/runtime/SimpleProgressIndexAssigner.java |  36 +-
 .../connector/protocol/opcua/OpcUaNameSpace.java   |   4 +-
 .../deletion/persist/PageCacheDeletionBuffer.java  |   2 +-
 .../tablet/parser/TabletInsertionEventParser.java  |   8 +-
 ...ileInsertionEventTableParserTabletIterator.java |   2 +-
 .../protocol/legacy/loader/DeletionLoader.java     |   2 +-
 .../protocol/legacy/loader/TsFileLoader.java       |   3 +-
 .../pipeconsensus/PipeConsensusReceiver.java       |   2 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |   1 +
 .../transform/converter/ValueConverter.java        |   4 +-
 .../statement/PipeConvertedInsertRowStatement.java |   7 +
 .../PipeConvertedInsertTabletStatement.java        |   7 +
 .../visitor/PipeStatementExceptionVisitor.java     |   2 +-
 .../resource/memory/InsertNodeMemoryEstimator.java |   2 +-
 .../table/v1/handler/RequestValidationHandler.java |  20 +-
 .../v1/handler/StatementConstructionHandler.java   |   4 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  12 +-
 .../execution/fragment/QueryContext.java           |  18 +-
 .../schema/source/TableDeviceFetchSource.java      |   2 +-
 .../schema/source/TableDeviceQuerySource.java      |   2 +-
 ...ator.java => AbstractAggTableScanOperator.java} | 419 +++++----
 .../AbstractDefaultAggTableScanOperator.java       | 109 +++
 .../relational/AbstractTableScanOperator.java      |   4 +-
 ...lator.java => DefaultAggTableScanOperator.java} |  29 +-
 .../relational/LastQueryAggTableScanOperator.java  | 424 +++++++++
 .../TableAggregationTableScanOperator.java         |  78 --
 ...eeAlignedDeviceViewAggregationScanOperator.java |  47 +-
 .../relational/aggregation/AccumulatorFactory.java |  10 +-
 .../aggregation/AggregationOperator.java           |   1 +
 .../relational/aggregation/LastAccumulator.java    |  22 +-
 .../relational/aggregation/LastByAccumulator.java  |  28 +-
 .../aggregation/LastByDescAccumulator.java         |   5 +
 .../aggregation/LastDescAccumulator.java           |   9 +-
 .../relational/aggregation/TableAccumulator.java   |   4 +
 .../relational/aggregation/TableAggregator.java    |  16 +
 .../UserDefinedAggregateFunctionAccumulator.java   |   6 +
 .../aggregation/grouped/GroupedAccumulator.java    |   8 +
 .../aggregation/grouped/GroupedAggregator.java     |   4 +
 .../GroupedUserDefinedAggregateAccumulator.java    |   6 +
 .../grouped/HashAggregationOperator.java           |   1 +
 .../grouped/StreamingAggregationOperator.java      |   1 +
 .../db/queryengine/plan/analyze/AnalyzeUtils.java  |  24 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   4 -
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  78 +-
 .../analyze/load/LoadTsFileTableSchemaCache.java   |   9 +-
 .../load/LoadTsFileToTableModelAnalyzer.java       |   2 +-
 .../load/LoadTsFileToTreeModelAnalyzer.java        |   9 +-
 .../analyze/load/LoadTsFileTreeSchemaCache.java    |   2 +-
 .../load/TreeSchemaAutoCreatorAndVerifier.java     |  36 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  11 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |   8 +-
 .../plan/planner/OperatorTreeGenerator.java        |   4 +-
 .../plan/planner/TableOperatorGenerator.java       | 288 ++++--
 .../plan/planner/plan/node/write/InsertNode.java   |   8 +-
 .../plan/node/write/RelationalDeleteDataNode.java  |  47 +-
 .../planner/plan/parameter/SeriesScanOptions.java  |  12 +
 .../relational/analyzer/StatementAnalyzer.java     |   7 +-
 .../predicate/ConvertPredicateToFilterVisitor.java |   2 +-
 .../schema/CheckSchemaPredicateVisitor.java        |   4 +-
 .../ConvertSchemaPredicateToFilterVisitor.java     |   4 +-
 .../plan/relational/metadata/TableSchema.java      |   2 +-
 .../metadata/fetcher/TableDeviceSchemaFetcher.java |   9 +-
 .../fetcher/TableHeaderSchemaValidator.java        |  24 +-
 .../fetcher/cache/TableDeviceSchemaCache.java      |   2 +-
 .../plan/relational/planner/RelationPlanner.java   |   2 +-
 .../distribute/TableDistributedPlanGenerator.java  |   4 +-
 .../relational/planner/node/TableScanNode.java     |   7 +-
 .../PushLimitOffsetIntoTableScan.java              |   5 +-
 .../optimizations/PushPredicateIntoTableScan.java  |   4 +-
 .../TransformAggregationToStreamable.java          |   4 +-
 .../optimizations/TransformSortToStreamSort.java   |   5 +-
 .../sql/ast/AbstractQueryDeviceWithCache.java      |   2 +-
 .../plan/relational/sql/ast/ColumnDefinition.java  |   2 +-
 .../plan/relational/sql/ast/LoadTsFile.java        |  16 +-
 .../relational/sql/ast/WrappedInsertStatement.java |  10 +-
 .../plan/relational/sql/parser/AstBuilder.java     |  16 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   2 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 134 ++-
 .../plan/statement/crud/InsertBaseStatement.java   |  13 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |  70 +-
 .../db/service/metrics/DataNodeMetricsHelper.java  |  82 +-
 .../db/service/metrics/file/TsFileMetrics.java     |  49 +-
 .../metrics/memory/ThresholdMemoryMetrics.java     | 194 +++++
 .../iotdb/db/storageengine/StorageEngine.java      |  13 +-
 .../db/storageengine/buffer/BloomFilterCache.java  |  28 +-
 .../iotdb/db/storageengine/buffer/ChunkCache.java  |  27 +-
 .../buffer/TimeSeriesMetadataCache.java            |  56 +-
 .../db/storageengine/dataregion/DataRegion.java    |  42 +-
 .../performer/impl/FastCompactionPerformer.java    |  15 +
 .../execute/task/CrossSpaceCompactionTask.java     |   2 +
 .../execute/task/InnerSpaceCompactionTask.java     |  32 +-
 .../task/InsertionCrossSpaceCompactionTask.java    |   2 +
 .../task/RepairUnsortedFileCompactionTask.java     |   2 +
 .../execute/utils/CompactionPathUtils.java         |  15 +-
 .../execute/utils/CompactionTableSchema.java       |   4 +-
 .../compaction/execute/utils/CompactionUtils.java  |  46 +
 .../executor/fast/SeriesCompactionExecutor.java    |  10 +-
 .../compaction/io/CompactionTsFileReader.java      |  14 +-
 .../compaction/io/CompactionTsFileWriter.java      |   2 +-
 .../compaction/schedule/CompactionTaskManager.java |   4 +-
 .../compaction/schedule/CompactionWorker.java      |   7 +-
 .../dataregion/memtable/AbstractMemTable.java      |   4 +-
 .../dataregion/memtable/TsFileProcessor.java       |  27 +-
 .../dataregion/modification/ModificationFile.java  |  31 +-
 .../reader/common/CachedPriorityMergeReader.java   |  97 ---
 .../reader/common/DescPriorityMergeReader.java     |  16 +-
 .../read/reader/common/PriorityMergeReader.java    |  22 +-
 .../dataregion/tsfile/TsFileResource.java          |  38 +-
 .../tsfile/timeindex/ArrayDeviceTimeIndex.java     |   2 +-
 .../dataregion/tsfile/timeindex/FileTimeIndex.java |   2 +-
 .../dataregion/tsfile/timeindex/ITimeIndex.java    |   2 +-
 .../db/storageengine/load/LoadTsFileManager.java   |   2 +-
 .../load/active/ActiveLoadTsFileLoader.java        |   1 +
 .../load/config/LoadTsFileConfigurator.java        |  29 +-
 .../LoadConvertedInsertTabletStatement.java        |  52 ++
 ...ertedInsertTabletStatementExceptionVisitor.java |  51 ++
 ...vertedInsertTabletStatementTSStatusVisitor.java |  65 ++
 ...leStatementDataTypeConvertExecutionVisitor.java | 143 +++
 ...eeStatementDataTypeConvertExecutionVisitor.java | 130 +++
 .../converter/LoadTsFileDataTypeConverter.java     | 107 +++
 .../memory/LoadTsFileDataCacheMemoryBlock.java     |   2 +-
 .../load/memory/LoadTsFileMemoryManager.java       |   2 +-
 .../load/splitter/TsFileSplitter.java              |  35 +-
 .../db/tools/schema/SRStatementGenerator.java      |   4 +-
 .../org/apache/iotdb/db/utils/CommonUtils.java     |   2 +-
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |   2 +-
 .../db/utils/datastructure/AlignedTVList.java      |   3 +-
 .../iotdb/db/metadata/path/PatternTreeMapTest.java |  41 +-
 .../SchemaRegionSimpleRecoverTest.java             |   4 +-
 .../db/pipe/consensus/DeletionRecoverTest.java     |   3 +-
 .../db/pipe/consensus/DeletionResourceTest.java    |   9 +-
 .../plan/parser/StatementGeneratorTest.java        |  11 +-
 .../node/write/InsertRowsNodeSerdeTest.java        |  22 +-
 .../node/write/InsertTabletNodeSerdeTest.java      |  16 +-
 .../planner/node/write/WritePlanNodeSplitTest.java |   2 +-
 .../node/write/RelationalDeleteDataNodeTest.java   |   3 +-
 .../plan/relational/analyzer/TSBSMetadata.java     |  38 +-
 .../plan/relational/analyzer/TestMatadata.java     |  18 +-
 .../fetcher/cache/TableDeviceSchemaCacheTest.java  |  32 +-
 .../plan/relational/sql/ast/InsertTabletTest.java  |   4 +-
 .../plan/statement/InsertStatementTest.java        |  45 +-
 .../plan/statement/StatementTestUtils.java         |  14 +-
 .../iotdb/db/storageengine/StorageEngineTest.java  |   2 +
 .../storageengine/buffer/BloomFilterCacheTest.java |  29 +-
 .../BatchedCompactionWithTsFileSplitterTest.java   |  15 +-
 .../compaction/CompactionWorkerTest.java           |  33 +
 .../inner/InnerSequenceCompactionSpeedTest.java    |  43 +
 .../repair/RepairUnsortedFileCompactionTest.java   |  53 ++
 .../CompactionTableModelTestFileWriter.java        |   2 +-
 .../CompactionTableSchemaCollectorTest.java        |  44 +-
 .../TableModelCompactionWithTTLTest.java           |   9 +-
 .../compaction/utils/CompactionCheckerUtils.java   |  30 +-
 .../modification/ModificationFileTest.java         | 242 +++++-
 ...SeriesReader.java => AscFakedSeriesReader.java} |  28 +-
 .../read/reader/common/DescFakedSeriesReader.java  |  54 ++
 .../reader/common/PriorityMergeReaderTest.java     | 113 ++-
 .../reader/common/PriorityMergeReaderTest2.java    |  59 --
 .../dataregion/tsfile/TsFileResourceTest.java      | 144 +++
 .../dataregion/wal/io/WALFileTest.java             |   2 +-
 .../file/UnsealedTsFileRecoverPerformerTest.java   |   4 +-
 .../conf/iotdb-system.properties.template          |  11 +-
 .../apache/iotdb/commons/client/ThriftClient.java  |  17 +-
 .../commons/exception/IllegalPathException.java    |   4 +-
 .../apache/iotdb/commons/path/PathPatternNode.java | 105 ++-
 .../apache/iotdb/commons/path/PathPatternUtil.java |   9 +-
 .../commons/schema/table/InformationSchema.java    |  16 +-
 .../apache/iotdb/commons/schema/table/TsTable.java |  10 +-
 ...entColumnSchema.java => FieldColumnSchema.java} |  18 +-
 .../{IdColumnSchema.java => TagColumnSchema.java}  |  16 +-
 .../schema/table/column/TsTableColumnCategory.java |  24 +-
 .../table/column/TsTableColumnSchemaUtil.java      |  16 +-
 .../service/AbstractThriftServiceThread.java       |   7 +-
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 .../iotdb/commons/service/metric/enums/Tag.java    |   4 +-
 .../db/relational/grammar/sql/RelationalSql.g4     |  16 +-
 .../src/main/openapi3/iotdb_rest_table_v1.yaml     |   2 +-
 .../iotdb/library/match/PatternExecutor.java       |   2 +-
 .../iotdb/library/match/UDAFPatternMatch.java      |  62 +-
 .../org/apache/iotdb/library/UDAFPatternTest.java  |  70 ++
 pom.xml                                            |   4 +-
 329 files changed, 7790 insertions(+), 3426 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index bc264cf83d1,04c54394f03..ac4b611b2fd
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@@ -21,7 -21,7 +21,6 @@@ package org.apache.iotdb.db.queryengine
  
  import org.apache.iotdb.commons.path.AlignedFullPath;
  import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
--import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
  import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
  import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
@@@ -53,7 -52,7 +52,6 @@@ import org.apache.tsfile.read.common.bl
  import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
  import org.apache.tsfile.utils.Binary;
  import org.apache.tsfile.utils.Pair;
--import org.apache.tsfile.utils.RamUsageEstimator;
  import org.apache.tsfile.write.schema.IMeasurementSchema;
  
  import java.io.IOException;
@@@ -62,22 -61,21 +60,17 @@@ import java.util.Collections
  import java.util.List;
  import java.util.Optional;
  import java.util.Set;
--import java.util.concurrent.TimeUnit;
  import java.util.stream.Collectors;
  
- import static java.lang.String.format;
  import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
- import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
- import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
- import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.constructAlignedPath;
+ import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+ import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+ import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
  import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER;
  import static 
org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange;
  
- public abstract class AbstractAggregationTableScanOperator extends 
AbstractDataSourceOperator {
- 
-   private static final long INSTANCE_SIZE =
-       
RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class);
+ public abstract class AbstractAggTableScanOperator extends 
AbstractDataSourceOperator {
  
 -  private static final long INSTANCE_SIZE =
 -      
RamUsageEstimator.shallowSizeOfInstance(AbstractAggTableScanOperator.class);
 -
    private boolean finished = false;
    private TsBlock inputTsBlock;
  
@@@ -114,51 -116,57 +111,43 @@@
  
    private boolean allAggregatorsHasFinalResult = false;
  
-   public AbstractAggregationTableScanOperator(
 -  public AbstractAggTableScanOperator(
--      PlanNodeId sourceId,
--      OperatorContext context,
--      List<ColumnSchema> aggColumnSchemas,
--      int[] aggColumnsIndexArray,
--      List<DeviceEntry> deviceEntries,
 -      int deviceCount,
--      SeriesScanOptions seriesScanOptions,
--      List<String> measurementColumnNames,
--      Set<String> allSensors,
--      List<IMeasurementSchema> measurementSchemas,
--      List<TableAggregator> tableAggregators,
--      List<ColumnSchema> groupingKeySchemas,
--      int[] groupingKeyIndex,
--      ITableTimeRangeIterator tableTimeRangeIterator,
--      boolean ascending,
--      boolean canUseStatistics,
--      List<Integer> aggregatorInputChannels) {
--
--    this.sourceId = sourceId;
--    this.operatorContext = context;
--    this.canUseStatistics = canUseStatistics;
--    this.tableAggregators = tableAggregators;
--    this.groupingKeySchemas = groupingKeySchemas;
--    this.groupingKeyIndex = groupingKeyIndex;
 -    this.groupingKeySize = groupingKeySchemas == null ? 0 : 
groupingKeySchemas.size();
--    this.aggColumnSchemas = aggColumnSchemas;
--    this.aggColumnsIndexArray = aggColumnsIndexArray;
--    this.deviceEntries = deviceEntries;
-     this.deviceCount = deviceEntries.size();
 -    this.deviceCount = deviceCount;
++  public AbstractAggTableScanOperator(AbstractAggTableScanOperatorParameter 
parameter) {
++
++    this.sourceId = parameter.sourceId;
++    this.operatorContext = parameter.context;
++    this.canUseStatistics = parameter.canUseStatistics;
++    this.tableAggregators = parameter.tableAggregators;
++    this.groupingKeySchemas = parameter.groupingKeySchemas;
++    this.groupingKeyIndex = parameter.groupingKeyIndex;
++    this.groupingKeySize =
++        parameter.groupingKeySchemas == null ? 0 : 
parameter.groupingKeySchemas.size();
++    this.aggColumnSchemas = parameter.aggColumnSchemas;
++    this.aggColumnsIndexArray = parameter.aggColumnsIndexArray;
++    this.deviceEntries = parameter.deviceEntries;
++    this.deviceCount = parameter.deviceCount;
      this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, 
Integer.toString(this.deviceCount));
--    this.ascending = ascending;
--    this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
--    this.seriesScanOptions = seriesScanOptions;
--    this.measurementColumnNames = measurementColumnNames;
++    this.ascending = parameter.ascending;
++    this.scanOrder = parameter.ascending ? Ordering.ASC : Ordering.DESC;
++    this.seriesScanOptions = parameter.seriesScanOptions;
++    this.measurementColumnNames = parameter.measurementColumnNames;
      this.measurementCount = measurementColumnNames.size();
      this.cachedRawDataSize =
          (1L + this.measurementCount)
              * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
--    this.allSensors = allSensors;
--    this.measurementSchemas = measurementSchemas;
++    this.allSensors = parameter.allSensors;
++    this.measurementSchemas = parameter.measurementSchemas;
      this.measurementColumnTSDataTypes =
--        
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
++        parameter.measurementSchemas.stream()
++            .map(IMeasurementSchema::getType)
++            .collect(Collectors.toList());
      this.currentDeviceIndex = 0;
      this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
--    this.aggregatorInputChannels = aggregatorInputChannels;
--    this.timeIterator = tableTimeRangeIterator;
++    this.aggregatorInputChannels = parameter.aggregatorInputChannels;
++    this.timeIterator = parameter.tableTimeRangeIterator;
+     this.dateBinSize =
+         timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
+             ? 1
+             : 0;
  
      constructAlignedSeriesScanUtil();
    }
@@@ -171,89 -179,64 +160,14 @@@
      return finished;
    }
  
-   @Override
-   public long calculateMaxPeekMemory() {
-     return cachedRawDataSize + maxReturnSize;
-   }
- 
-   @Override
-   public long calculateMaxReturnSize() {
-     return maxReturnSize;
-   }
- 
-   @Override
-   public long calculateRetainedSizeAfterCallingNext() {
-     return timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
-         ? cachedRawDataSize
-         : 0;
-   }
- 
--  @Override
--  public boolean hasNext() throws Exception {
-     return timeIterator.hasCachedTimeRange()
-         || timeIterator.hasNextTimeRange()
-         || !resultTsBlockBuilder.isEmpty();
 -    if (retainedTsBlock != null) {
 -      return true;
 -    }
 -
 -    return timeIterator.hasCachedTimeRange() || 
timeIterator.hasNextTimeRange();
--  }
--
--  @Override
--  public TsBlock next() throws Exception {
 -    if (retainedTsBlock != null) {
 -      return getResultFromRetainedTsBlock();
 -    }
--
--    // optimize for sql: select count(*) from (select count(s1), sum(s1) from 
table)
--    if (tableAggregators.isEmpty()
--        && timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
--        && resultTsBlockBuilder.getValueColumnBuilders().length == 0) {
--      resultTsBlockBuilder.reset();
--      currentDeviceIndex = deviceCount;
--      timeIterator.setFinished();
--      Column[] valueColumns = new Column[0];
--      return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
1), valueColumns);
--    }
--
--    // start stopwatch, reset leftRuntimeOfOneNextCall
--    long start = System.nanoTime();
--    leftRuntimeOfOneNextCall = 1000 * 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
--    long maxRuntime = leftRuntimeOfOneNextCall;
--
--    while (System.nanoTime() - start < maxRuntime
--        && (timeIterator.hasCachedTimeRange() || 
timeIterator.hasNextTimeRange())
--        && !resultTsBlockBuilder.isFull()) {
--
--      // calculate aggregation result on current time window
--      // return true if current time window is calc finished
--      if (calculateAggregationResultForCurrentTimeRange()) {
--        timeIterator.resetCurTimeRange();
--      }
--    }
--
-     if (resultTsBlockBuilder.getPositionCount() > 0) {
-       return buildResultTsBlock();
-     } else {
 -    if (resultTsBlockBuilder.isEmpty()) {
--      return null;
--    }
 -
 -    buildResultTsBlock();
 -    return checkTsBlockSizeAndGetResult();
--  }
 -
+   protected abstract void updateResultTsBlock();
  
-   private TsBlock buildResultTsBlock() {
-     int declaredPositions = resultTsBlockBuilder.getPositionCount();
-     ColumnBuilder[] valueColumnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
-     Column[] valueColumns = new Column[valueColumnBuilders.length];
-     for (int i = 0; i < valueColumns.length; i++) {
-       valueColumns[i] = valueColumnBuilders[i].build();
-       if (valueColumns[i].getPositionCount() != declaredPositions) {
-         throw new IllegalStateException(
-             format(
-                 "Declared positions (%s) does not match column %s's number of 
entries (%s)",
-                 declaredPositions, i, valueColumns[i].getPositionCount()));
-       }
-     }
- 
-     TsBlock resultTsBlock =
-         new TsBlock(
-             resultTsBlockBuilder.getPositionCount(),
+   protected void buildResultTsBlock() {
+     resultTsBlock =
+         resultTsBlockBuilder.build(
              new RunLengthEncodedColumn(
-                 TIME_COLUMN_TEMPLATE, 
resultTsBlockBuilder.getPositionCount()),
-             valueColumns);
+                 TIME_COLUMN_TEMPLATE, 
resultTsBlockBuilder.getPositionCount()));
      resultTsBlockBuilder.reset();
-     return resultTsBlock;
    }
  
    protected void constructAlignedSeriesScanUtil() {
@@@ -444,10 -421,13 +352,10 @@@
      switch (columnSchemaCategory) {
        case TIME:
          return inputRegion.getTimeColumn();
-       case ID:
+       case TAG:
 -        // TODO avoid create deviceStatics multi times; count, sum can use 
time statistics
          String id =
 -            (String)
 -                deviceEntries
 -                    .get(currentDeviceIndex)
 -                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
 +            getNthIdColumnValue(
 +                deviceEntries.get(currentDeviceIndex), 
aggColumnsIndexArray[columnIdx]);
          return getIdOrAttrColumn(
              inputRegion.getTimeColumn().getPositionCount(),
              id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
@@@ -512,10 -492,13 +420,10 @@@
      switch (columnSchemaCategory) {
        case TIME:
          return timeStatistics;
-       case ID:
+       case TAG:
 -        // TODO avoid create deviceStatics multi times; count, sum can use 
time statistics
          String id =
 -            (String)
 -                deviceEntries
 -                    .get(currentDeviceIndex)
 -                    .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
 +            getNthIdColumnValue(
 +                deviceEntries.get(currentDeviceIndex), 
aggColumnsIndexArray[columnIdx]);
          return getStatistics(
              timeStatistics, id == null ? null : new Binary(id, 
TSFileConfig.STRING_CHARSET));
        case ATTRIBUTE:
@@@ -721,48 -702,43 +627,43 @@@
        return;
      }
  
-     ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
- 
-     int groupKeySize = groupingKeySchemas == null ? 0 : 
groupingKeySchemas.size();
-     int dateBinSize =
-         timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
-             ? 1
-             : 0;
- 
-     if (groupingKeyIndex != null) {
-       for (int i = 0; i < groupKeySize; i++) {
-         if (TsTableColumnCategory.ID == 
groupingKeySchemas.get(i).getColumnCategory()) {
-           String id =
-               getNthIdColumnValue(deviceEntries.get(currentDeviceIndex), 
groupingKeyIndex[i]);
-           if (id == null) {
-             columnBuilders[i].appendNull();
-           } else {
-             columnBuilders[i].writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
-           }
-         } else {
-           Binary attribute =
-               deviceEntries
-                   .get(currentDeviceIndex)
-                   .getAttributeColumnValues()
-                   .get(groupingKeyIndex[i]);
-           if (attribute == null) {
-             columnBuilders[i].appendNull();
-           } else {
-             columnBuilders[i].writeBinary(attribute);
-           }
-         }
-       }
-     }
+     appendGroupKeysToResult(deviceEntries, currentDeviceIndex);
  
      if (dateBinSize > 0) {
-       
columnBuilders[groupKeySize].writeLong(timeIterator.getCurTimeRange().getMin());
+       
resultTsBlockBuilder.getValueColumnBuilders()[groupingKeySize].writeLong(
+           timeIterator.getCurTimeRange().getMin());
      }
  
-     for (int i = 0; i < aggregators.size(); i++) {
-       aggregators.get(i).evaluate(columnBuilders[groupKeySize + dateBinSize + 
i]);
+     for (int i = 0; i < tableAggregators.size(); i++) {
+       tableAggregators
+           .get(i)
+           .evaluate(
+               resultTsBlockBuilder.getValueColumnBuilders()[groupingKeySize + 
dateBinSize + i]);
      }
  
-     tsBlockBuilder.declarePosition();
+     resultTsBlockBuilder.declarePosition();
+   }
+ 
+   protected void appendGroupKeysToResult(List<DeviceEntry> deviceEntries, int 
deviceIndex) {
+     ColumnBuilder[] columnBuilders = 
resultTsBlockBuilder.getValueColumnBuilders();
+     for (int i = 0; i < groupingKeySize; i++) {
+       if (TsTableColumnCategory.TAG == 
groupingKeySchemas.get(i).getColumnCategory()) {
 -        String id = (String) 
deviceEntries.get(deviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
++        String id = getNthIdColumnValue(deviceEntries.get(deviceIndex), 
groupingKeyIndex[i]);
+         if (id == null) {
+           columnBuilders[i].appendNull();
+         } else {
+           columnBuilders[i].writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+         }
+       } else {
+         Binary attribute =
+             
deviceEntries.get(deviceIndex).getAttributeColumnValues().get(groupingKeyIndex[i]);
+         if (attribute == null) {
+           columnBuilders[i].appendNull();
+         } else {
+           columnBuilders[i].writeBinary(attribute);
+         }
+       }
+     }
    }
  
    public boolean isAllAggregatorsHasFinalResult(List<TableAggregator> 
aggregators) {
@@@ -852,14 -823,35 +748,117 @@@
    }
  
    @Override
-   public long ramBytesUsed() {
-     return INSTANCE_SIZE
-         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
-         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
-         + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
-         + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes())
-         + RamUsageEstimator.sizeOfCollection(deviceEntries);
+   public long calculateMaxPeekMemory() {
+     return cachedRawDataSize + maxReturnSize;
+   }
+ 
+   @Override
+   public long calculateMaxReturnSize() {
+     return maxReturnSize;
+   }
+ 
+   @Override
+   public long calculateRetainedSizeAfterCallingNext() {
+     return timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
+         ? cachedRawDataSize
+         : 0;
+   }
+ 
 -  @Override
 -  public long ramBytesUsed() {
 -    return INSTANCE_SIZE
 -        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
 -        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
 -        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
 -        + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes())
 -        + RamUsageEstimator.sizeOfCollection(deviceEntries);
 -  }
 -
+   @Override
+   public void close() throws Exception {
+     super.close();
+     tableAggregators.forEach(TableAggregator::close);
    }
 +
 +  abstract String getNthIdColumnValue(DeviceEntry deviceEntry, int 
idColumnIndex);
++
++  public static class AbstractAggTableScanOperatorParameter {
++    private final String timeColumnName;
++    protected final PlanNodeId sourceId;
++    protected final OperatorContext context;
++    protected final List<ColumnSchema> aggColumnSchemas;
++    protected final int[] aggColumnsIndexArray;
++    protected final SeriesScanOptions seriesScanOptions;
++    protected final List<String> measurementColumnNames;
++    protected final Set<String> allSensors;
++    protected final List<IMeasurementSchema> measurementSchemas;
++    protected final List<TableAggregator> tableAggregators;
++    protected final List<ColumnSchema> groupingKeySchemas;
++    protected final int[] groupingKeyIndex;
++    protected final ITableTimeRangeIterator tableTimeRangeIterator;
++    protected final boolean ascending;
++    protected final boolean canUseStatistics;
++    protected final List<Integer> aggregatorInputChannels;
++
++    protected List<DeviceEntry> deviceEntries;
++    protected int deviceCount;
++
++    public AbstractAggTableScanOperatorParameter(
++        PlanNodeId sourceId,
++        OperatorContext context,
++        List<ColumnSchema> aggColumnSchemas,
++        int[] aggColumnsIndexArray,
++        List<DeviceEntry> deviceEntries,
++        int deviceCount,
++        SeriesScanOptions seriesScanOptions,
++        List<String> measurementColumnNames,
++        Set<String> allSensors,
++        List<IMeasurementSchema> measurementSchemas,
++        List<TableAggregator> tableAggregators,
++        List<ColumnSchema> groupingKeySchemas,
++        int[] groupingKeyIndex,
++        ITableTimeRangeIterator tableTimeRangeIterator,
++        boolean ascending,
++        boolean canUseStatistics,
++        List<Integer> aggregatorInputChannels,
++        String timeColumnName) {
++      this.sourceId = sourceId;
++      this.context = context;
++      this.aggColumnSchemas = aggColumnSchemas;
++      this.aggColumnsIndexArray = aggColumnsIndexArray;
++      this.deviceEntries = deviceEntries;
++      this.deviceCount = deviceCount;
++      this.seriesScanOptions = seriesScanOptions;
++      this.measurementColumnNames = measurementColumnNames;
++      this.allSensors = allSensors;
++      this.measurementSchemas = measurementSchemas;
++      this.tableAggregators = tableAggregators;
++      this.groupingKeySchemas = groupingKeySchemas;
++      this.groupingKeyIndex = groupingKeyIndex;
++      this.tableTimeRangeIterator = tableTimeRangeIterator;
++      this.ascending = ascending;
++      this.canUseStatistics = canUseStatistics;
++      this.aggregatorInputChannels = aggregatorInputChannels;
++      this.timeColumnName = timeColumnName;
++    }
++
++    public List<TableAggregator> getTableAggregators() {
++      return tableAggregators;
++    }
++
++    public SeriesScanOptions getSeriesScanOptions() {
++      return seriesScanOptions;
++    }
++
++    public Set<String> getAllSensors() {
++      return allSensors;
++    }
++
++    public List<String> getMeasurementColumnNames() {
++      return measurementColumnNames;
++    }
++
++    public List<IMeasurementSchema> getMeasurementSchemas() {
++      return measurementSchemas;
++    }
++
++    public String getTimeColumnName() {
++      return timeColumnName;
++    }
++
++    public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
++      this.deviceEntries = deviceEntries;
++      this.deviceCount = deviceEntries.size();
++    }
++  }
  }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java
index 00000000000,00000000000..f88aa22aabc
new file mode 100644
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java
@@@ -1,0 -1,0 +1,109 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
++
++import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
++import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
++
++import org.apache.tsfile.block.column.Column;
++import org.apache.tsfile.read.common.block.TsBlock;
++import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
++import org.apache.tsfile.utils.RamUsageEstimator;
++
++import java.util.concurrent.TimeUnit;
++
++import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
++
++public abstract class AbstractDefaultAggTableScanOperator extends 
AbstractAggTableScanOperator {
++
++  private static final long INSTANCE_SIZE =
++      
RamUsageEstimator.shallowSizeOfInstance(AbstractDefaultAggTableScanOperator.class);
++
++  public 
AbstractDefaultAggTableScanOperator(AbstractAggTableScanOperatorParameter 
parameter) {
++    super(parameter);
++  }
++
++  @Override
++  public boolean hasNext() throws Exception {
++    if (retainedTsBlock != null) {
++      return true;
++    }
++
++    return timeIterator.hasCachedTimeRange() || 
timeIterator.hasNextTimeRange();
++  }
++
++  @Override
++  public TsBlock next() throws Exception {
++    if (retainedTsBlock != null) {
++      return getResultFromRetainedTsBlock();
++    }
++
++    // optimize for sql: select count(*) from (select count(s1), sum(s1) from 
table)
++    if (tableAggregators.isEmpty()
++        && timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
++        && resultTsBlockBuilder.getValueColumnBuilders().length == 0) {
++      resultTsBlockBuilder.reset();
++      currentDeviceIndex = deviceCount;
++      timeIterator.setFinished();
++      Column[] valueColumns = new Column[0];
++      return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
1), valueColumns);
++    }
++
++    // start stopwatch, reset leftRuntimeOfOneNextCall
++    long start = System.nanoTime();
++    leftRuntimeOfOneNextCall = 1000 * 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
++    long maxRuntime = leftRuntimeOfOneNextCall;
++
++    while (System.nanoTime() - start < maxRuntime
++        && (timeIterator.hasCachedTimeRange() || 
timeIterator.hasNextTimeRange())
++        && !resultTsBlockBuilder.isFull()) {
++
++      // calculate aggregation result on current time window
++      // return true if current time window is calc finished
++      if (calculateAggregationResultForCurrentTimeRange()) {
++        timeIterator.resetCurTimeRange();
++      }
++    }
++
++    if (resultTsBlockBuilder.isEmpty()) {
++      return null;
++    }
++
++    buildResultTsBlock();
++    return checkTsBlockSizeAndGetResult();
++  }
++
++  @Override
++  protected void updateResultTsBlock() {
++    appendAggregationResult();
++    // after appendAggregationResult invoked, aggregators must be cleared
++    resetTableAggregators();
++  }
++
++  @Override
++  public long ramBytesUsed() {
++    return INSTANCE_SIZE
++        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
++        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
++        + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
++        + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes())
++        + RamUsageEstimator.sizeOfCollection(deviceEntries);
++  }
++}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
index ca8c9725adc,6abf101ab60..d8f7a5f3359
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
@@@ -205,9 -213,10 +205,9 @@@ public abstract class AbstractTableScan
      Column[] valueColumns = new Column[columnsIndexArray.length];
      for (int i = 0; i < columnsIndexArray.length; i++) {
        switch (columnSchemas.get(i).getColumnCategory()) {
-         case ID:
+         case TAG:
 -          // +1 for skip the table name segment
 -          String idColumnValue =
 -              ((String) currentDeviceEntry.getNthSegment(columnsIndexArray[i] 
+ 1));
 +          String idColumnValue = getNthIdColumnValue(currentDeviceEntry, 
columnsIndexArray[i]);
 +
            valueColumns[i] =
                getIdOrAttributeValueColumn(
                    idColumnValue == null
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DefaultAggTableScanOperator.java
index 7fe6b41e838,a49446500d5..3ebd7bed0b3
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DefaultAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DefaultAggTableScanOperator.java
@@@ -17,26 -17,65 +17,19 @@@
   * under the License.
   */
  
- package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+ package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
  
- import org.apache.tsfile.block.column.Column;
- import org.apache.tsfile.block.column.ColumnBuilder;
 -import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
 -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 -import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 -import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 -import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+ import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
  
- public interface GroupedAccumulator {
 -import org.apache.tsfile.write.schema.IMeasurementSchema;
++public class DefaultAggTableScanOperator extends 
AbstractDefaultAggTableScanOperator {
  
-   long getEstimatedSize();
 -import java.util.List;
 -import java.util.Set;
 -
 -public class DefaultAggTableScanOperator extends AbstractAggTableScanOperator 
{
 -
 -  public DefaultAggTableScanOperator(
 -      PlanNodeId sourceId,
 -      OperatorContext context,
 -      List<ColumnSchema> aggColumnSchemas,
 -      int[] aggColumnsIndexArray,
 -      List<DeviceEntry> deviceEntries,
 -      int deviceCount,
 -      SeriesScanOptions seriesScanOptions,
 -      List<String> measurementColumnNames,
 -      Set<String> allSensors,
 -      List<IMeasurementSchema> measurementSchemas,
 -      List<TableAggregator> tableAggregators,
 -      List<ColumnSchema> groupingKeySchemas,
 -      int[] groupingKeyIndex,
 -      ITableTimeRangeIterator tableTimeRangeIterator,
 -      boolean ascending,
 -      boolean canUseStatistics,
 -      List<Integer> aggregatorInputChannels) {
 -    super(
 -        sourceId,
 -        context,
 -        aggColumnSchemas,
 -        aggColumnsIndexArray,
 -        deviceEntries,
 -        deviceCount,
 -        seriesScanOptions,
 -        measurementColumnNames,
 -        allSensors,
 -        measurementSchemas,
 -        tableAggregators,
 -        groupingKeySchemas,
 -        groupingKeyIndex,
 -        tableTimeRangeIterator,
 -        ascending,
 -        canUseStatistics,
 -        aggregatorInputChannels);
++  public DefaultAggTableScanOperator(AbstractAggTableScanOperatorParameter 
parameter) {
++    super(parameter);
+   }
  
-   void setGroupCount(long groupCount);
- 
-   void addInput(int[] groupIds, Column[] arguments);
- 
-   void addIntermediate(int[] groupIds, Column argument);
- 
-   void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder);
- 
-   void evaluateFinal(int groupId, ColumnBuilder columnBuilder);
- 
-   void prepareFinal();
- 
-   void reset();
+   @Override
 -  protected void updateResultTsBlock() {
 -    appendAggregationResult();
 -    // after appendAggregationResult invoked, aggregators must be cleared
 -    resetTableAggregators();
++  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
++    // +1 for skipping the table name segment
++    return ((String) deviceEntry.getNthSegment(idColumnIndex + 1));
+   }
  }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
index 00000000000,a1ae289f95c..b74fb4bdf54
mode 000000,100644..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
@@@ -1,0 -1,458 +1,424 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+ 
+ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+ import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 -import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
 -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastAccumulator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 -import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
+ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+ import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
+ 
+ import org.apache.tsfile.block.column.ColumnBuilder;
+ import org.apache.tsfile.common.conf.TSFileConfig;
+ import org.apache.tsfile.read.TimeValuePair;
+ import org.apache.tsfile.read.common.block.TsBlock;
+ import org.apache.tsfile.utils.Binary;
+ import org.apache.tsfile.utils.Pair;
+ import org.apache.tsfile.utils.RamUsageEstimator;
+ import org.apache.tsfile.utils.TsPrimitiveType;
+ import org.apache.tsfile.write.UnSupportedDataTypeException;
 -import org.apache.tsfile.write.schema.IMeasurementSchema;
+ 
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.OptionalLong;
 -import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
+ import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.Utils.serializeTimeValue;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
+ 
+ /**
+  * This class is used to execute aggregation table scan when apply {@code 
canUseLastCacheOptimize()}
+  */
+ public class LastQueryAggTableScanOperator extends 
AbstractAggTableScanOperator {
+ 
+   private static final long INSTANCE_SIZE =
+       
RamUsageEstimator.shallowSizeOfInstance(LastQueryAggTableScanOperator.class);
+ 
+   private static final TableDeviceSchemaCache TABLE_DEVICE_SCHEMA_CACHE =
+       TableDeviceSchemaCache.getInstance();
+ 
 -  private boolean finished = false;
+   private final String dbName;
+   private int outputDeviceIndex;
+   private DeviceEntry currentDeviceEntry;
+   private final List<DeviceEntry> cachedDeviceEntries;
+   private final int allDeviceCount;
+ 
+   private final boolean needUpdateCache;
+   private final boolean needUpdateNullEntry;
+   private final List<Integer> hitCachesIndexes;
+   private final List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults;
+   private int currentHitCacheIndex = 0;
+ 
+   // indicates the index of last(time) aggregation
+   private int lastTimeAggregationIdx = -1;
+ 
+   public LastQueryAggTableScanOperator(
 -      PlanNodeId sourceId,
 -      OperatorContext context,
 -      List<ColumnSchema> aggColumnSchemas,
 -      int[] aggColumnsIndexArray,
 -      List<DeviceEntry> unCachedDeviceEntries,
++      AbstractAggTableScanOperatorParameter parameter,
+       List<DeviceEntry> cachedDeviceEntries,
 -      SeriesScanOptions seriesScanOptions,
 -      List<String> measurementColumnNames,
 -      Set<String> allSensors,
 -      List<IMeasurementSchema> measurementSchemas,
 -      List<TableAggregator> tableAggregators,
 -      List<ColumnSchema> groupingKeySchemas,
 -      int[] groupingKeyIndex,
 -      ITableTimeRangeIterator tableTimeRangeIterator,
 -      boolean ascending,
 -      boolean canUseStatistics,
 -      List<Integer> aggregatorInputChannels,
+       QualifiedObjectName qualifiedObjectName,
+       List<Integer> hitCachesIndexes,
+       List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults) {
+ 
 -    super(
 -        sourceId,
 -        context,
 -        aggColumnSchemas,
 -        aggColumnsIndexArray,
 -        unCachedDeviceEntries,
 -        unCachedDeviceEntries.size(),
 -        seriesScanOptions,
 -        measurementColumnNames,
 -        allSensors,
 -        measurementSchemas,
 -        tableAggregators,
 -        groupingKeySchemas,
 -        groupingKeyIndex,
 -        tableTimeRangeIterator,
 -        ascending,
 -        canUseStatistics,
 -        aggregatorInputChannels);
++    super(parameter);
+ 
+     // notice that: deviceEntries store all unCachedDeviceEntries
 -    this.allDeviceCount = cachedDeviceEntries.size() + 
unCachedDeviceEntries.size();
++    this.allDeviceCount = cachedDeviceEntries.size() + parameter.deviceCount;
+     this.cachedDeviceEntries = cachedDeviceEntries;
 -    this.needUpdateCache = 
LastQueryUtil.needUpdateCache(seriesScanOptions.getGlobalTimeFilter());
++    this.needUpdateCache =
++        
LastQueryUtil.needUpdateCache(parameter.seriesScanOptions.getGlobalTimeFilter());
+     this.needUpdateNullEntry =
 -        
LastQueryUtil.needUpdateNullEntry(seriesScanOptions.getGlobalTimeFilter());
++        
LastQueryUtil.needUpdateNullEntry(parameter.seriesScanOptions.getGlobalTimeFilter());
+     this.hitCachesIndexes = hitCachesIndexes;
+     this.hitCachedResults = hitCachedResults;
+     this.dbName = qualifiedObjectName.getDatabaseName();
+ 
 -    for (int i = 0; i < tableAggregators.size(); i++) {
 -      if (tableAggregators.get(i).getAccumulator() instanceof 
LastAccumulator) {
++    for (int i = 0; i < parameter.tableAggregators.size(); i++) {
++      if (parameter.tableAggregators.get(i).getAccumulator() instanceof 
LastAccumulator) {
+         lastTimeAggregationIdx = i;
+       }
+     }
+   }
+ 
+   @Override
+   public boolean hasNext() throws Exception {
+     if (retainedTsBlock != null) {
+       return true;
+     }
+ 
+     return outputDeviceIndex < allDeviceCount;
+   }
+ 
+   @Override
+   public TsBlock next() throws Exception {
+     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+     long start = System.nanoTime();
+ 
+     if (retainedTsBlock != null) {
+       return getResultFromRetainedTsBlock();
+     }
+ 
+     while (System.nanoTime() - start < maxRuntime
+         && !resultTsBlockBuilder.isFull()
+         && outputDeviceIndex < allDeviceCount) {
+       processCurrentDevice();
+     }
+ 
+     if (resultTsBlockBuilder.isEmpty()) {
+       return null;
+     }
+ 
+     buildResultTsBlock();
+     return checkTsBlockSizeAndGetResult();
+   }
+ 
+   /** Main process logic, calc the last aggregation results of current 
device. */
+   private void processCurrentDevice() {
+     if (currentHitCacheIndex < hitCachesIndexes.size()
+         && outputDeviceIndex == hitCachesIndexes.get(currentHitCacheIndex)) {
+       currentDeviceEntry = cachedDeviceEntries.get(currentHitCacheIndex);
+       buildResultUseLastCache();
+       return;
+     }
+ 
+     if (calculateAggregationResultForCurrentTimeRange()) {
+       timeIterator.resetCurTimeRange();
+     }
+   }
+ 
+   private void buildResultUseLastCache() {
+     appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+     Pair<OptionalLong, TsPrimitiveType[]> currentHitResult =
+         hitCachedResults.get(currentHitCacheIndex);
+     long lastTime = currentHitResult.getLeft().getAsLong();
+     int channel = 0;
+     for (int i = 0; i < tableAggregators.size(); i++) {
+       TableAggregator aggregator = tableAggregators.get(i);
+       ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+       int columnIdx = aggregatorInputChannels.get(channel);
+       ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+       TsTableColumnCategory category = schema.getColumnCategory();
+       switch (category) {
+         case TAG:
+           String id =
 -              (String)
 -                  cachedDeviceEntries
 -                      .get(currentHitCacheIndex)
 -                      .getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
++              getNthIdColumnValue(
++                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+           if (id == null) {
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));
+             } else {
+               columnBuilder.appendNull();
+             }
+           } else {
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(
+                           getTSDataType(schema.getType()),
+                           lastTime,
+                           false,
+                           new TsPrimitiveType.TsBinary(
+                               new Binary(id, TSFileConfig.STRING_CHARSET)))));
+             } else {
+               columnBuilder.writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+             }
+           }
+           break;
+         case ATTRIBUTE:
+           Binary attribute =
+               cachedDeviceEntries
+                   .get(currentHitCacheIndex)
+                   .getAttributeColumnValues()
+                   .get(aggColumnsIndexArray[columnIdx]);
+           if (attribute == null) {
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));
+             } else {
+               columnBuilder.appendNull();
+             }
+           } else {
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(
+                           getTSDataType(schema.getType()),
+                           lastTime,
+                           false,
+                           new TsPrimitiveType.TsBinary(attribute))));
+             } else {
+               columnBuilder.writeBinary(attribute);
+             }
+           }
+           break;
+         case TIME:
+           if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+             // for last(time) aggregation
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(
+                           getTSDataType(schema.getType()),
+                           lastTime,
+                           new TsPrimitiveType.TsLong(lastTime))));
+             } else {
+               columnBuilder.writeTsPrimitiveType(new 
TsPrimitiveType.TsLong(lastTime));
+             }
+           } else {
+             // for last_by(time,time) aggregation
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(
+                           getTSDataType(schema.getType()),
+                           lastTime,
+                           false,
+                           new TsPrimitiveType.TsLong(lastTime))));
+             } else {
+               columnBuilder.writeTsPrimitiveType(new 
TsPrimitiveType.TsLong(lastTime));
+             }
+           }
+           break;
+         case FIELD:
+           int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+           TsPrimitiveType tsPrimitiveType =
+               
hitCachedResults.get(currentHitCacheIndex).getRight()[measurementIdx];
+           long lastByTime = 
hitCachedResults.get(currentHitCacheIndex).getLeft().getAsLong();
+           if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+             // there is no data for this time series
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(getTSDataType(schema.getType()), 
lastByTime, true, null)));
+             } else {
+               columnBuilder.appendNull();
+             }
+           } else {
+             if (aggregator.getStep().isOutputPartial()) {
+               columnBuilder.writeBinary(
+                   new Binary(
+                       serializeTimeValue(
+                           getTSDataType(schema.getType()), lastByTime, false, 
tsPrimitiveType)));
+             } else {
+               columnBuilder.writeTsPrimitiveType(tsPrimitiveType);
+             }
+           }
+           break;
+         default:
+           throw new IllegalStateException("Unsupported category: " + 
category);
+       }
+ 
+       channel += aggregator.getChannelCount();
+     }
+ 
+     resultTsBlockBuilder.declarePosition();
+     outputDeviceIndex++;
+     currentHitCacheIndex++;
+   }
+ 
+   private void updateLastCacheIfPossible() {
+     if (!needUpdateCache) {
+       return;
+     }
+ 
+     int channel = 0;
+     List<String> updateMeasurementList = new ArrayList<>();
+     List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
+     boolean hasSetLastTime = false;
+     for (int i = 0; i < tableAggregators.size(); i++) {
+       TableAggregator tableAggregator = tableAggregators.get(i);
+       ColumnSchema schema = 
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+ 
+       switch (schema.getColumnCategory()) {
+         case TIME:
+           if (!hasSetLastTime) {
+             hasSetLastTime = true;
+             if (i == lastTimeAggregationIdx) {
+               LastDescAccumulator lastAccumulator =
+                   (LastDescAccumulator) tableAggregator.getAccumulator();
+               if (lastAccumulator.hasInitResult()) {
+                 updateMeasurementList.add("");
+                 updateTimeValuePairList.add(
+                     new TimeValuePair(
+                         lastAccumulator.getMaxTime(),
+                         new 
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
+               }
+             } else {
+               LastByDescAccumulator lastByAccumulator =
+                   (LastByDescAccumulator) tableAggregator.getAccumulator();
+               if (lastByAccumulator.hasInitResult() && 
!lastByAccumulator.isXNull()) {
+                 updateMeasurementList.add("");
+                 updateTimeValuePairList.add(
+                     new TimeValuePair(
+                         lastByAccumulator.getLastTimeOfY(),
+                         new 
TsPrimitiveType.TsLong(lastByAccumulator.getLastTimeOfY())));
+               }
+             }
+           }
+           break;
+         case FIELD:
+           LastByDescAccumulator lastByAccumulator =
+               (LastByDescAccumulator) tableAggregator.getAccumulator();
+           // only can update LastCache when last_by return non-null value
+           if (lastByAccumulator.hasInitResult() && 
!lastByAccumulator.isXNull()) {
+             long lastByTime = lastByAccumulator.getLastTimeOfY();
+ 
+             if (!hasSetLastTime) {
+               hasSetLastTime = true;
+               updateMeasurementList.add("");
+               updateTimeValuePairList.add(
+                   new TimeValuePair(lastByTime, new 
TsPrimitiveType.TsLong(lastByTime)));
+             }
+ 
+             updateMeasurementList.add(schema.getName());
+             updateTimeValuePairList.add(
+                 new TimeValuePair(
+                     lastByTime, 
cloneTsPrimitiveType(lastByAccumulator.getXResult())));
+           }
+           break;
+         default:
+           break;
+       }
+ 
+       channel += tableAggregator.getChannelCount();
+     }
+ 
+     if (!updateMeasurementList.isEmpty()) {
+       String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
+       TimeValuePair[] updateTimeValuePairArray =
+           updateTimeValuePairList.toArray(new TimeValuePair[0]);
+       currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
+       TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache(
+           dbName, currentDeviceEntry.getDeviceID(), updateMeasurementArray, 
false);
+       TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+           dbName,
+           currentDeviceEntry.getDeviceID(),
+           updateMeasurementArray,
+           updateTimeValuePairArray);
+     }
+   }
+ 
+   @Override
+   protected void updateResultTsBlock() {
+     appendAggregationResult();
+ 
+     if (timeIterator.hasCachedTimeRange()) {
+       updateLastCacheIfPossible();
+     }
+ 
+     outputDeviceIndex++;
+ 
+     // after appendAggregationResult invoked, aggregators must be cleared
+     resetTableAggregators();
+   }
+ 
++  @Override
++  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
++    // +1 for skipping the table name segment
++    return ((String) deviceEntry.getNthSegment(idColumnIndex + 1));
++  }
++
+   private TsPrimitiveType cloneTsPrimitiveType(TsPrimitiveType originalValue) 
{
+     switch (originalValue.getDataType()) {
+       case BOOLEAN:
+         return new TsPrimitiveType.TsBoolean(originalValue.getBoolean());
+       case INT32:
+       case DATE:
+         return new TsPrimitiveType.TsInt(originalValue.getInt());
+       case INT64:
+       case TIMESTAMP:
+         return new TsPrimitiveType.TsLong(originalValue.getLong());
+       case FLOAT:
+         return new TsPrimitiveType.TsFloat(originalValue.getFloat());
+       case DOUBLE:
+         return new TsPrimitiveType.TsDouble(originalValue.getDouble());
+       case TEXT:
+       case BLOB:
+       case STRING:
+         return new TsPrimitiveType.TsBinary(originalValue.getBinary());
+       case VECTOR:
+         return new TsPrimitiveType.TsVector(originalValue.getVector());
+       default:
+         throw new UnSupportedDataTypeException(
+             "Unsupported data type:" + originalValue.getDataType());
+     }
+   }
+ 
+   @Override
+   public long ramBytesUsed() {
+     return INSTANCE_SIZE
+         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
+         + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+         + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
+         + (resultTsBlockBuilder == null ? 0 : 
resultTsBlockBuilder.getRetainedSizeInBytes())
+         + RamUsageEstimator.sizeOfCollection(deviceEntries)
+         + RamUsageEstimator.sizeOfCollection(cachedDeviceEntries)
+         + RamUsageEstimator.sizeOfCollection(hitCachedResults);
+   }
+ }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java
index 84b9199447f,9ab432c1e35..4bc073e63dc
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java
@@@ -17,67 -17,20 +17,26 @@@
   * under the License.
   */
  
 -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
 +package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
  
- import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
- import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
- import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 -import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode;
 -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
--import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
- import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
- import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
  
 -public abstract class SourceNode extends PlanNode implements AutoCloseable, 
IPartitionRelatedNode {
 +import org.apache.tsfile.file.metadata.IDeviceID;
- import org.apache.tsfile.write.schema.IMeasurementSchema;
- 
- import java.util.List;
- import java.util.Set;
  
 -  protected SourceNode(final PlanNodeId id) {
 -    super(id);
 -  }
 +public class TreeAlignedDeviceViewAggregationScanOperator
-     extends AbstractAggregationTableScanOperator {
++    extends AbstractDefaultAggTableScanOperator {
 +
 +  private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
  
 -  public abstract void open() throws Exception;
 +  public TreeAlignedDeviceViewAggregationScanOperator(
-       PlanNodeId sourceId,
-       OperatorContext context,
-       List<ColumnSchema> aggColumnSchemas,
-       int[] aggColumnsIndexArray,
-       List<DeviceEntry> deviceEntries,
-       SeriesScanOptions seriesScanOptions,
-       List<String> measurementColumnNames,
-       Set<String> allSensors,
-       List<IMeasurementSchema> measurementSchemas,
-       List<TableAggregator> tableAggregators,
-       List<ColumnSchema> groupingKeySchemas,
-       int[] groupingKeyIndex,
-       ITableTimeRangeIterator tableTimeRangeIterator,
-       boolean ascending,
-       boolean canUseStatistics,
-       List<Integer> aggregatorInputChannels,
++      AbstractAggTableScanOperatorParameter parameter,
 +      IDeviceID.TreeDeviceIdColumnValueExtractor extractor) {
-     super(
-         sourceId,
-         context,
-         aggColumnSchemas,
-         aggColumnsIndexArray,
-         deviceEntries,
-         seriesScanOptions,
-         measurementColumnNames,
-         allSensors,
-         measurementSchemas,
-         tableAggregators,
-         groupingKeySchemas,
-         groupingKeyIndex,
-         tableTimeRangeIterator,
-         ascending,
-         canUseStatistics,
-         aggregatorInputChannels);
++    super(parameter);
 +    this.extractor = extractor;
 +  }
  
 -  public abstract void setRegionReplicaSet(TRegionReplicaSet 
regionReplicaSet);
 +  @Override
 +  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
 +    return (String) extractor.extract(deviceEntry.getDeviceID(), 
idColumnIndex);
 +  }
  }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 9a7ef7d8c62,741e05a64bd..d1cf042ef52
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@@ -20,9 -20,8 +20,10 @@@
  package org.apache.iotdb.db.queryengine.plan.planner;
  
  import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+ import org.apache.iotdb.commons.conf.CommonDescriptor;
 +import org.apache.iotdb.commons.exception.IllegalPathException;
  import org.apache.iotdb.commons.path.AlignedFullPath;
 +import org.apache.iotdb.commons.path.PartialPath;
  import org.apache.iotdb.commons.schema.column.ColumnHeader;
  import org.apache.iotdb.db.conf.IoTDBDescriptor;
  import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@@ -72,15 -72,17 +74,21 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.DevicePredicateFilter;
  import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
  import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
++import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractAggTableScanOperator;
 +import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortFullOuterJoinOperator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortInnerJoinOperator;
- import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableAggregationTableScanOperator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
++import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator;
 +import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
+ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
  import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
@@@ -160,10 -162,7 +171,11 @@@ import org.apache.tsfile.block.column.C
  import org.apache.tsfile.common.conf.TSFileConfig;
  import org.apache.tsfile.common.conf.TSFileDescriptor;
  import org.apache.tsfile.enums.TSDataType;
 +import org.apache.tsfile.file.metadata.IDeviceID;
 +import org.apache.tsfile.file.metadata.idcolumn.FourOrHigherLevelDBExtractor;
 +import org.apache.tsfile.file.metadata.idcolumn.ThreeLevelDBExtractor;
 +import org.apache.tsfile.file.metadata.idcolumn.TwoLevelDBExtractor;
+ import org.apache.tsfile.read.TimeValuePair;
  import org.apache.tsfile.read.common.TimeRange;
  import org.apache.tsfile.read.common.block.column.BinaryColumn;
  import org.apache.tsfile.read.common.block.column.BooleanColumn;
@@@ -327,94 -330,8 +343,95 @@@ public class TableOperatorGenerator ext
    }
  
    @Override
 -  public Operator visitDeviceTableScan(
 -      DeviceTableScanNode node, LocalExecutionPlanContext context) {
 +  public Operator visitTreeNonAlignedDeviceViewScan(
 +      TreeNonAlignedDeviceViewScanNode node, LocalExecutionPlanContext 
context) {
 +    throw new UnsupportedOperationException(
 +        "view for non aligned devices in tree is not supported");
 +  }
 +
 +  public static IDeviceID.TreeDeviceIdColumnValueExtractor 
createTreeDeviceIdColumnValueExtractor(
 +      String treeDBName) {
 +    try {
 +      PartialPath db = new PartialPath(treeDBName);
 +      int dbLevel = db.getNodes().length;
 +      if (dbLevel == 2) {
 +        return new TwoLevelDBExtractor(treeDBName.length());
 +      } else if (dbLevel == 3) {
 +        return new ThreeLevelDBExtractor(treeDBName.length());
 +      } else if (dbLevel >= 4) {
 +        return new FourOrHigherLevelDBExtractor(dbLevel);
 +      } else {
 +        throw new IllegalArgumentException(
 +            "tree db name should at least be two level: " + treeDBName);
 +      }
 +    } catch (IllegalPathException e) {
 +      throw new IllegalArgumentException(e);
 +    }
 +  }
 +
 +  @Override
 +  public Operator visitTreeAlignedDeviceViewScan(
 +      TreeAlignedDeviceViewScanNode node, LocalExecutionPlanContext context) {
 +
 +    IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
 +        createTreeDeviceIdColumnValueExtractor(node.getTreeDBName());
 +
 +    AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter =
 +        constructAbstractTableScanOperatorParameter(
 +            node,
 +            context,
 +            TreeAlignedDeviceViewScanOperator.class.getSimpleName(),
 +            node.getMeasurementColumnNameMap());
 +
 +    TreeAlignedDeviceViewScanOperator treeAlignedDeviceViewScanOperator =
 +        new TreeAlignedDeviceViewScanOperator(parameter, 
idColumnValueExtractor);
 +
 +    addSource(
 +        treeAlignedDeviceViewScanOperator,
 +        context,
 +        node,
 +        parameter.measurementColumnNames,
 +        parameter.measurementSchemas,
-         parameter.allSensors);
++        parameter.allSensors,
++        TreeAlignedDeviceViewScanNode.class.getSimpleName());
 +
 +    return treeAlignedDeviceViewScanOperator;
 +  }
 +
 +  private void addSource(
-       AbstractTableScanOperator tableScanOperator,
++      AbstractDataSourceOperator sourceOperator,
 +      LocalExecutionPlanContext context,
 +      DeviceTableScanNode node,
 +      List<String> measurementColumnNames,
 +      List<IMeasurementSchema> measurementSchemas,
-       Set<String> allSensors) {
++      Set<String> allSensors,
++      String planNodeName) {
 +
-     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(tableScanOperator);
++    ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(sourceOperator);
 +
 +    for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
 +      if (node.getDeviceEntries().get(i) == null) {
 +        throw new IllegalStateException(
-             "Device entries of index " + i + " in DeviceTableScanNode is 
empty");
++            "Device entries of index " + i + " in " + planNodeName + " is 
empty");
 +      }
 +      AlignedFullPath alignedPath =
 +          constructAlignedPath(
 +              node.getDeviceEntries().get(i),
 +              measurementColumnNames,
 +              measurementSchemas,
 +              allSensors);
 +      ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
 +    }
 +
 +    context.getDriverContext().setInputDriver(true);
 +  }
 +
-   // fieldColumnsRenameMap will always
 +  private AbstractTableScanOperator.AbstractTableScanOperatorParameter
 +      constructAbstractTableScanOperatorParameter(
 +          DeviceTableScanNode node,
 +          LocalExecutionPlanContext context,
 +          String className,
 +          Map<String, String> fieldColumnsRenameMap) {
  
      List<Symbol> outputColumnNames = node.getOutputSymbols();
      int outputColumnCount = outputColumnNames.size();
@@@ -440,16 -357,12 +457,16 @@@
                    idAndAttributeColumnsIndexMap.get(columnName), columnName + 
" is null");
            columnSchemas.add(schema);
            break;
-         case MEASUREMENT:
+         case FIELD:
            columnsIndexArray[idx++] = measurementColumnCount;
            measurementColumnCount++;
 -          measurementColumnNames.add(schema.getName());
 +
 +          String realMeasurementName =
 +              fieldColumnsRenameMap.getOrDefault(schema.getName(), 
schema.getName());
 +
 +          measurementColumnNames.add(realMeasurementName);
            measurementSchemas.add(
 -              new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
 +              new MeasurementSchema(realMeasurementName, 
getTSDataType(schema.getType())));
            columnSchemas.add(schema);
            measurementColumnsIndexMap.put(columnName.getName(), 
measurementColumnCount - 1);
            break;
@@@ -466,16 -379,12 +483,15 @@@
  
      Set<Symbol> outputSet = new HashSet<>(outputColumnNames);
      for (Map.Entry<Symbol, ColumnSchema> entry : 
node.getAssignments().entrySet()) {
-       if (!outputSet.contains(entry.getKey())
-           && entry.getValue().getColumnCategory() == MEASUREMENT) {
+       if (!outputSet.contains(entry.getKey()) && 
entry.getValue().getColumnCategory() == FIELD) {
          measurementColumnCount++;
 -        measurementColumnNames.add(entry.getValue().getName());
 +        String realMeasurementName =
 +            fieldColumnsRenameMap.getOrDefault(
 +                entry.getValue().getName(), entry.getValue().getName());
 +
 +        measurementColumnNames.add(realMeasurementName);
          measurementSchemas.add(
 -            new MeasurementSchema(
 -                entry.getValue().getName(), 
getTSDataType(entry.getValue().getType())));
 +            new MeasurementSchema(realMeasurementName, 
getTSDataType(entry.getValue().getType())));
          measurementColumnsIndexMap.put(entry.getKey().getName(), 
measurementColumnCount - 1);
        } else if (entry.getValue().getColumnCategory() == TIME) {
          timeColumnName = entry.getKey().getName();
@@@ -511,45 -423,33 +527,46 @@@
      Set<String> allSensors = new HashSet<>(measurementColumnNames);
      // for time column
      allSensors.add("");
 -    TableScanOperator tableScanOperator =
 -        new TableScanOperator(
 -            operatorContext,
 -            node.getPlanNodeId(),
 -            columnSchemas,
 -            columnsIndexArray,
 -            node.getDeviceEntries(),
 -            node.getScanOrder(),
 -            seriesScanOptions,
 -            measurementColumnNames,
 -            allSensors,
 -            measurementSchemas,
 -            maxTsBlockLineNum);
  
 -    ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(tableScanOperator);
 +    return new AbstractTableScanOperator.AbstractTableScanOperatorParameter(
 +        allSensors,
 +        operatorContext,
 +        node.getPlanNodeId(),
 +        columnSchemas,
 +        columnsIndexArray,
 +        node.getDeviceEntries(),
 +        node.getScanOrder(),
 +        seriesScanOptions,
 +        measurementColumnNames,
 +        measurementSchemas,
 +        maxTsBlockLineNum);
 +  }
  
 -    for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
 -      AlignedFullPath alignedPath =
 -          constructAlignedPath(
 -              node.getDeviceEntries().get(i),
 -              measurementColumnNames,
 -              measurementSchemas,
 -              allSensors);
 -      ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
 -    }
 +  // used for TableScanOperator
 +  private AbstractTableScanOperator.AbstractTableScanOperatorParameter
 +      constructAbstractTableScanOperatorParameter(
 +          DeviceTableScanNode node, LocalExecutionPlanContext context) {
 +    return constructAbstractTableScanOperatorParameter(
 +        node, context, TableScanOperator.class.getSimpleName(), 
Collections.emptyMap());
 +  }
  
 -    context.getDriverContext().setInputDriver(true);
 +  @Override
 +  public Operator visitDeviceTableScan(
 +      DeviceTableScanNode node, LocalExecutionPlanContext context) {
 +
 +    AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter =
 +        constructAbstractTableScanOperatorParameter(node, context);
 +
 +    TableScanOperator tableScanOperator = new TableScanOperator(parameter);
 +
 +    addSource(
 +        tableScanOperator,
 +        context,
 +        node,
 +        parameter.measurementColumnNames,
 +        parameter.measurementSchemas,
-         parameter.allSensors);
++        parameter.allSensors,
++        DeviceTableScanNode.class.getSimpleName());
  
      return tableScanOperator;
    }
@@@ -1855,16 -1755,8 +1872,38 @@@
    }
  
    @Override
 -  public Operator visitAggregationTableScan(
 -      AggregationTableScanNode node, LocalExecutionPlanContext context) {
 +  public Operator visitAggregationTreeDeviceViewScan(
 +      AggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext 
context) {
 +    IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
 +        createTreeDeviceIdColumnValueExtractor(node.getTreeDBName());
-     return super.visitAggregationTreeDeviceViewScan(node, context);
++
++    AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter 
parameter =
++        constructAbstractAggTableScanOperatorParameter(
++            node,
++            context,
++            
TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
++            node.getMeasurementColumnNameMap());
++
++    TreeAlignedDeviceViewAggregationScanOperator 
treeAlignedDeviceViewAggregationScanOperator =
++        new TreeAlignedDeviceViewAggregationScanOperator(parameter, 
idColumnValueExtractor);
++
++    addSource(
++        treeAlignedDeviceViewAggregationScanOperator,
++        context,
++        node,
++        parameter.getMeasurementColumnNames(),
++        parameter.getMeasurementSchemas(),
++        parameter.getAllSensors(),
++        AggregationTreeDeviceViewScanNode.class.getSimpleName());
++    return treeAlignedDeviceViewAggregationScanOperator;
 +  }
 +
-   @Override
-   public Operator visitAggregationTableScan(
-       AggregationTableScanNode node, LocalExecutionPlanContext context) {
++  private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
++      constructAbstractAggTableScanOperatorParameter(
++          AggregationTableScanNode node,
++          LocalExecutionPlanContext context,
++          String className,
++          Map<String, String> fieldColumnsRenameMap) {
  
      List<String> measurementColumnNames = new ArrayList<>();
      List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
@@@ -1903,12 -1795,12 +1942,14 @@@
                aggColumnsIndexArray[channel] =
                    
requireNonNull(node.getIdAndAttributeIndexMap().get(symbol), symbol + " is 
null");
                break;
-             case MEASUREMENT:
+             case FIELD:
                aggColumnsIndexArray[channel] = measurementColumnCount;
                measurementColumnCount++;
--              measurementColumnNames.add(schema.getName());
++              String realMeasurementName =
++                  fieldColumnsRenameMap.getOrDefault(schema.getName(), 
schema.getName());
++              measurementColumnNames.add(realMeasurementName);
                measurementSchemas.add(
--                  new MeasurementSchema(schema.getName(), 
getTSDataType(schema.getType())));
++                  new MeasurementSchema(realMeasurementName, 
getTSDataType(schema.getType())));
                measurementColumnsIndexMap.put(symbol.getName(), 
measurementColumnCount - 1);
                break;
              case TIME:
@@@ -1931,12 -1823,12 +1972,14 @@@
  
      for (Map.Entry<Symbol, ColumnSchema> entry : 
node.getAssignments().entrySet()) {
        if (!aggColumnLayout.containsKey(entry.getKey())
-           && entry.getValue().getColumnCategory() == MEASUREMENT) {
+           && entry.getValue().getColumnCategory() == FIELD) {
          measurementColumnCount++;
--        measurementColumnNames.add(entry.getValue().getName());
++        String realMeasurementName =
++            fieldColumnsRenameMap.getOrDefault(
++                entry.getValue().getName(), entry.getValue().getName());
++        measurementColumnNames.add(realMeasurementName);
          measurementSchemas.add(
--            new MeasurementSchema(
--                entry.getValue().getName(), 
getTSDataType(entry.getValue().getType())));
++            new MeasurementSchema(realMeasurementName, 
getTSDataType(entry.getValue().getType())));
          measurementColumnsIndexMap.put(entry.getKey().getName(), 
measurementColumnCount - 1);
        } else if (entry.getValue().getColumnCategory() == TIME) {
          timeColumnName = entry.getKey().getName();
@@@ -2007,10 -1899,10 +2050,7 @@@
      final OperatorContext operatorContext =
          context
              .getDriverContext()
--            .addOperatorContext(
--                context.getNextOperatorId(),
--                node.getPlanNodeId(),
-                 TableAggregationTableScanOperator.class.getSimpleName());
 -                AbstractAggTableScanOperator.class.getSimpleName());
++            .addOperatorContext(context.getNextOperatorId(), 
node.getPlanNodeId(), className);
      SeriesScanOptions seriesScanOptions =
          buildSeriesScanOptions(
              context,
@@@ -2026,43 -1918,124 +2066,136 @@@
  
      Set<String> allSensors = new HashSet<>(measurementColumnNames);
      allSensors.add(""); // for time column
+     context.getDriverContext().setInputDriver(true);
+ 
 -    if (canUseLastCacheOptimize(aggregators, node, timeColumnName)) {
 -      List<Integer> hitCachesIndexes = new ArrayList<>();
 -      List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults = new 
ArrayList<>();
 -      List<DeviceEntry> cachedDeviceEntries = new ArrayList<>();
 -      List<DeviceEntry> unCachedDeviceEntries = new ArrayList<>();
 -      long tableTTL =
 -          DataNodeTTLCache.getInstance()
 -              .getTTLForTable(
++    return new 
AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter(
++        node.getPlanNodeId(),
++        operatorContext,
++        aggColumnSchemas,
++        aggColumnsIndexArray,
++        node.getDeviceEntries(),
++        node.getDeviceEntries().size(),
++        seriesScanOptions,
++        measurementColumnNames,
++        allSensors,
++        measurementSchemas,
++        aggregators,
++        groupingKeySchemas,
++        groupingKeyIndex,
++        timeRangeIterator,
++        scanAscending,
++        canUseStatistic,
++        aggregatorInputChannels,
++        timeColumnName);
++  }
++
++  // used for AggregationTableScanNode
++  private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
++      constructAbstractAggTableScanOperatorParameter(
++          AggregationTableScanNode node, LocalExecutionPlanContext context) {
++    return constructAbstractAggTableScanOperatorParameter(
++        node, context, AbstractAggTableScanOperator.class.getSimpleName(), 
Collections.emptyMap());
++  }
++
++  @Override
++  public Operator visitAggregationTableScan(
++      AggregationTableScanNode node, LocalExecutionPlanContext context) {
++
++    AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter 
parameter =
++        constructAbstractAggTableScanOperatorParameter(node, context);
 +
++    if (canUseLastCacheOptimize(
++        parameter.getTableAggregators(), node, 
parameter.getTimeColumnName())) {
++      return constructLastQueryAggTableScanOperator(node, parameter, context);
++    } else {
++      DefaultAggTableScanOperator aggTableScanOperator = new 
DefaultAggTableScanOperator(parameter);
++
++      addSource(
++          aggTableScanOperator,
++          context,
++          node,
++          parameter.getMeasurementColumnNames(),
++          parameter.getMeasurementSchemas(),
++          parameter.getAllSensors(),
++          AggregationTableScanNode.class.getSimpleName());
++      return aggTableScanOperator;
++    }
++  }
++
++  private LastQueryAggTableScanOperator 
constructLastQueryAggTableScanOperator(
++      AggregationTableScanNode node,
++      AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter 
parameter,
++      LocalExecutionPlanContext context) {
++    List<Integer> hitCachesIndexes = new ArrayList<>();
++    List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults = new 
ArrayList<>();
++    List<DeviceEntry> cachedDeviceEntries = new ArrayList<>();
++    List<DeviceEntry> unCachedDeviceEntries = new ArrayList<>();
++    long tableTTL =
++        DataNodeTTLCache.getInstance()
++            .getTTLForTable(
++                node.getQualifiedObjectName().getDatabaseName(),
++                node.getQualifiedObjectName().getObjectName());
++    Filter updateTimeFilter =
++        
updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(), 
tableTTL);
 +    for (int i = 0; i < node.getDeviceEntries().size(); i++) {
-       if (node.getDeviceEntries().get(i) == null) {
-         throw new IllegalStateException(
-             "Device entries of index " + i + " in AggregationTableScanNode is 
empty");
++      Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
++          TableDeviceSchemaCache.getInstance()
++              .getLastRow(
+                   node.getQualifiedObjectName().getDatabaseName(),
 -                  node.getQualifiedObjectName().getObjectName());
 -      Filter updateTimeFilter =
 -          updateFilterUsingTTL(seriesScanOptions.getGlobalTimeFilter(), 
tableTTL);
 -      for (int i = 0; i < node.getDeviceEntries().size(); i++) {
 -        Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
 -            TableDeviceSchemaCache.getInstance()
 -                .getLastRow(
 -                    node.getQualifiedObjectName().getDatabaseName(),
 -                    node.getDeviceEntries().get(i).getDeviceID(),
 -                    "",
 -                    measurementColumnNames);
 -        boolean allHitCache = true;
 -        if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
 -          for (int j = 0; j < lastByResult.get().getRight().length; j++) {
 -            TsPrimitiveType tsPrimitiveType = 
lastByResult.get().getRight()[j];
 -            if (tsPrimitiveType == null
 -                || (updateTimeFilter != null
 -                    && !LastQueryUtil.satisfyFilter(
 -                        updateTimeFilter,
 -                        new TimeValuePair(
 -                            lastByResult.get().getLeft().getAsLong(), 
tsPrimitiveType)))) {
 -              // the process logic is different from tree model which examine 
if
 -              // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, 
set
 -              // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
 -              // but it should skip in table model
 -              allHitCache = false;
 -              break;
 -            }
++                  node.getDeviceEntries().get(i).getDeviceID(),
++                  "",
++                  parameter.getMeasurementColumnNames());
++      boolean allHitCache = true;
++      if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
++        for (int j = 0; j < lastByResult.get().getRight().length; j++) {
++          TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
++          if (tsPrimitiveType == null
++              || (updateTimeFilter != null
++                  && !LastQueryUtil.satisfyFilter(
++                      updateTimeFilter,
++                      new TimeValuePair(
++                          lastByResult.get().getLeft().getAsLong(), 
tsPrimitiveType)))) {
++            // the process logic is different from tree model which examine if
++            // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
++            // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
++            // but it should skip in table model
++            allHitCache = false;
++            break;
+           }
 -        } else {
 -          allHitCache = false;
 -        }
 -
 -        if (!allHitCache) {
 -          AlignedFullPath alignedPath =
 -              constructAlignedPath(
 -                  node.getDeviceEntries().get(i),
 -                  measurementColumnNames,
 -                  measurementSchemas,
 -                  allSensors);
 -          ((DataDriverContext) 
context.getDriverContext()).addPath(alignedPath);
 -          unCachedDeviceEntries.add(node.getDeviceEntries().get(i));
 -        } else {
 -          hitCachesIndexes.add(i);
 -          hitCachedResults.add(lastByResult.get());
 -          cachedDeviceEntries.add(node.getDeviceEntries().get(i));
+         }
++      } else {
++        allHitCache = false;
+       }
+ 
 -      // context add TableLastQueryOperator
 -      LastQueryAggTableScanOperator lastQueryOperator =
 -          new LastQueryAggTableScanOperator(
 -              node.getPlanNodeId(),
 -              operatorContext,
 -              aggColumnSchemas,
 -              aggColumnsIndexArray,
 -              unCachedDeviceEntries,
 -              cachedDeviceEntries,
 -              seriesScanOptions,
 -              measurementColumnNames,
 -              allSensors,
 -              measurementSchemas,
 -              aggregators,
 -              groupingKeySchemas,
 -              groupingKeyIndex,
 -              timeRangeIterator,
 -              scanAscending,
 -              canUseStatistic,
 -              aggregatorInputChannels,
 -              node.getQualifiedObjectName(),
 -              hitCachesIndexes,
 -              hitCachedResults);
 -
 -      ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(lastQueryOperator);
 -      return lastQueryOperator;
 -    } else {
 -      DefaultAggTableScanOperator aggTableScanOperator =
 -          new DefaultAggTableScanOperator(
 -              node.getPlanNodeId(),
 -              operatorContext,
 -              aggColumnSchemas,
 -              aggColumnsIndexArray,
 -              node.getDeviceEntries(),
 -              node.getDeviceEntries().size(),
 -              seriesScanOptions,
 -              measurementColumnNames,
 -              allSensors,
 -              measurementSchemas,
 -              aggregators,
 -              groupingKeySchemas,
 -              groupingKeyIndex,
 -              timeRangeIterator,
 -              scanAscending,
 -              canUseStatistic,
 -              aggregatorInputChannels);
 -      for (int i = 0; i < node.getDeviceEntries().size(); i++) {
++      if (!allHitCache) {
+         AlignedFullPath alignedPath =
+             constructAlignedPath(
+                 node.getDeviceEntries().get(i),
 -                measurementColumnNames,
 -                measurementSchemas,
 -                allSensors);
++                parameter.getMeasurementColumnNames(),
++                parameter.getMeasurementSchemas(),
++                parameter.getAllSensors());
+         ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
++        unCachedDeviceEntries.add(node.getDeviceEntries().get(i));
++      } else {
++        hitCachesIndexes.add(i);
++        hitCachedResults.add(lastByResult.get());
++        cachedDeviceEntries.add(node.getDeviceEntries().get(i));
        }
-       AlignedFullPath alignedPath =
-           constructAlignedPath(
-               node.getDeviceEntries().get(i),
-               measurementColumnNames,
-               measurementSchemas,
-               allSensors);
-       ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
 -      ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
 -      return aggTableScanOperator;
      }
 +
-     context.getDriverContext().setInputDriver(true);
++    parameter.setDeviceEntries(unCachedDeviceEntries);
 +
-     TableAggregationTableScanOperator aggTableScanOperator =
-         new TableAggregationTableScanOperator(
-             node.getPlanNodeId(),
-             operatorContext,
-             aggColumnSchemas,
-             aggColumnsIndexArray,
-             node.getDeviceEntries(),
-             seriesScanOptions,
-             measurementColumnNames,
-             allSensors,
-             measurementSchemas,
-             aggregators,
-             groupingKeySchemas,
-             groupingKeyIndex,
-             timeRangeIterator,
-             scanAscending,
-             canUseStatistic,
-             aggregatorInputChannels);
-     ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(aggTableScanOperator);
-     return aggTableScanOperator;
++    // context add TableLastQueryOperator
++    LastQueryAggTableScanOperator lastQueryOperator =
++        new LastQueryAggTableScanOperator(
++            parameter,
++            cachedDeviceEntries,
++            node.getQualifiedObjectName(),
++            hitCachesIndexes,
++            hitCachedResults);
++
++    ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(lastQueryOperator);
++    return lastQueryOperator;
    }
  
    private SeriesScanOptions buildSeriesScanOptions(
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7eb324be575,470d616f44d..53d6516e2e3
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@@ -960,19 -809,9 +960,19 @@@ public class TableDistributedPlanGenera
          break;
        }
        if (deviceTableScanNode.getAssignments().get(symbol).getColumnCategory()
-           == TsTableColumnCategory.ID) {
+           == TsTableColumnCategory.TAG) {
 -        // segments[0] is always tableName
 -        orderingRules.add(deviceEntry -> (String) 
deviceEntry.getNthSegment(idx + 1));
 +
++        // segments[0] is always tableName for table model
 +        Function<DeviceEntry, String> iDColumnFunction =
 +            extractor
 +                .<Function<DeviceEntry, String>>map(
 +                    treeDeviceIdColumnValueExtractor ->
 +                        deviceEntry ->
 +                            (String)
 +                                treeDeviceIdColumnValueExtractor.extract(
 +                                    deviceEntry.getDeviceID(), idx))
 +                .orElseGet(() -> deviceEntry -> (String) 
deviceEntry.getNthSegment(idx + 1));
-         // segments[0] is always tableName for table model
 +        orderingRules.add(iDColumnFunction);
        } else {
          orderingRules.add(
              deviceEntry ->

Reply via email to