This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 602a726ae635d3d03c685cc5d999f2b11db518a7
Merge: 87fc892d072 263443dcb53
Author: Chen YZ <[email protected]>
AuthorDate: Mon Feb 3 16:06:46 2025 +0800

    merge master

 .github/workflows/cluster-it-1c1d.yml              |   2 +-
 .github/workflows/cluster-it-1c1d1a.yml            |   2 +-
 .github/workflows/cluster-it-1c3d.yml              |   2 +-
 .github/workflows/compile-check.yml                |   2 +-
 .github/workflows/daily-it.yml                     |   2 +-
 .github/workflows/daily-ut.yml                     |   2 +-
 .github/workflows/dependency-check.yml             |   2 +-
 .github/workflows/pipe-it-2cluster.yml             |   2 +-
 .github/workflows/sonar-codecov.yml                |   2 +-
 .github/workflows/table-cluster-it-1c1d.yml        |   2 +-
 .github/workflows/table-cluster-it-1c3d.yml        |   2 +-
 .github/workflows/unit-test.yml                    |   2 +-
 .github/workflows/vulnerability-check.yml          |   2 +-
 .mvn/develocity.xml                                |   3 +-
 .mvn/extensions.xml                                |   4 +-
 .../java/org/apache/iotdb/CountPointProcessor.java |   2 +
 .../iotdb/AlignedTimeseriesSessionExample.java     |  67 +-
 .../org/apache/iotdb/SessionConcurrentExample.java |  23 -
 .../main/java/org/apache/iotdb/SessionExample.java | 107 +---
 .../org/apache/iotdb/trigger/LoggerTrigger.java    |  12 +-
 .../iotdb/trigger/StatisticsUpdaterTrigger.java    |   4 +-
 .../apache/iotdb/udf/AggregateFunctionExample.java |  35 +-
 .../apache/iotdb/udf/ScalarFunctionExample.java    |  57 +-
 .../db/query/udf/example/relational/AllSum.java    |  49 +-
 .../query/udf/example/relational/ContainNull.java  |  21 +-
 .../relational/{DatePlusOne.java => DatePlus.java} |  31 +-
 .../query/udf/example/relational/FirstTwoSum.java  |  33 +-
 .../db/query/udf/example/relational/MyAvg.java     |  32 +-
 .../db/query/udf/example/relational/MyCount.java   |  19 +-
 .../java/org/apache/iotdb/it/env/EnvFactory.java   |   5 +
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |  49 ++
 .../iotdb/it/env/cluster/node/AINodeWrapper.java   |   5 +
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  12 +
 .../it/env/cluster/node/ConfigNodeWrapper.java     |   5 +
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |   5 +
 .../iotdb/it/env/remote/env/RemoteServerEnv.java   |  12 +
 .../org/apache/iotdb/it/utils/TsFileGenerator.java |  87 +--
 .../iotdb/it/utils/TsFileTableGenerator.java       | 186 ++++++
 .../java/org/apache/iotdb/itbase/env/BaseEnv.java  |   7 +
 ...ionMigrateDataNodeCrashITFrameworkForIoTV1.java |   2 +-
 ...ionMigrateDataNodeCrashITFrameworkForIoTV2.java |   2 +-
 ...oTDBRegionOperationReliabilityITFramework.java} | 174 ++++--
 .../IoTDBRegionGroupExpandAndShrinkForIoTV1IT.java | 157 +++++
 .../commit/IoTDBRegionReconstructForIoTV1IT.java   | 141 +++++
 .../IoTDBRegionMigrateNormalITForIoTV2Batch.java   |   4 +-
 .../IoTDBRegionMigrateOtherITForIoTV2Batch.java    |   4 +-
 .../IoTDBRegionMigrateNormalITForIoTV2Stream.java  |   4 +-
 .../IoTDBRegionMigrateOtherITForIoTV2Stream.java   |   4 +-
 .../IoTDBRegionMigrateDataNodeCrashForIoTV1IT.java |   4 +-
 ...TDBRegionMigrateDataNodeCrashForIoTV2Batch.java |   4 +-
 ...DBRegionMigrateDataNodeCrashForIoTV2Stream.java |   4 +-
 .../IoTDBRegionMigrateClusterCrashIoTV1IT.java     |   4 +-
 .../IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java  |   4 +-
 .../IoTDBRegionMigrateClusterCrashIoTV2Batch.java  |   4 +-
 ...oTDBRegionMigrateConfigNodeCrashIoTV2Batch.java |   4 +-
 .../IoTDBRegionMigrateClusterCrashIoTV2Stream.java |   4 +-
 ...TDBRegionMigrateConfigNodeCrashIoTV2Stream.java |   4 +-
 .../IoTDBRemoveDataNodeITFramework.java            |  22 +-
 .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java  |  80 ++-
 .../iotdb/db/it/IoTDBLoadTsFileWithModIT.java      | 172 ++++++
 .../it/IoTDBSyntaxConventionStringLiteralIT.java   |   3 +-
 .../org/apache/iotdb/db/it/utils/TestUtils.java    | 279 ++++++++-
 .../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java |   3 +-
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    |  10 +-
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     |  98 +++
 .../pipe/it/autocreate/IoTDBPipeIdempotentIT.java  |  93 +++
 .../pipe/it/autocreate/IoTDBPipeNullValueIT.java   |  28 +-
 .../it/autocreate/IoTDBPipeSwitchStatusIT.java     |  23 +-
 .../pipe/it/manual/IoTDBPipeReqAutoSliceIT.java    |   8 +-
 .../pipe/it/manual/IoTDBPipeTableManualIT.java     | 293 +++++++++
 .../manual/IoTDBPipeTypeConversionISessionIT.java  |  10 +-
 .../iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java |  30 +-
 .../pipe/it/tablemodel/IoTDBPipeAutoDropIT.java    |  10 +-
 .../pipe/it/tablemodel/IoTDBPipeClusterIT.java     |  65 ++
 .../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java    | 450 +++++++++++++-
 .../it/tablemodel/IoTDBPipeDoubleLivingIT.java     | 329 ++++++++++
 .../pipe/it/tablemodel/IoTDBPipeIsolationIT.java   | 377 ++++++++++++
 .../pipe/it/tablemodel/IoTDBPipeNullValueIT.java   |   4 +-
 .../pipe/it/tablemodel/IoTDBPipeProtocolIT.java    |   1 -
 .../it/tablemodel/IoTDBPipeSwitchStatusIT.java     |  23 +-
 .../pipe/it/tablemodel/IoTDBPipeSyntaxIT.java      |  35 +-
 .../IoTDBPipeTypeConversionISessionIT.java         |  97 ++-
 .../pipe/it/tablemodel/IoTDBPipeWithLoadIT.java    |   8 +-
 .../iotdb/pipe/it/tablemodel/TableModelUtils.java  | 614 +++++++++++++++----
 .../relational/it/db/it/IoTDBDeletionTableIT.java  |   2 +-
 .../relational/it/db/it/IoTDBInsertTableIT.java    |   2 +-
 .../it/db/it/udf/IoTDBSQLFunctionManagementIT.java |  19 +-
 .../udf/IoTDBUserDefinedAggregateFunctionIT.java   |  53 +-
 .../it/udf/IoTDBUserDefinedScalarFunctionIT.java   |   2 +-
 .../IoTDBAlignByDeviceWithTemplateTableIT.java     |   4 +-
 .../scalar/IoTDBFormatFunctionTableIT.java         |  21 +-
 ...oTDBNoSelectExpressionAfterAnalyzedTableIT.java |   6 +-
 .../query/old/query/IoTDBNullValueFillTableIT.java | 327 ----------
 .../it/query/old/query/IoTDBResultSetTableIT.java  |   2 +-
 .../query/old/query/IoTDBSelectSchemaTableIT.java  |  22 +-
 .../it/query/recent/IoTDBTableAggregationIT.java   | 656 +++++++++++++++++++-
 .../recent/subquery/SubqueryDataSetUtils.java      |  12 +-
 .../IoTDBUncorrelatedInPredicateSubqueryIT.java    | 319 ++++++++++
 .../IoTDBUncorrelatedQuantifiedComparisonIT.java   | 674 +++++++++++++++++++++
 .../IoTDBUncorrelatedScalarSubqueryIT.java         |   4 +-
 .../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java |   7 +-
 .../it/rest/it/IoTDBRestServiceFlushQueryIT.java   |   6 +-
 .../relational/it/rest/it/IoTDBRestServiceIT.java  |   3 +-
 ...IT.java => IoTDBRestServiceInsertValuesIT.java} |  20 +-
 .../relational/it/schema/IoTDBDatabaseIT.java      |  41 ++
 .../iotdb/relational/it/schema/IoTDBDeviceIT.java  |  47 +-
 .../iotdb/relational/it/schema/IoTDBTableIT.java   | 122 ++--
 .../it/session/IoTDBSessionRelationalIT.java       |  29 +-
 .../iotdb/session/it/IoTDBSessionComplexIT.java    |   8 +-
 .../iotdb/session/it/IoTDBSessionSimpleIT.java     |  78 ++-
 .../session/it/IoTDBSessionSyntaxConventionIT.java |  18 +-
 .../org/apache/iotdb/session/it/SessionIT.java     |  32 +-
 .../it/local/IoTDBSubscriptionDataTypeIT.java      |   8 +-
 .../java/org/apache/iotdb/util/MagicUtils.java     |  61 ++
 .../java/org/apache/iotdb/pipe/api/PipePlugin.java |  37 ++
 .../iotdb/pipe/api/annotation/TableModel.java      |  23 +-
 .../iotdb/pipe/api/annotation/TreeModel.java       |  23 +-
 .../parameter/PipeParameterValidator.java          |  42 +-
 .../analysis/AggregateFunctionAnalysis.java        |  62 ++
 .../api/customizer/analysis/FunctionAnalysis.java} |   8 +-
 .../analysis/ScalarFunctionAnalysis.java           |  51 ++
 ...ctionParameters.java => FunctionArguments.java} |  28 +-
 .../exception/UDFArgumentNotValidException.java}   |   9 +-
 .../udf/api/relational/AggregateFunction.java      |  48 +-
 .../iotdb/udf/api/relational/ScalarFunction.java   |  47 +-
 .../apache/iotdb/tool/data/AbstractDataTool.java   |  32 +-
 .../org/apache/iotdb/tool/data/ExportData.java     |  43 +-
 .../org/apache/iotdb/tool/data/ImportData.java     |  24 +-
 .../org/apache/iotdb/tool/tsfile/ExportTsFile.java |   3 -
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   3 +
 .../java/org/apache/iotdb/session/Session.java     |  27 +-
 .../payload/SubscriptionFileHandler.java           |  24 +-
 .../payload/SubscriptionSessionDataSet.java        |  12 +-
 .../org/apache/iotdb/session/util/RetryUtils.java  |  30 +-
 .../apache/iotdb/session/util/SessionUtils.java    |  25 +-
 .../java/org/apache/iotdb/session/TabletTest.java  |  28 +-
 .../iotdb/session/util/SessionUtilsTest.java       |  16 +-
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  16 +-
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |   8 +
 .../consensus/request/ConfigPhysicalPlan.java      |  10 +-
 .../consensus/request/ConfigPhysicalPlanType.java  |   4 +
 .../request/ConfigPhysicalPlanVisitor.java         | 110 +++-
 .../table/DescTable4InformationSchemaPlan.java}    |  12 +-
 .../table/ShowTable4InformationSchemaPlan.java}    |  16 +-
 .../write/pipe/payload/PipeCreateTablePlan.java}   |  18 +-
 .../pipe/payload/PipeDeactivateTemplatePlan.java   |  22 +-
 .../write/pipe/payload/PipeDeleteDevicesPlan.java  | 130 ++++
 .../request/write/table/PreCreateTablePlan.java    |  11 +-
 .../response/pipe/plugin/PipePluginTableResp.java  |  27 +-
 .../response/pipe/task/PipeTableResp.java          |   7 +
 .../DescTable4InformationSchemaResp.java}          |  29 +-
 .../ShowTable4InformationSchemaResp.java}          |  29 +-
 .../iotdb/confignode/manager/ClusterManager.java   |   2 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  66 +-
 .../apache/iotdb/confignode/manager/IManager.java  |  42 +-
 .../iotdb/confignode/manager/ProcedureManager.java | 491 +++++++++++----
 .../payload/PipeTransferConfigSnapshotSealReq.java |  32 +-
 .../protocol/IoTDBConfigRegionAirGapConnector.java |   8 +
 .../protocol/IoTDBConfigRegionConnector.java       |  10 +-
 .../coordinator/plugin/PipePluginCoordinator.java  |  29 +
 .../pipe/coordinator/task/PipeTaskCoordinator.java |  74 ++-
 .../pipe/event/PipeConfigRegionSnapshotEvent.java  |   3 +-
 .../extractor/ConfigRegionListeningFilter.java     |  89 +--
 .../pipe/extractor/ConfigRegionListeningQueue.java |  23 +-
 .../pipe/extractor/IoTDBConfigRegionExtractor.java |  74 ++-
 ...ConfigPhysicalPlanTablePatternParseVisitor.java | 135 +++++
 ...ConfigPhysicalPlanTreePatternParseVisitor.java} |   6 +-
 .../receiver/protocol/IoTDBConfigNodeReceiver.java | 170 +++++-
 .../PipeConfigPhysicalPlanTSStatusVisitor.java     |  76 +++
 .../manager/schema/ClusterSchemaManager.java       | 158 +++--
 .../persistence/executor/ConfigPlanExecutor.java   |   5 +
 .../confignode/persistence/pipe/PipeInfo.java      |   2 +-
 .../persistence/pipe/PipePluginInfo.java           |  39 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  28 +-
 .../schema/CNPhysicalPlanGenerator.java            |  77 ++-
 .../persistence/schema/ClusterSchemaInfo.java      |  77 +++
 .../confignode/persistence/schema/ConfigMTree.java |  29 +-
 .../schema/ConfignodeSnapshotParser.java           |  30 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  23 +-
 .../procedure/env/RegionMaintainHandler.java       |  13 +-
 .../impl/pipe/task/AlterPipeProcedureV2.java       |   4 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  42 +-
 .../impl/region/AddRegionPeerProcedure.java        |  84 +--
 .../impl/region/ReconstructRegionProcedure.java    | 209 +++++++
 .../impl/region/RegionMigrateProcedure.java        |  39 +-
 .../impl/region/RegionOperationProcedure.java      |  39 +-
 .../impl/region/RemoveRegionPeerProcedure.java     |  52 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   2 +-
 .../impl/schema/DeleteTimeSeriesProcedure.java     |   4 +-
 .../table/AbstractAlterOrDropTableProcedure.java   |  10 +-
 .../impl/schema/table/AddTableColumnProcedure.java |  16 +-
 .../impl/schema/table/CreateTableProcedure.java    |  20 +-
 .../impl/schema/table/DeleteDevicesProcedure.java  |  43 +-
 .../schema/table/DropTableColumnProcedure.java     |  21 +-
 .../impl/schema/table/DropTableProcedure.java      |  24 +-
 .../schema/table/RenameTableColumnProcedure.java   |  18 +-
 .../schema/table/SetTablePropertiesProcedure.java  |  18 +-
 .../impl/trigger/CreateTriggerProcedure.java       |   9 +-
 .../procedure/state/ReconstructRegionState.java    |  10 +-
 .../procedure/store/ProcedureFactory.java          |  40 +-
 .../confignode/procedure/store/ProcedureType.java  |  12 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  52 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |  40 ++
 .../response/pipe/PipePluginTableRespTest.java     |   4 +-
 .../connector/PipeConfigNodeThriftRequestTest.java |  12 +-
 ...igPhysicalPlanTablePatternParseVisitorTest.java | 133 ++++
 ...igPhysicalPlanTreePatternParseVisitorTest.java} |  44 +-
 .../pipe/annotation/PipePluginAnnotationTest.java} |  23 +-
 .../pipe/receiver/PipeEnrichedProcedureTest.java   | 194 ++++++
 .../schema/table/AddTableColumnProcedureTest.java  |   8 +-
 .../schema/table/CreateTableProcedureTest.java     |   5 +-
 .../schema/table/DeleteDevicesProcedureTest.java   |   5 +-
 .../schema/table/DropTableColumnProcedureTest.java |   4 +-
 .../impl/schema/table/DropTableProcedureTest.java  |   4 +-
 .../table/RenameTableColumnProcedureTest.java      |   4 +-
 .../table/SetTablePropertiesProcedureTest.java     |   6 +-
 .../apache/iotdb/consensus/pipe/PipeConsensus.java |  57 +-
 .../apache/iotdb/consensus/ratis/RatisClient.java  |   6 +-
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   4 +-
 .../dataregion/DataExecutionVisitor.java           |   9 +-
 .../schemaregion/SchemaExecutionVisitor.java       |  15 +-
 .../schemaregion/SchemaRegionStateMachine.java     |  15 +-
 .../LoadAnalyzeException.java}                     |   8 +-
 .../LoadAnalyzeTableColumnDisorderException.java}  |   7 +-
 .../LoadAnalyzeTypeMismatchException.java}         |   6 +-
 .../pipe/agent/plugin/PipeDataNodePluginAgent.java |  20 +-
 .../dataregion/PipeDataRegionPluginAgent.java      |  49 ++
 .../agent/task/connection/PipeEventCollector.java  |  35 +-
 .../client/IoTDBDataNodeAsyncClientManager.java    |   2 +-
 .../batch/PipeTabletEventTsFileBatch.java          | 384 +++---------
 .../PipeTransferDataNodeHandshakeV1Req.java        |  10 +-
 .../evolvable/request/PipeTransferPlanNodeReq.java |  18 +-
 .../request/PipeTransferSchemaSnapshotSealReq.java |  94 ++-
 .../request/PipeTransferTabletInsertNodeReqV2.java |   2 +-
 .../request/PipeTransferTabletRawReq.java          |   4 +-
 .../request/PipeTransferTabletRawReqV2.java        |   9 +-
 .../request/PipeTransferTsFileSealWithModReq.java  |  42 +-
 .../airgap/IoTDBDataNodeAirGapConnector.java       |   4 +
 .../airgap/IoTDBDataRegionAirGapConnector.java     |   4 +
 .../airgap/IoTDBSchemaRegionAirGapConnector.java   |  19 +-
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  |   2 +
 .../connector/protocol/opcua/OpcUaConnector.java   |   4 +
 .../connector/protocol/opcua/OpcUaNameSpace.java   |  44 +-
 .../pipeconsensus/PipeConsensusAsyncConnector.java |   4 +
 .../pipeconsensus/PipeConsensusSyncConnector.java  |   4 +
 .../request/PipeConsensusDeleteNodeReq.java        |   4 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |  14 +-
 .../PipeTransferTabletBatchEventHandler.java       |   2 +-
 .../async/handler/PipeTransferTsFileHandler.java   |  13 +-
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    |   4 +
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  |  23 +-
 .../thrift/sync/IoTDBSchemaRegionConnector.java    |  26 +-
 .../protocol/websocket/WebSocketConnector.java     |   2 +
 .../protocol/writeback/WriteBackConnector.java     | 199 +++++-
 .../util/builder/PipeTableModeTsFileBuilder.java   | 273 +++++++++
 .../util/builder/PipeTreeModelTsFileBuilder.java   | 268 ++++++++
 .../connector/util/builder/PipeTsFileBuilder.java  | 162 +++++
 .../util/{ => cacher}/LeaderCacheUtils.java        |   2 +-
 .../sorter/PipeTableModelTabletEventSorter.java    | 273 +++++++++
 .../util/{ => sorter}/PipeTabletEventSorter.java   | 107 +---
 .../sorter/PipeTreeModelTabletEventSorter.java     | 123 ++++
 .../pipe/consensus/deletion/DeletionResource.java  |   2 +-
 .../db/pipe/event/common/PipeInsertionEvent.java   |  15 +-
 .../common/deletion/PipeDeleteDataNodeEvent.java   |  24 +-
 .../db/pipe/event/common/row/PipeRowCollector.java |   2 +-
 .../schema/PipeSchemaRegionSnapshotEvent.java      |  75 ++-
 .../schema/PipeSchemaSerializableEventType.java    |  15 +-
 .../common/tablet/PipeRawTabletInsertionEvent.java |   2 +-
 .../tablet/parser/TabletInsertionEventParser.java  |   8 +-
 .../TabletInsertionEventTreePatternParser.java     |  13 +-
 .../scan/TsFileInsertionEventScanParser.java       |  42 +-
 .../table/TsFileInsertionEventTableParser.java     |   2 +-
 .../dataregion/DataRegionListeningFilter.java      |  21 +-
 .../dataregion/IoTDBDataRegionExtractor.java       | 156 ++---
 ...oricalDataRegionTsFileAndDeletionExtractor.java |   4 +-
 .../listener/PipeInsertionDataNodeListener.java    |  27 +-
 .../schemaregion/IoTDBSchemaRegionExtractor.java   |  28 +-
 .../PipePlanTablePatternParseVisitor.java          |  77 +++
 ...r.java => PipePlanTreePatternParseVisitor.java} |   7 +-
 .../schemaregion/SchemaRegionListeningFilter.java  |  87 +--
 .../schemaregion/SchemaRegionListeningQueue.java   |   8 +-
 .../pipe/metric/PipeDataNodeReceiverMetrics.java   |  48 +-
 .../processor/aggregate/AggregateProcessor.java    |   2 +
 .../StandardStatisticsOperatorProcessor.java       |   2 +
 .../processor/TumblingWindowingProcessor.java      |   2 +
 .../changing/ChangingValueSamplingProcessor.java   |   2 +
 .../sdt/SwingingDoorTrendingSamplingProcessor.java |   2 +
 .../tumbling/TumblingTimeSamplingProcessor.java    |   2 +
 .../pipeconsensus/PipeConsensusProcessor.java      |   4 +
 .../schemachange/RenameDatabaseProcessor.java      |  11 +-
 .../twostage/plugin/TwoStageCountProcessor.java    |  37 +-
 .../pipeconsensus/PipeConsensusReceiver.java       |  33 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 223 +++++--
 .../statement/PipeConvertedInsertRowStatement.java |  30 +-
 .../PipeConvertedInsertTabletStatement.java        |  52 +-
 .../visitor/PipePlanToStatementVisitor.java        |  54 +-
 .../PipeStatementTablePatternParseVisitor.java     |  33 +-
 ...a => PipeStatementTreePatternParseVisitor.java} |   6 +-
 ...leStatementDataTypeConvertExecutionVisitor.java |  10 +-
 ...eeStatementDataTypeConvertExecutionVisitor.java |   3 +-
 ...r.java => PipeTreeStatementToBatchVisitor.java} |   2 +-
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  21 +-
 .../resource/tsfile/PipeTsFileResourceManager.java |   5 +-
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  57 ++
 .../impl/DataNodeInternalRPCServiceImpl.java       |   2 +
 .../execution/executor/RegionWriteExecutor.java    |  55 +-
 .../function/table/ExcludeColumnFunction.java      |   4 +-
 .../execution/function/table/HOPTableFunction.java |   1 +
 .../operator/process/EnforceSingleRowOperator.java |   5 +-
 .../merge/comparator/JoinKeyComparatorFactory.java |   2 +-
 .../schema/source/TableDeviceQuerySource.java      |   4 +-
 .../InformationSchemaContentSupplierFactory.java   | 372 ++++++++++--
 .../relational/MergeSortSemiJoinOperator.java      | 228 +++++++
 .../relational/aggregation/AccumulatorFactory.java | 265 +++++++-
 .../relational/aggregation/AggregationMask.java    | 204 +++++++
 .../relational/aggregation/AvgAccumulator.java     | 122 +++-
 .../relational/aggregation/CountAccumulator.java   |  22 +-
 ...IfAccumulator.java => CountAllAccumulator.java} |  48 +-
 .../relational/aggregation/CountIfAccumulator.java |  22 +-
 .../relational/aggregation/ExtremeAccumulator.java |  94 ++-
 .../relational/aggregation/FirstAccumulator.java   | 158 +++--
 .../relational/aggregation/FirstByAccumulator.java | 164 +++--
 .../aggregation/FirstByDescAccumulator.java        | 132 +++-
 .../aggregation/FirstDescAccumulator.java          | 126 +++-
 .../relational/aggregation/LastAccumulator.java    | 140 ++++-
 .../relational/aggregation/LastByAccumulator.java  | 146 ++++-
 .../aggregation/LastByDescAccumulator.java         | 150 ++++-
 .../aggregation/LastDescAccumulator.java           | 144 ++++-
 ...dAccumulator.java => MaskedRecordIterator.java} |  40 +-
 .../relational/aggregation/MaxAccumulator.java     | 140 ++++-
 .../relational/aggregation/MinAccumulator.java     | 140 ++++-
 .../relational/aggregation/SumAccumulator.java     | 114 +++-
 .../relational/aggregation/TableAccumulator.java   |   2 +-
 .../relational/aggregation/TableAggregator.java    |   5 +-
 .../aggregation/TableMaxMinByBaseAccumulator.java  | 146 +++--
 .../aggregation/TableModeAccumulator.java          | 192 ++++--
 .../aggregation/TableVarianceAccumulator.java      | 158 +++--
 .../UserDefinedAggregateFunctionAccumulator.java   |  35 +-
 .../aggregation/grouped/GroupedAccumulator.java    |   4 +-
 .../aggregation/grouped/GroupedAggregator.java     |   6 +-
 .../aggregation/grouped/GroupedAvgAccumulator.java | 119 +++-
 .../grouped/GroupedCountAccumulator.java           |  22 +-
 .../grouped/GroupedCountIfAccumulator.java         |  22 +-
 .../grouped/GroupedExtremeAccumulator.java         |  95 ++-
 .../grouped/GroupedFirstAccumulator.java           | 153 ++++-
 .../grouped/GroupedFirstByAccumulator.java         | 191 ++++--
 .../grouped/GroupedLastAccumulator.java            | 133 +++-
 .../grouped/GroupedLastByAccumulator.java          | 150 ++++-
 .../aggregation/grouped/GroupedMaxAccumulator.java | 141 ++++-
 .../grouped/GroupedMaxMinByBaseAccumulator.java    | 147 ++++-
 .../aggregation/grouped/GroupedMinAccumulator.java | 141 ++++-
 .../grouped/GroupedModeAccumulator.java            | 246 ++++++--
 .../aggregation/grouped/GroupedSumAccumulator.java | 111 +++-
 .../GroupedUserDefinedAggregateAccumulator.java    |  14 +-
 .../grouped/GroupedVarianceAccumulator.java        | 159 +++--
 .../aggregation/grouped/hash/GroupByHash.java      |   2 +
 .../aggregation/grouped/hash/MarkDistinctHash.java |  90 +++
 .../relational/ColumnTransformerBuilder.java       |  20 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |   8 +
 .../db/queryengine/plan/analyze/AnalyzeUtils.java  |   9 +-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  30 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  27 +-
 .../analyze/load/LoadTsFileTableSchemaCache.java   |  17 +-
 .../load/LoadTsFileToTableModelAnalyzer.java       |  34 +-
 .../load/LoadTsFileToTreeModelAnalyzer.java        |  22 +-
 .../analyze/load/LoadTsFileTreeSchemaCache.java    |   3 +-
 .../load/TreeSchemaAutoCreatorAndVerifier.java     |  30 +-
 .../execution/config/TableConfigTaskVisitor.java   | 141 ++++-
 .../execution/config/TreeConfigTaskVisitor.java    |  30 +-
 .../config/executor/ClusterConfigTaskExecutor.java | 159 ++++-
 .../config/executor/IConfigTaskExecutor.java       |  19 +-
 .../config/metadata/DropPipePluginTask.java        |  13 +-
 .../config/metadata/ShowPipePluginsTask.java       |  20 +-
 .../ExtendRegionTask.java}                         |  19 +-
 .../metadata/{ => region}/MigrateRegionTask.java   |   4 +-
 .../ReconstructRegionTask.java}                    |  19 +-
 .../RemoveRegionTask.java}                         |  19 +-
 .../config/metadata/relational/ShowDBTask.java     |   4 +-
 .../metadata/relational/ShowTablesDetailsTask.java |  47 +-
 .../config/metadata/relational/ShowTablesTask.java |  35 +-
 .../execution/config/sys/pipe/AlterPipeTask.java   |   8 +-
 .../execution/config/sys/pipe/DropPipeTask.java    |   7 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |   3 +-
 .../execution/config/sys/pipe/StartPipeTask.java   |   7 +-
 .../execution/config/sys/pipe/StopPipeTask.java    |   7 +-
 .../db/queryengine/plan/parser/ASTVisitor.java     |  38 +-
 .../plan/planner/LogicalPlanVisitor.java           |   7 +-
 .../plan/planner/TableOperatorGenerator.java       |  95 ++-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  12 +
 .../plan/planner/plan/node/PlanNodeType.java       |   4 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../node/metadata/write/AlterTimeSeriesNode.java   |   9 +-
 .../node/metadata/write/CreateTimeSeriesNode.java  |  74 +--
 .../plan/node/pipe/PipeEnrichedDeleteDataNode.java |  66 +-
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |  44 +-
 .../plan/node/pipe/PipeEnrichedWritePlanNode.java  |  23 +-
 .../relational/analyzer/ExpressionAnalyzer.java    |  37 +-
 .../plan/relational/analyzer/Scope.java            |   8 +-
 .../relational/analyzer/StatementAnalyzer.java     | 116 ++--
 .../plan/relational/metadata/Metadata.java         |   4 +-
 .../relational/metadata/TableMetadataImpl.java     |  53 +-
 .../metadata/fetcher/TableDeviceSchemaFetcher.java |  14 +-
 .../fetcher/TableHeaderSchemaValidator.java        |  11 +-
 .../plan/relational/planner/IrTypeAnalyzer.java    |  15 +-
 .../plan/relational/planner/QueryPlanner.java      |   2 +-
 .../plan/relational/planner/RelationPlanner.java   |  91 +--
 .../relational/planner/SimplePlanRewriter.java     |  88 +++
 .../plan/relational/planner/SubqueryPlanner.java   |  84 ++-
 .../relational/planner/TableLogicalPlanner.java    |  15 +-
 .../distribute/TableDistributedPlanGenerator.java  |  24 +
 .../planner/iterative/rule/PruneApplyColumns.java  | 138 +++++
 .../iterative/rule/PruneApplyCorrelation.java      |  70 +++
 .../iterative/rule/PruneApplySourceColumns.java    |  95 +++
 .../rule/RemoveUnreferencedScalarApplyNodes.java   |  42 ++
 .../rule/RemoveUnreferencedScalarSubqueries.java   |  70 +++
 .../TransformFilteringSemiJoinToInnerJoin.java     | 150 +++++
 ...mUncorrelatedInPredicateSubqueryToSemiJoin.java |  95 +++
 .../plan/relational/planner/node/Patterns.java     |   7 +-
 .../plan/relational/planner/node/SemiJoinNode.java | 181 ++++++
 .../node/schema/CreateOrUpdateTableDeviceNode.java |  14 +-
 .../DataNodeLocationSupplierFactory.java           |  11 +
 .../optimizations/LogicalOptimizeFactory.java      |  26 +-
 .../PushAggregationIntoTableScan.java              |   4 +
 .../optimizations/PushPredicateIntoTableScan.java  | 235 ++++++-
 ...mQuantifiedComparisonApplyToCorrelatedJoin.java | 341 +++++++++++
 .../optimizations/UnaliasSymbolReferences.java     |  29 +
 .../sql/ast/AbstractQueryDeviceWithCache.java      |   2 +
 .../relational/sql/ast/AbstractTraverseDevice.java |   8 +
 .../plan/relational/sql/ast/AlterPipe.java         |  22 +-
 .../plan/relational/sql/ast/AstVisitor.java        |  14 +-
 .../sql/ast/{Update.java => CountStatement.java}   |  59 +-
 .../relational/sql/ast/CreateOrUpdateDevice.java   |  31 +-
 .../plan/relational/sql/ast/CreatePipe.java        |   4 +-
 .../plan/relational/sql/ast/Delete.java            |  28 +-
 .../plan/relational/sql/ast/DeleteDevice.java      |   2 -
 .../plan/relational/sql/ast/DropColumn.java        |   3 -
 .../plan/relational/sql/ast/DropPipe.java          |  10 +-
 .../plan/relational/sql/ast/LoadTsFile.java        |   9 +
 .../plan/relational/sql/ast/PipeEnriched.java      |   8 +-
 .../plan/relational/sql/ast/ShowPipes.java         |  10 +-
 .../plan/relational/sql/ast/StartPipe.java         |   8 +-
 .../ast/{Statement.java => StartRepairData.java}   |  12 +-
 .../plan/relational/sql/ast/Statement.java         |   4 +-
 .../plan/relational/sql/ast/StopPipe.java          |   8 +-
 .../ast/{Statement.java => StopRepairData.java}    |  12 +-
 .../plan/relational/sql/ast/Update.java            |   6 +-
 .../plan/relational/sql/parser/AstBuilder.java     |  67 +-
 .../plan/relational/sql/rewrite/ShowRewrite.java   |  51 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   9 +-
 .../plan/statement/StatementVisitor.java           |  18 +-
 .../plan/statement/crud/InsertBaseStatement.java   |  22 +-
 .../plan/statement/crud/InsertRowStatement.java    |  18 +-
 .../plan/statement/crud/LoadTsFileStatement.java   |   9 +
 .../metadata/AlterTimeSeriesStatement.java         |   7 +-
 .../metadata/pipe/AlterPipeStatement.java          |  31 +-
 .../metadata/pipe/DropPipePluginStatement.java     |  17 +-
 .../statement/metadata/pipe/DropPipeStatement.java |  23 +-
 .../metadata/pipe/ShowPipePluginsStatement.java    |  23 +-
 .../metadata/pipe/ShowPipesStatement.java          |  18 +-
 .../metadata/pipe/StartPipeStatement.java          |  17 +-
 .../statement/metadata/pipe/StopPipeStatement.java |  17 +-
 .../ExtendRegionStatement.java}                    |  32 +-
 .../{ => region}/MigrateRegionStatement.java       |   2 +-
 .../ReconstructRegionStatement.java}               |  42 +-
 .../RemoveRegionStatement.java}                    |  32 +-
 .../plan/statement/pipe/PipeEnrichedStatement.java |  10 +-
 .../attribute/DeviceAttributeStore.java            |  10 +-
 .../attribute/IDeviceAttributeStore.java           |   6 +-
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  | 136 +++--
 .../schemaregion/mtree/impl/mem/MemMTreeStore.java |  52 +-
 .../mtree/impl/mem/mnode/basic/BasicMNode.java     |  22 +-
 .../impl/mem/snapshot/MemMTreeSnapshotUtil.java    |  37 +-
 .../schemaengine/table/InformationSchemaUtils.java |  11 +-
 .../db/service/metrics/DataNodeMetricsHelper.java  |   4 +-
 .../metrics/memory/ConsensusMemoryMetrics.java     |  73 +++
 .../metrics/memory/GlobalMemoryMetrics.java        | 132 ++++
 .../metrics/memory/QueryEngineMemoryMetrics.java   | 179 ++++++
 .../metrics/memory/SchemaEngineMemoryMetrics.java  | 124 ++++
 .../metrics/memory/StorageEngineMemoryMetrics.java | 210 +++++++
 .../metrics/memory/StreamEngineMemoryMetrics.java  |  73 +++
 .../metrics/memory/ThresholdMemoryMetrics.java     | 194 ------
 .../db/storageengine/dataregion/DataRegion.java    | 123 ++--
 .../task/InsertionCrossSpaceCompactionTask.java    |   3 -
 .../utils/CompactionTableSchemaCollector.java      |   2 +-
 .../compaction/repair/RepairDataFileScanUtil.java  |  30 +-
 .../schedule/CompactionScheduleContext.java        |  21 +
 .../compaction/schedule/CompactionScheduler.java   |  60 +-
 .../impl/RewriteCrossSpaceCompactionSelector.java  |  11 +-
 .../dataregion/memtable/TsFileProcessor.java       |   4 +
 .../dataregion/modification/ModificationFile.java  |  89 ++-
 .../modification/PartitionLevelModFileManager.java |   5 +-
 .../dataregion/snapshot/SnapshotTaker.java         |   2 +-
 .../dataregion/tsfile/TsFileResource.java          |  10 +-
 .../tsfile/timeindex/ArrayDeviceTimeIndex.java     |  20 +
 .../db/storageengine/load/LoadTsFileManager.java   |  23 +-
 .../load/active/ActiveLoadDirScanner.java          |  27 +-
 .../load/active/ActiveLoadTsFileLoader.java        |  13 +-
 .../LoadConvertedInsertTabletStatement.java        |  18 +-
 ...leStatementDataTypeConvertExecutionVisitor.java |  10 +-
 ...eeStatementDataTypeConvertExecutionVisitor.java |   3 +-
 .../converter/LoadTsFileDataTypeConverter.java     |  78 ++-
 .../load/splitter/TsFileSplitter.java              |   5 +-
 .../broker/SubscriptionPrefetchingQueue.java       |   2 +
 .../db/subscription/event/SubscriptionEvent.java   |  21 +-
 .../batch/SubscriptionPipeTabletEventBatch.java    |  24 +-
 .../batch/SubscriptionPipeTsFileEventBatch.java    |  11 +-
 .../db/tools/schema/SRStatementGenerator.java      | 208 +++++--
 .../tools/schema/SchemaRegionSnapshotParser.java   |  32 +-
 .../iotdb/db/utils/constant/SqlConstant.java       |   1 +
 .../iotdb/db/utils/datastructure/BinaryTVList.java |   6 +-
 .../db/utils/datastructure/BooleanTVList.java      |   6 +-
 .../iotdb/db/utils/datastructure/DoubleTVList.java |   6 +-
 .../iotdb/db/utils/datastructure/FloatTVList.java  |   6 +-
 .../iotdb/db/utils/datastructure/IntTVList.java    |   6 +-
 .../iotdb/db/utils/datastructure/LongTVList.java   |   6 +-
 .../connector/PipeDataNodeThriftRequestTest.java   |  22 +-
 .../PipeStatementTablePatternParseVisitorTest.java |  57 ++
 ... PipeStatementTreePatternParseVisitorTest.java} |  20 +-
 .../pipe/connector/PipeTabletEventSorterTest.java  | 263 ++++++--
 .../db/pipe/consensus/DeletionResourceTest.java    |  39 +-
 .../pipe/event/PipeTabletInsertionEventTest.java   |  25 +-
 .../PipePlanTablePatternParseVisitorTest.java      | 129 ++++
 ...va => PipePlanTreePatternParseVisitorTest.java} |  32 +-
 .../annotation/PipePluginAnnotationTest.java}      |  23 +-
 .../resource/PipeTsFileResourceManagerTest.java    |   2 +-
 .../metadata/write/MetadataWriteNodeSerDeTest.java |   2 +-
 .../plan/relational/analyzer/DistinctTest.java     |  91 +++
 .../plan/relational/planner/SubqueryTest.java      | 310 ++++++++++
 .../planner/assertions/PlanMatchPattern.java       |  17 +
 .../planner/assertions/SemiJoinMatcher.java        |  79 +++
 .../storageengine/buffer/BloomFilterCacheTest.java |   5 +-
 .../storageengine/dataregion/DataRegionTest.java   |  24 +-
 .../compaction/CompactionValidationTest.java       |   7 +-
 .../InsertionCrossSpaceCompactionSelectorTest.java |   4 +-
 .../cross/InsertionCrossSpaceCompactionTest.java   |  64 +-
 .../inner/InnerSpaceCompactionSelectorTest.java    |   2 +-
 .../repair/RepairDataFileScanUtilTest.java         |   2 +-
 .../TableModelFastCompactionPerformerTest.java     |  13 +-
 ...TableModelReadChunkCompactionPerformerTest.java |  13 +-
 ...TableModelReadPointCompactionPerformerTest.java |  13 +-
 .../modification/ModificationFileTest.java         |  90 ++-
 .../reader/chunk/MemAlignedChunkLoaderTest.java    |   2 +-
 .../read/reader/chunk/MemChunkLoaderTest.java      |  12 +-
 .../file/UnsealedTsFileRecoverPerformerTest.java   |   4 +-
 .../iotdb/db/tools/TsFileAndModSettleToolTest.java |   2 +-
 .../iotdb/db/tools/TsFileSelfCheckToolTest.java    |   6 +-
 .../iotdb/db/tools/TsFileSketchToolTest.java       |  10 +-
 .../db/utils/SchemaRegionSnapshotParserTest.java   | 200 ++++--
 iotdb-core/node-commons/pom.xml                    |   4 +
 .../conf/iotdb-system.properties.template          |   6 +-
 .../exception}/table/ColumnNotExistsException.java |   2 +-
 .../table/TableAlreadyExistsException.java         |   2 +-
 .../exception}/table/TableNotExistsException.java  |   2 +-
 .../commons/pipe/agent/plugin/PipePluginAgent.java |  17 +-
 .../builtin/connector/PlaceholderConnector.java    |   4 +
 .../connector/donothing/DoNothingConnector.java    |   4 +
 .../iotdb/airgap/IoTDBAirGapConnector.java         |   4 +
 .../consensus/PipeConsensusAsyncConnector.java     |   4 +
 .../iotdb/thrift/IoTDBLegacyPipeConnector.java     |   2 +
 .../iotdb/thrift/IoTDBThriftAsyncConnector.java    |   5 +
 .../iotdb/thrift/IoTDBThriftConnector.java         |   4 +
 .../iotdb/thrift/IoTDBThriftSslConnector.java      |   4 +
 .../iotdb/thrift/IoTDBThriftSyncConnector.java     |   5 +
 .../builtin/connector/opcua/OpcUaConnector.java    |   4 +
 .../connector/websocket/WebSocketConnector.java    |   2 +
 .../connector/writeback/WriteBackConnector.java    |   4 +
 .../extractor/donothing/DoNothingExtractor.java    |   4 +
 .../builtin/extractor/iotdb/IoTDBExtractor.java    |   4 +
 .../processor/donothing/DoNothingProcessor.java    |   4 +
 .../throwing/ThrowingExceptionProcessor.java       |   4 +
 .../agent/plugin/meta/PipePluginMetaKeeper.java    |  47 +-
 .../commons/pipe/agent/task/PipeTaskAgent.java     |   2 +-
 .../pipe/agent/task/meta/PipeMetaKeeper.java       |  12 +-
 .../pipe/agent/task/meta/PipeStaticMeta.java       |  14 +
 .../config/constant/PipeExtractorConstant.java     |   3 +
 .../connector/client/IoTDBSyncClientManager.java   |  28 +-
 .../payload/thrift/request/PipeRequestType.java    |   4 +-
 .../thrift/request/PipeTransferFileSealReqV2.java  |  13 +-
 .../connector/protocol/IoTDBAirGapConnector.java   |  14 +-
 .../pipe/connector/protocol/IoTDBConnector.java    |  10 +
 .../connector/protocol/IoTDBSslSyncConnector.java  |   4 +
 .../options/PipeInclusionOptions.java              |  43 +-
 .../pipe/datastructure/pattern/TablePattern.java   |  29 +-
 .../pipe/datastructure/pattern/TreePattern.java    |  25 +-
 .../{ => resource}/PersistentResource.java         |   2 +-
 .../commons/pipe/datastructure/result/Result.java} |  37 +-
 .../pipe/datastructure/visibility/Visibility.java  |   9 +-
 .../visibility/VisibilityTestUtils.java            | 135 +++++
 .../datastructure/visibility/VisibilityUtils.java  | 133 ++++
 .../commons/pipe/extractor/IoTDBExtractor.java     | 103 +++-
 .../extractor/IoTDBNonDataRegionExtractor.java     |  24 +-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   |  49 +-
 .../commons/pipe/receiver/IoTDBReceiverAgent.java  |   7 +-
 .../snapshot/PipeSnapshotResourceManager.java      |   9 +-
 .../apache/iotdb/commons/schema/node/IMNode.java   |  18 +-
 .../commons/schema/node/visitor/MNodeVisitor.java  |   6 +-
 .../iotdb/commons/service/metric/enums/Metric.java |   3 +-
 .../iotdb/commons/udf/access/RecordIterator.java   |   9 +-
 .../TableBuiltinAggregationFunction.java           |   2 +
 .../org/apache/iotdb/commons/utils/FileUtils.java  |  49 ++
 .../org/apache/iotdb/commons/utils/PathUtils.java  |   2 +-
 .../apache/iotdb/commons/utils/RetryUtils.java}    |  30 +-
 .../annotation/PipePluginAnnotationTest.java}      |  23 +-
 .../db/relational/grammar/sql/RelationalSql.g4     |  11 +-
 .../thrift-commons/src/main/thrift/common.thrift   |   3 +-
 .../src/main/thrift/confignode.thrift              |  69 +++
 pom.xml                                            |   6 +-
 608 files changed, 23175 insertions(+), 6095 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java
