This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch TreeToTableView in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 57029e8dfc087374eeb6ce95b0a8e9b62c63f9f9 Merge: c34ea18e34a 90e7a809d02 Author: JackieTien97 <[email protected]> AuthorDate: Tue Dec 31 19:33:58 2024 +0800 resolve conflicts .github/workflows/cluster-it-1c1d.yml | 2 +- .github/workflows/cluster-it-1c1d1a.yml | 4 +- .github/workflows/cluster-it-1c3d.yml | 4 +- .github/workflows/compile-check.yml | 4 +- .github/workflows/dependency-check.yml | 4 +- .github/workflows/multi-language-client.yml | 4 +- .github/workflows/pipe-it-2cluster.yml | 4 +- .github/workflows/sonar-codecov.yml | 4 +- .github/workflows/table-cluster-it-1c1d.yml | 6 +- .github/workflows/table-cluster-it-1c3d.yml | 4 +- .github/workflows/todos-check.yml | 2 +- .github/workflows/unit-test.yml | 6 +- example/client-cpp-example/pom.xml | 4 + example/client-cpp-example/src/CMakeLists.txt | 3 + .../src/TableModelSessionExample.cpp | 129 +-- .../java/org/apache/iotdb/TableHttpExample.java | 4 +- .../java/org/apache/iotdb/TableHttpsExample.java | 4 +- .../org/apache/iotdb/TableModelSessionExample.java | 14 +- .../apache/iotdb/TableModelSessionPoolExample.java | 28 +- .../apache/iotdb/udf/AggregateFunctionExample.java | 2 +- .../apache/iotdb/udf/ScalarFunctionExample.java | 2 +- integration-test/pom.xml | 44 +- .../org/apache/iotdb/itbase/category/ManualIT.java | 9 +- .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 73 +- .../iotdb/db/it/schema/IoTDBCreateDatabaseIT.java | 193 ++++ .../db/it/schema/IoTDBCreateStorageGroupIT.java | 153 ---- .../it/autocreate/IoTDBPipeAutoConflictIT.java | 51 ++ .../manual/IoTDBPipeTypeConversionISessionIT.java | 2 +- .../IoTDBPipeTypeConversionISessionIT.java | 498 +++++++++++ .../it/tablemodel/IoTDBPipeTypeConversionIT.java | 632 ++++++++++++++ .../pipe/it/tablemodel/IoTDBPipeWithLoadIT.java | 58 +- .../iotdb/pipe/it/tablemodel/TableModelUtils.java | 20 +- .../it/db/it/IoTDBCaseWhenThenTableIT.java | 6 +- .../relational/it/db/it/IoTDBDeletionTableIT.java | 968 +++++++++++++++++++-- .../it/db/it/IoTDBDisableDeletionTableIT.java | 120 --- .../it/db/it/IoTDBExecuteBatchTableIT.java | 26 +- .../it/db/it/IoTDBFlushQueryTableIT.java | 30 +- .../it/db/it/IoTDBInsertAlignedValuesTableIT.java | 122 ++- .../relational/it/db/it/IoTDBInsertTableIT.java | 267 +++--- .../it/db/it/IoTDBMultiDeviceTableIT.java | 90 +- ...va => IoTDBMultiTAGsWithAttributesTableIT.java} | 135 ++- .../relational/it/db/it/IoTDBRecoverTableIT.java | 16 +- .../it/db/it/IoTDBRecoverUnclosedTableIT.java | 16 +- .../relational/it/db/it/IoTDBRestartTableIT.java | 51 +- .../udf/IoTDBUserDefinedAggregateFunctionIT.java | 2 +- .../it/udf/IoTDBUserDefinedScalarFunctionIT.java | 4 +- .../it/query/old/IoTDBDatetimeFormatTableIT.java | 2 +- .../it/query/old/IoTDBFilterBetweenTableIT.java | 2 +- .../it/query/old/IoTDBFilterNullTableIT.java | 2 +- .../it/query/old/IoTDBFilterTableIT.java | 10 +- .../it/query/old/IoTDBNestedQueryTableIT.java | 6 +- .../it/query/old/IoTDBSimpleQueryTableIT.java | 39 +- .../alignbydevice/IoTDBAlignByDeviceTableIT.java | 2 +- .../IoTDBAlignByDeviceWithTemplateTableIT.java | 2 +- ...oTDBOrderByLimitOffsetAlignByDeviceTableIT.java | 4 +- .../IoTDBOrderByWithAlignByDeviceTableIT.java | 6 +- .../IoTDBAlignedOffsetLimitPushDownTableIT.java | 2 +- .../it/query/old/aligned/TableUtils.java | 2 +- .../scalar/IoTDBCastFunctionTableIT.java | 6 +- .../scalar/IoTDBCastFunctionTableSpecialIT.java | 2 +- .../scalar/IoTDBDiffFunctionTableIT.java | 2 +- .../scalar/IoTDBReplaceFunctionTableIT.java | 2 +- .../scalar/IoTDBRoundFunctionTableIT.java | 2 +- .../scalar/IoTDBScalarFunctionTableIT.java | 62 +- .../scalar/IoTDBSubStringFunctionTableIT.java | 2 +- .../it/query/old/orderBy/IoTDBOrderByTableIT.java | 36 +- .../it/query/old/query/IoTDBArithmeticTableIT.java | 4 +- .../it/query/old/query/IoTDBFuzzyQueryTableIT.java | 2 +- .../it/query/old/query/IoTDBInTableIT.java | 4 +- ...oTDBNoSelectExpressionAfterAnalyzedTableIT.java | 2 +- .../query/old/query/IoTDBNullOperandTableIT.java | 2 +- .../it/query/old/query/IoTDBPaginationTableIT.java | 4 +- .../it/query/old/query/IoTDBQueryDemoTableIT.java | 2 +- .../IoTDBQueryWithComplexValueFilterTableIT.java | 2 +- .../it/query/old/query/IoTDBResultSetTableIT.java | 6 +- .../query/IoTDBSelectCompareExpressionTableIT.java | 2 +- .../it/query/recent/IoTDBFillTableIT.java | 6 +- .../it/query/recent/IoTDBGapFillTableIT.java | 2 +- .../it/query/recent/IoTDBNullIdQueryIT.java | 4 +- .../it/query/recent/IoTDBTableAggregationIT.java | 32 +- .../recent/subquery/SubqueryDataSetUtils.java | 4 +- .../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java | 6 +- .../it/rest/it/IoTDBRestServiceFlushQueryIT.java | 34 +- .../relational/it/rest/it/IoTDBRestServiceIT.java | 54 +- .../it/IoTDBRestServiceInsertAlignedValuesIT.java | 67 +- .../relational/it/schema/IoTDBDatabaseIT.java | 18 +- .../iotdb/relational/it/schema/IoTDBDeviceIT.java | 14 +- .../iotdb/relational/it/schema/IoTDBTableIT.java | 56 +- .../it/session/IoTDBSessionRelationalIT.java | 282 +++--- .../it/session/IoTDBTableModelSessionIT.java | 8 +- .../pool/IoTDBInsertTableSessionPoolIT.java | 57 +- .../session/pool/IoTDBTableModelSessionPoolIT.java | 4 +- .../udf/api/relational/AggregateFunction.java | 3 +- .../java/org/apache/iotdb/cli/AbstractCli.java | 26 +- .../src/main/java/org/apache/iotdb/cli/Cli.java | 4 +- iotdb-client/client-cpp/src/main/Session.h | 37 +- iotdb-client/client-cpp/src/main/TableSession.cpp | 2 +- iotdb-client/client-cpp/src/main/TableSession.h | 2 +- .../src/test/cpp/sessionRelationalIT.cpp | 26 +- iotdb-client/client-py/iotdb/utils/NumpyTablet.py | 2 +- iotdb-client/client-py/iotdb/utils/Tablet.py | 6 +- .../client-py/table_model_session_example.py | 2 +- .../client-py/table_model_session_pool_example.py | 2 +- .../tests/integration/test_relational_session.py | 14 +- .../tests/integration/test_tablemodel_insert.py | 408 ++++----- .../org/apache/iotdb/jdbc/IoTDBConnection.java | 4 + .../apache/iotdb/rpc/TElasticFramedTransport.java | 15 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 + .../iotdb/rpc/TElasticFramedTransportTest.java | 71 ++ .../session/subscription/SubscriptionSession.java | 8 +- .../iotdb/session/SessionCacheLeaderTest.java | 12 +- .../ainode/iotdb/ainode/model/model_factory.py | 2 - iotdb-core/ainode/poetry.lock | 8 +- .../iotdb/confignode/manager/load/LoadManager.java | 9 - .../manager/load/balancer/RouteBalancer.java | 7 +- .../router/priority/GreedyPriorityBalancer.java | 42 +- .../router/priority/IPriorityBalancer.java | 5 +- .../router/priority/LeaderPriorityBalancer.java | 21 +- .../confignode/manager/load/cache/LoadCache.java | 16 - .../load/cache/node/DataNodeHeartbeatCache.java | 3 +- .../runtime/PipeRuntimeCoordinator.java | 4 +- .../runtime/heartbeat/PipeHeartbeat.java | 8 +- .../persistence/schema/ClusterSchemaInfo.java | 6 +- .../confignode/persistence/schema/ConfigMTree.java | 2 +- .../procedure/env/RegionMaintainHandler.java | 29 +- .../impl/region/AddRegionPeerProcedure.java | 12 +- .../impl/region/RegionMigrateProcedure.java | 34 +- .../impl/region/RemoveRegionPeerProcedure.java | 22 +- .../request/ConfigPhysicalPlanSerDeTest.java | 10 +- .../router/priority/GreedyPriorityTest.java | 13 +- .../priority/LeaderPriorityBalancerTest.java | 65 +- .../persistence/schema/ConfigMTreeTest.java | 8 +- .../schema/table/AddTableColumnProcedureTest.java | 4 +- .../schema/table/CreateTableProcedureTest.java | 8 +- .../apache/iotdb/consensus/iot/IoTConsensus.java | 8 +- .../consensus/iot/IoTConsensusServerImpl.java | 28 +- .../service/IoTConsensusRPCServiceProcessor.java | 6 +- .../iotdb/consensus/ratis/RatisConsensus.java | 23 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 29 +- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 22 +- .../schemaregion/SchemaExecutionVisitor.java | 5 +- .../db/exception/VerifyMetadataException.java | 12 +- ...va => VerifyMetadataTypeMismatchException.java} | 8 +- .../{ => load}/LoadEmptyFileException.java | 2 +- .../db/exception/{ => load}/LoadFileException.java | 2 +- .../{ => load}/LoadReadOnlyException.java | 2 +- .../LoadRuntimeOutOfMemoryException.java | 2 +- .../{ => load}/PartitionViolationException.java | 2 +- .../load/RegionReplicaSetChangedException.java} | 22 +- .../agent/runtime/SimpleProgressIndexAssigner.java | 36 +- .../connector/protocol/opcua/OpcUaNameSpace.java | 4 +- .../deletion/persist/PageCacheDeletionBuffer.java | 2 +- .../tablet/parser/TabletInsertionEventParser.java | 8 +- ...ileInsertionEventTableParserTabletIterator.java | 2 +- .../protocol/legacy/loader/DeletionLoader.java | 2 +- .../protocol/legacy/loader/TsFileLoader.java | 3 +- .../pipeconsensus/PipeConsensusReceiver.java | 2 +- .../protocol/thrift/IoTDBDataNodeReceiver.java | 1 + .../transform/converter/ValueConverter.java | 4 +- .../statement/PipeConvertedInsertRowStatement.java | 7 + .../PipeConvertedInsertTabletStatement.java | 7 + .../visitor/PipeStatementExceptionVisitor.java | 2 +- .../resource/memory/InsertNodeMemoryEstimator.java | 2 +- .../table/v1/handler/RequestValidationHandler.java | 20 +- .../v1/handler/StatementConstructionHandler.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 12 +- .../execution/fragment/QueryContext.java | 18 +- .../schema/source/TableDeviceFetchSource.java | 2 +- .../schema/source/TableDeviceQuerySource.java | 2 +- ...ator.java => AbstractAggTableScanOperator.java} | 419 +++++---- .../AbstractDefaultAggTableScanOperator.java | 109 +++ .../relational/AbstractTableScanOperator.java | 4 +- ...lator.java => DefaultAggTableScanOperator.java} | 29 +- .../relational/LastQueryAggTableScanOperator.java | 424 +++++++++ .../TableAggregationTableScanOperator.java | 78 -- ...eeAlignedDeviceViewAggregationScanOperator.java | 47 +- .../relational/aggregation/AccumulatorFactory.java | 10 +- .../aggregation/AggregationOperator.java | 1 + .../relational/aggregation/LastAccumulator.java | 22 +- .../relational/aggregation/LastByAccumulator.java | 28 +- .../aggregation/LastByDescAccumulator.java | 5 + .../aggregation/LastDescAccumulator.java | 9 +- .../relational/aggregation/TableAccumulator.java | 4 + .../relational/aggregation/TableAggregator.java | 16 + .../UserDefinedAggregateFunctionAccumulator.java | 6 + .../aggregation/grouped/GroupedAccumulator.java | 8 + .../aggregation/grouped/GroupedAggregator.java | 4 + .../GroupedUserDefinedAggregateAccumulator.java | 6 + .../grouped/HashAggregationOperator.java | 1 + .../grouped/StreamingAggregationOperator.java | 1 + .../db/queryengine/plan/analyze/AnalyzeUtils.java | 24 +- .../queryengine/plan/analyze/AnalyzeVisitor.java | 4 - .../plan/analyze/load/LoadTsFileAnalyzer.java | 78 +- .../analyze/load/LoadTsFileTableSchemaCache.java | 9 +- .../load/LoadTsFileToTableModelAnalyzer.java | 2 +- .../load/LoadTsFileToTreeModelAnalyzer.java | 9 +- .../analyze/load/LoadTsFileTreeSchemaCache.java | 2 +- .../load/TreeSchemaAutoCreatorAndVerifier.java | 36 +- .../config/executor/ClusterConfigTaskExecutor.java | 11 +- .../db/queryengine/plan/parser/ASTVisitor.java | 8 +- .../plan/planner/OperatorTreeGenerator.java | 4 +- .../plan/planner/TableOperatorGenerator.java | 288 ++++-- .../plan/planner/plan/node/write/InsertNode.java | 8 +- .../plan/node/write/RelationalDeleteDataNode.java | 47 +- .../planner/plan/parameter/SeriesScanOptions.java | 12 + .../relational/analyzer/StatementAnalyzer.java | 7 +- .../predicate/ConvertPredicateToFilterVisitor.java | 2 +- .../schema/CheckSchemaPredicateVisitor.java | 4 +- .../ConvertSchemaPredicateToFilterVisitor.java | 4 +- .../plan/relational/metadata/TableSchema.java | 2 +- .../metadata/fetcher/TableDeviceSchemaFetcher.java | 9 +- .../fetcher/TableHeaderSchemaValidator.java | 24 +- .../fetcher/cache/TableDeviceSchemaCache.java | 2 +- .../plan/relational/planner/RelationPlanner.java | 2 +- .../distribute/TableDistributedPlanGenerator.java | 4 +- .../relational/planner/node/TableScanNode.java | 7 +- .../PushLimitOffsetIntoTableScan.java | 5 +- .../optimizations/PushPredicateIntoTableScan.java | 4 +- .../TransformAggregationToStreamable.java | 4 +- .../optimizations/TransformSortToStreamSort.java | 5 +- .../sql/ast/AbstractQueryDeviceWithCache.java | 2 +- .../plan/relational/sql/ast/ColumnDefinition.java | 2 +- .../plan/relational/sql/ast/LoadTsFile.java | 16 +- .../relational/sql/ast/WrappedInsertStatement.java | 10 +- .../plan/relational/sql/parser/AstBuilder.java | 16 +- .../scheduler/load/LoadTsFileDispatcherImpl.java | 2 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 134 ++- .../plan/statement/crud/InsertBaseStatement.java | 13 +- .../plan/statement/crud/LoadTsFileStatement.java | 70 +- .../db/service/metrics/DataNodeMetricsHelper.java | 82 +- .../db/service/metrics/file/TsFileMetrics.java | 49 +- .../metrics/memory/ThresholdMemoryMetrics.java | 194 +++++ .../iotdb/db/storageengine/StorageEngine.java | 13 +- .../db/storageengine/buffer/BloomFilterCache.java | 28 +- .../iotdb/db/storageengine/buffer/ChunkCache.java | 27 +- .../buffer/TimeSeriesMetadataCache.java | 56 +- .../db/storageengine/dataregion/DataRegion.java | 42 +- .../performer/impl/FastCompactionPerformer.java | 15 + .../execute/task/CrossSpaceCompactionTask.java | 2 + .../execute/task/InnerSpaceCompactionTask.java | 32 +- .../task/InsertionCrossSpaceCompactionTask.java | 2 + .../task/RepairUnsortedFileCompactionTask.java | 2 + .../execute/utils/CompactionPathUtils.java | 15 +- .../execute/utils/CompactionTableSchema.java | 4 +- .../compaction/execute/utils/CompactionUtils.java | 46 + .../executor/fast/SeriesCompactionExecutor.java | 10 +- .../compaction/io/CompactionTsFileReader.java | 14 +- .../compaction/io/CompactionTsFileWriter.java | 2 +- .../compaction/schedule/CompactionTaskManager.java | 4 +- .../compaction/schedule/CompactionWorker.java | 7 +- .../dataregion/memtable/AbstractMemTable.java | 4 +- .../dataregion/memtable/TsFileProcessor.java | 27 +- .../dataregion/modification/ModificationFile.java | 31 +- .../reader/common/CachedPriorityMergeReader.java | 97 --- .../reader/common/DescPriorityMergeReader.java | 16 +- .../read/reader/common/PriorityMergeReader.java | 22 +- .../dataregion/tsfile/TsFileResource.java | 38 +- .../tsfile/timeindex/ArrayDeviceTimeIndex.java | 2 +- .../dataregion/tsfile/timeindex/FileTimeIndex.java | 2 +- .../dataregion/tsfile/timeindex/ITimeIndex.java | 2 +- .../db/storageengine/load/LoadTsFileManager.java | 2 +- .../load/active/ActiveLoadTsFileLoader.java | 1 + .../load/config/LoadTsFileConfigurator.java | 29 +- .../LoadConvertedInsertTabletStatement.java | 52 ++ ...ertedInsertTabletStatementExceptionVisitor.java | 51 ++ ...vertedInsertTabletStatementTSStatusVisitor.java | 65 ++ ...leStatementDataTypeConvertExecutionVisitor.java | 143 +++ ...eeStatementDataTypeConvertExecutionVisitor.java | 130 +++ .../converter/LoadTsFileDataTypeConverter.java | 107 +++ .../memory/LoadTsFileDataCacheMemoryBlock.java | 2 +- .../load/memory/LoadTsFileMemoryManager.java | 2 +- .../load/splitter/TsFileSplitter.java | 35 +- .../db/tools/schema/SRStatementGenerator.java | 4 +- .../org/apache/iotdb/db/utils/CommonUtils.java | 2 +- .../java/org/apache/iotdb/db/utils/MemUtils.java | 2 +- .../db/utils/datastructure/AlignedTVList.java | 3 +- .../iotdb/db/metadata/path/PatternTreeMapTest.java | 41 +- .../SchemaRegionSimpleRecoverTest.java | 4 +- .../db/pipe/consensus/DeletionRecoverTest.java | 3 +- .../db/pipe/consensus/DeletionResourceTest.java | 9 +- .../plan/parser/StatementGeneratorTest.java | 11 +- .../node/write/InsertRowsNodeSerdeTest.java | 22 +- .../node/write/InsertTabletNodeSerdeTest.java | 16 +- .../planner/node/write/WritePlanNodeSplitTest.java | 2 +- .../node/write/RelationalDeleteDataNodeTest.java | 3 +- .../plan/relational/analyzer/TSBSMetadata.java | 38 +- .../plan/relational/analyzer/TestMatadata.java | 18 +- .../fetcher/cache/TableDeviceSchemaCacheTest.java | 32 +- .../plan/relational/sql/ast/InsertTabletTest.java | 4 +- .../plan/statement/InsertStatementTest.java | 45 +- .../plan/statement/StatementTestUtils.java | 14 +- .../iotdb/db/storageengine/StorageEngineTest.java | 2 + .../storageengine/buffer/BloomFilterCacheTest.java | 29 +- .../BatchedCompactionWithTsFileSplitterTest.java | 15 +- .../compaction/CompactionWorkerTest.java | 33 + .../inner/InnerSequenceCompactionSpeedTest.java | 43 + .../repair/RepairUnsortedFileCompactionTest.java | 53 ++ .../CompactionTableModelTestFileWriter.java | 2 +- .../CompactionTableSchemaCollectorTest.java | 44 +- .../TableModelCompactionWithTTLTest.java | 9 +- .../compaction/utils/CompactionCheckerUtils.java | 30 +- .../modification/ModificationFileTest.java | 242 +++++- ...SeriesReader.java => AscFakedSeriesReader.java} | 28 +- .../read/reader/common/DescFakedSeriesReader.java | 54 ++ .../reader/common/PriorityMergeReaderTest.java | 113 ++- .../reader/common/PriorityMergeReaderTest2.java | 59 -- .../dataregion/tsfile/TsFileResourceTest.java | 144 +++ .../dataregion/wal/io/WALFileTest.java | 2 +- .../file/UnsealedTsFileRecoverPerformerTest.java | 4 +- .../conf/iotdb-system.properties.template | 11 +- .../apache/iotdb/commons/client/ThriftClient.java | 17 +- .../commons/exception/IllegalPathException.java | 4 +- .../apache/iotdb/commons/path/PathPatternNode.java | 105 ++- .../apache/iotdb/commons/path/PathPatternUtil.java | 9 +- .../commons/schema/table/InformationSchema.java | 16 +- .../apache/iotdb/commons/schema/table/TsTable.java | 10 +- ...entColumnSchema.java => FieldColumnSchema.java} | 18 +- .../{IdColumnSchema.java => TagColumnSchema.java} | 16 +- .../schema/table/column/TsTableColumnCategory.java | 24 +- .../table/column/TsTableColumnSchemaUtil.java | 16 +- .../service/AbstractThriftServiceThread.java | 7 +- .../iotdb/commons/service/metric/enums/Metric.java | 2 + .../iotdb/commons/service/metric/enums/Tag.java | 4 +- .../db/relational/grammar/sql/RelationalSql.g4 | 16 +- .../src/main/openapi3/iotdb_rest_table_v1.yaml | 2 +- .../iotdb/library/match/PatternExecutor.java | 2 +- .../iotdb/library/match/UDAFPatternMatch.java | 62 +- .../org/apache/iotdb/library/UDAFPatternTest.java | 70 ++ pom.xml | 4 +- 329 files changed, 7790 insertions(+), 3426 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index bc264cf83d1,04c54394f03..ac4b611b2fd --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java @@@ -21,7 -21,7 +21,6 @@@ package org.apache.iotdb.db.queryengine import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; --import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; @@@ -53,7 -52,7 +52,6 @@@ import org.apache.tsfile.read.common.bl import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; --import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; @@@ -62,22 -61,21 +60,17 @@@ import java.util.Collections import java.util.List; import java.util.Optional; import java.util.Set; --import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - import static java.lang.String.format; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange; - import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING; - import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; - import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.constructAlignedPath; + import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.CURRENT_DEVICE_INDEX_STRING; + import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; - public abstract class AbstractAggregationTableScanOperator extends AbstractDataSourceOperator { - - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); + public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOperator { - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AbstractAggTableScanOperator.class); - private boolean finished = false; private TsBlock inputTsBlock; @@@ -114,51 -116,57 +111,43 @@@ private boolean allAggregatorsHasFinalResult = false; - public AbstractAggregationTableScanOperator( - public AbstractAggTableScanOperator( -- PlanNodeId sourceId, -- OperatorContext context, -- List<ColumnSchema> aggColumnSchemas, -- int[] aggColumnsIndexArray, -- List<DeviceEntry> deviceEntries, - int deviceCount, -- SeriesScanOptions seriesScanOptions, -- List<String> measurementColumnNames, -- Set<String> allSensors, -- List<IMeasurementSchema> measurementSchemas, -- List<TableAggregator> tableAggregators, -- List<ColumnSchema> groupingKeySchemas, -- int[] groupingKeyIndex, -- ITableTimeRangeIterator tableTimeRangeIterator, -- boolean ascending, -- boolean canUseStatistics, -- List<Integer> aggregatorInputChannels) { -- -- this.sourceId = sourceId; -- this.operatorContext = context; -- this.canUseStatistics = canUseStatistics; -- this.tableAggregators = tableAggregators; -- this.groupingKeySchemas = groupingKeySchemas; -- this.groupingKeyIndex = groupingKeyIndex; - this.groupingKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); -- this.aggColumnSchemas = aggColumnSchemas; -- this.aggColumnsIndexArray = aggColumnsIndexArray; -- this.deviceEntries = deviceEntries; - this.deviceCount = deviceEntries.size(); - this.deviceCount = deviceCount; ++ public AbstractAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) { ++ ++ this.sourceId = parameter.sourceId; ++ this.operatorContext = parameter.context; ++ this.canUseStatistics = parameter.canUseStatistics; ++ this.tableAggregators = parameter.tableAggregators; ++ this.groupingKeySchemas = parameter.groupingKeySchemas; ++ this.groupingKeyIndex = parameter.groupingKeyIndex; ++ this.groupingKeySize = ++ parameter.groupingKeySchemas == null ? 0 : parameter.groupingKeySchemas.size(); ++ this.aggColumnSchemas = parameter.aggColumnSchemas; ++ this.aggColumnsIndexArray = parameter.aggColumnsIndexArray; ++ this.deviceEntries = parameter.deviceEntries; ++ this.deviceCount = parameter.deviceCount; this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); -- this.ascending = ascending; -- this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC; -- this.seriesScanOptions = seriesScanOptions; -- this.measurementColumnNames = measurementColumnNames; ++ this.ascending = parameter.ascending; ++ this.scanOrder = parameter.ascending ? Ordering.ASC : Ordering.DESC; ++ this.seriesScanOptions = parameter.seriesScanOptions; ++ this.measurementColumnNames = parameter.measurementColumnNames; this.measurementCount = measurementColumnNames.size(); this.cachedRawDataSize = (1L + this.measurementCount) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); -- this.allSensors = allSensors; -- this.measurementSchemas = measurementSchemas; ++ this.allSensors = parameter.allSensors; ++ this.measurementSchemas = parameter.measurementSchemas; this.measurementColumnTSDataTypes = -- measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); ++ parameter.measurementSchemas.stream() ++ .map(IMeasurementSchema::getType) ++ .collect(Collectors.toList()); this.currentDeviceIndex = 0; this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); -- this.aggregatorInputChannels = aggregatorInputChannels; -- this.timeIterator = tableTimeRangeIterator; ++ this.aggregatorInputChannels = parameter.aggregatorInputChannels; ++ this.timeIterator = parameter.tableTimeRangeIterator; + this.dateBinSize = + timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? 1 + : 0; constructAlignedSeriesScanUtil(); } @@@ -171,89 -179,64 +160,14 @@@ return finished; } - @Override - public long calculateMaxPeekMemory() { - return cachedRawDataSize + maxReturnSize; - } - - @Override - public long calculateMaxReturnSize() { - return maxReturnSize; - } - - @Override - public long calculateRetainedSizeAfterCallingNext() { - return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR - ? cachedRawDataSize - : 0; - } - -- @Override -- public boolean hasNext() throws Exception { - return timeIterator.hasCachedTimeRange() - || timeIterator.hasNextTimeRange() - || !resultTsBlockBuilder.isEmpty(); - if (retainedTsBlock != null) { - return true; - } - - return timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange(); -- } -- -- @Override -- public TsBlock next() throws Exception { - if (retainedTsBlock != null) { - return getResultFromRetainedTsBlock(); - } -- -- // optimize for sql: select count(*) from (select count(s1), sum(s1) from table) -- if (tableAggregators.isEmpty() -- && timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR -- && resultTsBlockBuilder.getValueColumnBuilders().length == 0) { -- resultTsBlockBuilder.reset(); -- currentDeviceIndex = deviceCount; -- timeIterator.setFinished(); -- Column[] valueColumns = new Column[0]; -- return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1), valueColumns); -- } -- -- // start stopwatch, reset leftRuntimeOfOneNextCall -- long start = System.nanoTime(); -- leftRuntimeOfOneNextCall = 1000 * operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); -- long maxRuntime = leftRuntimeOfOneNextCall; -- -- while (System.nanoTime() - start < maxRuntime -- && (timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange()) -- && !resultTsBlockBuilder.isFull()) { -- -- // calculate aggregation result on current time window -- // return true if current time window is calc finished -- if (calculateAggregationResultForCurrentTimeRange()) { -- timeIterator.resetCurTimeRange(); -- } -- } -- - if (resultTsBlockBuilder.getPositionCount() > 0) { - return buildResultTsBlock(); - } else { - if (resultTsBlockBuilder.isEmpty()) { -- return null; -- } - - buildResultTsBlock(); - return checkTsBlockSizeAndGetResult(); -- } - + protected abstract void updateResultTsBlock(); - private TsBlock buildResultTsBlock() { - int declaredPositions = resultTsBlockBuilder.getPositionCount(); - ColumnBuilder[] valueColumnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - Column[] valueColumns = new Column[valueColumnBuilders.length]; - for (int i = 0; i < valueColumns.length; i++) { - valueColumns[i] = valueColumnBuilders[i].build(); - if (valueColumns[i].getPositionCount() != declaredPositions) { - throw new IllegalStateException( - format( - "Declared positions (%s) does not match column %s's number of entries (%s)", - declaredPositions, i, valueColumns[i].getPositionCount())); - } - } - - TsBlock resultTsBlock = - new TsBlock( - resultTsBlockBuilder.getPositionCount(), + protected void buildResultTsBlock() { + resultTsBlock = + resultTsBlockBuilder.build( new RunLengthEncodedColumn( - TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount()), - valueColumns); + TIME_COLUMN_TEMPLATE, resultTsBlockBuilder.getPositionCount())); resultTsBlockBuilder.reset(); - return resultTsBlock; } protected void constructAlignedSeriesScanUtil() { @@@ -444,10 -421,13 +352,10 @@@ switch (columnSchemaCategory) { case TIME: return inputRegion.getTimeColumn(); - case ID: + case TAG: - // TODO avoid create deviceStatics multi times; count, sum can use time statistics String id = - (String) - deviceEntries - .get(currentDeviceIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); + getNthIdColumnValue( + deviceEntries.get(currentDeviceIndex), aggColumnsIndexArray[columnIdx]); return getIdOrAttrColumn( inputRegion.getTimeColumn().getPositionCount(), id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); @@@ -512,10 -492,13 +420,10 @@@ switch (columnSchemaCategory) { case TIME: return timeStatistics; - case ID: + case TAG: - // TODO avoid create deviceStatics multi times; count, sum can use time statistics String id = - (String) - deviceEntries - .get(currentDeviceIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); + getNthIdColumnValue( + deviceEntries.get(currentDeviceIndex), aggColumnsIndexArray[columnIdx]); return getStatistics( timeStatistics, id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET)); case ATTRIBUTE: @@@ -721,48 -702,43 +627,43 @@@ return; } - ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); - - int groupKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size(); - int dateBinSize = - timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR - ? 1 - : 0; - - if (groupingKeyIndex != null) { - for (int i = 0; i < groupKeySize; i++) { - if (TsTableColumnCategory.ID == groupingKeySchemas.get(i).getColumnCategory()) { - String id = - getNthIdColumnValue(deviceEntries.get(currentDeviceIndex), groupingKeyIndex[i]); - if (id == null) { - columnBuilders[i].appendNull(); - } else { - columnBuilders[i].writeBinary(new Binary(id, TSFileConfig.STRING_CHARSET)); - } - } else { - Binary attribute = - deviceEntries - .get(currentDeviceIndex) - .getAttributeColumnValues() - .get(groupingKeyIndex[i]); - if (attribute == null) { - columnBuilders[i].appendNull(); - } else { - columnBuilders[i].writeBinary(attribute); - } - } - } - } + appendGroupKeysToResult(deviceEntries, currentDeviceIndex); if (dateBinSize > 0) { - columnBuilders[groupKeySize].writeLong(timeIterator.getCurTimeRange().getMin()); + resultTsBlockBuilder.getValueColumnBuilders()[groupingKeySize].writeLong( + timeIterator.getCurTimeRange().getMin()); } - for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(i).evaluate(columnBuilders[groupKeySize + dateBinSize + i]); + for (int i = 0; i < tableAggregators.size(); i++) { + tableAggregators + .get(i) + .evaluate( + resultTsBlockBuilder.getValueColumnBuilders()[groupingKeySize + dateBinSize + i]); } - tsBlockBuilder.declarePosition(); + resultTsBlockBuilder.declarePosition(); + } + + protected void appendGroupKeysToResult(List<DeviceEntry> deviceEntries, int deviceIndex) { + ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); + for (int i = 0; i < groupingKeySize; i++) { + if (TsTableColumnCategory.TAG == groupingKeySchemas.get(i).getColumnCategory()) { - String id = (String) deviceEntries.get(deviceIndex).getNthSegment(groupingKeyIndex[i] + 1); ++ String id = getNthIdColumnValue(deviceEntries.get(deviceIndex), groupingKeyIndex[i]); + if (id == null) { + columnBuilders[i].appendNull(); + } else { + columnBuilders[i].writeBinary(new Binary(id, TSFileConfig.STRING_CHARSET)); + } + } else { + Binary attribute = + deviceEntries.get(deviceIndex).getAttributeColumnValues().get(groupingKeyIndex[i]); + if (attribute == null) { + columnBuilders[i].appendNull(); + } else { + columnBuilders[i].writeBinary(attribute); + } + } + } } public boolean isAllAggregatorsHasFinalResult(List<TableAggregator> aggregators) { @@@ -852,14 -823,35 +748,117 @@@ } @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) - + RamUsageEstimator.sizeOfCollection(deviceEntries); + public long calculateMaxPeekMemory() { + return cachedRawDataSize + maxReturnSize; + } + + @Override + public long calculateMaxReturnSize() { + return maxReturnSize; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? cachedRawDataSize + : 0; + } + - @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) - + RamUsageEstimator.sizeOfCollection(deviceEntries); - } - + @Override + public void close() throws Exception { + super.close(); + tableAggregators.forEach(TableAggregator::close); } + + abstract String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex); ++ ++ public static class AbstractAggTableScanOperatorParameter { ++ private final String timeColumnName; ++ protected final PlanNodeId sourceId; ++ protected final OperatorContext context; ++ protected final List<ColumnSchema> aggColumnSchemas; ++ protected final int[] aggColumnsIndexArray; ++ protected final SeriesScanOptions seriesScanOptions; ++ protected final List<String> measurementColumnNames; ++ protected final Set<String> allSensors; ++ protected final List<IMeasurementSchema> measurementSchemas; ++ protected final List<TableAggregator> tableAggregators; ++ protected final List<ColumnSchema> groupingKeySchemas; ++ protected final int[] groupingKeyIndex; ++ protected final ITableTimeRangeIterator tableTimeRangeIterator; ++ protected final boolean ascending; ++ protected final boolean canUseStatistics; ++ protected final List<Integer> aggregatorInputChannels; ++ ++ protected List<DeviceEntry> deviceEntries; ++ protected int deviceCount; ++ ++ public AbstractAggTableScanOperatorParameter( ++ PlanNodeId sourceId, ++ OperatorContext context, ++ List<ColumnSchema> aggColumnSchemas, ++ int[] aggColumnsIndexArray, ++ List<DeviceEntry> deviceEntries, ++ int deviceCount, ++ SeriesScanOptions seriesScanOptions, ++ List<String> measurementColumnNames, ++ Set<String> allSensors, ++ List<IMeasurementSchema> measurementSchemas, ++ List<TableAggregator> tableAggregators, ++ List<ColumnSchema> groupingKeySchemas, ++ int[] groupingKeyIndex, ++ ITableTimeRangeIterator tableTimeRangeIterator, ++ boolean ascending, ++ boolean canUseStatistics, ++ List<Integer> aggregatorInputChannels, ++ String timeColumnName) { ++ this.sourceId = sourceId; ++ this.context = context; ++ this.aggColumnSchemas = aggColumnSchemas; ++ this.aggColumnsIndexArray = aggColumnsIndexArray; ++ this.deviceEntries = deviceEntries; ++ this.deviceCount = deviceCount; ++ this.seriesScanOptions = seriesScanOptions; ++ this.measurementColumnNames = measurementColumnNames; ++ this.allSensors = allSensors; ++ this.measurementSchemas = measurementSchemas; ++ this.tableAggregators = tableAggregators; ++ this.groupingKeySchemas = groupingKeySchemas; ++ this.groupingKeyIndex = groupingKeyIndex; ++ this.tableTimeRangeIterator = tableTimeRangeIterator; ++ this.ascending = ascending; ++ this.canUseStatistics = canUseStatistics; ++ this.aggregatorInputChannels = aggregatorInputChannels; ++ this.timeColumnName = timeColumnName; ++ } ++ ++ public List<TableAggregator> getTableAggregators() { ++ return tableAggregators; ++ } ++ ++ public SeriesScanOptions getSeriesScanOptions() { ++ return seriesScanOptions; ++ } ++ ++ public Set<String> getAllSensors() { ++ return allSensors; ++ } ++ ++ public List<String> getMeasurementColumnNames() { ++ return measurementColumnNames; ++ } ++ ++ public List<IMeasurementSchema> getMeasurementSchemas() { ++ return measurementSchemas; ++ } ++ ++ public String getTimeColumnName() { ++ return timeColumnName; ++ } ++ ++ public void setDeviceEntries(List<DeviceEntry> deviceEntries) { ++ this.deviceEntries = deviceEntries; ++ this.deviceCount = deviceEntries.size(); ++ } ++ } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java index 00000000000,00000000000..f88aa22aabc new file mode 100644 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDefaultAggTableScanOperator.java @@@ -1,0 -1,0 +1,109 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++ ++package org.apache.iotdb.db.queryengine.execution.operator.source.relational; ++ ++import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; ++import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; ++ ++import org.apache.tsfile.block.column.Column; ++import org.apache.tsfile.read.common.block.TsBlock; ++import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; ++import org.apache.tsfile.utils.RamUsageEstimator; ++ ++import java.util.concurrent.TimeUnit; ++ ++import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; ++ ++public abstract class AbstractDefaultAggTableScanOperator extends AbstractAggTableScanOperator { ++ ++ private static final long INSTANCE_SIZE = ++ RamUsageEstimator.shallowSizeOfInstance(AbstractDefaultAggTableScanOperator.class); ++ ++ public AbstractDefaultAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) { ++ super(parameter); ++ } ++ ++ @Override ++ public boolean hasNext() throws Exception { ++ if (retainedTsBlock != null) { ++ return true; ++ } ++ ++ return timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange(); ++ } ++ ++ @Override ++ public TsBlock next() throws Exception { ++ if (retainedTsBlock != null) { ++ return getResultFromRetainedTsBlock(); ++ } ++ ++ // optimize for sql: select count(*) from (select count(s1), sum(s1) from table) ++ if (tableAggregators.isEmpty() ++ && timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR ++ && resultTsBlockBuilder.getValueColumnBuilders().length == 0) { ++ resultTsBlockBuilder.reset(); ++ currentDeviceIndex = deviceCount; ++ timeIterator.setFinished(); ++ Column[] valueColumns = new Column[0]; ++ return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1), valueColumns); ++ } ++ ++ // start stopwatch, reset leftRuntimeOfOneNextCall ++ long start = System.nanoTime(); ++ leftRuntimeOfOneNextCall = 1000 * operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); ++ long maxRuntime = leftRuntimeOfOneNextCall; ++ ++ while (System.nanoTime() - start < maxRuntime ++ && (timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange()) ++ && !resultTsBlockBuilder.isFull()) { ++ ++ // calculate aggregation result on current time window ++ // return true if current time window is calc finished ++ if (calculateAggregationResultForCurrentTimeRange()) { ++ timeIterator.resetCurTimeRange(); ++ } ++ } ++ ++ if (resultTsBlockBuilder.isEmpty()) { ++ return null; ++ } ++ ++ buildResultTsBlock(); ++ return checkTsBlockSizeAndGetResult(); ++ } ++ ++ @Override ++ protected void updateResultTsBlock() { ++ appendAggregationResult(); ++ // after appendAggregationResult invoked, aggregators must be cleared ++ resetTableAggregators(); ++ } ++ ++ @Override ++ public long ramBytesUsed() { ++ return INSTANCE_SIZE ++ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) ++ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) ++ + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) ++ + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) ++ + RamUsageEstimator.sizeOfCollection(deviceEntries); ++ } ++} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java index ca8c9725adc,6abf101ab60..d8f7a5f3359 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java @@@ -205,9 -213,10 +205,9 @@@ public abstract class AbstractTableScan Column[] valueColumns = new Column[columnsIndexArray.length]; for (int i = 0; i < columnsIndexArray.length; i++) { switch (columnSchemas.get(i).getColumnCategory()) { - case ID: + case TAG: - // +1 for skip the table name segment - String idColumnValue = - ((String) currentDeviceEntry.getNthSegment(columnsIndexArray[i] + 1)); + String idColumnValue = getNthIdColumnValue(currentDeviceEntry, columnsIndexArray[i]); + valueColumns[i] = getIdOrAttributeValueColumn( idColumnValue == null diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DefaultAggTableScanOperator.java index 7fe6b41e838,a49446500d5..3ebd7bed0b3 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DefaultAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DefaultAggTableScanOperator.java @@@ -17,26 -17,65 +17,19 @@@ * under the License. */ - package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; + package org.apache.iotdb.db.queryengine.execution.operator.source.relational; - import org.apache.tsfile.block.column.Column; - import org.apache.tsfile.block.column.ColumnBuilder; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; -import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; - public interface GroupedAccumulator { -import org.apache.tsfile.write.schema.IMeasurementSchema; ++public class DefaultAggTableScanOperator extends AbstractDefaultAggTableScanOperator { - long getEstimatedSize(); -import java.util.List; -import java.util.Set; - -public class DefaultAggTableScanOperator extends AbstractAggTableScanOperator { - - public DefaultAggTableScanOperator( - PlanNodeId sourceId, - OperatorContext context, - List<ColumnSchema> aggColumnSchemas, - int[] aggColumnsIndexArray, - List<DeviceEntry> deviceEntries, - int deviceCount, - SeriesScanOptions seriesScanOptions, - List<String> measurementColumnNames, - Set<String> allSensors, - List<IMeasurementSchema> measurementSchemas, - List<TableAggregator> tableAggregators, - List<ColumnSchema> groupingKeySchemas, - int[] groupingKeyIndex, - ITableTimeRangeIterator tableTimeRangeIterator, - boolean ascending, - boolean canUseStatistics, - List<Integer> aggregatorInputChannels) { - super( - sourceId, - context, - aggColumnSchemas, - aggColumnsIndexArray, - deviceEntries, - deviceCount, - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - tableAggregators, - groupingKeySchemas, - groupingKeyIndex, - tableTimeRangeIterator, - ascending, - canUseStatistics, - aggregatorInputChannels); ++ public DefaultAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) { ++ super(parameter); + } - void setGroupCount(long groupCount); - - void addInput(int[] groupIds, Column[] arguments); - - void addIntermediate(int[] groupIds, Column argument); - - void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder); - - void evaluateFinal(int groupId, ColumnBuilder columnBuilder); - - void prepareFinal(); - - void reset(); + @Override - protected void updateResultTsBlock() { - appendAggregationResult(); - // after appendAggregationResult invoked, aggregators must be cleared - resetTableAggregators(); ++ String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { ++ // +1 for skipping the table name segment ++ return ((String) deviceEntry.getNthSegment(idColumnIndex + 1)); + } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java index 00000000000,a1ae289f95c..b74fb4bdf54 mode 000000,100644..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java @@@ -1,0 -1,458 +1,424 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + + import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; + import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; -import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; + import org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastAccumulator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; + + import org.apache.tsfile.block.column.ColumnBuilder; + import org.apache.tsfile.common.conf.TSFileConfig; + import org.apache.tsfile.read.TimeValuePair; + import org.apache.tsfile.read.common.block.TsBlock; + import org.apache.tsfile.utils.Binary; + import org.apache.tsfile.utils.Pair; + import org.apache.tsfile.utils.RamUsageEstimator; + import org.apache.tsfile.utils.TsPrimitiveType; + import org.apache.tsfile.write.UnSupportedDataTypeException; -import org.apache.tsfile.write.schema.IMeasurementSchema; + + import java.util.ArrayList; + import java.util.List; + import java.util.OptionalLong; -import java.util.Set; + import java.util.concurrent.TimeUnit; + + import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.Utils.serializeTimeValue; + import static org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE; + import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; + + /** + * This class is used to execute aggregation table scan when apply {@code canUseLastCacheOptimize()} + */ + public class LastQueryAggTableScanOperator extends AbstractAggTableScanOperator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LastQueryAggTableScanOperator.class); + + private static final TableDeviceSchemaCache TABLE_DEVICE_SCHEMA_CACHE = + TableDeviceSchemaCache.getInstance(); + - private boolean finished = false; + private final String dbName; + private int outputDeviceIndex; + private DeviceEntry currentDeviceEntry; + private final List<DeviceEntry> cachedDeviceEntries; + private final int allDeviceCount; + + private final boolean needUpdateCache; + private final boolean needUpdateNullEntry; + private final List<Integer> hitCachesIndexes; + private final List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults; + private int currentHitCacheIndex = 0; + + // indicates the index of last(time) aggregation + private int lastTimeAggregationIdx = -1; + + public LastQueryAggTableScanOperator( - PlanNodeId sourceId, - OperatorContext context, - List<ColumnSchema> aggColumnSchemas, - int[] aggColumnsIndexArray, - List<DeviceEntry> unCachedDeviceEntries, ++ AbstractAggTableScanOperatorParameter parameter, + List<DeviceEntry> cachedDeviceEntries, - SeriesScanOptions seriesScanOptions, - List<String> measurementColumnNames, - Set<String> allSensors, - List<IMeasurementSchema> measurementSchemas, - List<TableAggregator> tableAggregators, - List<ColumnSchema> groupingKeySchemas, - int[] groupingKeyIndex, - ITableTimeRangeIterator tableTimeRangeIterator, - boolean ascending, - boolean canUseStatistics, - List<Integer> aggregatorInputChannels, + QualifiedObjectName qualifiedObjectName, + List<Integer> hitCachesIndexes, + List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults) { + - super( - sourceId, - context, - aggColumnSchemas, - aggColumnsIndexArray, - unCachedDeviceEntries, - unCachedDeviceEntries.size(), - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - tableAggregators, - groupingKeySchemas, - groupingKeyIndex, - tableTimeRangeIterator, - ascending, - canUseStatistics, - aggregatorInputChannels); ++ super(parameter); + + // notice that: deviceEntries store all unCachedDeviceEntries - this.allDeviceCount = cachedDeviceEntries.size() + unCachedDeviceEntries.size(); ++ this.allDeviceCount = cachedDeviceEntries.size() + parameter.deviceCount; + this.cachedDeviceEntries = cachedDeviceEntries; - this.needUpdateCache = LastQueryUtil.needUpdateCache(seriesScanOptions.getGlobalTimeFilter()); ++ this.needUpdateCache = ++ LastQueryUtil.needUpdateCache(parameter.seriesScanOptions.getGlobalTimeFilter()); + this.needUpdateNullEntry = - LastQueryUtil.needUpdateNullEntry(seriesScanOptions.getGlobalTimeFilter()); ++ LastQueryUtil.needUpdateNullEntry(parameter.seriesScanOptions.getGlobalTimeFilter()); + this.hitCachesIndexes = hitCachesIndexes; + this.hitCachedResults = hitCachedResults; + this.dbName = qualifiedObjectName.getDatabaseName(); + - for (int i = 0; i < tableAggregators.size(); i++) { - if (tableAggregators.get(i).getAccumulator() instanceof LastAccumulator) { ++ for (int i = 0; i < parameter.tableAggregators.size(); i++) { ++ if (parameter.tableAggregators.get(i).getAccumulator() instanceof LastAccumulator) { + lastTimeAggregationIdx = i; + } + } + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + + return outputDeviceIndex < allDeviceCount; + } + + @Override + public TsBlock next() throws Exception { + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + + if (retainedTsBlock != null) { + return getResultFromRetainedTsBlock(); + } + + while (System.nanoTime() - start < maxRuntime + && !resultTsBlockBuilder.isFull() + && outputDeviceIndex < allDeviceCount) { + processCurrentDevice(); + } + + if (resultTsBlockBuilder.isEmpty()) { + return null; + } + + buildResultTsBlock(); + return checkTsBlockSizeAndGetResult(); + } + + /** Main process logic, calc the last aggregation results of current device. */ + private void processCurrentDevice() { + if (currentHitCacheIndex < hitCachesIndexes.size() + && outputDeviceIndex == hitCachesIndexes.get(currentHitCacheIndex)) { + currentDeviceEntry = cachedDeviceEntries.get(currentHitCacheIndex); + buildResultUseLastCache(); + return; + } + + if (calculateAggregationResultForCurrentTimeRange()) { + timeIterator.resetCurTimeRange(); + } + } + + private void buildResultUseLastCache() { + appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex); + Pair<OptionalLong, TsPrimitiveType[]> currentHitResult = + hitCachedResults.get(currentHitCacheIndex); + long lastTime = currentHitResult.getLeft().getAsLong(); + int channel = 0; + for (int i = 0; i < tableAggregators.size(); i++) { + TableAggregator aggregator = tableAggregators.get(i); + ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i); + int columnIdx = aggregatorInputChannels.get(channel); + ColumnSchema schema = aggColumnSchemas.get(columnIdx); + TsTableColumnCategory category = schema.getColumnCategory(); + switch (category) { + case TAG: + String id = - (String) - cachedDeviceEntries - .get(currentHitCacheIndex) - .getNthSegment(aggColumnsIndexArray[columnIdx] + 1); ++ getNthIdColumnValue( ++ cachedDeviceEntries.get(currentHitCacheIndex), aggColumnsIndexArray[columnIdx]); + if (id == null) { + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue(getTSDataType(schema.getType()), lastTime, true, null))); + } else { + columnBuilder.appendNull(); + } + } else { + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue( + getTSDataType(schema.getType()), + lastTime, + false, + new TsPrimitiveType.TsBinary( + new Binary(id, TSFileConfig.STRING_CHARSET))))); + } else { + columnBuilder.writeBinary(new Binary(id, TSFileConfig.STRING_CHARSET)); + } + } + break; + case ATTRIBUTE: + Binary attribute = + cachedDeviceEntries + .get(currentHitCacheIndex) + .getAttributeColumnValues() + .get(aggColumnsIndexArray[columnIdx]); + if (attribute == null) { + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue(getTSDataType(schema.getType()), lastTime, true, null))); + } else { + columnBuilder.appendNull(); + } + } else { + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue( + getTSDataType(schema.getType()), + lastTime, + false, + new TsPrimitiveType.TsBinary(attribute)))); + } else { + columnBuilder.writeBinary(attribute); + } + } + break; + case TIME: + if (aggregator.getAccumulator() instanceof LastDescAccumulator) { + // for last(time) aggregation + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue( + getTSDataType(schema.getType()), + lastTime, + new TsPrimitiveType.TsLong(lastTime)))); + } else { + columnBuilder.writeTsPrimitiveType(new TsPrimitiveType.TsLong(lastTime)); + } + } else { + // for last_by(time,time) aggregation + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue( + getTSDataType(schema.getType()), + lastTime, + false, + new TsPrimitiveType.TsLong(lastTime)))); + } else { + columnBuilder.writeTsPrimitiveType(new TsPrimitiveType.TsLong(lastTime)); + } + } + break; + case FIELD: + int measurementIdx = aggColumnsIndexArray[aggregatorInputChannels.get(channel)]; + TsPrimitiveType tsPrimitiveType = + hitCachedResults.get(currentHitCacheIndex).getRight()[measurementIdx]; + long lastByTime = hitCachedResults.get(currentHitCacheIndex).getLeft().getAsLong(); + if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) { + // there is no data for this time series + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue(getTSDataType(schema.getType()), lastByTime, true, null))); + } else { + columnBuilder.appendNull(); + } + } else { + if (aggregator.getStep().isOutputPartial()) { + columnBuilder.writeBinary( + new Binary( + serializeTimeValue( + getTSDataType(schema.getType()), lastByTime, false, tsPrimitiveType))); + } else { + columnBuilder.writeTsPrimitiveType(tsPrimitiveType); + } + } + break; + default: + throw new IllegalStateException("Unsupported category: " + category); + } + + channel += aggregator.getChannelCount(); + } + + resultTsBlockBuilder.declarePosition(); + outputDeviceIndex++; + currentHitCacheIndex++; + } + + private void updateLastCacheIfPossible() { + if (!needUpdateCache) { + return; + } + + int channel = 0; + List<String> updateMeasurementList = new ArrayList<>(); + List<TimeValuePair> updateTimeValuePairList = new ArrayList<>(); + boolean hasSetLastTime = false; + for (int i = 0; i < tableAggregators.size(); i++) { + TableAggregator tableAggregator = tableAggregators.get(i); + ColumnSchema schema = aggColumnSchemas.get(aggregatorInputChannels.get(channel)); + + switch (schema.getColumnCategory()) { + case TIME: + if (!hasSetLastTime) { + hasSetLastTime = true; + if (i == lastTimeAggregationIdx) { + LastDescAccumulator lastAccumulator = + (LastDescAccumulator) tableAggregator.getAccumulator(); + if (lastAccumulator.hasInitResult()) { + updateMeasurementList.add(""); + updateTimeValuePairList.add( + new TimeValuePair( + lastAccumulator.getMaxTime(), + new TsPrimitiveType.TsLong(lastAccumulator.getMaxTime()))); + } + } else { + LastByDescAccumulator lastByAccumulator = + (LastByDescAccumulator) tableAggregator.getAccumulator(); + if (lastByAccumulator.hasInitResult() && !lastByAccumulator.isXNull()) { + updateMeasurementList.add(""); + updateTimeValuePairList.add( + new TimeValuePair( + lastByAccumulator.getLastTimeOfY(), + new TsPrimitiveType.TsLong(lastByAccumulator.getLastTimeOfY()))); + } + } + } + break; + case FIELD: + LastByDescAccumulator lastByAccumulator = + (LastByDescAccumulator) tableAggregator.getAccumulator(); + // only can update LastCache when last_by return non-null value + if (lastByAccumulator.hasInitResult() && !lastByAccumulator.isXNull()) { + long lastByTime = lastByAccumulator.getLastTimeOfY(); + + if (!hasSetLastTime) { + hasSetLastTime = true; + updateMeasurementList.add(""); + updateTimeValuePairList.add( + new TimeValuePair(lastByTime, new TsPrimitiveType.TsLong(lastByTime))); + } + + updateMeasurementList.add(schema.getName()); + updateTimeValuePairList.add( + new TimeValuePair( + lastByTime, cloneTsPrimitiveType(lastByAccumulator.getXResult()))); + } + break; + default: + break; + } + + channel += tableAggregator.getChannelCount(); + } + + if (!updateMeasurementList.isEmpty()) { + String[] updateMeasurementArray = updateMeasurementList.toArray(new String[0]); + TimeValuePair[] updateTimeValuePairArray = + updateTimeValuePairList.toArray(new TimeValuePair[0]); + currentDeviceEntry = deviceEntries.get(currentDeviceIndex); + TABLE_DEVICE_SCHEMA_CACHE.initOrInvalidateLastCache( + dbName, currentDeviceEntry.getDeviceID(), updateMeasurementArray, false); + TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists( + dbName, + currentDeviceEntry.getDeviceID(), + updateMeasurementArray, + updateTimeValuePairArray); + } + } + + @Override + protected void updateResultTsBlock() { + appendAggregationResult(); + + if (timeIterator.hasCachedTimeRange()) { + updateLastCacheIfPossible(); + } + + outputDeviceIndex++; + + // after appendAggregationResult invoked, aggregators must be cleared + resetTableAggregators(); + } + ++ @Override ++ String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { ++ // +1 for skipping the table name segment ++ return ((String) deviceEntry.getNthSegment(idColumnIndex + 1)); ++ } ++ + private TsPrimitiveType cloneTsPrimitiveType(TsPrimitiveType originalValue) { + switch (originalValue.getDataType()) { + case BOOLEAN: + return new TsPrimitiveType.TsBoolean(originalValue.getBoolean()); + case INT32: + case DATE: + return new TsPrimitiveType.TsInt(originalValue.getInt()); + case INT64: + case TIMESTAMP: + return new TsPrimitiveType.TsLong(originalValue.getLong()); + case FLOAT: + return new TsPrimitiveType.TsFloat(originalValue.getFloat()); + case DOUBLE: + return new TsPrimitiveType.TsDouble(originalValue.getDouble()); + case TEXT: + case BLOB: + case STRING: + return new TsPrimitiveType.TsBinary(originalValue.getBinary()); + case VECTOR: + return new TsPrimitiveType.TsVector(originalValue.getVector()); + default: + throw new UnSupportedDataTypeException( + "Unsupported data type:" + originalValue.getDataType()); + } + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) + + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) + + RamUsageEstimator.sizeOfCollection(deviceEntries) + + RamUsageEstimator.sizeOfCollection(cachedDeviceEntries) + + RamUsageEstimator.sizeOfCollection(hitCachedResults); + } + } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java index 84b9199447f,9ab432c1e35..4bc073e63dc --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewAggregationScanOperator.java @@@ -17,67 -17,20 +17,26 @@@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; - import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; - import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; - import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; --import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; - import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; - import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; -public abstract class SourceNode extends PlanNode implements AutoCloseable, IPartitionRelatedNode { +import org.apache.tsfile.file.metadata.IDeviceID; - import org.apache.tsfile.write.schema.IMeasurementSchema; - - import java.util.List; - import java.util.Set; - protected SourceNode(final PlanNodeId id) { - super(id); - } +public class TreeAlignedDeviceViewAggregationScanOperator - extends AbstractAggregationTableScanOperator { ++ extends AbstractDefaultAggTableScanOperator { + + private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; - public abstract void open() throws Exception; + public TreeAlignedDeviceViewAggregationScanOperator( - PlanNodeId sourceId, - OperatorContext context, - List<ColumnSchema> aggColumnSchemas, - int[] aggColumnsIndexArray, - List<DeviceEntry> deviceEntries, - SeriesScanOptions seriesScanOptions, - List<String> measurementColumnNames, - Set<String> allSensors, - List<IMeasurementSchema> measurementSchemas, - List<TableAggregator> tableAggregators, - List<ColumnSchema> groupingKeySchemas, - int[] groupingKeyIndex, - ITableTimeRangeIterator tableTimeRangeIterator, - boolean ascending, - boolean canUseStatistics, - List<Integer> aggregatorInputChannels, ++ AbstractAggTableScanOperatorParameter parameter, + IDeviceID.TreeDeviceIdColumnValueExtractor extractor) { - super( - sourceId, - context, - aggColumnSchemas, - aggColumnsIndexArray, - deviceEntries, - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - tableAggregators, - groupingKeySchemas, - groupingKeyIndex, - tableTimeRangeIterator, - ascending, - canUseStatistics, - aggregatorInputChannels); ++ super(parameter); + this.extractor = extractor; + } - public abstract void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet); + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return (String) extractor.extract(deviceEntry.getDeviceID(), idColumnIndex); + } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 9a7ef7d8c62,741e05a64bd..d1cf042ef52 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@@ -20,9 -20,8 +20,10 @@@ package org.apache.iotdb.db.queryengine.plan.planner; import org.apache.iotdb.common.rpc.thrift.TEndPoint; + import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; @@@ -72,15 -72,17 +74,21 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.execution.operator.schema.source.DevicePredicateFilter; import org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory; import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; ++import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractAggTableScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortFullOuterJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.MergeSortInnerJoinOperator; - import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableAggregationTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; ++import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator; + import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator; @@@ -160,10 -162,7 +171,11 @@@ import org.apache.tsfile.block.column.C import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.idcolumn.FourOrHigherLevelDBExtractor; +import org.apache.tsfile.file.metadata.idcolumn.ThreeLevelDBExtractor; +import org.apache.tsfile.file.metadata.idcolumn.TwoLevelDBExtractor; + import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.column.BinaryColumn; import org.apache.tsfile.read.common.block.column.BooleanColumn; @@@ -327,94 -330,8 +343,95 @@@ public class TableOperatorGenerator ext } @Override - public Operator visitDeviceTableScan( - DeviceTableScanNode node, LocalExecutionPlanContext context) { + public Operator visitTreeNonAlignedDeviceViewScan( + TreeNonAlignedDeviceViewScanNode node, LocalExecutionPlanContext context) { + throw new UnsupportedOperationException( + "view for non aligned devices in tree is not supported"); + } + + public static IDeviceID.TreeDeviceIdColumnValueExtractor createTreeDeviceIdColumnValueExtractor( + String treeDBName) { + try { + PartialPath db = new PartialPath(treeDBName); + int dbLevel = db.getNodes().length; + if (dbLevel == 2) { + return new TwoLevelDBExtractor(treeDBName.length()); + } else if (dbLevel == 3) { + return new ThreeLevelDBExtractor(treeDBName.length()); + } else if (dbLevel >= 4) { + return new FourOrHigherLevelDBExtractor(dbLevel); + } else { + throw new IllegalArgumentException( + "tree db name should at least be two level: " + treeDBName); + } + } catch (IllegalPathException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Operator visitTreeAlignedDeviceViewScan( + TreeAlignedDeviceViewScanNode node, LocalExecutionPlanContext context) { + + IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor = + createTreeDeviceIdColumnValueExtractor(node.getTreeDBName()); + + AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = + constructAbstractTableScanOperatorParameter( + node, + context, + TreeAlignedDeviceViewScanOperator.class.getSimpleName(), + node.getMeasurementColumnNameMap()); + + TreeAlignedDeviceViewScanOperator treeAlignedDeviceViewScanOperator = + new TreeAlignedDeviceViewScanOperator(parameter, idColumnValueExtractor); + + addSource( + treeAlignedDeviceViewScanOperator, + context, + node, + parameter.measurementColumnNames, + parameter.measurementSchemas, - parameter.allSensors); ++ parameter.allSensors, ++ TreeAlignedDeviceViewScanNode.class.getSimpleName()); + + return treeAlignedDeviceViewScanOperator; + } + + private void addSource( - AbstractTableScanOperator tableScanOperator, ++ AbstractDataSourceOperator sourceOperator, + LocalExecutionPlanContext context, + DeviceTableScanNode node, + List<String> measurementColumnNames, + List<IMeasurementSchema> measurementSchemas, - Set<String> allSensors) { ++ Set<String> allSensors, ++ String planNodeName) { + - ((DataDriverContext) context.getDriverContext()).addSourceOperator(tableScanOperator); ++ ((DataDriverContext) context.getDriverContext()).addSourceOperator(sourceOperator); + + for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { + if (node.getDeviceEntries().get(i) == null) { + throw new IllegalStateException( - "Device entries of index " + i + " in DeviceTableScanNode is empty"); ++ "Device entries of index " + i + " in " + planNodeName + " is empty"); + } + AlignedFullPath alignedPath = + constructAlignedPath( + node.getDeviceEntries().get(i), + measurementColumnNames, + measurementSchemas, + allSensors); + ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); + } + + context.getDriverContext().setInputDriver(true); + } + - // fieldColumnsRenameMap will always + private AbstractTableScanOperator.AbstractTableScanOperatorParameter + constructAbstractTableScanOperatorParameter( + DeviceTableScanNode node, + LocalExecutionPlanContext context, + String className, + Map<String, String> fieldColumnsRenameMap) { List<Symbol> outputColumnNames = node.getOutputSymbols(); int outputColumnCount = outputColumnNames.size(); @@@ -440,16 -357,12 +457,16 @@@ idAndAttributeColumnsIndexMap.get(columnName), columnName + " is null"); columnSchemas.add(schema); break; - case MEASUREMENT: + case FIELD: columnsIndexArray[idx++] = measurementColumnCount; measurementColumnCount++; - measurementColumnNames.add(schema.getName()); + + String realMeasurementName = + fieldColumnsRenameMap.getOrDefault(schema.getName(), schema.getName()); + + measurementColumnNames.add(realMeasurementName); measurementSchemas.add( - new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); + new MeasurementSchema(realMeasurementName, getTSDataType(schema.getType()))); columnSchemas.add(schema); measurementColumnsIndexMap.put(columnName.getName(), measurementColumnCount - 1); break; @@@ -466,16 -379,12 +483,15 @@@ Set<Symbol> outputSet = new HashSet<>(outputColumnNames); for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { - if (!outputSet.contains(entry.getKey()) - && entry.getValue().getColumnCategory() == MEASUREMENT) { + if (!outputSet.contains(entry.getKey()) && entry.getValue().getColumnCategory() == FIELD) { measurementColumnCount++; - measurementColumnNames.add(entry.getValue().getName()); + String realMeasurementName = + fieldColumnsRenameMap.getOrDefault( + entry.getValue().getName(), entry.getValue().getName()); + + measurementColumnNames.add(realMeasurementName); measurementSchemas.add( - new MeasurementSchema( - entry.getValue().getName(), getTSDataType(entry.getValue().getType()))); + new MeasurementSchema(realMeasurementName, getTSDataType(entry.getValue().getType()))); measurementColumnsIndexMap.put(entry.getKey().getName(), measurementColumnCount - 1); } else if (entry.getValue().getColumnCategory() == TIME) { timeColumnName = entry.getKey().getName(); @@@ -511,45 -423,33 +527,46 @@@ Set<String> allSensors = new HashSet<>(measurementColumnNames); // for time column allSensors.add(""); - TableScanOperator tableScanOperator = - new TableScanOperator( - operatorContext, - node.getPlanNodeId(), - columnSchemas, - columnsIndexArray, - node.getDeviceEntries(), - node.getScanOrder(), - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - maxTsBlockLineNum); - ((DataDriverContext) context.getDriverContext()).addSourceOperator(tableScanOperator); + return new AbstractTableScanOperator.AbstractTableScanOperatorParameter( + allSensors, + operatorContext, + node.getPlanNodeId(), + columnSchemas, + columnsIndexArray, + node.getDeviceEntries(), + node.getScanOrder(), + seriesScanOptions, + measurementColumnNames, + measurementSchemas, + maxTsBlockLineNum); + } - for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { - AlignedFullPath alignedPath = - constructAlignedPath( - node.getDeviceEntries().get(i), - measurementColumnNames, - measurementSchemas, - allSensors); - ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); - } + // used for TableScanOperator + private AbstractTableScanOperator.AbstractTableScanOperatorParameter + constructAbstractTableScanOperatorParameter( + DeviceTableScanNode node, LocalExecutionPlanContext context) { + return constructAbstractTableScanOperatorParameter( + node, context, TableScanOperator.class.getSimpleName(), Collections.emptyMap()); + } - context.getDriverContext().setInputDriver(true); + @Override + public Operator visitDeviceTableScan( + DeviceTableScanNode node, LocalExecutionPlanContext context) { + + AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = + constructAbstractTableScanOperatorParameter(node, context); + + TableScanOperator tableScanOperator = new TableScanOperator(parameter); + + addSource( + tableScanOperator, + context, + node, + parameter.measurementColumnNames, + parameter.measurementSchemas, - parameter.allSensors); ++ parameter.allSensors, ++ DeviceTableScanNode.class.getSimpleName()); return tableScanOperator; } @@@ -1855,16 -1755,8 +1872,38 @@@ } @Override - public Operator visitAggregationTableScan( - AggregationTableScanNode node, LocalExecutionPlanContext context) { + public Operator visitAggregationTreeDeviceViewScan( + AggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext context) { + IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor = + createTreeDeviceIdColumnValueExtractor(node.getTreeDBName()); - return super.visitAggregationTreeDeviceViewScan(node, context); ++ ++ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter parameter = ++ constructAbstractAggTableScanOperatorParameter( ++ node, ++ context, ++ TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(), ++ node.getMeasurementColumnNameMap()); ++ ++ TreeAlignedDeviceViewAggregationScanOperator treeAlignedDeviceViewAggregationScanOperator = ++ new TreeAlignedDeviceViewAggregationScanOperator(parameter, idColumnValueExtractor); ++ ++ addSource( ++ treeAlignedDeviceViewAggregationScanOperator, ++ context, ++ node, ++ parameter.getMeasurementColumnNames(), ++ parameter.getMeasurementSchemas(), ++ parameter.getAllSensors(), ++ AggregationTreeDeviceViewScanNode.class.getSimpleName()); ++ return treeAlignedDeviceViewAggregationScanOperator; + } + - @Override - public Operator visitAggregationTableScan( - AggregationTableScanNode node, LocalExecutionPlanContext context) { ++ private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter ++ constructAbstractAggTableScanOperatorParameter( ++ AggregationTableScanNode node, ++ LocalExecutionPlanContext context, ++ String className, ++ Map<String, String> fieldColumnsRenameMap) { List<String> measurementColumnNames = new ArrayList<>(); List<IMeasurementSchema> measurementSchemas = new ArrayList<>(); @@@ -1903,12 -1795,12 +1942,14 @@@ aggColumnsIndexArray[channel] = requireNonNull(node.getIdAndAttributeIndexMap().get(symbol), symbol + " is null"); break; - case MEASUREMENT: + case FIELD: aggColumnsIndexArray[channel] = measurementColumnCount; measurementColumnCount++; -- measurementColumnNames.add(schema.getName()); ++ String realMeasurementName = ++ fieldColumnsRenameMap.getOrDefault(schema.getName(), schema.getName()); ++ measurementColumnNames.add(realMeasurementName); measurementSchemas.add( -- new MeasurementSchema(schema.getName(), getTSDataType(schema.getType()))); ++ new MeasurementSchema(realMeasurementName, getTSDataType(schema.getType()))); measurementColumnsIndexMap.put(symbol.getName(), measurementColumnCount - 1); break; case TIME: @@@ -1931,12 -1823,12 +1972,14 @@@ for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { if (!aggColumnLayout.containsKey(entry.getKey()) - && entry.getValue().getColumnCategory() == MEASUREMENT) { + && entry.getValue().getColumnCategory() == FIELD) { measurementColumnCount++; -- measurementColumnNames.add(entry.getValue().getName()); ++ String realMeasurementName = ++ fieldColumnsRenameMap.getOrDefault( ++ entry.getValue().getName(), entry.getValue().getName()); ++ measurementColumnNames.add(realMeasurementName); measurementSchemas.add( -- new MeasurementSchema( -- entry.getValue().getName(), getTSDataType(entry.getValue().getType()))); ++ new MeasurementSchema(realMeasurementName, getTSDataType(entry.getValue().getType()))); measurementColumnsIndexMap.put(entry.getKey().getName(), measurementColumnCount - 1); } else if (entry.getValue().getColumnCategory() == TIME) { timeColumnName = entry.getKey().getName(); @@@ -2007,10 -1899,10 +2050,7 @@@ final OperatorContext operatorContext = context .getDriverContext() -- .addOperatorContext( -- context.getNextOperatorId(), -- node.getPlanNodeId(), - TableAggregationTableScanOperator.class.getSimpleName()); - AbstractAggTableScanOperator.class.getSimpleName()); ++ .addOperatorContext(context.getNextOperatorId(), node.getPlanNodeId(), className); SeriesScanOptions seriesScanOptions = buildSeriesScanOptions( context, @@@ -2026,43 -1918,124 +2066,136 @@@ Set<String> allSensors = new HashSet<>(measurementColumnNames); allSensors.add(""); // for time column + context.getDriverContext().setInputDriver(true); + - if (canUseLastCacheOptimize(aggregators, node, timeColumnName)) { - List<Integer> hitCachesIndexes = new ArrayList<>(); - List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults = new ArrayList<>(); - List<DeviceEntry> cachedDeviceEntries = new ArrayList<>(); - List<DeviceEntry> unCachedDeviceEntries = new ArrayList<>(); - long tableTTL = - DataNodeTTLCache.getInstance() - .getTTLForTable( ++ return new AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter( ++ node.getPlanNodeId(), ++ operatorContext, ++ aggColumnSchemas, ++ aggColumnsIndexArray, ++ node.getDeviceEntries(), ++ node.getDeviceEntries().size(), ++ seriesScanOptions, ++ measurementColumnNames, ++ allSensors, ++ measurementSchemas, ++ aggregators, ++ groupingKeySchemas, ++ groupingKeyIndex, ++ timeRangeIterator, ++ scanAscending, ++ canUseStatistic, ++ aggregatorInputChannels, ++ timeColumnName); ++ } ++ ++ // used for AggregationTableScanNode ++ private AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter ++ constructAbstractAggTableScanOperatorParameter( ++ AggregationTableScanNode node, LocalExecutionPlanContext context) { ++ return constructAbstractAggTableScanOperatorParameter( ++ node, context, AbstractAggTableScanOperator.class.getSimpleName(), Collections.emptyMap()); ++ } ++ ++ @Override ++ public Operator visitAggregationTableScan( ++ AggregationTableScanNode node, LocalExecutionPlanContext context) { ++ ++ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter parameter = ++ constructAbstractAggTableScanOperatorParameter(node, context); + ++ if (canUseLastCacheOptimize( ++ parameter.getTableAggregators(), node, parameter.getTimeColumnName())) { ++ return constructLastQueryAggTableScanOperator(node, parameter, context); ++ } else { ++ DefaultAggTableScanOperator aggTableScanOperator = new DefaultAggTableScanOperator(parameter); ++ ++ addSource( ++ aggTableScanOperator, ++ context, ++ node, ++ parameter.getMeasurementColumnNames(), ++ parameter.getMeasurementSchemas(), ++ parameter.getAllSensors(), ++ AggregationTableScanNode.class.getSimpleName()); ++ return aggTableScanOperator; ++ } ++ } ++ ++ private LastQueryAggTableScanOperator constructLastQueryAggTableScanOperator( ++ AggregationTableScanNode node, ++ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter parameter, ++ LocalExecutionPlanContext context) { ++ List<Integer> hitCachesIndexes = new ArrayList<>(); ++ List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults = new ArrayList<>(); ++ List<DeviceEntry> cachedDeviceEntries = new ArrayList<>(); ++ List<DeviceEntry> unCachedDeviceEntries = new ArrayList<>(); ++ long tableTTL = ++ DataNodeTTLCache.getInstance() ++ .getTTLForTable( ++ node.getQualifiedObjectName().getDatabaseName(), ++ node.getQualifiedObjectName().getObjectName()); ++ Filter updateTimeFilter = ++ updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(), tableTTL); + for (int i = 0; i < node.getDeviceEntries().size(); i++) { - if (node.getDeviceEntries().get(i) == null) { - throw new IllegalStateException( - "Device entries of index " + i + " in AggregationTableScanNode is empty"); ++ Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult = ++ TableDeviceSchemaCache.getInstance() ++ .getLastRow( + node.getQualifiedObjectName().getDatabaseName(), - node.getQualifiedObjectName().getObjectName()); - Filter updateTimeFilter = - updateFilterUsingTTL(seriesScanOptions.getGlobalTimeFilter(), tableTTL); - for (int i = 0; i < node.getDeviceEntries().size(); i++) { - Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult = - TableDeviceSchemaCache.getInstance() - .getLastRow( - node.getQualifiedObjectName().getDatabaseName(), - node.getDeviceEntries().get(i).getDeviceID(), - "", - measurementColumnNames); - boolean allHitCache = true; - if (lastByResult.isPresent() && lastByResult.get().getLeft().isPresent()) { - for (int j = 0; j < lastByResult.get().getRight().length; j++) { - TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j]; - if (tsPrimitiveType == null - || (updateTimeFilter != null - && !LastQueryUtil.satisfyFilter( - updateTimeFilter, - new TimeValuePair( - lastByResult.get().getLeft().getAsLong(), tsPrimitiveType)))) { - // the process logic is different from tree model which examine if - // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set - // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`, - // but it should skip in table model - allHitCache = false; - break; - } ++ node.getDeviceEntries().get(i).getDeviceID(), ++ "", ++ parameter.getMeasurementColumnNames()); ++ boolean allHitCache = true; ++ if (lastByResult.isPresent() && lastByResult.get().getLeft().isPresent()) { ++ for (int j = 0; j < lastByResult.get().getRight().length; j++) { ++ TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j]; ++ if (tsPrimitiveType == null ++ || (updateTimeFilter != null ++ && !LastQueryUtil.satisfyFilter( ++ updateTimeFilter, ++ new TimeValuePair( ++ lastByResult.get().getLeft().getAsLong(), tsPrimitiveType)))) { ++ // the process logic is different from tree model which examine if ++ // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set ++ // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`, ++ // but it should skip in table model ++ allHitCache = false; ++ break; + } - } else { - allHitCache = false; - } - - if (!allHitCache) { - AlignedFullPath alignedPath = - constructAlignedPath( - node.getDeviceEntries().get(i), - measurementColumnNames, - measurementSchemas, - allSensors); - ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); - unCachedDeviceEntries.add(node.getDeviceEntries().get(i)); - } else { - hitCachesIndexes.add(i); - hitCachedResults.add(lastByResult.get()); - cachedDeviceEntries.add(node.getDeviceEntries().get(i)); + } ++ } else { ++ allHitCache = false; + } + - // context add TableLastQueryOperator - LastQueryAggTableScanOperator lastQueryOperator = - new LastQueryAggTableScanOperator( - node.getPlanNodeId(), - operatorContext, - aggColumnSchemas, - aggColumnsIndexArray, - unCachedDeviceEntries, - cachedDeviceEntries, - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - aggregators, - groupingKeySchemas, - groupingKeyIndex, - timeRangeIterator, - scanAscending, - canUseStatistic, - aggregatorInputChannels, - node.getQualifiedObjectName(), - hitCachesIndexes, - hitCachedResults); - - ((DataDriverContext) context.getDriverContext()).addSourceOperator(lastQueryOperator); - return lastQueryOperator; - } else { - DefaultAggTableScanOperator aggTableScanOperator = - new DefaultAggTableScanOperator( - node.getPlanNodeId(), - operatorContext, - aggColumnSchemas, - aggColumnsIndexArray, - node.getDeviceEntries(), - node.getDeviceEntries().size(), - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - aggregators, - groupingKeySchemas, - groupingKeyIndex, - timeRangeIterator, - scanAscending, - canUseStatistic, - aggregatorInputChannels); - for (int i = 0; i < node.getDeviceEntries().size(); i++) { ++ if (!allHitCache) { + AlignedFullPath alignedPath = + constructAlignedPath( + node.getDeviceEntries().get(i), - measurementColumnNames, - measurementSchemas, - allSensors); ++ parameter.getMeasurementColumnNames(), ++ parameter.getMeasurementSchemas(), ++ parameter.getAllSensors()); + ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); ++ unCachedDeviceEntries.add(node.getDeviceEntries().get(i)); ++ } else { ++ hitCachesIndexes.add(i); ++ hitCachedResults.add(lastByResult.get()); ++ cachedDeviceEntries.add(node.getDeviceEntries().get(i)); } - AlignedFullPath alignedPath = - constructAlignedPath( - node.getDeviceEntries().get(i), - measurementColumnNames, - measurementSchemas, - allSensors); - ((DataDriverContext) context.getDriverContext()).addPath(alignedPath); - ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); - return aggTableScanOperator; } + - context.getDriverContext().setInputDriver(true); ++ parameter.setDeviceEntries(unCachedDeviceEntries); + - TableAggregationTableScanOperator aggTableScanOperator = - new TableAggregationTableScanOperator( - node.getPlanNodeId(), - operatorContext, - aggColumnSchemas, - aggColumnsIndexArray, - node.getDeviceEntries(), - seriesScanOptions, - measurementColumnNames, - allSensors, - measurementSchemas, - aggregators, - groupingKeySchemas, - groupingKeyIndex, - timeRangeIterator, - scanAscending, - canUseStatistic, - aggregatorInputChannels); - ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggTableScanOperator); - return aggTableScanOperator; ++ // context add TableLastQueryOperator ++ LastQueryAggTableScanOperator lastQueryOperator = ++ new LastQueryAggTableScanOperator( ++ parameter, ++ cachedDeviceEntries, ++ node.getQualifiedObjectName(), ++ hitCachesIndexes, ++ hitCachedResults); ++ ++ ((DataDriverContext) context.getDriverContext()).addSourceOperator(lastQueryOperator); ++ return lastQueryOperator; } private SeriesScanOptions buildSeriesScanOptions( diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 7eb324be575,470d616f44d..53d6516e2e3 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@@ -960,19 -809,9 +960,19 @@@ public class TableDistributedPlanGenera break; } if (deviceTableScanNode.getAssignments().get(symbol).getColumnCategory() - == TsTableColumnCategory.ID) { + == TsTableColumnCategory.TAG) { - // segments[0] is always tableName - orderingRules.add(deviceEntry -> (String) deviceEntry.getNthSegment(idx + 1)); + ++ // segments[0] is always tableName for table model + Function<DeviceEntry, String> iDColumnFunction = + extractor + .<Function<DeviceEntry, String>>map( + treeDeviceIdColumnValueExtractor -> + deviceEntry -> + (String) + treeDeviceIdColumnValueExtractor.extract( + deviceEntry.getDeviceID(), idx)) + .orElseGet(() -> deviceEntry -> (String) deviceEntry.getNthSegment(idx + 1)); - // segments[0] is always tableName for table model + orderingRules.add(iDColumnFunction); } else { orderingRules.add( deviceEntry ->
