This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch support_uncorrelated_in_predicate
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cd7dbedc61ad21b1d2b1c330d81f7828a56d772d
Merge: 6fbbf92e5b3 325b8d8fd01
Author: lancelly <[email protected]>
AuthorDate: Mon Dec 16 12:00:40 2024 +0800

    merge with master

 .github/workflows/pipe-it-2cluster.yml             |   2 +-
 .github/workflows/todos-check.yml                  |  40 ++
 dependencies.json                                  |   2 +-
 .../org/apache/iotdb/TableModelSessionExample.java |  29 +-
 .../apache/iotdb/TableModelSessionPoolExample.java |  29 +-
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  15 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |   9 +
 .../iotdb/itbase/constant/UDFTestConstant.java     |   1 +
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   3 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../org/apache/iotdb/ainode/it/AINodeBasicIT.java  |   1 +
 .../it/cluster/IoTDBClusterNodeGetterIT.java       |   6 +-
 .../IoTDBRegionMigrateNormalITForIoTV2Batch.java}  |   5 +-
 .../IoTDBRegionMigrateOtherITForIoTV2Batch.java}   |   5 +-
 .../IoTDBRegionMigrateNormalITForIoTV2Stream.java} |  19 +-
 .../IoTDBRegionMigrateOtherITForIoTV2Stream.java}  |  19 +-
 ...torCrashWhenRemoveRemotePeerForIoTV2Batch.java} |   4 +-
 ...DBRegionMigrateDataNodeCrashForIoTV2Batch.java} |   4 +-
 ...inalCrashWhenDeleteLocalPeerForIoTV2Batch.java} |   4 +-
 ...nalCrashWhenRemoveRemotePeerForIoTV2Batch.java} |   4 +-
 ...orCrashWhenRemoveRemotePeerForIoTV2Stream.java} |  17 +-
 ...BRegionMigrateDataNodeCrashForIoTV2Stream.java} |  17 +-
 ...nalCrashWhenDeleteLocalPeerForIoTV2Stream.java} |  18 +-
 ...alCrashWhenRemoveRemotePeerForIoTV2Stream.java} |  18 +-
 .../IoTDBRegionMigrateClusterCrashIoTV2Batch.java} |   4 +-
 ...TDBRegionMigrateConfigNodeCrashIoTV2Batch.java} |   4 +-
 ...IoTDBRegionMigrateClusterCrashIoTV2Stream.java} |  17 +-
 ...DBRegionMigrateConfigNodeCrashIoTV2Stream.java} |  18 +-
 .../confignode/it/utils/ConfigNodeTestUtils.java   |   4 +-
 .../org/apache/iotdb/db/it/IoTDBNestedQueryIT.java |   1 +
 .../org/apache/iotdb/db/it/IoTDBRestartIT.java     |   1 +
 .../db/it/IoTDBSyntaxConventionIdentifierIT.java   |   1 +
 .../it/IoTDBSyntaxConventionStringLiteralIT.java   |   1 +
 .../aligned/IoTDBAggregationWithDeletion2IT.java   |   1 +
 .../db/it/aligned/IoTDBAlignedSeriesQuery4IT.java  |   1 +
 .../scalar/IoTDBRoundFunctionIT.java               |   1 +
 .../db/it/groupby/IoTDBGroupByNaturalMonthIT.java  |  28 +
 .../apache/iotdb/db/it/path/IoTDBQuotedPathIT.java |   1 +
 .../db/it/query/IoTDBEncryptionValueQueryIT.java   |   4 +-
 .../db/it/query/IoTDBLoadEncryptedTsFileIT.java    |  55 +-
 .../iotdb/db/it/query/IoTDBLoadPlainTsFileIT.java  | 145 -----
 .../db/it/trigger/IoTDBTriggerManagementIT.java    |  47 ++
 .../iotdb/db/it/udf/IoTDBUDFBlockQueryIT.java      |  12 +
 .../iotdb/db/it/udf/IoTDBUDFManagementIT.java      |   1 +
 .../iotdb/db/it/udf/IoTDBUDFWindowQueryIT.java     |   1 +
 .../db/it/udf/IoTDBUDTFAlignByTimeQueryIT.java     |   1 +
 .../iotdb/db/it/udf/IoTDBUDTFHybridQueryIT.java    |   1 +
 .../iotdb/db/it/udf/IoTDBUDTFNonAlignQueryIT.java  |   1 +
 .../org/apache/iotdb/db/it/utils/TestUtils.java    | 170 +++++-
 .../pipe/it/autocreate/IoTDBPipeLifeCycleIT.java   |   2 +-
 .../manual/IoTDBPipeTypeConversionISessionIT.java  |   3 +-
 .../iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java |  15 +-
 .../it/tablemodel/IoTDBPipeAutoConflictIT.java     |  23 +-
 .../pipe/it/tablemodel/IoTDBPipeAutoDropIT.java    |  88 ++-
 .../pipe/it/tablemodel/IoTDBPipeClusterIT.java     |  68 ++-
 .../IoTDBPipeConnectorCompressionIT.java           |  30 +-
 .../tablemodel/IoTDBPipeConnectorParallelIT.java   |  14 +-
 .../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java    |  31 +-
 .../pipe/it/tablemodel/IoTDBPipeExtractorIT.java   | 105 +++-
 .../pipe/it/tablemodel/IoTDBPipeLifeCycleIT.java   |  90 ++-
 .../pipe/it/tablemodel/IoTDBPipeNullValueIT.java   |  11 +-
 .../pipe/it/tablemodel/IoTDBPipeProtocolIT.java    |  36 +-
 .../it/tablemodel/IoTDBPipeSwitchStatusIT.java     |   7 +-
 .../pipe/it/tablemodel/IoTDBPipeWithLoadIT.java    |   9 +-
 .../it/tablemodel/IoTDBTablePatternFormatIT.java   |  58 +-
 .../iotdb/pipe/it/tablemodel/TableModelUtils.java  |  32 +-
 .../it/db/it/IoTDBCaseWhenThenTableIT.java         |  23 +-
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  | 245 +++++++-
 .../relational/it/db/it/IoTDBRestartTableIT.java   |   1 +
 .../it/db/it/IoTDBSetConfigurationTableIT.java     |  39 +-
 .../it/db/it/udf/IoTDBSQLFunctionManagementIT.java |  16 +-
 .../it/db/it/udf/scalar/IoTDBScalarFunctionIT.java |  21 +-
 .../aligned/IoTDBAlignedSeriesQueryTable4IT.java   |   1 +
 .../scalar/IoTDBCastFunctionTableSpecialIT.java    |  23 +-
 .../it/query/old/query/IoTDBArithmeticTableIT.java |  23 +-
 .../it/query/recent/IoTDBTableAggregationIT.java   | 129 +++++
 .../iotdb/relational/it/schema/IoTDBDeviceIT.java  |  36 +-
 .../api/exception/PipeConnectionException.java     |   4 +-
 .../iotdb/pipe/api/exception/PipeException.java    |   6 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     |  14 +-
 .../org/apache/iotdb/cli/utils/JlineUtils.java     |  32 +-
 iotdb-client/client-py/iotdb/Session.py            |  12 +-
 .../client-py/iotdb/sqlalchemy/IoTDBDialect.py     |  15 +-
 .../client-py/iotdb/utils/IoTDBRpcDataSet.py       |   3 +
 iotdb-client/client-py/requirements.txt            |   2 +-
 .../tests/integration/sqlalchemy/test_dialect.py   |  41 +-
 .../consumer/SubscriptionConsumer.java             |   8 +-
 .../session/subscription/util/CollectionUtils.java |  18 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   2 +-
 .../client/sync/SyncDataNodeClientPool.java        |   2 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  24 +-
 .../confignode/conf/ConfigNodeDescriptor.java      |  25 +-
 .../consensus/request/ConfigPhysicalPlan.java      |   4 +
 .../consensus/response/ttl/ShowTTLResp.java        |   1 +
 .../iotdb/confignode/manager/ConfigManager.java    |  39 +-
 .../iotdb/confignode/manager/ProcedureManager.java | 639 +++++++++------------
 .../iotdb/confignode/manager/TTLManager.java       |   1 +
 .../PartiteGraphPlacementRegionGroupAllocator.java |   7 +-
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  12 +
 .../manager/schema/ClusterSchemaManager.java       |  11 +-
 .../iotdb/confignode/persistence/TTLInfo.java      |   1 +
 .../confignode/persistence/node/NodeInfo.java      |   1 +
 .../confignode/procedure/ProcedureExecutor.java    |  24 -
 .../confignode/procedure/store/ProcedureType.java  |   1 +
 .../request/ConfigPhysicalPlanSerDeTest.java       |  18 +
 .../region/AllocatorScatterWidthManualTest.java    |   4 +-
 .../GreedyCopySetRegionGroupAllocatorTest.java     |   4 +-
 .../confignode/persistence/TriggerInfoTest.java    |   1 +
 .../iotdb/confignode/persistence/UDFInfoTest.java  |   1 +
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 157 ++---
 .../consensus/iot/IoTConsensusServerImpl.java      |  20 +-
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |  28 +-
 .../apache/iotdb/consensus/iot/StabilityTest.java  |  36 --
 .../assembly/resources/conf/logback-datanode.xml   |  18 +
 .../ArithmeticBinaryColumnTransformer.ftl          |   1 -
 .../templates/ArithmeticColumnTransformerApi.ftl   |  23 +-
 .../templates/ArithmeticUnaryColumnTransformer.ftl |  23 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  23 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 120 +++-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   9 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  34 --
 .../client/IoTDBDataNodeAsyncClientManager.java    |  45 +-
 .../connector/protocol/opcua/OpcUaNameSpace.java   |  13 +-
 .../pipeconsensus/PipeConsensusAsyncConnector.java |   4 +-
 .../pipeconsensus/PipeConsensusSyncConnector.java  |  56 +-
 .../deletion/persist/PageCacheDeletionBuffer.java  |   2 +-
 .../PipeDataNodeRemainingEventAndTimeOperator.java |   2 +-
 .../protocol/airgap/IoTDBAirGapReceiver.java       |  52 +-
 .../protocol/airgap/IoTDBAirGapReceiverAgent.java  |   7 +-
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |   3 +-
 .../protocol/legacy/loader/DeletionLoader.java     |   3 +-
 .../protocol/legacy/loader/TsFileLoader.java       |   3 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  56 +-
 .../statement/PipeConvertedInsertRowStatement.java |   6 +
 .../PipeConvertedInsertTabletStatement.java        |   7 +
 ...eStatementDataTypeConvertExecutionVisitor.java} | 101 +++-
 ...eStatementDataTypeConvertExecutionVisitor.java} |  11 +-
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  |   3 +-
 .../rest/v1/impl/GrafanaApiServiceImpl.java        |   9 +-
 .../protocol/rest/v1/impl/RestApiServiceImpl.java  |   9 +-
 .../rest/v2/impl/GrafanaApiServiceImpl.java        |   9 +-
 .../protocol/rest/v2/impl/RestApiServiceImpl.java  |  12 +-
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |  55 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   3 +-
 .../thrift/impl/DataNodeRegionManager.java         |   4 +-
 .../db/queryengine/common/MPPQueryContext.java     |  14 +-
 .../queryengine/execution/FutureStateChange.java   |  23 +-
 .../db/queryengine/execution/StateMachine.java     |  23 +-
 .../timerangeiterator/AggrWindowIterator.java      |  26 +-
 .../PreAggrWindowWithNaturalMonthIterator.java     |   8 +-
 .../TimeRangeIteratorFactory.java                  |   9 +-
 .../execution/operator/AggregationUtil.java        |   7 +-
 .../process/SlidingWindowAggregationOperator.java  |   7 +-
 .../comparator/AscBinaryTypeJoinKeyComparator.java |  48 +-
 .../AscBooleanTypeJoinKeyComparator.java           |  38 +-
 .../comparator/AscDoubleTypeJoinKeyComparator.java |  38 +-
 .../comparator/AscFloatTypeJoinKeyComparator.java  |  38 +-
 .../comparator/AscIntTypeJoinKeyComparator.java    |  38 +-
 .../comparator/AscLongTypeJoinKeyComparator.java   |  38 +-
 .../DescBinaryTypeJoinKeyComparator.java           |  48 +-
 .../DescBooleanTypeJoinKeyComparator.java          |  38 +-
 .../DescDoubleTypeJoinKeyComparator.java           |  38 +-
 .../comparator/DescFloatTypeJoinKeyComparator.java |  38 +-
 .../comparator/DescIntTypeJoinKeyComparator.java   |  38 +-
 .../comparator/DescLongTypeJoinKeyComparator.java  |  38 +-
 .../join/merge/comparator/JoinKeyComparator.java   |   8 +-
 .../merge/comparator/JoinKeyComparatorFactory.java |  13 +
 .../relational/AbstractMergeSortJoinOperator.java  | 176 ++++--
 .../relational/MergeSortFullOuterJoinOperator.java |  99 ++--
 .../relational/MergeSortInnerJoinOperator.java     |  71 ++-
 .../relational/MergeSortSemiJoinOperator.java      |  46 +-
 .../relational/aggregation/AvgAccumulator.java     |  73 +++
 .../relational/aggregation/CountAccumulator.java   |  20 +
 .../relational/aggregation/SumAccumulator.java     |  69 +++
 .../relational/aggregation/TableAccumulator.java   |  32 +-
 .../relational/aggregation/TableAggregator.java    |  24 +-
 .../aggregation/TableModeAccumulator.java          | 102 ++++
 .../aggregation/TableVarianceAccumulator.java      |  86 +++
 .../aggregation/grouped/GroupedAccumulator.java    |  24 +-
 .../aggregation/grouped/GroupedAggregator.java     |  24 +-
 .../aggregation/grouped/UpdateMemory.java          |  24 +-
 .../aggregation/grouped/array/BigArrays.java       |  24 +-
 .../aggregation/grouped/array/BinaryBigArray.java  |  24 +-
 .../aggregation/grouped/array/BooleanBigArray.java |  24 +-
 .../aggregation/grouped/array/ByteBigArray.java    |  24 +-
 .../aggregation/grouped/array/DoubleBigArray.java  |  24 +-
 .../aggregation/grouped/array/FloatBigArray.java   |  24 +-
 .../aggregation/grouped/array/IntBigArray.java     |  24 +-
 .../aggregation/grouped/array/IntBigArrays.java    |  24 +-
 .../aggregation/grouped/array/IntComparator.java   |  24 +-
 .../aggregation/grouped/array/LongBigArray.java    |  24 +-
 .../aggregation/grouped/array/MapBigArray.java     |  24 +-
 .../aggregation/grouped/array/ObjectBigArray.java  |  24 +-
 .../aggregation/grouped/array/ShortBigArray.java   |  24 +-
 .../grouped/builder/HashAggregationBuilder.java    |  24 +-
 .../builder/InMemoryHashAggregationBuilder.java    |  24 +-
 .../grouped/hash/CombineHashFunction.java          |  24 +-
 .../aggregation/grouped/hash/FlatGroupByHash.java  |  24 +-
 .../aggregation/grouped/hash/FlatHash.java         |  24 +-
 .../aggregation/grouped/hash/FlatHashStrategy.java |  24 +-
 .../aggregation/grouped/hash/GroupByHash.java      |  24 +-
 .../grouped/hash/VariableWidthData.java            |  24 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  37 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   4 +-
 .../plan/analyze/TemplatedAggregationAnalyze.java  |   4 +-
 .../analyze/cache/schema/DataNodeTTLCache.java     |   1 +
 .../load/TreeSchemaAutoCreatorAndVerifier.java     |   3 +-
 .../analyze/schema/AutoCreateSchemaExecutor.java   |   3 +-
 .../analyze/schema/ClusterSchemaFetchExecutor.java |   3 +-
 .../plan/execution/ExecutionResult.java            |   1 +
 .../plan/execution/IQueryExecution.java            |   2 +
 .../queryengine/plan/execution/QueryExecution.java |   5 +
 .../plan/execution/config/ConfigExecution.java     |   5 +
 .../execution/config/TableConfigTaskVisitor.java   |  21 +-
 .../execution/config/TreeConfigTaskVisitor.java    |  45 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  28 +-
 .../plan/optimization/LimitOffsetPushDown.java     |  20 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |   4 +-
 .../plan/planner/LocalExecutionPlanner.java        |   1 +
 .../queryengine/plan/planner/LogicalPlanner.java   |   1 +
 .../plan/planner/OperatorTreeGenerator.java        |  31 +-
 .../plan/planner/TableOperatorGenerator.java       | 110 ++--
 .../planner/distribution/DistributionPlanner.java  |   1 +
 .../plan/planner/plan/DistributedQueryPlan.java    |   1 +
 .../plan/planner/plan/LogicalQueryPlan.java        |   1 +
 .../plan/planner/plan/PlanFragment.java            |   1 +
 .../plan/planner/plan/node/PlanNodeId.java         |   1 +
 .../planner/plan/node/process/DeviceViewNode.java  |   1 +
 .../plan/planner/plan/node/process/FillNode.java   |   1 +
 .../plan/planner/plan/node/process/FilterNode.java |   1 +
 .../plan/node/process/GroupByLevelNode.java        |   1 +
 .../plan/node/process/HorizontallyConcatNode.java  |   1 +
 .../plan/planner/plan/node/process/LimitNode.java  |   1 +
 .../planner/plan/node/process/ProcessNode.java     |   1 +
 .../plan/planner/plan/node/process/SortNode.java   |   1 +
 .../node/process/join/FullOuterTimeJoinNode.java   |   1 +
 .../plan/planner/plan/node/write/InsertNode.java   |  10 +
 .../plan/relational/analyzer/Analysis.java         |   7 +-
 .../relational/analyzer/StatementAnalyzer.java     |  50 +-
 .../querystats/PlanOptimizersStatsCollector.java   |  24 +-
 .../querystats/QueryPlanOptimizerStatistics.java   |  24 +-
 .../querystats/QueryPlanOptimizerStats.java        |  24 +-
 .../function/arithmetic/AdditionResolver.java      |  23 +-
 .../function/arithmetic/DivisionResolver.java      |  23 +-
 .../function/arithmetic/ModulusResolver.java       |  23 +-
 .../arithmetic/MultiplicationResolver.java         |  23 +-
 .../function/arithmetic/SubtractionResolver.java   |  23 +-
 .../metadata/fetcher/TableDeviceSchemaFetcher.java |   6 +-
 .../fetcher/TableDeviceSchemaValidator.java        |   3 +-
 .../plan/relational/planner/Assignments.java       |  24 +-
 .../relational/planner/ExpressionExtractor.java    |  24 +-
 .../planner/ExpressionSymbolInliner.java           |  24 +-
 .../plan/relational/planner/NodeAndMappings.java   |  24 +-
 .../plan/relational/planner/OrderingScheme.java    |  23 +-
 .../relational/planner/OrderingTranslator.java     |  24 +-
 .../plan/relational/planner/PlanBuilder.java       |  24 +-
 .../plan/relational/planner/PlanNodeSearcher.java  |  24 +-
 .../plan/relational/planner/QueryPlanner.java      |  44 +-
 .../plan/relational/planner/RelationPlan.java      |  24 +-
 .../plan/relational/planner/RelationPlanner.java   |  24 +-
 .../plan/relational/planner/SortOrder.java         |  24 +-
 .../plan/relational/planner/SubqueryPlanner.java   |  24 +-
 .../plan/relational/planner/SymbolAllocator.java   |  24 +-
 .../plan/relational/planner/SymbolsExtractor.java  |  24 +-
 .../relational/planner/TableLogicalPlanner.java    |  23 +-
 .../plan/relational/planner/TranslationMap.java    |  24 +-
 .../planner/distribute/SubPlanGenerator.java       |  23 +-
 .../distribute/TableDistributedPlanGenerator.java  |  23 +-
 .../distribute/TableDistributedPlanner.java        |  24 +-
 .../distribute/TableModelQueryFragmentPlanner.java |  24 +-
 .../TableModelTypeProviderExtractor.java           |  23 +-
 .../planner/ir/DefaultTraversalVisitor.java        |  23 +-
 .../planner/ir/DeterminismEvaluator.java           |  23 +-
 .../planner/ir/ExpressionTreeRewriter.java         |   1 +
 .../plan/relational/planner/ir/IrUtils.java        |  23 +-
 .../planner/ir/ReplaceSymbolInExpression.java      |  24 +-
 .../planner/iterative/GroupReference.java          |  24 +-
 .../planner/iterative/IterativeOptimizer.java      |  24 +-
 .../plan/relational/planner/iterative/Lookup.java  |  24 +-
 .../plan/relational/planner/iterative/Memo.java    |  24 +-
 .../plan/relational/planner/iterative/Plans.java   |  24 +-
 .../relational/planner/iterative/RuleIndex.java    |  23 +-
 .../relational/planner/iterative/RuleStats.java    |  24 +-
 .../planner/iterative/RuleStatsRecorder.java       |  24 +-
 .../rule/EliminateLimitProjectWithTableScan.java   |  24 +-
 .../rule/EliminateLimitWithTableScan.java          |  24 +-
 .../planner/iterative/rule/InlineProjections.java  |  24 +-
 .../planner/iterative/rule/MergeFilters.java       |  23 +-
 .../iterative/rule/MergeLimitWithMergeSort.java    |  24 +-
 .../planner/iterative/rule/MergeLimits.java        |  24 +-
 .../iterative/rule/ProjectOffPushDownRule.java     |  24 +-
 .../iterative/rule/PruneAggregationColumns.java    |  24 +-
 .../rule/PruneAggregationSourceColumns.java        |  24 +-
 .../iterative/rule/PruneDistinctAggregation.java   | 139 +++++
 .../planner/iterative/rule/PruneFilterColumns.java |  24 +-
 .../planner/iterative/rule/PruneLimitColumns.java  |  24 +-
 .../planner/iterative/rule/PruneOffsetColumns.java |  24 +-
 .../iterative/rule/PruneOutputSourceColumns.java   |  24 +-
 .../iterative/rule/PruneProjectColumns.java        |  24 +-
 .../planner/iterative/rule/PruneSortColumns.java   |  24 +-
 .../iterative/rule/PruneTableScanColumns.java      |  24 +-
 .../iterative/rule/PushLimitThroughOffset.java     |  23 +-
 .../rule/RemoveRedundantIdentityProjections.java   |  24 +-
 .../rule/TransformUncorrelatedSubqueryToJoin.java  |  24 +-
 .../relational/planner/node/AggregationNode.java   |  24 +-
 .../relational/planner/node/ChildReplacer.java     |  24 +-
 .../planner/node/DeviceTableScanNode.java          |  23 +-
 .../planner/node/ExplainAnalyzeNode.java           |  23 +-
 .../plan/relational/planner/node/JoinNode.java     |   3 +-
 .../plan/relational/planner/node/LimitNode.java    |  23 +-
 .../plan/relational/planner/node/OffsetNode.java   |  23 +-
 .../plan/relational/planner/node/OutputNode.java   |  23 +-
 .../plan/relational/planner/node/Patterns.java     |  24 +-
 .../plan/relational/planner/node/ProjectNode.java  |  23 +-
 .../plan/relational/planner/node/SortNode.java     |  23 +-
 .../relational/planner/node/TableScanNode.java     |  23 +-
 .../optimizations/AdaptivePlanOptimizer.java       |  23 +-
 .../planner/optimizations/JoinUtils.java           |  35 +-
 .../optimizations/LogicalOptimizeFactory.java      |   3 +
 .../planner/optimizations/PlanOptimizer.java       |  24 +-
 .../PushAggregationIntoTableScan.java              |  23 +-
 .../PushLimitOffsetIntoTableScan.java              |  23 +-
 .../optimizations/PushPredicateIntoTableScan.java  | 137 ++---
 .../optimizations/QueryCardinalityUtil.java        |  17 +
 .../planner/optimizations/SymbolMapper.java        |  24 +-
 .../TransformAggregationToStreamable.java          |  23 +-
 .../optimizations/UnaliasSymbolReferences.java     |  24 +-
 .../plan/relational/sql/rewrite/ShowRewrite.java   |  24 +-
 .../relational/sql/rewrite/StatementRewrite.java   |  24 +-
 .../plan/relational/utils/DisjointSet.java         |   1 +
 .../plan/relational/utils/MoreLists.java           |  24 +-
 .../db/queryengine/plan/scheduler/IScheduler.java  |   1 +
 .../plan/statement/crud/InsertBaseStatement.java   |   4 +-
 .../crud/InsertMultiTabletsStatement.java          |   2 +-
 .../plan/statement/crud/InsertRowStatement.java    |   9 +-
 .../crud/InsertRowsOfOneDeviceStatement.java       |   2 +-
 .../plan/statement/crud/InsertRowsStatement.java   |   2 +-
 .../plan/statement/crud/InsertTabletStatement.java |  12 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |   2 +-
 .../plan/statement/pipe/PipeEnrichedStatement.java |   3 +-
 .../AbstractCaseWhenThenColumnTransformer.java     |  23 +-
 .../column/TableCaseWhenThenColumnTransformer.java |  23 +-
 .../column/TreeCaseWhenThenColumnTransformer.java  |  23 +-
 .../AbstractCastFunctionColumnTransformer.java     |  23 +-
 .../scalar/DateBinFunctionColumnTransformer.java   |  23 +-
 .../scalar/TryCastFunctionColumnTransformer.java   |  23 +-
 .../read/resp/info/impl/ShowSchemaResult.java      |   1 +
 .../read/resp/info/impl/ShowTimeSeriesResult.java  |   1 +
 .../schemaregion/utils/ResourceByPathUtils.java    |   1 +
 .../iotdb/db/service/RegionMigrateService.java     |  49 +-
 .../iotdb/db/service/metrics/WritingMetrics.java   |  32 ++
 .../db/service/metrics/file/TsFileMetrics.java     |  33 ++
 .../iotdb/db/storageengine/StorageEngine.java      |   4 +
 .../storageengine/buffer/CacheHitRatioMonitor.java |   4 +-
 .../db/storageengine/dataregion/DataRegion.java    |   7 +-
 .../dataregion/DataRegionMetrics.java              |  14 +-
 .../impl/ReadPointCompactionPerformer.java         |   6 +-
 .../RepairUnsortedFileCompactionPerformer.java     |  13 +
 .../compaction/selector/ICompactionSelector.java   |   1 +
 .../compaction/selector/ICrossSpaceSelector.java   |   1 +
 .../selector/IInnerSeqSpaceSelector.java           |   1 +
 .../dataregion/memtable/AbstractMemTable.java      |   1 -
 .../memtable/TsFileProcessorInfoMetrics.java       |   8 +-
 .../dataregion/memtable/WritableMemChunk.java      |   1 +
 .../dataregion/modification/ModEntry.java          |   2 +-
 .../filescan/model/AlignedDeviceChunkMetaData.java |   1 +
 .../dataregion/wal/buffer/WALBuffer.java           |   6 +-
 .../dataregion/wal/buffer/WALEntry.java            |   2 +
 .../dataregion/wal/buffer/WALInfoEntry.java        |  34 ++
 .../dataregion/wal/buffer/WALSignalEntry.java      |   5 +
 .../storageengine/dataregion/wal/node/WALNode.java |   4 +-
 .../wal/utils/MemoryControlledWALEntryQueue.java   |  81 +++
 .../load/active/ActiveLoadTsFileLoader.java        |   3 +-
 .../rescon/memory/MemTableManager.java             |   1 +
 .../db/storageengine/rescon/memory/SystemInfo.java |  31 +-
 .../rescon/memory/TsFileResourceManager.java       |  18 +
 .../task/subtask/SubscriptionConnectorSubtask.java |   9 -
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  13 +-
 .../org/apache/iotdb/db/utils/DateTimeUtils.java   |   9 +-
 .../java/org/apache/iotdb/db/utils/MmapUtil.java   |   1 +
 .../iotdb/db/utils/annotations/TableModel.java     |  33 --
 .../iotdb/db/utils/annotations/TreeModel.java      |  33 --
 .../schemaRegion/SchemaRegionTableDeviceTest.java  |  13 +-
 .../connector/PipeDataNodeThriftRequestTest.java   |  22 +-
 .../aggregation/TimeRangeIteratorTest.java         |  53 +-
 .../operator/AggregationOperatorTest.java          |   7 +-
 .../AlignedSeriesAggregationScanOperatorTest.java  |   3 +-
 .../operator/HorizontallyConcatOperatorTest.java   |   5 +-
 .../execution/operator/LastQueryOperatorTest.java  |   9 +-
 .../operator/LastQueryTreeSortOperatorTest.java    |   9 +-
 .../operator/MergeTreeSortOperatorTest.java        |   5 +
 .../execution/operator/OperatorMemoryTest.java     |  16 +-
 .../operator/RawDataAggregationOperatorTest.java   |   3 +-
 .../SeriesAggregationScanOperatorTest.java         |   3 +-
 .../SlidingWindowAggregationOperatorTest.java      |   8 +-
 .../operator/UpdateLastCacheOperatorTest.java      |   3 +-
 .../plan/optimization/LimitOffsetPushDownTest.java |  11 +-
 .../plan/relational/analyzer/DistinctTest.java     | 119 ++++
 .../plan/relational/analyzer/JoinTest.java         | 291 ++++++++--
 .../analyzer/LimitOffsetPushDownTest.java          |  23 +-
 .../plan/relational/analyzer/TSBSMetadata.java     |  23 +-
 .../plan/relational/analyzer/TestMatadata.java     |  23 +-
 .../plan/relational/analyzer/TestPlanBuilder.java  |   1 +
 .../plan/relational/analyzer/TestUtils.java        |   1 +
 .../plan/relational/planner/SubqueryTest.java      |   1 +
 .../AggregationDeviceTableScanMatcher.java         |  24 +-
 .../planner/assertions/AggregationFunction.java    |  24 +-
 .../assertions/AggregationFunctionMatcher.java     |  24 +-
 .../assertions/AggregationFunctionProvider.java    |  24 +-
 .../planner/assertions/AggregationMatcher.java     |  24 +-
 .../planner/assertions/AggregationStepMatcher.java |  24 +-
 .../planner/assertions/AliasMatcher.java           |  24 +-
 .../planner/assertions/AliasPresent.java           |  24 +-
 .../assertions/BaseStrictSymbolsMatcher.java       |  24 +-
 .../planner/assertions/ColumnReference.java        |  24 +-
 .../planner/assertions/DeviceTableScanMatcher.java |  24 +-
 .../planner/assertions/EquiJoinClauseProvider.java |  28 +-
 .../planner/assertions/ExchangeNodeMatcher.java    |  24 +-
 .../planner/assertions/ExpectedValueProvider.java  |  24 +-
 .../planner/assertions/ExpressionMatcher.java      |  24 +-
 .../planner/assertions/ExpressionVerifier.java     |  24 +-
 .../planner/assertions/FilterMatcher.java          |  24 +-
 .../assertions/IdentityProjectionMatcher.java      |  24 +-
 .../InformationSchemaTableScanMatcher.java         |  24 +-
 .../relational/planner/assertions/JoinMatcher.java |  24 +-
 .../planner/assertions/LimitMatcher.java           |  24 +-
 .../relational/planner/assertions/MatchResult.java |  24 +-
 .../relational/planner/assertions/Matcher.java     |  24 +-
 .../planner/assertions/NotPlanNodeMatcher.java     |  24 +-
 .../planner/assertions/OffsetMatcher.java          |  24 +-
 .../planner/assertions/OutputMatcher.java          |  24 +-
 .../relational/planner/assertions/PlanAssert.java  |  24 +-
 .../planner/assertions/PlanMatchPattern.java       |  26 +-
 .../planner/assertions/PlanMatchingState.java      |  24 +-
 .../planner/assertions/PlanMatchingVisitor.java    |  24 +-
 .../planner/assertions/PlanNodeMatcher.java        |  24 +-
 .../planner/assertions/PlanTestSymbol.java         |  24 +-
 .../planner/assertions/PredicateMatcher.java       |  24 +-
 .../planner/assertions/RvalueMatcher.java          |  24 +-
 .../relational/planner/assertions/SortMatcher.java |  24 +-
 .../assertions/StrictAssignedSymbolsMatcher.java   |  24 +-
 .../planner/assertions/StrictSymbolsMatcher.java   |  24 +-
 .../relational/planner/assertions/SymbolAlias.java |  28 +-
 .../planner/assertions/SymbolAliases.java          |  24 +-
 .../planner/assertions/TableScanMatcher.java       |  24 +-
 .../plan/relational/planner/assertions/Util.java   |  24 +-
 .../column/unary/scalar/DateBinFunctionTest.java   |  23 +-
 .../buffer/CacheHitRatioMonitorTest.java           |   4 +-
 .../compaction/AbstractCompactionTest.java         |   1 +
 .../compaction/CompactionTaskComparatorTest.java   |   1 +
 .../compaction/CompactionTaskManagerTest.java      |   1 +
 .../ReadPointCompactionPerformerTest.java          |   1 +
 .../inner/InnerCompactionSchedulerTest.java        |   1 +
 .../inner/InnerSpaceCompactionExceptionTest.java   |   1 +
 .../recover/SizeTieredCompactionRecoverTest.java   |   1 +
 .../repair/RepairUnsortedFileCompactionTest.java   |  44 ++
 .../utils/MultiTsFileDeviceIteratorTest.java       |   1 +
 .../dataregion/memtable/MemTableFlushTaskTest.java |   1 +
 .../dataregion/tsfile/FakedTsFileResource.java     |   1 +
 .../rescon/memory/ResourceManagerTest.java         |  40 ++
 .../iotdb/metrics/core/IoTDBMetricManager.java     |   2 +-
 .../iotdb/metrics/AbstractMetricManager.java       |  12 +-
 .../iotdb/metrics/AbstractMetricService.java       |  40 +-
 .../iotdb/metrics/impl/DoNothingMetricManager.java |   2 +-
 iotdb-core/node-commons/pom.xml                    |   4 +-
 .../conf/iotdb-system.properties.template          |  60 +-
 .../async/AsyncPipeDataTransferServiceClient.java  |  28 +-
 .../commons/client/sync/ByteBuddyEnhancer.java     |  75 +++
 .../sync/SyncThriftClientWithErrorHandler.java     |  30 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   4 +
 .../commons/concurrent/ThreadPoolMetrics.java      | 207 +++----
 .../WrappedScheduledExecutorService.java           |   2 +
 .../WrappedSingleThreadExecutorService.java        |   2 +
 .../WrappedSingleThreadScheduledExecutor.java      |   2 +
 .../threadpool/WrappedThreadPoolExecutor.java      |   2 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  50 ++
 .../iotdb/commons/conf/ConfigurationFileUtils.java |  25 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   2 +
 .../iotdb/commons/exception/ShutdownException.java |   1 +
 .../PipeRuntimeConnectorCriticalException.java     |  14 +-
 ...meConnectorRetryTimesConfigurableException.java |   3 +-
 .../pipe/PipeRuntimeCriticalException.java         |  14 +-
 .../exception/pipe/PipeRuntimeException.java       |  10 +-
 .../commons/executable/ExecutableManager.java      |  11 +
 .../iotdb/commons/partition/DataPartition.java     |   1 +
 .../commons/partition/DataPartitionQueryParam.java |   1 +
 .../apache/iotdb/commons/partition/Partition.java  |   1 +
 .../apache/iotdb/commons/path/MeasurementPath.java |   1 +
 .../org/apache/iotdb/commons/path/PartialPath.java |   1 +
 .../iotdb/commons/path/PathDeserializeUtil.java    |   1 +
 .../connector/protocol/IoTDBAirGapConnector.java   |   6 +-
 .../pipe/datastructure/pattern/TablePattern.java   |   2 +-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  76 ++-
 .../commons/pipe/receiver/IoTDBReceiverAgent.java  |   6 +-
 .../filter/impl/StringValueFilterVisitor.java      |   5 +-
 .../schema/filter/impl/values/LikeFilter.java      |   2 +-
 .../apache/iotdb/commons/schema/ttl/TTLCache.java  |   1 +
 .../iotdb/commons/service/metric/enums/Metric.java |   1 +
 .../meta/consumer/ConsumerGroupMeta.java           |  34 +-
 .../meta/consumer/ConsumerGroupMetaKeeper.java     |   7 +-
 .../iotdb/commons/trigger/TriggerInformation.java  |   1 +
 .../apache/iotdb/commons/trigger/TriggerTable.java |   1 +
 .../commons/udf/utils/UDFBinaryTransformer.java    |   1 +
 .../commons/udf/utils/UDFDataTypeTransformer.java  |   1 +
 .../src/main/thrift/confignode.thrift              |   4 +-
 .../iotdb/library/dprofile/UDTFResample.java       |   6 +-
 .../iotdb/library/dquality/UDTFCompleteness.java   |   3 +-
 .../iotdb/library/dquality/UDTFConsistency.java    |   2 +-
 .../iotdb/library/dquality/UDTFTimeliness.java     |   3 +-
 .../iotdb/library/dquality/UDTFValidity.java       |   3 +-
 .../apache/iotdb/library/frequency/UDTFIFFT.java   |   7 +-
 .../library/series/UDTFConsecutiveSequences.java   |   5 +-
 .../library/series/UDTFConsecutiveWindows.java     |  11 +-
 .../java/org/apache/iotdb/library/util/Util.java   |  90 ++-
 pom.xml                                            |  12 +-
 518 files changed, 8012 insertions(+), 3930 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