index 2b3998365a5,00000000000..e01428e4736
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java
@@@ -1,105 -1,0 +1,103 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.function.table;
 +
 +import org.apache.iotdb.udf.api.exception.UDFException;
 +import org.apache.iotdb.udf.api.relational.TableFunction;
 +import org.apache.iotdb.udf.api.relational.access.Record;
 +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
 +import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 +import org.apache.iotdb.udf.api.relational.table.argument.Argument;
 +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
 +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
 +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
 +import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
 +import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
 +import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
 +import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
 +import org.apache.iotdb.udf.api.type.Type;
 +
 +import com.google.common.collect.ImmutableList;
 +import org.apache.tsfile.block.column.ColumnBuilder;
 +
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +
 +public class ExcludeColumnFunction implements TableFunction {
 +  private final String TBL_PARAM = "DATA";
 +  private final String COL_PARAM = "EXCLUDE";
 +
 +  @Override
 +  public List<ParameterSpecification> getArgumentsSpecification() {
 +    return Arrays.asList(
 +        
TableParameterSpecification.builder().name(TBL_PARAM).rowSemantics().build(),
 +        
ScalarParameterSpecification.builder().name(COL_PARAM).type(Type.STRING).build());
 +  }
 +
 +  @Override
 +  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) 
throws UDFException {
 +    TableArgument tableArgument = (TableArgument) arguments.get(TBL_PARAM);
 +    if (tableArgument == null) {
 +      throw new UDFException("Table argument is missing");
 +    }
 +    String excludeColumn = (String) ((ScalarArgument) 
arguments.get(COL_PARAM)).getValue();
 +    ImmutableList.Builder<Integer> requiredColumns = ImmutableList.builder();
 +    DescribedSchema.Builder schemaBuilder = DescribedSchema.builder();
 +    for (int i = 0; i < tableArgument.getFieldNames().size(); i++) {
 +      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
 +      if (!fieldName.isPresent() || 
!fieldName.get().equalsIgnoreCase(excludeColumn)) {
 +        requiredColumns.add(i);
-         schemaBuilder.addField(
-             fieldName,
-             tableArgument.getFieldTypes().get(i));
++        schemaBuilder.addField(fieldName, 
tableArgument.getFieldTypes().get(i));
 +      }
 +    }
 +    return TableFunctionAnalysis.builder()
 +        .properColumnSchema(schemaBuilder.build())
 +        .requiredColumns(TBL_PARAM, requiredColumns.build())
 +        .build();
 +  }
 +
 +  @Override
 +  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
 +    return new TableFunctionProcessorProvider() {
 +      @Override
 +      public TableFunctionDataProcessor getDataProcessor() {
 +        return new TableFunctionDataProcessor() {
 +          @Override
 +          public void process(Record input, List<ColumnBuilder> 
columnBuilders) {
 +            for (int i = 0; i < input.size(); i++) {
 +              if (input.isNull(i)) {
 +                columnBuilders.get(i).appendNull();
 +              } else {
 +                columnBuilders.get(i).writeObject(input.getObject(i));
 +              }
 +            }
 +          }
 +
 +          @Override
 +          public void finish(List<ColumnBuilder> columnBuilders) {
 +            // do nothing
 +          }
 +        };
 +      }
 +    };
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
index 6bf8df15f04,00000000000..4da85a53f12
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
@@@ -1,176 -1,0 +1,177 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +
 +package org.apache.iotdb.db.queryengine.execution.function.table;
 +
 +import org.apache.iotdb.udf.api.exception.UDFException;
 +import org.apache.iotdb.udf.api.relational.TableFunction;
 +import org.apache.iotdb.udf.api.relational.access.Record;
 +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
 +import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 +import org.apache.iotdb.udf.api.relational.table.argument.Argument;
 +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
 +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
 +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
 +import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
 +import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
 +import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
 +import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
 +import org.apache.iotdb.udf.api.type.Type;
 +
 +import org.apache.tsfile.block.column.ColumnBuilder;
 +
 +import java.util.Arrays;
