This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 602a726ae635d3d03c685cc5d999f2b11db518a7 Merge: 87fc892d072 263443dcb53 Author: Chen YZ <[email protected]> AuthorDate: Mon Feb 3 16:06:46 2025 +0800 merge master .github/workflows/cluster-it-1c1d.yml | 2 +- .github/workflows/cluster-it-1c1d1a.yml | 2 +- .github/workflows/cluster-it-1c3d.yml | 2 +- .github/workflows/compile-check.yml | 2 +- .github/workflows/daily-it.yml | 2 +- .github/workflows/daily-ut.yml | 2 +- .github/workflows/dependency-check.yml | 2 +- .github/workflows/pipe-it-2cluster.yml | 2 +- .github/workflows/sonar-codecov.yml | 2 +- .github/workflows/table-cluster-it-1c1d.yml | 2 +- .github/workflows/table-cluster-it-1c3d.yml | 2 +- .github/workflows/unit-test.yml | 2 +- .github/workflows/vulnerability-check.yml | 2 +- .mvn/develocity.xml | 3 +- .mvn/extensions.xml | 4 +- .../java/org/apache/iotdb/CountPointProcessor.java | 2 + .../iotdb/AlignedTimeseriesSessionExample.java | 67 +- .../org/apache/iotdb/SessionConcurrentExample.java | 23 - .../main/java/org/apache/iotdb/SessionExample.java | 107 +--- .../org/apache/iotdb/trigger/LoggerTrigger.java | 12 +- .../iotdb/trigger/StatisticsUpdaterTrigger.java | 4 +- .../apache/iotdb/udf/AggregateFunctionExample.java | 35 +- .../apache/iotdb/udf/ScalarFunctionExample.java | 57 +- .../db/query/udf/example/relational/AllSum.java | 49 +- .../query/udf/example/relational/ContainNull.java | 21 +- .../relational/{DatePlusOne.java => DatePlus.java} | 31 +- .../query/udf/example/relational/FirstTwoSum.java | 33 +- .../db/query/udf/example/relational/MyAvg.java | 32 +- .../db/query/udf/example/relational/MyCount.java | 19 +- .../java/org/apache/iotdb/it/env/EnvFactory.java | 5 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 49 ++ .../iotdb/it/env/cluster/node/AINodeWrapper.java | 5 + .../it/env/cluster/node/AbstractNodeWrapper.java | 12 + .../it/env/cluster/node/ConfigNodeWrapper.java | 5 + .../iotdb/it/env/cluster/node/DataNodeWrapper.java | 5 + .../iotdb/it/env/remote/env/RemoteServerEnv.java | 12 + .../org/apache/iotdb/it/utils/TsFileGenerator.java | 87 +-- .../iotdb/it/utils/TsFileTableGenerator.java | 186 ++++++ .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 7 + ...ionMigrateDataNodeCrashITFrameworkForIoTV1.java | 2 +- ...ionMigrateDataNodeCrashITFrameworkForIoTV2.java | 2 +- ...oTDBRegionOperationReliabilityITFramework.java} | 174 ++++-- .../IoTDBRegionGroupExpandAndShrinkForIoTV1IT.java | 157 +++++ .../commit/IoTDBRegionReconstructForIoTV1IT.java | 141 +++++ .../IoTDBRegionMigrateNormalITForIoTV2Batch.java | 4 +- .../IoTDBRegionMigrateOtherITForIoTV2Batch.java | 4 +- .../IoTDBRegionMigrateNormalITForIoTV2Stream.java | 4 +- .../IoTDBRegionMigrateOtherITForIoTV2Stream.java | 4 +- .../IoTDBRegionMigrateDataNodeCrashForIoTV1IT.java | 4 +- ...TDBRegionMigrateDataNodeCrashForIoTV2Batch.java | 4 +- ...DBRegionMigrateDataNodeCrashForIoTV2Stream.java | 4 +- .../IoTDBRegionMigrateClusterCrashIoTV1IT.java | 4 +- .../IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java | 4 +- .../IoTDBRegionMigrateClusterCrashIoTV2Batch.java | 4 +- ...oTDBRegionMigrateConfigNodeCrashIoTV2Batch.java | 4 +- .../IoTDBRegionMigrateClusterCrashIoTV2Stream.java | 4 +- ...TDBRegionMigrateConfigNodeCrashIoTV2Stream.java | 4 +- .../IoTDBRemoveDataNodeITFramework.java | 22 +- .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 80 ++- .../iotdb/db/it/IoTDBLoadTsFileWithModIT.java | 172 ++++++ .../it/IoTDBSyntaxConventionStringLiteralIT.java | 3 +- .../org/apache/iotdb/db/it/utils/TestUtils.java | 279 ++++++++- .../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 3 +- .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 10 +- .../pipe/it/autocreate/IoTDBPipeClusterIT.java | 98 +++ .../pipe/it/autocreate/IoTDBPipeIdempotentIT.java | 93 +++ .../pipe/it/autocreate/IoTDBPipeNullValueIT.java | 28 +- .../it/autocreate/IoTDBPipeSwitchStatusIT.java | 23 +- .../pipe/it/manual/IoTDBPipeReqAutoSliceIT.java | 8 +- .../pipe/it/manual/IoTDBPipeTableManualIT.java | 293 +++++++++ .../manual/IoTDBPipeTypeConversionISessionIT.java | 10 +- .../iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java | 30 +- .../pipe/it/tablemodel/IoTDBPipeAutoDropIT.java | 10 +- .../pipe/it/tablemodel/IoTDBPipeClusterIT.java | 65 ++ .../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java | 450 +++++++++++++- .../it/tablemodel/IoTDBPipeDoubleLivingIT.java | 329 ++++++++++ .../pipe/it/tablemodel/IoTDBPipeIsolationIT.java | 377 ++++++++++++ .../pipe/it/tablemodel/IoTDBPipeNullValueIT.java | 4 +- .../pipe/it/tablemodel/IoTDBPipeProtocolIT.java | 1 - .../it/tablemodel/IoTDBPipeSwitchStatusIT.java | 23 +- .../pipe/it/tablemodel/IoTDBPipeSyntaxIT.java | 35 +- .../IoTDBPipeTypeConversionISessionIT.java | 97 ++- .../pipe/it/tablemodel/IoTDBPipeWithLoadIT.java | 8 +- .../iotdb/pipe/it/tablemodel/TableModelUtils.java | 614 +++++++++++++++---- .../relational/it/db/it/IoTDBDeletionTableIT.java | 2 +- .../relational/it/db/it/IoTDBInsertTableIT.java | 2 +- .../it/db/it/udf/IoTDBSQLFunctionManagementIT.java | 19 +- .../udf/IoTDBUserDefinedAggregateFunctionIT.java | 53 +- .../it/udf/IoTDBUserDefinedScalarFunctionIT.java | 2 +- .../IoTDBAlignByDeviceWithTemplateTableIT.java | 4 +- .../scalar/IoTDBFormatFunctionTableIT.java | 21 +- ...oTDBNoSelectExpressionAfterAnalyzedTableIT.java | 6 +- .../query/old/query/IoTDBNullValueFillTableIT.java | 327 ---------- .../it/query/old/query/IoTDBResultSetTableIT.java | 2 +- .../query/old/query/IoTDBSelectSchemaTableIT.java | 22 +- .../it/query/recent/IoTDBTableAggregationIT.java | 656 +++++++++++++++++++- .../recent/subquery/SubqueryDataSetUtils.java | 12 +- .../IoTDBUncorrelatedInPredicateSubqueryIT.java | 319 ++++++++++ .../IoTDBUncorrelatedQuantifiedComparisonIT.java | 674 +++++++++++++++++++++ .../IoTDBUncorrelatedScalarSubqueryIT.java | 4 +- .../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java | 7 +- .../it/rest/it/IoTDBRestServiceFlushQueryIT.java | 6 +- .../relational/it/rest/it/IoTDBRestServiceIT.java | 3 +- ...IT.java => IoTDBRestServiceInsertValuesIT.java} | 20 +- .../relational/it/schema/IoTDBDatabaseIT.java | 41 ++ .../iotdb/relational/it/schema/IoTDBDeviceIT.java | 47 +- .../iotdb/relational/it/schema/IoTDBTableIT.java | 122 ++-- .../it/session/IoTDBSessionRelationalIT.java | 29 +- .../iotdb/session/it/IoTDBSessionComplexIT.java | 8 +- .../iotdb/session/it/IoTDBSessionSimpleIT.java | 78 ++- .../session/it/IoTDBSessionSyntaxConventionIT.java | 18 +- .../org/apache/iotdb/session/it/SessionIT.java | 32 +- .../it/local/IoTDBSubscriptionDataTypeIT.java | 8 +- .../java/org/apache/iotdb/util/MagicUtils.java | 61 ++ .../java/org/apache/iotdb/pipe/api/PipePlugin.java | 37 ++ .../iotdb/pipe/api/annotation/TableModel.java | 23 +- .../iotdb/pipe/api/annotation/TreeModel.java | 23 +- .../parameter/PipeParameterValidator.java | 42 +- .../analysis/AggregateFunctionAnalysis.java | 62 ++ .../api/customizer/analysis/FunctionAnalysis.java} | 8 +- .../analysis/ScalarFunctionAnalysis.java | 51 ++ ...ctionParameters.java => FunctionArguments.java} | 28 +- .../exception/UDFArgumentNotValidException.java} | 9 +- .../udf/api/relational/AggregateFunction.java | 48 +- .../iotdb/udf/api/relational/ScalarFunction.java | 47 +- .../apache/iotdb/tool/data/AbstractDataTool.java | 32 +- .../org/apache/iotdb/tool/data/ExportData.java | 43 +- .../org/apache/iotdb/tool/data/ImportData.java | 24 +- .../org/apache/iotdb/tool/tsfile/ExportTsFile.java | 3 - .../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 + .../java/org/apache/iotdb/session/Session.java | 27 +- .../payload/SubscriptionFileHandler.java | 24 +- .../payload/SubscriptionSessionDataSet.java | 12 +- .../org/apache/iotdb/session/util/RetryUtils.java | 30 +- .../apache/iotdb/session/util/SessionUtils.java | 25 +- .../java/org/apache/iotdb/session/TabletTest.java | 28 +- .../iotdb/session/util/SessionUtilsTest.java | 16 +- .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 16 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 8 + .../consensus/request/ConfigPhysicalPlan.java | 10 +- .../consensus/request/ConfigPhysicalPlanType.java | 4 + .../request/ConfigPhysicalPlanVisitor.java | 110 +++- .../table/DescTable4InformationSchemaPlan.java} | 12 +- .../table/ShowTable4InformationSchemaPlan.java} | 16 +- .../write/pipe/payload/PipeCreateTablePlan.java} | 18 +- .../pipe/payload/PipeDeactivateTemplatePlan.java | 22 +- .../write/pipe/payload/PipeDeleteDevicesPlan.java | 130 ++++ .../request/write/table/PreCreateTablePlan.java | 11 +- .../response/pipe/plugin/PipePluginTableResp.java | 27 +- .../response/pipe/task/PipeTableResp.java | 7 + .../DescTable4InformationSchemaResp.java} | 29 +- .../ShowTable4InformationSchemaResp.java} | 29 +- .../iotdb/confignode/manager/ClusterManager.java | 2 +- .../iotdb/confignode/manager/ConfigManager.java | 66 +- .../apache/iotdb/confignode/manager/IManager.java | 42 +- .../iotdb/confignode/manager/ProcedureManager.java | 491 +++++++++++---- .../payload/PipeTransferConfigSnapshotSealReq.java | 32 +- .../protocol/IoTDBConfigRegionAirGapConnector.java | 8 + .../protocol/IoTDBConfigRegionConnector.java | 10 +- .../coordinator/plugin/PipePluginCoordinator.java | 29 + .../pipe/coordinator/task/PipeTaskCoordinator.java | 74 ++- .../pipe/event/PipeConfigRegionSnapshotEvent.java | 3 +- .../extractor/ConfigRegionListeningFilter.java | 89 +-- .../pipe/extractor/ConfigRegionListeningQueue.java | 23 +- .../pipe/extractor/IoTDBConfigRegionExtractor.java | 74 ++- ...ConfigPhysicalPlanTablePatternParseVisitor.java | 135 +++++ ...ConfigPhysicalPlanTreePatternParseVisitor.java} | 6 +- .../receiver/protocol/IoTDBConfigNodeReceiver.java | 170 +++++- .../PipeConfigPhysicalPlanTSStatusVisitor.java | 76 +++ .../manager/schema/ClusterSchemaManager.java | 158 +++-- .../persistence/executor/ConfigPlanExecutor.java | 5 + .../confignode/persistence/pipe/PipeInfo.java | 2 +- .../persistence/pipe/PipePluginInfo.java | 39 +- .../confignode/persistence/pipe/PipeTaskInfo.java | 28 +- .../schema/CNPhysicalPlanGenerator.java | 77 ++- .../persistence/schema/ClusterSchemaInfo.java | 77 +++ .../confignode/persistence/schema/ConfigMTree.java | 29 +- .../schema/ConfignodeSnapshotParser.java | 30 +- .../procedure/env/ConfigNodeProcedureEnv.java | 23 +- .../procedure/env/RegionMaintainHandler.java | 13 +- .../impl/pipe/task/AlterPipeProcedureV2.java | 4 +- .../impl/pipe/task/CreatePipeProcedureV2.java | 42 +- .../impl/region/AddRegionPeerProcedure.java | 84 +-- .../impl/region/ReconstructRegionProcedure.java | 209 +++++++ .../impl/region/RegionMigrateProcedure.java | 39 +- .../impl/region/RegionOperationProcedure.java | 39 +- .../impl/region/RemoveRegionPeerProcedure.java | 52 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../impl/schema/DeleteTimeSeriesProcedure.java | 4 +- .../table/AbstractAlterOrDropTableProcedure.java | 10 +- .../impl/schema/table/AddTableColumnProcedure.java | 16 +- .../impl/schema/table/CreateTableProcedure.java | 20 +- .../impl/schema/table/DeleteDevicesProcedure.java | 43 +- .../schema/table/DropTableColumnProcedure.java | 21 +- .../impl/schema/table/DropTableProcedure.java | 24 +- .../schema/table/RenameTableColumnProcedure.java | 18 +- .../schema/table/SetTablePropertiesProcedure.java | 18 +- .../impl/trigger/CreateTriggerProcedure.java | 9 +- .../procedure/state/ReconstructRegionState.java | 10 +- .../procedure/store/ProcedureFactory.java | 40 +- .../confignode/procedure/store/ProcedureType.java | 12 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 52 +- .../request/ConfigPhysicalPlanSerDeTest.java | 40 ++ .../response/pipe/PipePluginTableRespTest.java | 4 +- .../connector/PipeConfigNodeThriftRequestTest.java | 12 +- ...igPhysicalPlanTablePatternParseVisitorTest.java | 133 ++++ ...igPhysicalPlanTreePatternParseVisitorTest.java} | 44 +- .../pipe/annotation/PipePluginAnnotationTest.java} | 23 +- .../pipe/receiver/PipeEnrichedProcedureTest.java | 194 ++++++ .../schema/table/AddTableColumnProcedureTest.java | 8 +- .../schema/table/CreateTableProcedureTest.java | 5 +- .../schema/table/DeleteDevicesProcedureTest.java | 5 +- .../schema/table/DropTableColumnProcedureTest.java | 4 +- .../impl/schema/table/DropTableProcedureTest.java | 4 +- .../table/RenameTableColumnProcedureTest.java | 4 +- .../table/SetTablePropertiesProcedureTest.java | 6 +- .../apache/iotdb/consensus/pipe/PipeConsensus.java | 57 +- .../apache/iotdb/consensus/ratis/RatisClient.java | 6 +- .../apache/iotdb/consensus/ratis/utils/Utils.java | 1 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +- .../dataregion/DataExecutionVisitor.java | 9 +- .../schemaregion/SchemaExecutionVisitor.java | 15 +- .../schemaregion/SchemaRegionStateMachine.java | 15 +- .../LoadAnalyzeException.java} | 8 +- .../LoadAnalyzeTableColumnDisorderException.java} | 7 +- .../LoadAnalyzeTypeMismatchException.java} | 6 +- .../pipe/agent/plugin/PipeDataNodePluginAgent.java | 20 +- .../dataregion/PipeDataRegionPluginAgent.java | 49 ++ .../agent/task/connection/PipeEventCollector.java | 35 +- .../client/IoTDBDataNodeAsyncClientManager.java | 2 +- .../batch/PipeTabletEventTsFileBatch.java | 384 +++--------- .../PipeTransferDataNodeHandshakeV1Req.java | 10 +- .../evolvable/request/PipeTransferPlanNodeReq.java | 18 +- .../request/PipeTransferSchemaSnapshotSealReq.java | 94 ++- .../request/PipeTransferTabletInsertNodeReqV2.java | 2 +- .../request/PipeTransferTabletRawReq.java | 4 +- .../request/PipeTransferTabletRawReqV2.java | 9 +- .../request/PipeTransferTsFileSealWithModReq.java | 42 +- .../airgap/IoTDBDataNodeAirGapConnector.java | 4 + .../airgap/IoTDBDataRegionAirGapConnector.java | 4 + .../airgap/IoTDBSchemaRegionAirGapConnector.java | 19 +- .../protocol/legacy/IoTDBLegacyPipeConnector.java | 2 + .../connector/protocol/opcua/OpcUaConnector.java | 4 + .../connector/protocol/opcua/OpcUaNameSpace.java | 44 +- .../pipeconsensus/PipeConsensusAsyncConnector.java | 4 + .../pipeconsensus/PipeConsensusSyncConnector.java | 4 + .../request/PipeConsensusDeleteNodeReq.java | 4 +- .../async/IoTDBDataRegionAsyncConnector.java | 14 +- .../PipeTransferTabletBatchEventHandler.java | 2 +- .../async/handler/PipeTransferTsFileHandler.java | 13 +- .../thrift/sync/IoTDBDataNodeSyncConnector.java | 4 + .../thrift/sync/IoTDBDataRegionSyncConnector.java | 23 +- .../thrift/sync/IoTDBSchemaRegionConnector.java | 26 +- .../protocol/websocket/WebSocketConnector.java | 2 + .../protocol/writeback/WriteBackConnector.java | 199 +++++- .../util/builder/PipeTableModeTsFileBuilder.java | 273 +++++++++ .../util/builder/PipeTreeModelTsFileBuilder.java | 268 ++++++++ .../connector/util/builder/PipeTsFileBuilder.java | 162 +++++ .../util/{ => cacher}/LeaderCacheUtils.java | 2 +- .../sorter/PipeTableModelTabletEventSorter.java | 273 +++++++++ .../util/{ => sorter}/PipeTabletEventSorter.java | 107 +--- .../sorter/PipeTreeModelTabletEventSorter.java | 123 ++++ .../pipe/consensus/deletion/DeletionResource.java | 2 +- .../db/pipe/event/common/PipeInsertionEvent.java | 15 +- .../common/deletion/PipeDeleteDataNodeEvent.java | 24 +- .../db/pipe/event/common/row/PipeRowCollector.java | 2 +- .../schema/PipeSchemaRegionSnapshotEvent.java | 75 ++- .../schema/PipeSchemaSerializableEventType.java | 15 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 2 +- .../tablet/parser/TabletInsertionEventParser.java | 8 +- .../TabletInsertionEventTreePatternParser.java | 13 +- .../scan/TsFileInsertionEventScanParser.java | 42 +- .../table/TsFileInsertionEventTableParser.java | 2 +- .../dataregion/DataRegionListeningFilter.java | 21 +- .../dataregion/IoTDBDataRegionExtractor.java | 156 ++--- ...oricalDataRegionTsFileAndDeletionExtractor.java | 4 +- .../listener/PipeInsertionDataNodeListener.java | 27 +- .../schemaregion/IoTDBSchemaRegionExtractor.java | 28 +- .../PipePlanTablePatternParseVisitor.java | 77 +++ ...r.java => PipePlanTreePatternParseVisitor.java} | 7 +- .../schemaregion/SchemaRegionListeningFilter.java | 87 +-- .../schemaregion/SchemaRegionListeningQueue.java | 8 +- .../pipe/metric/PipeDataNodeReceiverMetrics.java | 48 +- .../processor/aggregate/AggregateProcessor.java | 2 + .../StandardStatisticsOperatorProcessor.java | 2 + .../processor/TumblingWindowingProcessor.java | 2 + .../changing/ChangingValueSamplingProcessor.java | 2 + .../sdt/SwingingDoorTrendingSamplingProcessor.java | 2 + .../tumbling/TumblingTimeSamplingProcessor.java | 2 + .../pipeconsensus/PipeConsensusProcessor.java | 4 + .../schemachange/RenameDatabaseProcessor.java | 11 +- .../twostage/plugin/TwoStageCountProcessor.java | 37 +- .../pipeconsensus/PipeConsensusReceiver.java | 33 +- .../protocol/thrift/IoTDBDataNodeReceiver.java | 223 +++++-- .../statement/PipeConvertedInsertRowStatement.java | 30 +- .../PipeConvertedInsertTabletStatement.java | 52 +- .../visitor/PipePlanToStatementVisitor.java | 54 +- .../PipeStatementTablePatternParseVisitor.java | 33 +- ...a => PipeStatementTreePatternParseVisitor.java} | 6 +- ...leStatementDataTypeConvertExecutionVisitor.java | 10 +- ...eeStatementDataTypeConvertExecutionVisitor.java | 3 +- ...r.java => PipeTreeStatementToBatchVisitor.java} | 2 +- .../pipe/resource/memory/PipeMemoryWeightUtil.java | 21 +- .../resource/tsfile/PipeTsFileResourceManager.java | 5 +- .../iotdb/db/protocol/client/ConfigNodeClient.java | 57 ++ .../impl/DataNodeInternalRPCServiceImpl.java | 2 + .../execution/executor/RegionWriteExecutor.java | 55 +- .../function/table/ExcludeColumnFunction.java | 4 +- .../execution/function/table/HOPTableFunction.java | 1 + .../operator/process/EnforceSingleRowOperator.java | 5 +- .../merge/comparator/JoinKeyComparatorFactory.java | 2 +- .../schema/source/TableDeviceQuerySource.java | 4 +- .../InformationSchemaContentSupplierFactory.java | 372 ++++++++++-- .../relational/MergeSortSemiJoinOperator.java | 228 +++++++ .../relational/aggregation/AccumulatorFactory.java | 265 +++++++- .../relational/aggregation/AggregationMask.java | 204 +++++++ .../relational/aggregation/AvgAccumulator.java | 122 +++- .../relational/aggregation/CountAccumulator.java | 22 +- ...IfAccumulator.java => CountAllAccumulator.java} | 48 +- .../relational/aggregation/CountIfAccumulator.java | 22 +- .../relational/aggregation/ExtremeAccumulator.java | 94 ++- .../relational/aggregation/FirstAccumulator.java | 158 +++-- .../relational/aggregation/FirstByAccumulator.java | 164 +++-- .../aggregation/FirstByDescAccumulator.java | 132 +++- .../aggregation/FirstDescAccumulator.java | 126 +++- .../relational/aggregation/LastAccumulator.java | 140 ++++- .../relational/aggregation/LastByAccumulator.java | 146 ++++- .../aggregation/LastByDescAccumulator.java | 150 ++++- .../aggregation/LastDescAccumulator.java | 144 ++++- ...dAccumulator.java => MaskedRecordIterator.java} | 40 +- .../relational/aggregation/MaxAccumulator.java | 140 ++++- .../relational/aggregation/MinAccumulator.java | 140 ++++- .../relational/aggregation/SumAccumulator.java | 114 +++- .../relational/aggregation/TableAccumulator.java | 2 +- .../relational/aggregation/TableAggregator.java | 5 +- .../aggregation/TableMaxMinByBaseAccumulator.java | 146 +++-- .../aggregation/TableModeAccumulator.java | 192 ++++-- .../aggregation/TableVarianceAccumulator.java | 158 +++-- .../UserDefinedAggregateFunctionAccumulator.java | 35 +- .../aggregation/grouped/GroupedAccumulator.java | 4 +- .../aggregation/grouped/GroupedAggregator.java | 6 +- .../aggregation/grouped/GroupedAvgAccumulator.java | 119 +++- .../grouped/GroupedCountAccumulator.java | 22 +- .../grouped/GroupedCountIfAccumulator.java | 22 +- .../grouped/GroupedExtremeAccumulator.java | 95 ++- .../grouped/GroupedFirstAccumulator.java | 153 ++++- .../grouped/GroupedFirstByAccumulator.java | 191 ++++-- .../grouped/GroupedLastAccumulator.java | 133 +++- .../grouped/GroupedLastByAccumulator.java | 150 ++++- .../aggregation/grouped/GroupedMaxAccumulator.java | 141 ++++- .../grouped/GroupedMaxMinByBaseAccumulator.java | 147 ++++- .../aggregation/grouped/GroupedMinAccumulator.java | 141 ++++- .../grouped/GroupedModeAccumulator.java | 246 ++++++-- .../aggregation/grouped/GroupedSumAccumulator.java | 111 +++- .../GroupedUserDefinedAggregateAccumulator.java | 14 +- .../grouped/GroupedVarianceAccumulator.java | 159 +++-- .../aggregation/grouped/hash/GroupByHash.java | 2 + .../aggregation/grouped/hash/MarkDistinctHash.java | 90 +++ .../relational/ColumnTransformerBuilder.java | 20 +- .../iotdb/db/queryengine/plan/Coordinator.java | 8 + .../db/queryengine/plan/analyze/AnalyzeUtils.java | 9 +- .../queryengine/plan/analyze/AnalyzeVisitor.java | 30 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 27 +- .../analyze/load/LoadTsFileTableSchemaCache.java | 17 +- .../load/LoadTsFileToTableModelAnalyzer.java | 34 +- .../load/LoadTsFileToTreeModelAnalyzer.java | 22 +- .../analyze/load/LoadTsFileTreeSchemaCache.java | 3 +- .../load/TreeSchemaAutoCreatorAndVerifier.java | 30 +- .../execution/config/TableConfigTaskVisitor.java | 141 ++++- .../execution/config/TreeConfigTaskVisitor.java | 30 +- .../config/executor/ClusterConfigTaskExecutor.java | 159 ++++- .../config/executor/IConfigTaskExecutor.java | 19 +- .../config/metadata/DropPipePluginTask.java | 13 +- .../config/metadata/ShowPipePluginsTask.java | 20 +- .../ExtendRegionTask.java} | 19 +- .../metadata/{ => region}/MigrateRegionTask.java | 4 +- .../ReconstructRegionTask.java} | 19 +- .../RemoveRegionTask.java} | 19 +- .../config/metadata/relational/ShowDBTask.java | 4 +- .../metadata/relational/ShowTablesDetailsTask.java | 47 +- .../config/metadata/relational/ShowTablesTask.java | 35 +- .../execution/config/sys/pipe/AlterPipeTask.java | 8 +- .../execution/config/sys/pipe/DropPipeTask.java | 7 +- .../execution/config/sys/pipe/ShowPipeTask.java | 3 +- .../execution/config/sys/pipe/StartPipeTask.java | 7 +- .../execution/config/sys/pipe/StopPipeTask.java | 7 +- .../db/queryengine/plan/parser/ASTVisitor.java | 38 +- .../plan/planner/LogicalPlanVisitor.java | 7 +- .../plan/planner/TableOperatorGenerator.java | 95 ++- .../plan/planner/plan/node/PlanGraphPrinter.java | 12 + .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../node/metadata/write/AlterTimeSeriesNode.java | 9 +- .../node/metadata/write/CreateTimeSeriesNode.java | 74 +-- .../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 66 +- .../plan/node/pipe/PipeEnrichedInsertNode.java | 44 +- .../plan/node/pipe/PipeEnrichedWritePlanNode.java | 23 +- .../relational/analyzer/ExpressionAnalyzer.java | 37 +- .../plan/relational/analyzer/Scope.java | 8 +- .../relational/analyzer/StatementAnalyzer.java | 116 ++-- .../plan/relational/metadata/Metadata.java | 4 +- .../relational/metadata/TableMetadataImpl.java | 53 +- .../metadata/fetcher/TableDeviceSchemaFetcher.java | 14 +- .../fetcher/TableHeaderSchemaValidator.java | 11 +- .../plan/relational/planner/IrTypeAnalyzer.java | 15 +- .../plan/relational/planner/QueryPlanner.java | 2 +- .../plan/relational/planner/RelationPlanner.java | 91 +-- .../relational/planner/SimplePlanRewriter.java | 88 +++ .../plan/relational/planner/SubqueryPlanner.java | 84 ++- .../relational/planner/TableLogicalPlanner.java | 15 +- .../distribute/TableDistributedPlanGenerator.java | 24 + .../planner/iterative/rule/PruneApplyColumns.java | 138 +++++ .../iterative/rule/PruneApplyCorrelation.java | 70 +++ .../iterative/rule/PruneApplySourceColumns.java | 95 +++ .../rule/RemoveUnreferencedScalarApplyNodes.java | 42 ++ .../rule/RemoveUnreferencedScalarSubqueries.java | 70 +++ .../TransformFilteringSemiJoinToInnerJoin.java | 150 +++++ ...mUncorrelatedInPredicateSubqueryToSemiJoin.java | 95 +++ .../plan/relational/planner/node/Patterns.java | 7 +- .../plan/relational/planner/node/SemiJoinNode.java | 181 ++++++ .../node/schema/CreateOrUpdateTableDeviceNode.java | 14 +- .../DataNodeLocationSupplierFactory.java | 11 + .../optimizations/LogicalOptimizeFactory.java | 26 +- .../PushAggregationIntoTableScan.java | 4 + .../optimizations/PushPredicateIntoTableScan.java | 235 ++++++- ...mQuantifiedComparisonApplyToCorrelatedJoin.java | 341 +++++++++++ .../optimizations/UnaliasSymbolReferences.java | 29 + .../sql/ast/AbstractQueryDeviceWithCache.java | 2 + .../relational/sql/ast/AbstractTraverseDevice.java | 8 + .../plan/relational/sql/ast/AlterPipe.java | 22 +- .../plan/relational/sql/ast/AstVisitor.java | 14 +- .../sql/ast/{Update.java => CountStatement.java} | 59 +- .../relational/sql/ast/CreateOrUpdateDevice.java | 31 +- .../plan/relational/sql/ast/CreatePipe.java | 4 +- .../plan/relational/sql/ast/Delete.java | 28 +- .../plan/relational/sql/ast/DeleteDevice.java | 2 - .../plan/relational/sql/ast/DropColumn.java | 3 - .../plan/relational/sql/ast/DropPipe.java | 10 +- .../plan/relational/sql/ast/LoadTsFile.java | 9 + .../plan/relational/sql/ast/PipeEnriched.java | 8 +- .../plan/relational/sql/ast/ShowPipes.java | 10 +- .../plan/relational/sql/ast/StartPipe.java | 8 +- .../ast/{Statement.java => StartRepairData.java} | 12 +- .../plan/relational/sql/ast/Statement.java | 4 +- .../plan/relational/sql/ast/StopPipe.java | 8 +- .../ast/{Statement.java => StopRepairData.java} | 12 +- .../plan/relational/sql/ast/Update.java | 6 +- .../plan/relational/sql/parser/AstBuilder.java | 67 +- .../plan/relational/sql/rewrite/ShowRewrite.java | 51 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 9 +- .../plan/statement/StatementVisitor.java | 18 +- .../plan/statement/crud/InsertBaseStatement.java | 22 +- .../plan/statement/crud/InsertRowStatement.java | 18 +- .../plan/statement/crud/LoadTsFileStatement.java | 9 + .../metadata/AlterTimeSeriesStatement.java | 7 +- .../metadata/pipe/AlterPipeStatement.java | 31 +- .../metadata/pipe/DropPipePluginStatement.java | 17 +- .../statement/metadata/pipe/DropPipeStatement.java | 23 +- .../metadata/pipe/ShowPipePluginsStatement.java | 23 +- .../metadata/pipe/ShowPipesStatement.java | 18 +- .../metadata/pipe/StartPipeStatement.java | 17 +- .../statement/metadata/pipe/StopPipeStatement.java | 17 +- .../ExtendRegionStatement.java} | 32 +- .../{ => region}/MigrateRegionStatement.java | 2 +- .../ReconstructRegionStatement.java} | 42 +- .../RemoveRegionStatement.java} | 32 +- .../plan/statement/pipe/PipeEnrichedStatement.java | 10 +- .../attribute/DeviceAttributeStore.java | 10 +- .../attribute/IDeviceAttributeStore.java | 6 +- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 136 +++-- .../schemaregion/mtree/impl/mem/MemMTreeStore.java | 52 +- .../mtree/impl/mem/mnode/basic/BasicMNode.java | 22 +- .../impl/mem/snapshot/MemMTreeSnapshotUtil.java | 37 +- .../schemaengine/table/InformationSchemaUtils.java | 11 +- .../db/service/metrics/DataNodeMetricsHelper.java | 4 +- .../metrics/memory/ConsensusMemoryMetrics.java | 73 +++ .../metrics/memory/GlobalMemoryMetrics.java | 132 ++++ .../metrics/memory/QueryEngineMemoryMetrics.java | 179 ++++++ .../metrics/memory/SchemaEngineMemoryMetrics.java | 124 ++++ .../metrics/memory/StorageEngineMemoryMetrics.java | 210 +++++++ .../metrics/memory/StreamEngineMemoryMetrics.java | 73 +++ .../metrics/memory/ThresholdMemoryMetrics.java | 194 ------ .../db/storageengine/dataregion/DataRegion.java | 123 ++-- .../task/InsertionCrossSpaceCompactionTask.java | 3 - .../utils/CompactionTableSchemaCollector.java | 2 +- .../compaction/repair/RepairDataFileScanUtil.java | 30 +- .../schedule/CompactionScheduleContext.java | 21 + .../compaction/schedule/CompactionScheduler.java | 60 +- .../impl/RewriteCrossSpaceCompactionSelector.java | 11 +- .../dataregion/memtable/TsFileProcessor.java | 4 + .../dataregion/modification/ModificationFile.java | 89 ++- .../modification/PartitionLevelModFileManager.java | 5 +- .../dataregion/snapshot/SnapshotTaker.java | 2 +- .../dataregion/tsfile/TsFileResource.java | 10 +- .../tsfile/timeindex/ArrayDeviceTimeIndex.java | 20 + .../db/storageengine/load/LoadTsFileManager.java | 23 +- .../load/active/ActiveLoadDirScanner.java | 27 +- .../load/active/ActiveLoadTsFileLoader.java | 13 +- .../LoadConvertedInsertTabletStatement.java | 18 +- ...leStatementDataTypeConvertExecutionVisitor.java | 10 +- ...eeStatementDataTypeConvertExecutionVisitor.java | 3 +- .../converter/LoadTsFileDataTypeConverter.java | 78 ++- .../load/splitter/TsFileSplitter.java | 5 +- .../broker/SubscriptionPrefetchingQueue.java | 2 + .../db/subscription/event/SubscriptionEvent.java | 21 +- .../batch/SubscriptionPipeTabletEventBatch.java | 24 +- .../batch/SubscriptionPipeTsFileEventBatch.java | 11 +- .../db/tools/schema/SRStatementGenerator.java | 208 +++++-- .../tools/schema/SchemaRegionSnapshotParser.java | 32 +- .../iotdb/db/utils/constant/SqlConstant.java | 1 + .../iotdb/db/utils/datastructure/BinaryTVList.java | 6 +- .../db/utils/datastructure/BooleanTVList.java | 6 +- .../iotdb/db/utils/datastructure/DoubleTVList.java | 6 +- .../iotdb/db/utils/datastructure/FloatTVList.java | 6 +- .../iotdb/db/utils/datastructure/IntTVList.java | 6 +- .../iotdb/db/utils/datastructure/LongTVList.java | 6 +- .../connector/PipeDataNodeThriftRequestTest.java | 22 +- .../PipeStatementTablePatternParseVisitorTest.java | 57 ++ ... PipeStatementTreePatternParseVisitorTest.java} | 20 +- .../pipe/connector/PipeTabletEventSorterTest.java | 263 ++++++-- .../db/pipe/consensus/DeletionResourceTest.java | 39 +- .../pipe/event/PipeTabletInsertionEventTest.java | 25 +- .../PipePlanTablePatternParseVisitorTest.java | 129 ++++ ...va => PipePlanTreePatternParseVisitorTest.java} | 32 +- .../annotation/PipePluginAnnotationTest.java} | 23 +- .../resource/PipeTsFileResourceManagerTest.java | 2 +- .../metadata/write/MetadataWriteNodeSerDeTest.java | 2 +- .../plan/relational/analyzer/DistinctTest.java | 91 +++ .../plan/relational/planner/SubqueryTest.java | 310 ++++++++++ .../planner/assertions/PlanMatchPattern.java | 17 + .../planner/assertions/SemiJoinMatcher.java | 79 +++ .../storageengine/buffer/BloomFilterCacheTest.java | 5 +- .../storageengine/dataregion/DataRegionTest.java | 24 +- .../compaction/CompactionValidationTest.java | 7 +- .../InsertionCrossSpaceCompactionSelectorTest.java | 4 +- .../cross/InsertionCrossSpaceCompactionTest.java | 64 +- .../inner/InnerSpaceCompactionSelectorTest.java | 2 +- .../repair/RepairDataFileScanUtilTest.java | 2 +- .../TableModelFastCompactionPerformerTest.java | 13 +- ...TableModelReadChunkCompactionPerformerTest.java | 13 +- ...TableModelReadPointCompactionPerformerTest.java | 13 +- .../modification/ModificationFileTest.java | 90 ++- .../reader/chunk/MemAlignedChunkLoaderTest.java | 2 +- .../read/reader/chunk/MemChunkLoaderTest.java | 12 +- .../file/UnsealedTsFileRecoverPerformerTest.java | 4 +- .../iotdb/db/tools/TsFileAndModSettleToolTest.java | 2 +- .../iotdb/db/tools/TsFileSelfCheckToolTest.java | 6 +- .../iotdb/db/tools/TsFileSketchToolTest.java | 10 +- .../db/utils/SchemaRegionSnapshotParserTest.java | 200 ++++-- iotdb-core/node-commons/pom.xml | 4 + .../conf/iotdb-system.properties.template | 6 +- .../exception}/table/ColumnNotExistsException.java | 2 +- .../table/TableAlreadyExistsException.java | 2 +- .../exception}/table/TableNotExistsException.java | 2 +- .../commons/pipe/agent/plugin/PipePluginAgent.java | 17 +- .../builtin/connector/PlaceholderConnector.java | 4 + .../connector/donothing/DoNothingConnector.java | 4 + .../iotdb/airgap/IoTDBAirGapConnector.java | 4 + .../consensus/PipeConsensusAsyncConnector.java | 4 + .../iotdb/thrift/IoTDBLegacyPipeConnector.java | 2 + .../iotdb/thrift/IoTDBThriftAsyncConnector.java | 5 + .../iotdb/thrift/IoTDBThriftConnector.java | 4 + .../iotdb/thrift/IoTDBThriftSslConnector.java | 4 + .../iotdb/thrift/IoTDBThriftSyncConnector.java | 5 + .../builtin/connector/opcua/OpcUaConnector.java | 4 + .../connector/websocket/WebSocketConnector.java | 2 + .../connector/writeback/WriteBackConnector.java | 4 + .../extractor/donothing/DoNothingExtractor.java | 4 + .../builtin/extractor/iotdb/IoTDBExtractor.java | 4 + .../processor/donothing/DoNothingProcessor.java | 4 + .../throwing/ThrowingExceptionProcessor.java | 4 + .../agent/plugin/meta/PipePluginMetaKeeper.java | 47 +- .../commons/pipe/agent/task/PipeTaskAgent.java | 2 +- .../pipe/agent/task/meta/PipeMetaKeeper.java | 12 +- .../pipe/agent/task/meta/PipeStaticMeta.java | 14 + .../config/constant/PipeExtractorConstant.java | 3 + .../connector/client/IoTDBSyncClientManager.java | 28 +- .../payload/thrift/request/PipeRequestType.java | 4 +- .../thrift/request/PipeTransferFileSealReqV2.java | 13 +- .../connector/protocol/IoTDBAirGapConnector.java | 14 +- .../pipe/connector/protocol/IoTDBConnector.java | 10 + .../connector/protocol/IoTDBSslSyncConnector.java | 4 + .../options/PipeInclusionOptions.java | 43 +- .../pipe/datastructure/pattern/TablePattern.java | 29 +- .../pipe/datastructure/pattern/TreePattern.java | 25 +- .../{ => resource}/PersistentResource.java | 2 +- .../commons/pipe/datastructure/result/Result.java} | 37 +- .../pipe/datastructure/visibility/Visibility.java | 9 +- .../visibility/VisibilityTestUtils.java | 135 +++++ .../datastructure/visibility/VisibilityUtils.java | 133 ++++ .../commons/pipe/extractor/IoTDBExtractor.java | 103 +++- .../extractor/IoTDBNonDataRegionExtractor.java | 24 +- .../commons/pipe/receiver/IoTDBFileReceiver.java | 49 +- .../commons/pipe/receiver/IoTDBReceiverAgent.java | 7 +- .../snapshot/PipeSnapshotResourceManager.java | 9 +- .../apache/iotdb/commons/schema/node/IMNode.java | 18 +- .../commons/schema/node/visitor/MNodeVisitor.java | 6 +- .../iotdb/commons/service/metric/enums/Metric.java | 3 +- .../iotdb/commons/udf/access/RecordIterator.java | 9 +- .../TableBuiltinAggregationFunction.java | 2 + .../org/apache/iotdb/commons/utils/FileUtils.java | 49 ++ .../org/apache/iotdb/commons/utils/PathUtils.java | 2 +- .../apache/iotdb/commons/utils/RetryUtils.java} | 30 +- .../annotation/PipePluginAnnotationTest.java} | 23 +- .../db/relational/grammar/sql/RelationalSql.g4 | 11 +- .../thrift-commons/src/main/thrift/common.thrift | 3 +- .../src/main/thrift/confignode.thrift | 69 +++ pom.xml | 6 +- 608 files changed, 23175 insertions(+), 6095 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java index 2b3998365a5,00000000000..e01428e4736 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/ExcludeColumnFunction.java @@@ -1,105 -1,0 +1,103 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.function.table; + +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class ExcludeColumnFunction implements TableFunction { + private final String TBL_PARAM = "DATA"; + private final String COL_PARAM = "EXCLUDE"; + + @Override + public List<ParameterSpecification> getArgumentsSpecification() { + return Arrays.asList( + TableParameterSpecification.builder().name(TBL_PARAM).rowSemantics().build(), + ScalarParameterSpecification.builder().name(COL_PARAM).type(Type.STRING).build()); + } + + @Override + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { + TableArgument tableArgument = (TableArgument) arguments.get(TBL_PARAM); + if (tableArgument == null) { + throw new UDFException("Table argument is missing"); + } + String excludeColumn = (String) ((ScalarArgument) arguments.get(COL_PARAM)).getValue(); + ImmutableList.Builder<Integer> requiredColumns = ImmutableList.builder(); + DescribedSchema.Builder schemaBuilder = DescribedSchema.builder(); + for (int i = 0; i < tableArgument.getFieldNames().size(); i++) { + Optional<String> fieldName = tableArgument.getFieldNames().get(i); + if (!fieldName.isPresent() || !fieldName.get().equalsIgnoreCase(excludeColumn)) { + requiredColumns.add(i); - schemaBuilder.addField( - fieldName, - tableArgument.getFieldTypes().get(i)); ++ schemaBuilder.addField(fieldName, tableArgument.getFieldTypes().get(i)); + } + } + return TableFunctionAnalysis.builder() + .properColumnSchema(schemaBuilder.build()) + .requiredColumns(TBL_PARAM, requiredColumns.build()) + .build(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionDataProcessor getDataProcessor() { + return new TableFunctionDataProcessor() { + @Override + public void process(Record input, List<ColumnBuilder> columnBuilders) { + for (int i = 0; i < input.size(); i++) { + if (input.isNull(i)) { + columnBuilders.get(i).appendNull(); + } else { + columnBuilders.get(i).writeObject(input.getObject(i)); + } + } + } + + @Override + public void finish(List<ColumnBuilder> columnBuilders) { + // do nothing + } + }; + } + }; + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java index 6bf8df15f04,00000000000..4da85a53f12 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java @@@ -1,176 -1,0 +1,177 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.function.table; + +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.util.Arrays; ++import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class HOPTableFunction implements TableFunction { + + private static final String DATA_PARAMETER_NAME = "DATA"; + private static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; + private static final String SLIDE_PARAMETER_NAME = "SLIDE"; + private static final String SIZE_PARAMETER_NAME = "SIZE"; + private static final String START_PARAMETER_NAME = "START"; + + @Override + public List<ParameterSpecification> getArgumentsSpecification() { + return Arrays.asList( + TableParameterSpecification.builder() + .name(DATA_PARAMETER_NAME) + .passThroughColumns() + .keepWhenEmpty() + .build(), + ScalarParameterSpecification.builder() + .name(TIMECOL_PARAMETER_NAME) + .type(Type.STRING) + .build(), + ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(), + ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(), + ScalarParameterSpecification.builder() + .name(START_PARAMETER_NAME) + .type(Type.TIMESTAMP) + .defaultValue(Long.MIN_VALUE) + .build()); + } + + private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) { + int requiredIndex = -1; + for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { + Optional<String> fieldName = tableArgument.getFieldNames().get(i); + if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { + requiredIndex = i; + break; + } + } + return requiredIndex; + } + + // TODO: ImmutableMap + @Override + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { + TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); + String expectedFieldName = + (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); + int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); + if (requiredIndex == -1) { + throw new UDFException("The required field is not found in the input table"); + } + DescribedSchema properColumnSchema = + new DescribedSchema.Builder() + .addField("window_start", Type.TIMESTAMP) + .addField("window_end", Type.TIMESTAMP) + .build(); + + // outputColumnSchema + return TableFunctionAnalysis.builder() + .properColumnSchema(properColumnSchema) + .requiredColumns( + DATA_PARAMETER_NAME, + IntStream.range(0, tableArgument.getFieldTypes().size()) + .boxed() + .collect(Collectors.toList())) + .build(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); + String expectedFieldName = + (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); + int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); + + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionDataProcessor getDataProcessor() { + return new HOPDataProcessor( + (Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(), + (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), + (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(), + requiredIndex); + } + }; + } + + private static class HOPDataProcessor implements TableFunctionDataProcessor { + + private final long slide; + private final long size; + private final int timeColumnIndex; + private long curTime; + + public HOPDataProcessor(long startTime, long slide, long size, int timeColumnIndex) { + this.slide = slide; + this.size = size; + this.curTime = startTime; + this.timeColumnIndex = timeColumnIndex; + } + + @Override + public void process(Record input, List<ColumnBuilder> columnBuilders) { + long timeValue = input.getLong(timeColumnIndex); + if (curTime == Long.MIN_VALUE) { + curTime = timeValue; + } + if (curTime + size <= timeValue) { + // jump to appropriate window + long move = (timeValue - curTime - size) / slide + 1; + curTime += move * slide; + } + long slideTime = curTime; + while (slideTime <= timeValue && slideTime + size > timeValue) { + for (int i = 0; i < input.size(); i++) { + if (input.isNull(i)) { + columnBuilders.get(i + 2).appendNull(); + } else { + columnBuilders.get(i + 2).writeObject(input.getObject(i)); + } + } + columnBuilders.get(0).writeLong(slideTime); + columnBuilders.get(1).writeLong(slideTime + size); + slideTime += slide; + } + } + + @Override + public void finish(List<ColumnBuilder> columnBuilders) {} + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java index 95df45ae6b6,dbf74b711ca..3e0cabe0422 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java @@@ -74,18 -76,18 +76,22 @@@ public class GroupedUserDefinedAggregat } @Override - public void addInput(int[] groupIds, Column[] arguments) { + public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) { RecordIterator iterator = - new RecordIterator( - Arrays.asList(arguments), inputDataTypes, arguments[0].getPositionCount()); + mask.isSelectAll() + ? new RecordIterator( + Arrays.asList(arguments), inputDataTypes, arguments[0].getPositionCount()) + : new MaskedRecordIterator(Arrays.asList(arguments), inputDataTypes, mask); + int[] selectedPositions = mask.getSelectedPositions(); int index = 0; while (iterator.hasNext()) { - int groupId = groupIds[index++]; + int groupId = groupIds[selectedPositions[index]]; + index++; State state = getOrCreateState(groupId); + if (state == null) { + state = aggregateFunction.createState(); + stateArray.set(groupId, state); + } aggregateFunction.addInput(state, iterator.next()); } } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 01377a469ff,e94136cf74c..f28c338b6ac --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@@ -145,10 -145,9 +147,11 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeNonAlignedDeviceViewScanNode; @@@ -220,8 -217,8 +223,9 @@@ import java.util.function.BiFunction import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; + import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.FIELD; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index e7c3c1de1bf,442d59f8be3..441b31b2c42 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@@ -72,7 -72,7 +72,8 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; @@@ -942,72 -935,18 +943,83 @@@ public class PlanGraphPrinter extends P } return render(node, boxValue, context); } + + @Override + public List<String> visitSemiJoin(SemiJoinNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("SemiJoin-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); + boxValue.add(String.format("SourceJoinSymbol: %s", node.getSourceJoinSymbol())); + boxValue.add( + String.format("FilteringSourceJoinSymbol: %s", node.getFilteringSourceJoinSymbol())); + return render(node, boxValue, context); + } + @Override + public List<String> visitTableFunctionProcessor( + TableFunctionProcessorNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("TableFunctionProcessor-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("Function: %s", node.getName())); + boxValue.add(String.format("ProperOutputs: %s", node.getProperOutputs())); + boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); + boxValue.add(String.format("RequiredSymbols: %s", node.getRequiredSymbols())); + if (node.isPruneWhenEmpty()) { + boxValue.add("Prune when empty"); + } else { + boxValue.add("Keep when empty"); + } + node.getDataOrganizationSpecification() + .ifPresent( + specification -> { + if (!specification.getPartitionBy().isEmpty()) { + boxValue.add( + "Partition by: [" + Joiner.on(", ").join(specification.getPartitionBy()) + "]"); + } + specification + .getOrderingScheme() + .ifPresent(orderingScheme -> boxValue.add("Order by: " + orderingScheme)); + }); + if (!node.getArguments().isEmpty()) { + node.getArguments().forEach((key, value) -> boxValue.add(formatArgument(key, value))); + } + return render(node, boxValue, context); + } + + private String formatArgument(String argumentName, Argument argument) { + if (argument instanceof ScalarArgument) { + return formatScalarArgument(argumentName, (ScalarArgument) argument); + } + if (argument instanceof DescriptorArgument) { + return formatDescriptorArgument(argumentName, (DescriptorArgument) argument); + } else { + return formatTableArgument(argumentName, (TableArgument) argument); + } + } + + private String formatScalarArgument(String argumentName, ScalarArgument argument) { + return format( + "%s => ScalarArgument{type=%s, value=%s}", + argumentName, argument.getType(), argument.getValue()); + } + + private String formatDescriptorArgument(String argumentName, DescriptorArgument argument) { + String descriptor; + if (argument.getDescriptor().isPresent()) { + descriptor = + argument.getDescriptor().get().getFields().stream() + .map(field -> field.getName() + field.getType().map(type -> " " + type).orElse("")) + .collect(joining(", ", "(", ")")); + } else { + descriptor = "NULL"; + } + return format("%s => DescriptorArgument{%s}", argumentName, descriptor); + } + + private String formatTableArgument(String argumentName, TableArgument argument) { + return format("%s => TableArgument", argumentName); + } + private String printRegion(TRegionReplicaSet regionReplicaSet) { return String.format( "Partition: %s", diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 99db38ae149,d04c57e77fb..69e948a3fa0 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@@ -127,8 -127,7 +127,9 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index bbdb55815ac,65ae1c14835..d79fcca4476 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@@ -41,8 -36,8 +41,9 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; + import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; +import org.apache.iotdb.db.queryengine.plan.relational.planner.IrExpressionInterpreter; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.ScopeAware; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index 2a556889336,607b1c37135..55f36ec4e89 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@@ -52,12 -52,12 +55,13 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignature; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.utils.constant.SqlConstant; - import org.apache.iotdb.udf.api.customizer.config.AggregateFunctionConfig; - import org.apache.iotdb.udf.api.customizer.config.ScalarFunctionConfig; - import org.apache.iotdb.udf.api.customizer.parameter.FunctionParameters; + import org.apache.iotdb.rpc.TSStatusCode; + import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis; + import org.apache.iotdb.udf.api.customizer.analysis.ScalarFunctionAnalysis; + import org.apache.iotdb.udf.api.customizer.parameter.FunctionArguments; import org.apache.iotdb.udf.api.relational.AggregateFunction; import org.apache.iotdb.udf.api.relational.ScalarFunction; +import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.type.BlobType; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index c987c94268c,a90c07eaaf8..637c7abc7c0 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@@ -57,9 -57,9 +57,10 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 058bad7cb1b,2b414438983..c6fd7862bea --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@@ -46,9 -45,8 +46,10 @@@ import org.apache.iotdb.db.queryengine. import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; + import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; @@@ -735,114 -733,33 +736,142 @@@ public class UnaliasSymbolReferences im outputMapping); } + @Override + public PlanAndMappings visitSemiJoin(SemiJoinNode node, UnaliasContext context) { + // it is assumed that symbols are distinct between SemiJoin source and filtering source. Only + // symbols from outer correlation might be the exception + PlanAndMappings rewrittenSource = node.getSource().accept(this, context); + PlanAndMappings rewrittenFilteringSource = node.getFilteringSource().accept(this, context); + + Map<Symbol, Symbol> outputMapping = new HashMap<>(); + outputMapping.putAll(rewrittenSource.getMappings()); + outputMapping.putAll(rewrittenFilteringSource.getMappings()); + + SymbolMapper mapper = symbolMapper(outputMapping); + + Symbol newSourceJoinSymbol = mapper.map(node.getSourceJoinSymbol()); + Symbol newFilteringSourceJoinSymbol = mapper.map(node.getFilteringSourceJoinSymbol()); + Symbol newSemiJoinOutput = mapper.map(node.getSemiJoinOutput()); + + return new PlanAndMappings( + new SemiJoinNode( + node.getPlanNodeId(), + rewrittenSource.getRoot(), + rewrittenFilteringSource.getRoot(), + newSourceJoinSymbol, + newFilteringSourceJoinSymbol, + newSemiJoinOutput), + outputMapping); + } ++ + @Override + public PlanAndMappings visitTableFunction(TableFunctionNode node, UnaliasContext context) { + Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping()); + SymbolMapper mapper = symbolMapper(mapping); + + List<Symbol> newProperOutputs = mapper.map(node.getProperOutputs()); + + ImmutableList.Builder<PlanNode> newSources = ImmutableList.builder(); + ImmutableList.Builder<TableFunctionNode.TableArgumentProperties> newTableArgumentProperties = + ImmutableList.builder(); + + for (int i = 0; i < node.getChildren().size(); i++) { + PlanAndMappings newSource = node.getChildren().get(i).accept(this, context); + newSources.add(newSource.getRoot()); + + SymbolMapper inputMapper = symbolMapper(new HashMap<>(newSource.getMappings())); + TableFunctionNode.TableArgumentProperties properties = + node.getTableArgumentProperties().get(i); + Optional<DataOrganizationSpecification> newSpecification = + properties.getDataOrganizationSpecification().map(inputMapper::mapAndDistinct); + TableFunctionNode.PassThroughSpecification newPassThroughSpecification = + new TableFunctionNode.PassThroughSpecification( + properties.getPassThroughSpecification().isDeclaredAsPassThrough(), + properties.getPassThroughSpecification().getColumns().stream() + .map( + column -> + new TableFunctionNode.PassThroughColumn( + inputMapper.map(column.getSymbol()), column.isPartitioningColumn())) + .collect(toImmutableList())); + newTableArgumentProperties.add( + new TableFunctionNode.TableArgumentProperties( + properties.getArgumentName(), + properties.isRowSemantics(), + properties.isPruneWhenEmpty(), + newPassThroughSpecification, + inputMapper.map(properties.getRequiredColumns()), + newSpecification)); + } + + return new PlanAndMappings( + new TableFunctionNode( + node.getPlanNodeId(), + node.getName(), + node.getArguments(), + newProperOutputs, + newSources.build(), + newTableArgumentProperties.build()), + mapping); + } + + @Override + public PlanAndMappings visitTableFunctionProcessor( + TableFunctionProcessorNode node, UnaliasContext context) { + if (node.getChildren().isEmpty()) { + Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping()); + SymbolMapper mapper = symbolMapper(mapping); + return new PlanAndMappings( + new TableFunctionProcessorNode( + node.getPlanNodeId(), + node.getName(), + mapper.map(node.getProperOutputs()), + Optional.empty(), + node.isPruneWhenEmpty(), + ImmutableList.of(), + ImmutableList.of(), + Optional.empty(), + node.getArguments()), + mapping); + } + + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + List<TableFunctionNode.PassThroughSpecification> newPassThroughSpecification = + node.getPassThroughSpecifications().stream() + .map( + passThroughSpecification -> + new TableFunctionNode.PassThroughSpecification( + passThroughSpecification.isDeclaredAsPassThrough(), + passThroughSpecification.getColumns().stream() + .map( + column -> + new TableFunctionNode.PassThroughColumn( + mapper.map(column.getSymbol()), + column.isPartitioningColumn())) + .collect(toImmutableList()))) + .collect(Collectors.toList()); + List<List<Symbol>> newRequiredSymbols = + node.getRequiredSymbols().stream().map(mapper::map).collect(toImmutableList()); + + Optional<DataOrganizationSpecification> newSpecification = + node.getDataOrganizationSpecification().map(mapper::mapAndDistinct); + + TableFunctionProcessorNode rewrittenTableFunctionProcessor = + new TableFunctionProcessorNode( + node.getPlanNodeId(), + node.getName(), + mapper.map(node.getProperOutputs()), + Optional.of(rewrittenSource.getRoot()), + node.isPruneWhenEmpty(), + newPassThroughSpecification, + newRequiredSymbols, + newSpecification, + node.getArguments()); + + return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping); + } } private static class UnaliasContext {