index 995d568802c,a64ff35257d..33620690fed
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparatorFactory.java
@@@ -21,9 -21,22 +21,22 @@@ package org.apache.iotdb.db.queryengine
  
  import org.apache.tsfile.read.common.type.Type;
  
+ import java.util.ArrayList;
+ import java.util.List;
+ 
  public class JoinKeyComparatorFactory {
+   private JoinKeyComparatorFactory() {}
+ 
+   public static List<JoinKeyComparator> getComparators(
+       List<Type> joinKeyTypes, boolean isAscending) {
+     List<JoinKeyComparator> comparators = new 
ArrayList<>(joinKeyTypes.size());
+     for (Type joinKeyType : joinKeyTypes) {
+       comparators.add(getComparator(joinKeyType, isAscending));
+     }
+     return comparators;
+   }
  
 -  private static JoinKeyComparator getComparator(Type type, boolean 
isAscending) {
 +  public static JoinKeyComparator getComparator(Type type, boolean 
isAscending) {
      switch (type.getTypeEnum()) {
        case INT32:
        case DATE:
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
index 95b0e6d0848,00000000000..fee63beb627
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MergeSortSemiJoinOperator.java
@@@ -1,197 -1,0 +1,209 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 +
 +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 +import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 +import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator;
 +
 +import org.apache.tsfile.block.column.ColumnBuilder;
 +import org.apache.tsfile.enums.TSDataType;
- import org.apache.tsfile.read.common.type.Type;
 +import org.apache.tsfile.utils.RamUsageEstimator;
 +
++import java.util.Collections;
 +import java.util.List;
 +
 +public class MergeSortSemiJoinOperator extends AbstractMergeSortJoinOperator {
 +  private static final long INSTANCE_SIZE =
 +      
RamUsageEstimator.shallowSizeOfInstance(MergeSortSemiJoinOperator.class);
 +
 +  private final int outputColumnNum;
 +
 +  public MergeSortSemiJoinOperator(
 +      OperatorContext operatorContext,
 +      Operator leftChild,
 +      int leftJoinKeyPosition,
 +      int[] leftOutputSymbolIdx,
 +      Operator rightChild,
 +      int rightJoinKeyPosition,
 +      JoinKeyComparator joinKeyComparator,
-       List<TSDataType> dataTypes,
-       Type joinKeyType) {
++      List<TSDataType> dataTypes) {
 +    super(
 +        operatorContext,
 +        leftChild,
-         leftJoinKeyPosition,
++        new int[] {leftJoinKeyPosition},
 +        leftOutputSymbolIdx,
 +        rightChild,
-         rightJoinKeyPosition,
++        new int[] {rightJoinKeyPosition},
 +        null,
-         joinKeyComparator,
-         dataTypes,
-         joinKeyType);
++        Collections.singletonList(joinKeyComparator),
++        dataTypes);
 +    outputColumnNum = dataTypes.size();
 +  }
 +
 +  @Override
 +  public boolean hasNext() throws Exception {
 +    if (retainedTsBlock != null) {
 +      return true;
 +    }
 +
 +    return !leftFinished;
 +  }
 +
 +  @Override
 +  protected boolean prepareInput() throws Exception {
 +    gotCandidateBlocks();
 +    if (rightFinished) {
 +      return leftBlockNotEmpty();
 +    }
 +    return leftBlockNotEmpty() && rightBlockNotEmpty() && gotNextRightBlock();
 +  }
 +
 +  @Override
 +  protected boolean processFinished() {
 +    if (rightFinished) {
 +      appendAllLeftBlock();
 +      return true;
 +    }
 +    // all the join keys in rightTsBlock are less than leftTsBlock, just skip 
right
 +    if (allRightLessThanLeft()) {
 +      resetRightBlockList();
 +      return true;
 +    }
 +
 +    // all the join Keys in leftTsBlock are less than rightTsBlock, just 
append the left value with
 +    // match flag as false
 +    if (allLeftLessThanRight()) {
 +      appendAllLeftBlock();
 +      resetLeftBlock();
 +      return true;
 +    }
 +
++    // skip all NULL values in right, because NULL value will not match the 
left value
++    while (currentRightHasNullValue()) {
++      if (rightFinishedWithIncIndex()) {
++        return true;
++      }
++    }
++
 +    // continue right < left, until right >= left
-     while (comparator.lessThan(
++    while (lessThan(
 +        rightBlockList.get(rightBlockListIdx),
-         rightJoinKeyPosition,
++        rightJoinKeyPositions,
 +        rightIndex,
 +        leftBlock,
-         leftJoinKeyPosition,
++        leftJoinKeyPositions,
 +        leftIndex)) {
 +      if (rightFinishedWithIncIndex()) {
 +        return true;
 +      }
 +    }
 +    if (currentRoundNeedStop()) {
 +      return true;
 +    }
 +
++    // skip all NULL values in left, because NULL value will not match the 
left value
++    while (currentLeftHasNullValue()) {
++      if (leftFinishedWithIncIndex()) {
++        return true;
++      }
++    }
++
 +    // continue left < right, until left >= right
-     while (comparator.lessThan(
++    while (lessThan(
 +        leftBlock,
-         leftJoinKeyPosition,
++        leftJoinKeyPositions,
 +        leftIndex,
 +        rightBlockList.get(rightBlockListIdx),
-         rightJoinKeyPosition,
++        rightJoinKeyPositions,
 +        rightIndex)) {
 +      // current left won't match any right, append left with false SemiJoin 
result
 +      appendValueToResult(false);
 +      leftIndex++;
 +      if (leftIndex >= leftBlock.getPositionCount()) {
 +        resetLeftBlock();
 +        return true;
 +      }
 +    }
 +    if (currentRoundNeedStop()) {
 +      return true;
 +    }
 +
 +    // has right values equal to current left, append to join result, inc 
leftIndex
 +    if (hasMatchedRightValueToProbeLeft()) {
 +      leftIndex++;
 +      if (leftIndex >= leftBlock.getPositionCount()) {
 +        resetLeftBlock();
 +        return true;
 +      }
 +    }
 +
 +    return false;
 +  }
 +
 +  @Override
 +  protected boolean hasMatchedRightValueToProbeLeft() {
 +    boolean matches =
-         comparator.equalsTo(
++        equalsTo(
 +            leftBlock,
-             leftJoinKeyPosition,
++            leftJoinKeyPositions,
 +            leftIndex,
 +            rightBlockList.get(rightBlockListIdx),
-             rightJoinKeyPosition,
++            rightJoinKeyPositions,
 +            rightIndex);
 +    appendValueToResult(matches);
 +    return matches;
 +  }
 +
 +  private void appendValueToResult(boolean matches) {
 +    appendLeftBlockData(leftOutputSymbolIdx, resultBuilder, leftBlock, 
leftIndex);
 +    appendSemiJoinOutput(matches);
 +    resultBuilder.declarePosition();
 +  }
 +
 +  private void appendSemiJoinOutput(boolean value) {
 +    ColumnBuilder columnBuilder = 
resultBuilder.getColumnBuilder(outputColumnNum - 1);
 +    columnBuilder.writeBoolean(value);
 +  }
 +
 +  private void appendAllLeftBlock() {
 +    while (leftBlockNotEmpty()) {
 +      appendValueToResult(false);
 +      leftIndex++;
 +    }
 +  }
 +
 +  @Override
 +  protected void recordsWhenDataMatches() {
 +    // do nothing
 +  }
 +
 +  @Override
 +  public long ramBytesUsed() {
 +    return INSTANCE_SIZE
 +        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(leftChild)
 +        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(rightChild)
 +        + RamUsageEstimator.sizeOf(leftOutputSymbolIdx)
 +        + RamUsageEstimator.sizeOf(rightOutputSymbolIdx)
 +        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
 +        + resultBuilder.getRetainedSizeInBytes();
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 6d9dc549144,7dcdbe04c95..740c7584454
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@@ -1378,63 -1364,6 +1367,62 @@@ public class TableOperatorGenerator ext
      }
    }
  
 +  @Override
 +  public Operator visitSemiJoin(SemiJoinNode node, LocalExecutionPlanContext 
context) {
 +    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
 +
 +    Operator leftChild = node.getLeftChild().accept(this, context);
 +    Operator rightChild = node.getRightChild().accept(this, context);
 +
 +    ImmutableMap<Symbol, Integer> sourceColumnNamesMap =
 +        makeLayoutFromOutputSymbols(node.getSource().getOutputSymbols());
 +    List<Symbol> sourceOutputSymbols = node.getSource().getOutputSymbols();
 +    int[] sourceOutputSymbolIdx = new 
int[node.getSource().getOutputSymbols().size()];
 +    for (int i = 0; i < sourceOutputSymbolIdx.length; i++) {
 +      Integer index = sourceColumnNamesMap.get(sourceOutputSymbols.get(i));
 +      checkNotNull(index, "Source of SemiJoinNode doesn't contain 
sourceOutputSymbol.");
 +      sourceOutputSymbolIdx[i] = index;
 +    }
 +
 +    ImmutableMap<Symbol, Integer> filteringSourceColumnNamesMap =
 +        makeLayoutFromOutputSymbols(node.getRightChild().getOutputSymbols());
 +
 +    Integer sourceJoinKeyPosition = 
sourceColumnNamesMap.get(node.getSourceJoinSymbol());
 +    checkNotNull(sourceJoinKeyPosition, "Source of SemiJoinNode doesn't 
contain sourceJoinSymbol.");
 +
 +    Integer filteringSourceJoinKeyPosition =
 +        
filteringSourceColumnNamesMap.get(node.getFilteringSourceJoinSymbol());
 +    checkNotNull(
 +        filteringSourceJoinKeyPosition,
 +        "FilteringSource of SemiJoinNode doesn't contain 
filteringSourceJoinSymbol.");
 +
 +    Type sourceJoinKeyType = getJoinKeyType(context, 
node.getSourceJoinSymbol());
 +
 +    checkArgument(
 +        sourceJoinKeyType == getJoinKeyType(context, 
node.getFilteringSourceJoinSymbol()),
 +        "Join key type mismatch.");
 +    OperatorContext operatorContext =
 +        context
 +            .getDriverContext()
 +            .addOperatorContext(
 +                context.getNextOperatorId(),
 +                node.getPlanNodeId(),
 +                MergeSortSemiJoinOperator.class.getSimpleName());
 +    return new MergeSortSemiJoinOperator(
 +        operatorContext,
 +        leftChild,
 +        sourceJoinKeyPosition,
 +        sourceOutputSymbolIdx,
 +        rightChild,
 +        filteringSourceJoinKeyPosition,
 +        JoinKeyComparatorFactory.getComparator(sourceJoinKeyType, true),
-         dataTypes,
-         sourceJoinKeyType);
++        dataTypes);
 +  }
 +
 +  private Type getJoinKeyType(LocalExecutionPlanContext context, Symbol 
symbol) {
 +    return context.getTypeProvider().getTableModelType(symbol);
 +  }
 +
    @Override
    public Operator visitEnforceSingleRow(
        EnforceSingleRowNode node, LocalExecutionPlanContext context) {
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index ce18b64c291,d52c23cad31..62bdd1bf1b9
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@@ -32,11 -32,9 +32,12 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationColumns;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationSourceColumns;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyColumns;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplyCorrelation;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneApplySourceColumns;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinColumns;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinCorrelation;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneDistinctAggregation;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneEnforceSingleRowColumns;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFillColumns;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFilterColumns;
@@@ -211,17 -204,13 +212,19 @@@ public class LogicalOptimizeFactory 
              plannerContext,
              ruleStats,
              ImmutableSet.of(
 -                new RemoveRedundantEnforceSingleRowNode(),
 -                new TransformUncorrelatedSubqueryToJoin())),
 +                new RemoveRedundantEnforceSingleRowNode(), new 
RemoveUnreferencedScalarSubqueries(),
 +                new TransformUncorrelatedSubqueryToJoin(),
 +                    new 
TransformUncorrelatedInPredicateSubqueryToSemiJoin())),
          new CheckSubqueryNodesAreRewritten(),
+         new IterativeOptimizer(
+             plannerContext, ruleStats, ImmutableSet.of(new 
PruneDistinctAggregation())),
          simplifyOptimizer,
          new PushPredicateIntoTableScan(),
 +        // Currently, Distinct is not supported, so we cant use this rule for 
now.
 +        //        new IterativeOptimizer(
 +        //            plannerContext,
 +        //            ruleStats,
 +        //            ImmutableSet.of(new 
TransformFilteringSemiJoinToInnerJoin())),
          // redo columnPrune and inlineProjections after 
pushPredicateIntoTableScan
          columnPruningOptimizer,
          inlineProjectionLimitFiltersOptimizer,
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 7ba4d2514fd,26dd85498f8..f5313b1a537
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@@ -98,8 -98,8 +101,9 @@@ import static org.apache.iotdb.db.query
  import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.extractJoinPredicate;
  import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.joinEqualityExpression;
  import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.JoinUtils.processInnerJoin;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.QueryCardinalityUtil.extractCardinality;
  import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.EQUAL;
  
  /**
   * <b>Optimization phase:</b> Logical plan planning.
@@@ -742,202 -716,30 +720,225 @@@ public class PushPredicateIntoTableSca
        return symbolAllocator.newSymbol(expression, 
analysis.getType(expression));
      }
  
+     private void appendSortNodeForMergeSortJoin(JoinNode joinNode) {
+       int size = joinNode.getCriteria().size();
+       List<Symbol> leftOrderBy = new ArrayList<>(size);
+       List<Symbol> rightOrderBy = new ArrayList<>(size);
+       Map<Symbol, SortOrder> leftOrderings = new HashMap<>(size);
+       Map<Symbol, SortOrder> rightOrderings = new HashMap<>(size);
+       for (JoinNode.EquiJoinClause equiJoinClause : joinNode.getCriteria()) {
+         leftOrderBy.add(equiJoinClause.getLeft());
+         leftOrderings.put(equiJoinClause.getLeft(), ASC_NULLS_LAST);
+         rightOrderBy.add(equiJoinClause.getRight());
+         rightOrderings.put(equiJoinClause.getRight(), ASC_NULLS_LAST);
+       }
+       OrderingScheme leftOrderingScheme = new OrderingScheme(leftOrderBy, 
leftOrderings);
+       OrderingScheme rightOrderingScheme = new OrderingScheme(rightOrderBy, 
rightOrderings);
+       SortNode leftSortNode =
+           new SortNode(
+               queryId.genPlanNodeId(), joinNode.getLeftChild(), 
leftOrderingScheme, false, false);
+       SortNode rightSortNode =
+           new SortNode(
+               queryId.genPlanNodeId(), joinNode.getRightChild(), 
rightOrderingScheme, false, false);
+       joinNode.setLeftChild(leftSortNode);
+       joinNode.setRightChild(rightSortNode);
+     }
+ 
 +    @Override
 +    public PlanNode visitSemiJoin(SemiJoinNode node, RewriteContext context) {
 +      Expression inheritedPredicate =
 +          context.inheritedPredicate != null ? context.inheritedPredicate : 
TRUE_LITERAL;
 +      if (!extractConjuncts(inheritedPredicate)
 +          .contains(node.getSemiJoinOutput().toSymbolReference())) {
 +        return visitNonFilteringSemiJoin(node, context);
 +      }
 +      return visitFilteringSemiJoin(node, context);
 +    }
 +
 +    private PlanNode visitNonFilteringSemiJoin(SemiJoinNode node, 
RewriteContext context) {
 +      Expression inheritedPredicate =
 +          context.inheritedPredicate != null ? context.inheritedPredicate : 
TRUE_LITERAL;
 +
 +      List<Expression> sourceConjuncts = new ArrayList<>();
 +      List<Expression> postJoinConjuncts = new ArrayList<>();
 +
 +      // TODO: see if there are predicates that can be inferred from the semi 
join output
 +      PlanNode rewrittenFilteringSource =
 +          node.getFilteringSource().accept(this, new RewriteContext());
 +
 +      // Push inheritedPredicates down to the source if they don't involve 
the semi join output
 +      ImmutableSet<Symbol> sourceScope = 
ImmutableSet.copyOf(node.getSource().getOutputSymbols());
 +      EqualityInference inheritedInference = new EqualityInference(metadata, 
inheritedPredicate);
 +      EqualityInference.nonInferrableConjuncts(metadata, inheritedPredicate)
 +          .forEach(
 +              conjunct -> {
 +                Expression rewrittenConjunct = 
inheritedInference.rewrite(conjunct, sourceScope);
 +                // Since each source row is reflected exactly once in the 
output, ok to push
 +                // non-deterministic predicates down
 +                if (rewrittenConjunct != null) {
 +                  sourceConjuncts.add(rewrittenConjunct);
 +                } else {
 +                  postJoinConjuncts.add(conjunct);
 +                }
 +              });
 +
 +      // Add the inherited equality predicates back in
 +      EqualityInference.EqualityPartition equalityPartition =
 +          inheritedInference.generateEqualitiesPartitionedBy(sourceScope);
 +      sourceConjuncts.addAll(equalityPartition.getScopeEqualities());
 +      
postJoinConjuncts.addAll(equalityPartition.getScopeComplementEqualities());
 +      
postJoinConjuncts.addAll(equalityPartition.getScopeStraddlingEqualities());
 +
 +      PlanNode rewrittenSource =
 +          node.getSource().accept(this, new 
RewriteContext(combineConjuncts(sourceConjuncts)));
 +
-       PlanNode output = appendSortNode(node, rewrittenSource, 
rewrittenFilteringSource);
++      PlanNode output = appendSortNodeForSemiJoin(node, rewrittenSource, 
rewrittenFilteringSource);
 +
 +      if (!postJoinConjuncts.isEmpty()) {
 +        output =
 +            new FilterNode(queryId.genPlanNodeId(), output, 
combineConjuncts(postJoinConjuncts));
 +      }
 +      return output;
 +    }
 +
-     private SemiJoinNode appendSortNode(
++    private SemiJoinNode appendSortNodeForSemiJoin(
 +        SemiJoinNode node, PlanNode rewrittenSource, PlanNode 
rewrittenFilteringSource) {
 +      OrderingScheme sourceOrderingScheme =
 +          new OrderingScheme(
 +              ImmutableList.of(node.getSourceJoinSymbol()),
 +              ImmutableMap.of(node.getSourceJoinSymbol(), ASC_NULLS_LAST));
 +      OrderingScheme filteringSourceOrderingScheme =
 +          new OrderingScheme(
 +              ImmutableList.of(node.getFilteringSourceJoinSymbol()),
 +              ImmutableMap.of(node.getFilteringSourceJoinSymbol(), 
ASC_NULLS_LAST));
 +      SortNode sourceSortNode =
 +          new SortNode(
 +              queryId.genPlanNodeId(), rewrittenSource, sourceOrderingScheme, 
false, false);
 +      SortNode filteringSourceSortNode =
 +          new SortNode(
 +              queryId.genPlanNodeId(),
 +              rewrittenFilteringSource,
 +              filteringSourceOrderingScheme,
 +              false,
 +              false);
 +      return new SemiJoinNode(
 +          node.getPlanNodeId(),
 +          sourceSortNode,
 +          filteringSourceSortNode,
 +          node.getSourceJoinSymbol(),
 +          node.getFilteringSourceJoinSymbol(),
 +          node.getSemiJoinOutput());
 +    }
 +
 +    private PlanNode visitFilteringSemiJoin(SemiJoinNode node, RewriteContext 
context) {
 +      Expression inheritedPredicate =
 +          context.inheritedPredicate != null ? context.inheritedPredicate : 
TRUE_LITERAL;
 +      Expression deterministicInheritedPredicate = 
filterDeterministicConjuncts(inheritedPredicate);
 +      Expression sourceEffectivePredicate = TRUE_LITERAL;
 +      Expression filteringSourceEffectivePredicate = TRUE_LITERAL;
 +      // Expression sourceEffectivePredicate =
 +      // 
filterDeterministicConjuncts(effectivePredicateExtractor.extract(session, 
node.getSource(),
 +      // types, typeAnalyzer));
 +      // Expression filteringSourceEffectivePredicate = 
filterDeterministicConjuncts(metadata,
 +      // effectivePredicateExtractor.extract(session, 
node.getFilteringSource(), types,
 +      // typeAnalyzer));
 +      Expression joinExpression =
 +          new ComparisonExpression(
 +              EQUAL,
 +              node.getSourceJoinSymbol().toSymbolReference(),
 +              node.getFilteringSourceJoinSymbol().toSymbolReference());
 +
 +      List<Symbol> sourceSymbols = node.getSource().getOutputSymbols();
 +      List<Symbol> filteringSourceSymbols = 
node.getFilteringSource().getOutputSymbols();
 +
 +      List<Expression> sourceConjuncts = new ArrayList<>();
 +      List<Expression> filteringSourceConjuncts = new ArrayList<>();
 +      List<Expression> postJoinConjuncts = new ArrayList<>();
 +
 +      // Generate equality inferences
 +      EqualityInference allInference =
 +          new EqualityInference(
 +              metadata,
 +              deterministicInheritedPredicate,
 +              sourceEffectivePredicate,
 +              filteringSourceEffectivePredicate,
 +              joinExpression);
 +      EqualityInference allInferenceWithoutSourceInferred =
 +          new EqualityInference(
 +              metadata,
 +              deterministicInheritedPredicate,
 +              filteringSourceEffectivePredicate,
 +              joinExpression);
 +      EqualityInference allInferenceWithoutFilteringSourceInferred =
 +          new EqualityInference(
 +              metadata, deterministicInheritedPredicate, 
sourceEffectivePredicate, joinExpression);
 +
 +      // Push inheritedPredicates down to the source if they don't involve 
the semi join output
 +      Set<Symbol> sourceScope = ImmutableSet.copyOf(sourceSymbols);
 +      EqualityInference.nonInferrableConjuncts(metadata, inheritedPredicate)
 +          .forEach(
 +              conjunct -> {
 +                Expression rewrittenConjunct = allInference.rewrite(conjunct, 
sourceScope);
 +                // Since each source row is reflected exactly once in the 
output, ok to push
 +                // non-deterministic predicates down
 +                if (rewrittenConjunct != null) {
 +                  sourceConjuncts.add(rewrittenConjunct);
 +                } else {
 +                  postJoinConjuncts.add(conjunct);
 +                }
 +              });
 +
 +      // Push inheritedPredicates down to the filtering source if possible
 +      Set<Symbol> filterScope = ImmutableSet.copyOf(filteringSourceSymbols);
 +      EqualityInference.nonInferrableConjuncts(metadata, 
deterministicInheritedPredicate)
 +          .forEach(
 +              conjunct -> {
 +                Expression rewrittenConjunct = allInference.rewrite(conjunct, 
filterScope);
 +                // We cannot push non-deterministic predicates to filtering 
side. Each filtering
 +                // side row have to be
 +                // logically reevaluated for each source row.
 +                if (rewrittenConjunct != null) {
 +                  filteringSourceConjuncts.add(rewrittenConjunct);
 +                }
 +              });
 +
 +      // move effective predicate conjuncts source <-> filter
 +      // See if we can push the filtering source effective predicate to the 
source side
 +      EqualityInference.nonInferrableConjuncts(metadata, 
filteringSourceEffectivePredicate)
 +          .map(conjunct -> allInference.rewrite(conjunct, sourceScope))
 +          .filter(Objects::nonNull)
 +          .forEach(sourceConjuncts::add);
 +
 +      // See if we can push the source effective predicate to the filtering 
source side
 +      EqualityInference.nonInferrableConjuncts(metadata, 
sourceEffectivePredicate)
 +          .map(conjunct -> allInference.rewrite(conjunct, filterScope))
 +          .filter(Objects::nonNull)
 +          .forEach(filteringSourceConjuncts::add);
 +
 +      // Add equalities from the inference back in
 +      sourceConjuncts.addAll(
 +          allInferenceWithoutSourceInferred
 +              .generateEqualitiesPartitionedBy(sourceScope)
 +              .getScopeEqualities());
 +      filteringSourceConjuncts.addAll(
 +          allInferenceWithoutFilteringSourceInferred
 +              .generateEqualitiesPartitionedBy(filterScope)
 +              .getScopeEqualities());
 +
 +      PlanNode rewrittenSource =
 +          node.getSource().accept(this, new 
RewriteContext(combineConjuncts(sourceConjuncts)));
 +      PlanNode rewrittenFilteringSource =
 +          node.getFilteringSource()
 +              .accept(this, new 
RewriteContext(combineConjuncts(filteringSourceConjuncts)));
 +
-       PlanNode output = appendSortNode(node, rewrittenSource, 
rewrittenFilteringSource);
++      PlanNode output = appendSortNodeForSemiJoin(node, rewrittenSource, 
rewrittenFilteringSource);
 +      if (!postJoinConjuncts.isEmpty()) {
 +        output =
 +            new FilterNode(queryId.genPlanNodeId(), output, 
combineConjuncts(postJoinConjuncts));
 +      }
 +      return output;
 +    }
 +
      @Override
      public PlanNode visitInsertTablet(InsertTabletNode node, RewriteContext 
context) {
        return node;

Reply via email to