++import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +public class HOPTableFunction implements TableFunction {
 +
 +  private static final String DATA_PARAMETER_NAME = "DATA";
 +  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
 +  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
 +  private static final String SIZE_PARAMETER_NAME = "SIZE";
 +  private static final String START_PARAMETER_NAME = "START";
 +
 +  @Override
 +  public List<ParameterSpecification> getArgumentsSpecification() {
 +    return Arrays.asList(
 +        TableParameterSpecification.builder()
 +            .name(DATA_PARAMETER_NAME)
 +            .passThroughColumns()
 +            .keepWhenEmpty()
 +            .build(),
 +        ScalarParameterSpecification.builder()
 +            .name(TIMECOL_PARAMETER_NAME)
 +            .type(Type.STRING)
 +            .build(),
 +        
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
 +        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
 +        ScalarParameterSpecification.builder()
 +            .name(START_PARAMETER_NAME)
 +            .type(Type.TIMESTAMP)
 +            .defaultValue(Long.MIN_VALUE)
 +            .build());
 +  }
 +
 +  private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
 +    int requiredIndex = -1;
 +    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
 +      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
 +      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
 +        requiredIndex = i;
 +        break;
 +      }
 +    }
 +    return requiredIndex;
 +  }
 +
 +  // TODO: ImmutableMap
 +  @Override
 +  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
 +    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
 +    String expectedFieldName =
 +        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
 +    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
 +    if (requiredIndex == -1) {
 +      throw new UDFException("The required field is not found in the input 
table");
 +    }
 +    DescribedSchema properColumnSchema =
 +        new DescribedSchema.Builder()
 +            .addField("window_start", Type.TIMESTAMP)
 +            .addField("window_end", Type.TIMESTAMP)
 +            .build();
 +
 +    // outputColumnSchema
 +    return TableFunctionAnalysis.builder()
 +        .properColumnSchema(properColumnSchema)
 +        .requiredColumns(
 +            DATA_PARAMETER_NAME,
 +            IntStream.range(0, tableArgument.getFieldTypes().size())
 +                .boxed()
 +                .collect(Collectors.toList()))
 +        .build();
 +  }
 +
 +  @Override
 +  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
 +    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
 +    String expectedFieldName =
 +        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
 +    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
 +
 +    return new TableFunctionProcessorProvider() {
 +      @Override
 +      public TableFunctionDataProcessor getDataProcessor() {
 +        return new HOPDataProcessor(
 +            (Long) ((ScalarArgument) 
arguments.get(START_PARAMETER_NAME)).getValue(),
 +            (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
 +            (Long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue(),
 +            requiredIndex);
 +      }
 +    };
 +  }
 +
 +  private static class HOPDataProcessor implements TableFunctionDataProcessor 
{
 +
 +    private final long slide;
 +    private final long size;
 +    private final int timeColumnIndex;
 +    private long curTime;
 +
 +    public HOPDataProcessor(long startTime, long slide, long size, int 
timeColumnIndex) {
 +      this.slide = slide;
 +      this.size = size;
 +      this.curTime = startTime;
 +      this.timeColumnIndex = timeColumnIndex;
 +    }
 +
 +    @Override
 +    public void process(Record input, List<ColumnBuilder> columnBuilders) {
 +      long timeValue = input.getLong(timeColumnIndex);
 +      if (curTime == Long.MIN_VALUE) {
 +        curTime = timeValue;
 +      }
 +      if (curTime + size <= timeValue) {
 +        // jump to appropriate window
 +        long move = (timeValue - curTime - size) / slide + 1;
 +        curTime += move * slide;
 +      }
 +      long slideTime = curTime;
 +      while (slideTime <= timeValue && slideTime + size > timeValue) {
 +        for (int i = 0; i < input.size(); i++) {
 +          if (input.isNull(i)) {
 +            columnBuilders.get(i + 2).appendNull();
 +          } else {
 +            columnBuilders.get(i + 2).writeObject(input.getObject(i));
 +          }
 +        }
 +        columnBuilders.get(0).writeLong(slideTime);
 +        columnBuilders.get(1).writeLong(slideTime + size);
 +        slideTime += slide;
 +      }
 +    }
 +
 +    @Override
 +    public void finish(List<ColumnBuilder> columnBuilders) {}
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
index 95df45ae6b6,dbf74b711ca..3e0cabe0422
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
@@@ -74,18 -76,18 +76,22 @@@ public class GroupedUserDefinedAggregat
    }
  
    @Override
-   public void addInput(int[] groupIds, Column[] arguments) {
+   public void addInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
      RecordIterator iterator =
-         new RecordIterator(
-             Arrays.asList(arguments), inputDataTypes, 
arguments[0].getPositionCount());
+         mask.isSelectAll()
+             ? new RecordIterator(
+                 Arrays.asList(arguments), inputDataTypes, 
arguments[0].getPositionCount())
+             : new MaskedRecordIterator(Arrays.asList(arguments), 
inputDataTypes, mask);
+     int[] selectedPositions = mask.getSelectedPositions();
      int index = 0;
      while (iterator.hasNext()) {
-       int groupId = groupIds[index++];
+       int groupId = groupIds[selectedPositions[index]];
+       index++;
        State state = getOrCreateState(groupId);
 +      if (state == null) {
 +        state = aggregateFunction.createState();
 +        stateArray.set(groupId, state);
 +      }
        aggregateFunction.addInput(state, iterator.next());
      }
    }
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 01377a469ff,e94136cf74c..f28c338b6ac
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@@ -145,10 -145,9 +147,11 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeNonAlignedDeviceViewScanNode;
@@@ -220,8 -217,8 +223,9 @@@ import java.util.function.BiFunction
  import java.util.stream.Collectors;
  
  import static com.google.common.base.Preconditions.checkArgument;
+ import static com.google.common.base.Preconditions.checkNotNull;
  import static com.google.common.collect.ImmutableList.toImmutableList;
 +import static com.google.common.collect.Iterables.getOnlyElement;
  import static java.util.Objects.requireNonNull;
  import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.FIELD;
  import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME;
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index e7c3c1de1bf,442d59f8be3..441b31b2c42
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@@ -72,7 -72,7 +72,8 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
@@@ -942,72 -935,18 +943,83 @@@ public class PlanGraphPrinter extends P
      }
      return render(node, boxValue, context);
    }
+ 
+   @Override
+   public List<String> visitSemiJoin(SemiJoinNode node, GraphContext context) {
+     List<String> boxValue = new ArrayList<>();
+     boxValue.add(String.format("SemiJoin-%s", node.getPlanNodeId().getId()));
+     boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
+     boxValue.add(String.format("SourceJoinSymbol: %s", 
node.getSourceJoinSymbol()));
+     boxValue.add(
+         String.format("FilteringSourceJoinSymbol: %s", 
node.getFilteringSourceJoinSymbol()));
+     return render(node, boxValue, context);
+   }
  
 +  @Override
 +  public List<String> visitTableFunctionProcessor(
 +      TableFunctionProcessorNode node, GraphContext context) {
 +    List<String> boxValue = new ArrayList<>();
 +    boxValue.add(String.format("TableFunctionProcessor-%s", 
node.getPlanNodeId().getId()));
 +    boxValue.add(String.format("Function: %s", node.getName()));
 +    boxValue.add(String.format("ProperOutputs: %s", node.getProperOutputs()));
 +    boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
 +    boxValue.add(String.format("RequiredSymbols: %s", 
node.getRequiredSymbols()));
 +    if (node.isPruneWhenEmpty()) {
 +      boxValue.add("Prune when empty");
 +    } else {
 +      boxValue.add("Keep when empty");
 +    }
 +    node.getDataOrganizationSpecification()
 +        .ifPresent(
 +            specification -> {
 +              if (!specification.getPartitionBy().isEmpty()) {
 +                boxValue.add(
 +                    "Partition by: [" + Joiner.on(", 
").join(specification.getPartitionBy()) + "]");
 +              }
 +              specification
 +                  .getOrderingScheme()
 +                  .ifPresent(orderingScheme -> boxValue.add("Order by: " + 
orderingScheme));
 +            });
 +    if (!node.getArguments().isEmpty()) {
 +      node.getArguments().forEach((key, value) -> 
boxValue.add(formatArgument(key, value)));
 +    }
 +    return render(node, boxValue, context);
 +  }
 +
 +  private String formatArgument(String argumentName, Argument argument) {
 +    if (argument instanceof ScalarArgument) {
 +      return formatScalarArgument(argumentName, (ScalarArgument) argument);
 +    }
 +    if (argument instanceof DescriptorArgument) {
 +      return formatDescriptorArgument(argumentName, (DescriptorArgument) 
argument);
 +    } else {
 +      return formatTableArgument(argumentName, (TableArgument) argument);
 +    }
 +  }
 +
 +  private String formatScalarArgument(String argumentName, ScalarArgument 
argument) {
 +    return format(
 +        "%s => ScalarArgument{type=%s, value=%s}",
 +        argumentName, argument.getType(), argument.getValue());
 +  }
 +
 +  private String formatDescriptorArgument(String argumentName, 
DescriptorArgument argument) {
 +    String descriptor;
 +    if (argument.getDescriptor().isPresent()) {
 +      descriptor =
 +          argument.getDescriptor().get().getFields().stream()
 +              .map(field -> field.getName() + field.getType().map(type -> " " 
+ type).orElse(""))
 +              .collect(joining(", ", "(", ")"));
 +    } else {
 +      descriptor = "NULL";
 +    }
 +    return format("%s => DescriptorArgument{%s}", argumentName, descriptor);
 +  }
 +
 +  private String formatTableArgument(String argumentName, TableArgument 
argument) {
 +    return format("%s => TableArgument", argumentName);
 +  }
 +
    private String printRegion(TRegionReplicaSet regionReplicaSet) {
      return String.format(
          "Partition: %s",
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 99db38ae149,d04c57e77fb..69e948a3fa0
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@@ -127,8 -127,7 +127,9 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index bbdb55815ac,65ae1c14835..d79fcca4476
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@@ -41,8 -36,8 +41,9 @@@ import org.apache.iotdb.db.queryengine.
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
  import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl;
  import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.IrExpressionInterpreter;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.ScopeAware;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index 2a556889336,607b1c37135..55f36ec4e89
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@@ -52,12 -52,12 +55,13 @@@ import org.apache.iotdb.db.queryengine.
  import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature;
  import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
  import org.apache.iotdb.db.utils.constant.SqlConstant;
- import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig;
- import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig;
- import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters;
+ import org.apache.iotdb.rpc.TSStatusCode;
+ import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
+ import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis;
+ import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments;
  import org.apache.iotdb.udf.api.relational.AggregateFunction;
  import org.apache.iotdb.udf.api.relational.ScalarFunction;
 +import org.apache.iotdb.udf.api.relational.TableFunction;
  
  import org.apache.tsfile.file.metadata.IDeviceID;
  import org.apache.tsfile.read.common.type.BlobType;
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index c987c94268c,a90c07eaaf8..637c7abc7c0
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@@ -57,9 -57,9 +57,10 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index 058bad7cb1b,2b414438983..c6fd7862bea
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@@ -46,9 -45,8 +46,10 @@@ import org.apache.iotdb.db.queryengine.
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
  import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
  import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
@@@ -735,114 -733,33 +736,142 @@@ public class UnaliasSymbolReferences im
            outputMapping);
      }
  
+     @Override
+     public PlanAndMappings visitSemiJoin(SemiJoinNode node, UnaliasContext 
context) {
+       // it is assumed that symbols are distinct between SemiJoin source and 
filtering source. Only
+       // symbols from outer correlation might be the exception
+       PlanAndMappings rewrittenSource = node.getSource().accept(this, 
context);
+       PlanAndMappings rewrittenFilteringSource = 
node.getFilteringSource().accept(this, context);
+ 
+       Map<Symbol, Symbol> outputMapping = new HashMap<>();
+       outputMapping.putAll(rewrittenSource.getMappings());
+       outputMapping.putAll(rewrittenFilteringSource.getMappings());
+ 
+       SymbolMapper mapper = symbolMapper(outputMapping);
+ 
+       Symbol newSourceJoinSymbol = mapper.map(node.getSourceJoinSymbol());
+       Symbol newFilteringSourceJoinSymbol = 
mapper.map(node.getFilteringSourceJoinSymbol());
+       Symbol newSemiJoinOutput = mapper.map(node.getSemiJoinOutput());
+ 
+       return new PlanAndMappings(
+           new SemiJoinNode(
+               node.getPlanNodeId(),
+               rewrittenSource.getRoot(),
+               rewrittenFilteringSource.getRoot(),
+               newSourceJoinSymbol,
+               newFilteringSourceJoinSymbol,
+               newSemiJoinOutput),
+           outputMapping);
+     }
++
 +    @Override
 +    public PlanAndMappings visitTableFunction(TableFunctionNode node, 
UnaliasContext context) {
 +      Map<Symbol, Symbol> mapping = new 
HashMap<>(context.getCorrelationMapping());
 +      SymbolMapper mapper = symbolMapper(mapping);
 +
 +      List<Symbol> newProperOutputs = mapper.map(node.getProperOutputs());
 +
 +      ImmutableList.Builder<PlanNode> newSources = ImmutableList.builder();
 +      ImmutableList.Builder<TableFunctionNode.TableArgumentProperties> 
newTableArgumentProperties =
 +          ImmutableList.builder();
 +
 +      for (int i = 0; i < node.getChildren().size(); i++) {
 +        PlanAndMappings newSource = node.getChildren().get(i).accept(this, 
context);
 +        newSources.add(newSource.getRoot());
 +
 +        SymbolMapper inputMapper = symbolMapper(new 
HashMap<>(newSource.getMappings()));
 +        TableFunctionNode.TableArgumentProperties properties =
 +            node.getTableArgumentProperties().get(i);
 +        Optional<DataOrganizationSpecification> newSpecification =
 +            
properties.getDataOrganizationSpecification().map(inputMapper::mapAndDistinct);
 +        TableFunctionNode.PassThroughSpecification 
newPassThroughSpecification =
 +            new TableFunctionNode.PassThroughSpecification(
 +                
properties.getPassThroughSpecification().isDeclaredAsPassThrough(),
 +                properties.getPassThroughSpecification().getColumns().stream()
 +                    .map(
 +                        column ->
 +                            new TableFunctionNode.PassThroughColumn(
 +                                inputMapper.map(column.getSymbol()), 
column.isPartitioningColumn()))
 +                    .collect(toImmutableList()));
 +        newTableArgumentProperties.add(
 +            new TableFunctionNode.TableArgumentProperties(
 +                properties.getArgumentName(),
 +                properties.isRowSemantics(),
 +                properties.isPruneWhenEmpty(),
 +                newPassThroughSpecification,
 +                inputMapper.map(properties.getRequiredColumns()),
 +                newSpecification));
 +      }
 +
 +      return new PlanAndMappings(
 +          new TableFunctionNode(
 +              node.getPlanNodeId(),
 +              node.getName(),
 +              node.getArguments(),
 +              newProperOutputs,
 +              newSources.build(),
 +              newTableArgumentProperties.build()),
 +          mapping);
 +    }
 +
 +    @Override
 +    public PlanAndMappings visitTableFunctionProcessor(
 +        TableFunctionProcessorNode node, UnaliasContext context) {
 +      if (node.getChildren().isEmpty()) {
 +        Map<Symbol, Symbol> mapping = new 
HashMap<>(context.getCorrelationMapping());
 +        SymbolMapper mapper = symbolMapper(mapping);
 +        return new PlanAndMappings(
 +            new TableFunctionProcessorNode(
 +                node.getPlanNodeId(),
 +                node.getName(),
 +                mapper.map(node.getProperOutputs()),
 +                Optional.empty(),
 +                node.isPruneWhenEmpty(),
 +                ImmutableList.of(),
 +                ImmutableList.of(),
 +                Optional.empty(),
 +                node.getArguments()),
 +            mapping);
 +      }
 +
 +      PlanAndMappings rewrittenSource = node.getChild().accept(this, context);
 +      Map<Symbol, Symbol> mapping = new 
HashMap<>(rewrittenSource.getMappings());
 +      SymbolMapper mapper = symbolMapper(mapping);
 +
 +      List<TableFunctionNode.PassThroughSpecification> 
newPassThroughSpecification =
 +          node.getPassThroughSpecifications().stream()
 +              .map(
 +                  passThroughSpecification ->
 +                      new TableFunctionNode.PassThroughSpecification(
 +                          passThroughSpecification.isDeclaredAsPassThrough(),
 +                          passThroughSpecification.getColumns().stream()
 +                              .map(
 +                                  column ->
 +                                      new TableFunctionNode.PassThroughColumn(
 +                                          mapper.map(column.getSymbol()),
 +                                          column.isPartitioningColumn()))
 +                              .collect(toImmutableList())))
 +              .collect(Collectors.toList());
 +      List<List<Symbol>> newRequiredSymbols =
 +          
node.getRequiredSymbols().stream().map(mapper::map).collect(toImmutableList());
 +
 +      Optional<DataOrganizationSpecification> newSpecification =
 +          node.getDataOrganizationSpecification().map(mapper::mapAndDistinct);
 +
 +      TableFunctionProcessorNode rewrittenTableFunctionProcessor =
 +          new TableFunctionProcessorNode(
 +              node.getPlanNodeId(),
 +              node.getName(),
 +              mapper.map(node.getProperOutputs()),
 +              Optional.of(rewrittenSource.getRoot()),
 +              node.isPruneWhenEmpty(),
 +              newPassThroughSpecification,
 +              newRequiredSymbols,
 +              newSpecification,
 +              node.getArguments());
 +
 +      return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping);
 +    }
    }
  
    private static class UnaliasContext {

Reply via email to