This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/alter_column_datatype in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d768032f0226c0096866fb521de8c5dc79d2bb9 Merge: d07c81865bd 52da69813b0 Author: Tian Jiang <[email protected]> AuthorDate: Fri Jan 24 15:35:47 2025 +0800 Merge branch 'master' into force_ci/alter_column_datatype # Conflicts: # integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java # integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java # integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java # iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java # iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java # pom.xml .github/workflows/cluster-it-1c1d.yml | 2 +- .github/workflows/cluster-it-1c1d1a.yml | 2 +- .github/workflows/cluster-it-1c3d.yml | 2 +- .github/workflows/compile-check.yml | 2 +- .github/workflows/daily-it.yml | 38 +- .github/workflows/daily-ut.yml | 2 +- .github/workflows/dependency-check.yml | 2 +- .github/workflows/pipe-it-2cluster.yml | 2 +- .github/workflows/sonar-codecov.yml | 2 +- .github/workflows/table-cluster-it-1c1d.yml | 2 +- .github/workflows/table-cluster-it-1c3d.yml | 2 +- .github/workflows/unit-test.yml | 2 +- .github/workflows/vulnerability-check.yml | 2 +- .mvn/develocity.xml | 3 +- .mvn/extensions.xml | 4 +- .../java/org/apache/iotdb/CountPointProcessor.java | 2 + .../iotdb/AlignedTimeseriesSessionExample.java | 67 +- .../org/apache/iotdb/SessionConcurrentExample.java | 23 - .../main/java/org/apache/iotdb/SessionExample.java | 107 +-- .../apache/iotdb/SubscriptionSessionExample.java | 35 +- .../TableModelSubscriptionSessionExample.java | 173 ++++ .../org/apache/iotdb/trigger/LoggerTrigger.java | 12 +- .../iotdb/trigger/StatisticsUpdaterTrigger.java | 4 +- integration-test/pom.xml | 2 + .../java/org/apache/iotdb/it/env/EnvFactory.java | 5 + .../iotdb/it/env/cluster/ClusterConstant.java | 2 + .../it/env/cluster/config/MppCommonConfig.java | 12 + .../it/env/cluster/config/MppDataNodeConfig.java | 4 +- .../env/cluster/config/MppSharedCommonConfig.java | 14 + .../iotdb/it/env/cluster/env/AbstractEnv.java | 44 + .../iotdb/it/env/cluster/node/AINodeWrapper.java | 5 + .../it/env/cluster/node/AbstractNodeWrapper.java | 12 + .../it/env/cluster/node/ConfigNodeWrapper.java | 5 + .../iotdb/it/env/cluster/node/DataNodeWrapper.java | 11 + .../it/env/remote/config/RemoteCommonConfig.java | 10 + .../it/env/remote/config/RemoteDataNodeConfig.java | 2 +- .../iotdb/it/env/remote/env/RemoteServerEnv.java | 7 + .../org/apache/iotdb/it/utils/TsFileGenerator.java | 87 +- .../iotdb/it/utils/TsFileTableGenerator.java | 78 +- .../java/org/apache/iotdb/itbase/env/BaseEnv.java | 4 + .../org/apache/iotdb/itbase/env/CommonConfig.java | 4 + .../apache/iotdb/itbase/env/DataNodeConfig.java | 2 +- .../IoTDBPartitionTableAutoCleanTest.java | 135 ++++ ...ionMigrateDataNodeCrashITFrameworkForIoTV1.java | 2 +- ...ionMigrateDataNodeCrashITFrameworkForIoTV2.java | 2 +- ...oTDBRegionOperationReliabilityITFramework.java} | 174 ++-- .../IoTDBRegionGroupExpandAndShrinkForIoTV1IT.java | 157 ++++ .../commit/IoTDBRegionReconstructForIoTV1IT.java | 149 ++++ .../IoTDBRegionMigrateNormalITForIoTV2Batch.java | 4 +- .../IoTDBRegionMigrateOtherITForIoTV2Batch.java | 4 +- .../IoTDBRegionMigrateNormalITForIoTV2Stream.java | 4 +- .../IoTDBRegionMigrateOtherITForIoTV2Stream.java | 4 +- .../IoTDBRegionMigrateDataNodeCrashForIoTV1IT.java | 4 +- ...TDBRegionMigrateDataNodeCrashForIoTV2Batch.java | 4 +- ...DBRegionMigrateDataNodeCrashForIoTV2Stream.java | 4 +- .../IoTDBRegionMigrateClusterCrashIoTV1IT.java | 4 +- .../IoTDBRegionMigrateConfigNodeCrashIoTV1IT.java | 4 +- .../IoTDBRegionMigrateClusterCrashIoTV2Batch.java | 4 +- ...oTDBRegionMigrateConfigNodeCrashIoTV2Batch.java | 4 +- .../IoTDBRegionMigrateClusterCrashIoTV2Stream.java | 4 +- ...TDBRegionMigrateConfigNodeCrashIoTV2Stream.java | 4 +- .../IoTDBRemoveDataNodeITFramework.java | 113 ++- .../IoTDBRemoveDataNodeNormalIT.java | 25 +- .../confignode/it/removedatanode/SQLModel.java | 10 +- .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 2 +- .../iotdb/db/it/IoTDBLoadTsFileWithModIT.java | 172 ++++ .../it/IoTDBSyntaxConventionStringLiteralIT.java | 3 +- .../org/apache/iotdb/db/it/utils/TestUtils.java | 256 +++++- .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java | 10 +- .../pipe/it/autocreate/IoTDBPipeClusterIT.java | 98 +++ .../pipe/it/autocreate/IoTDBPipeIdempotentIT.java | 93 +++ .../pipe/it/autocreate/IoTDBPipeNullValueIT.java | 28 +- .../pipe/it/manual/IoTDBPipeReqAutoSliceIT.java | 8 +- .../pipe/it/manual/IoTDBPipeTableManualIT.java | 293 +++++++ .../manual/IoTDBPipeTypeConversionISessionIT.java | 10 +- .../pipe/it/tablemodel/IoTDBPipeAutoDropIT.java | 10 +- .../pipe/it/tablemodel/IoTDBPipeClusterIT.java | 65 ++ .../pipe/it/tablemodel/IoTDBPipeDataSinkIT.java | 43 +- .../pipe/it/tablemodel/IoTDBPipeProtocolIT.java | 1 - .../pipe/it/tablemodel/IoTDBPipeSyntaxIT.java | 2 +- .../IoTDBPipeTypeConversionISessionIT.java | 22 +- .../iotdb/pipe/it/tablemodel/TableModelUtils.java | 53 +- .../relational/it/db/it/IoTDBDeletionTableIT.java | 282 ++++++- .../udf/IoTDBUserDefinedAggregateFunctionIT.java | 53 +- .../scalar/IoTDBFormatFunctionTableIT.java | 18 +- .../IoTDBTableAggregationFunctionDistinctIT.java | 374 +++++++++ .../it/query/recent/IoTDBTableAggregationIT.java | 893 ++++++++++++++++++++- .../IoTDBUncorrelatedQuantifiedComparisonIT.java | 674 ++++++++++++++++ .../it/rest/it/IoTDBRestServiceCaseWhenThenIT.java | 7 +- .../it/rest/it/IoTDBRestServiceFlushQueryIT.java | 6 +- .../relational/it/rest/it/IoTDBRestServiceIT.java | 3 +- ...IT.java => IoTDBRestServiceInsertValuesIT.java} | 20 +- .../it/schema/IoTDBAlterColumnTypeIT.java | 2 +- .../iotdb/session/it/IoTDBSessionComplexIT.java | 8 +- .../iotdb/session/it/IoTDBSessionSimpleIT.java | 42 +- .../session/it/IoTDBSessionSyntaxConventionIT.java | 18 +- .../org/apache/iotdb/session/it/SessionIT.java | 32 +- .../subscription/it/AbstractSubscriptionIT.java | 2 +- .../it/cluster/IoTDBSubscriptionRestartIT.java | 34 +- .../it/dual/IoTDBSubscriptionConsumerGroupIT.java | 16 +- .../it/dual/IoTDBSubscriptionTimePrecisionIT.java | 10 +- .../it/dual/IoTDBSubscriptionTopicIT.java | 44 +- .../it/local/IoTDBSubscriptionBasicIT.java | 42 +- .../it/local/IoTDBSubscriptionDataTypeIT.java | 18 +- .../it/local/IoTDBSubscriptionIdempotentIT.java | 24 +- .../it/local/IoTDBSubscriptionTopicIT.java | 10 +- .../it/triple/AbstractSubscriptionTripleIT.java | 2 +- .../it/triple/IoTDBSubscriptionSharingIT.java | 32 +- .../AbstractSubscriptionRegressionIT.java | 26 +- .../IoTDBDefaultPullConsumerDataSetIT.java | 4 +- .../IoTDBDefaultTsfilePushConsumerIT.java | 6 +- .../IoTDBRootDatasetPushConsumerIT.java | 6 +- .../IoTDBRootPullConsumeTsfileIT.java | 6 +- ...merPullConsumerWith1TopicShareProcessMixIT.java | 12 +- .../param/IoTDBTestParamPullConsumerIT.java | 93 ++- .../param/IoTDBTestParamPushConsumerIT.java | 47 +- .../param/IoTDBTestParamSubscriptionSessionIT.java | 12 +- .../regression/param/IoTDBTestParamTopicIT.java | 6 +- ...DBTestAutoCommitFalseDataSetPullConsumerIT.java | 6 +- ...TDBTestAutoCommitTrueDataSetPullConsumerIT.java | 6 +- .../format/IoTDBDBDataSetPullConsumerIT.java | 6 +- .../format/IoTDBDBTsfilePullConsumerIT.java | 6 +- .../IoTDBAllTsDatasetPullConsumerIT.java | 4 +- .../IoTDBAllTsTsfilePullConsumerIT.java | 6 +- .../IoTDBAllTsfilePullConsumerSnapshotIT.java | 6 +- .../IoTDBPathDeviceDataSetPullConsumerIT.java | 4 +- .../IoTDBPathDeviceTsfilePullConsumerIT.java | 6 +- .../IoTDBTimeTsDatasetPullConsumerIT.java | 4 +- .../IoTDBTimeTsTsfilePullConsumerIT.java | 6 +- .../IoTDBSnapshotDevicePullConsumerDataSetIT.java | 4 +- .../IoTDBSnapshotDevicePullConsumerTsfileIT.java | 6 +- ...DBConsumer2With1TopicShareProcessDataSetIT.java | 6 +- ...TDBConsumer2With1TopicShareProcessTsfileIT.java | 6 +- .../multi/IoTDBMultiGroupVsMultiConsumerIT.java | 12 +- .../IoTDBOneConsumerMultiTopicsDatasetIT.java | 4 +- .../multi/IoTDBOneConsumerMultiTopicsMixIT.java | 4 +- .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 4 +- .../pattern/IoTDBDBPatternPullConsumeTsfileIT.java | 6 +- .../IoTDBDBPatternPullConsumerDataSetIT.java | 4 +- .../IoTDBDefaultPatternPullConsumerDataSetIT.java | 4 +- .../IoTDBDevicePatternPullConsumeTsfileIT.java | 6 +- .../IoTDBDevicePatternPullConsumerDataSetIT.java | 4 +- ...DBMiddleMatch2PatternPullConsumerDataSetIT.java | 4 +- ...IoTDBMiddleMatchPatternPullConsumeTsfileIT.java | 6 +- ...TDBMiddleMatchPatternPullConsumerDataSetIT.java | 4 +- .../IoTDBRootPatternPullConsumeTsfileIT.java | 6 +- .../pattern/IoTDBTSPatternPullConsumeTsfileIT.java | 6 +- .../IoTDBTSPatternPullConsumerDataSetIT.java | 4 +- .../time/IoTDBAllPullConsumerDataSetIT.java | 4 +- .../time/IoTDBHistoryPullConsumerDataSetIT.java | 4 +- .../time/IoTDBRealTimePullConsumerDataSetIT.java | 4 +- ...oTDBTimeRangeAccuratePullConsumerDataSetIT.java | 4 +- .../time/IoTDBTimeRangePullConsumerDataSetIT.java | 4 +- .../format/IoTDBTestPushConsumeDataSetIT.java | 6 +- .../IoTDBTestPushConsumeNoTargetDirTsfileIT.java | 6 +- .../format/IoTDBTestPushConsumeTsfileIT.java | 6 +- .../IoTDBLooseAllTsDatasetPushConsumerIT.java | 6 +- ...TDBLooseAllTsDatasetPushConsumerSnapshotIT.java | 6 +- .../IoTDBLooseAllTsfilePushConsumerIT.java | 6 +- .../IoTDBPathLooseDeviceTsfilePushConsumerIT.java | 6 +- .../IoTDBPathLooseTsDatasetPushConsumerIT.java | 6 +- .../IoTDBPathLooseTsfilePushConsumerIT.java | 6 +- .../IoTDBPathTsLooseDatasetPushConsumerIT.java | 6 +- .../IoTDBTimeLooseTsDatasetPushConsumerIT.java | 6 +- .../IoTDBTimeLooseTsTsfilePushConsumerIT.java | 6 +- .../IoTDBTimeLooseTsfilePushConsumerIT.java | 6 +- .../IoTDBTimeTsLooseDatasetPushConsumerIT.java | 6 +- ...oTDBSnapshotTSPatternDatasetPushConsumerIT.java | 6 +- ...IoTDBSnapshotTSPatternTsfilePushConsumerIT.java | 6 +- ...DBConsumer2With1TopicShareProcessDataSetIT.java | 10 +- ...TDBConsumer2With1TopicShareProcessTsfileIT.java | 10 +- .../multi/IoTDBMultiGroupVsMultiConsumerIT.java | 26 +- .../IoTDBOneConsumerMultiTopicsDatasetIT.java | 6 +- .../multi/IoTDBOneConsumerMultiTopicsMixIT.java | 6 +- .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 6 +- .../IoTDBDBPatternDatasetPushConsumerIT.java | 6 +- .../IoTDBDBPatternTsfilePushConsumerIT.java | 6 +- .../IoTDBDefaultPatternTsfilePushConsumerIT.java | 6 +- .../IoTDBDevicePatternDatasetPushConsumerIT.java | 6 +- .../IoTDBDevicePatternTsfilePushConsumerIT.java | 6 +- .../IoTDBRootPatternDatasetPushConsumerIT.java | 6 +- .../IoTDBTSPatternDatasetPushConsumerIT.java | 6 +- .../IoTDBTSPatternTsfilePushConsumerIT.java | 6 +- .../IoTDBHistoryRootDatasetPushConsumerIT.java | 6 +- .../time/IoTDBHistoryRootTsFilePushConsumerIT.java | 6 +- .../time/IoTDBRealTimeDBDatasetPushConsumerIT.java | 6 +- .../time/IoTDBRealTimeDBTsfilePushConsumerIT.java | 6 +- ...DBTimeRangeAccurateDBDataSetPushConsumerIT.java | 6 +- .../IoTDBTimeRangeDBDataSetPushConsumerIT.java | 6 +- .../time/IoTDBTimeRangeDBTsfilePushConsumerIT.java | 6 +- .../topic/IoTDBDataSet1TopicConsumerSpecialIT.java | 4 +- .../regression/topic/IoTDBTestTopicNameIT.java | 4 +- .../regression/user/IoTDBOtherUserConsumerIT.java | 6 +- .../java/org/apache/iotdb/util/MagicUtils.java | 61 ++ .../java/org/apache/iotdb/pipe/api/PipePlugin.java | 37 + .../iotdb/pipe/api/annotation/TableModel.java | 23 +- .../iotdb/pipe/api/annotation/TreeModel.java | 23 +- .../parameter/PipeParameterValidator.java | 42 +- .../org/apache/iotdb/tool/tsfile/ExportTsFile.java | 3 - .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 1 + .../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 + .../iotdb/rpc/subscription/config/TopicConfig.java | 22 + .../subscription/payload/poll/TabletsPayload.java | 11 +- .../java/org/apache/iotdb/session/Session.java | 27 +- .../subscription/AbstractSubscriptionSession.java | 219 +++++ .../subscription/ISubscriptionTableSession.java | 177 ++++ .../subscription/ISubscriptionTreeSession.java | 177 ++++ .../session/subscription/SubscriptionSession.java | 337 -------- .../SubscriptionSessionConnection.java | 26 +- .../subscription/SubscriptionSessionWrapper.java | 72 ++ .../subscription/SubscriptionTableSession.java | 113 +++ .../SubscriptionTableSessionBuilder.java | 66 ++ .../subscription/SubscriptionTreeSession.java | 180 +++++ .../SubscriptionTreeSessionBuilder.java | 63 ++ .../consumer/ISubscriptionTablePullConsumer.java | 181 +++++ .../consumer/ISubscriptionTablePushConsumer.java | 86 ++ .../consumer/ISubscriptionTreePullConsumer.java | 181 +++++ .../consumer/ISubscriptionTreePushConsumer.java | 86 ++ .../AbstractSubscriptionConsumer.java} | 170 +--- .../base/AbstractSubscriptionConsumerBuilder.java | 150 ++++ .../AbstractSubscriptionProvider.java} | 47 +- .../AbstractSubscriptionProviders.java} | 58 +- .../AbstractSubscriptionPullConsumer.java} | 155 +--- .../AbstractSubscriptionPullConsumerBuilder.java | 122 +++ .../AbstractSubscriptionPushConsumer.java} | 147 +--- .../AbstractSubscriptionPushConsumerBuilder.java | 139 ++++ .../SubscriptionExecutorServiceManager.java | 2 +- .../consumer/table/SubscriptionTableProvider.java | 49 ++ .../table/SubscriptionTablePullConsumer.java | 152 ++++ .../SubscriptionTablePullConsumerBuilder.java | 125 +++ .../table/SubscriptionTablePushConsumer.java | 95 +++ .../SubscriptionTablePushConsumerBuilder.java | 138 ++++ .../consumer/tree/SubscriptionTreeProvider.java | 49 ++ .../tree/SubscriptionTreePullConsumer.java | 317 ++++++++ .../tree/SubscriptionTreePullConsumerBuilder.java | 124 +++ .../tree/SubscriptionTreePushConsumer.java | 290 +++++++ .../tree/SubscriptionTreePushConsumerBuilder.java | 139 ++++ .../session/subscription/model/Subscription.java | 3 +- .../iotdb/session/subscription/model/Topic.java | 2 +- .../payload/SubscriptionSessionDataSet.java | 12 +- .../apache/iotdb/session/util/SessionUtils.java | 25 +- .../java/org/apache/iotdb/session/TabletTest.java | 28 +- .../iotdb/session/util/SessionUtilsTest.java | 16 +- .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 1 + .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 21 +- .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 12 + .../iotdb/confignode/conf/ConfigNodeConfig.java | 9 + .../consensus/request/ConfigPhysicalPlan.java | 14 +- .../consensus/request/ConfigPhysicalPlanType.java | 3 + .../request/ConfigPhysicalPlanVisitor.java | 110 ++- .../partition/AutoCleanPartitionTablePlan.java | 99 +++ .../write/pipe/payload/PipeCreateTablePlan.java} | 27 +- .../pipe/payload/PipeDeactivateTemplatePlan.java | 22 +- .../write/pipe/payload/PipeDeleteDevicesPlan.java | 130 +++ .../request/write/table/PreCreateTablePlan.java | 11 +- .../response/pipe/plugin/PipePluginTableResp.java | 27 +- .../response/pipe/task/PipeTableResp.java | 2 +- .../iotdb/confignode/manager/ClusterManager.java | 2 +- .../iotdb/confignode/manager/ConfigManager.java | 38 +- .../apache/iotdb/confignode/manager/IManager.java | 13 + .../iotdb/confignode/manager/ProcedureManager.java | 500 +++++++++--- .../iotdb/confignode/manager/TTLManager.java | 11 + .../confignode/manager/load/cache/LoadCache.java | 13 +- .../load/cache/region/RegionGroupCache.java | 39 +- .../manager/partition/PartitionManager.java | 13 +- .../manager/partition/RegionGroupStatus.java | 25 +- .../payload/PipeTransferConfigSnapshotSealReq.java | 32 +- .../protocol/IoTDBConfigRegionAirGapConnector.java | 8 + .../protocol/IoTDBConfigRegionConnector.java | 10 +- .../coordinator/plugin/PipePluginCoordinator.java | 29 + .../pipe/event/PipeConfigRegionSnapshotEvent.java | 3 +- .../extractor/ConfigRegionListeningFilter.java | 89 +- .../pipe/extractor/ConfigRegionListeningQueue.java | 23 +- .../pipe/extractor/IoTDBConfigRegionExtractor.java | 74 +- ...ConfigPhysicalPlanTablePatternParseVisitor.java | 135 ++++ ...ConfigPhysicalPlanTreePatternParseVisitor.java} | 6 +- .../receiver/protocol/IoTDBConfigNodeReceiver.java | 170 +++- .../PipeConfigPhysicalPlanTSStatusVisitor.java | 76 ++ .../manager/schema/ClusterSchemaManager.java | 29 +- .../iotdb/confignode/persistence/TTLInfo.java | 16 + .../persistence/executor/ConfigPlanExecutor.java | 4 + .../partition/DatabasePartitionTable.java | 10 + .../persistence/partition/PartitionInfo.java | 19 + .../confignode/persistence/pipe/PipeInfo.java | 2 +- .../persistence/pipe/PipePluginInfo.java | 39 +- .../confignode/persistence/pipe/PipeTaskInfo.java | 7 +- .../schema/CNPhysicalPlanGenerator.java | 77 +- .../persistence/schema/ClusterSchemaInfo.java | 6 +- .../confignode/persistence/schema/ConfigMTree.java | 5 +- .../schema/ConfignodeSnapshotParser.java | 30 +- .../confignode/procedure/InternalProcedure.java | 5 - .../procedure/PartitionTableAutoCleaner.java | 81 ++ .../iotdb/confignode/procedure/Procedure.java | 78 -- .../confignode/procedure/ProcedureExecutor.java | 115 +-- .../procedure/env/ConfigNodeProcedureEnv.java | 23 +- .../procedure/env/RegionMaintainHandler.java | 13 +- .../procedure/impl/StateMachineProcedure.java | 181 ++--- .../impl/pipe/task/CreatePipeProcedureV2.java | 42 +- .../impl/region/AddRegionPeerProcedure.java | 84 +- .../impl/region/ReconstructRegionProcedure.java | 209 +++++ .../impl/region/RegionMigrateProcedure.java | 39 +- .../impl/region/RegionOperationProcedure.java | 38 +- .../impl/region/RemoveRegionPeerProcedure.java | 52 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../impl/schema/DeleteTimeSeriesProcedure.java | 4 +- .../table/AbstractAlterOrDropTableProcedure.java | 10 +- .../impl/schema/table/AddTableColumnProcedure.java | 16 +- .../table/AlterTableColumnDataTypeProcedure.java | 9 +- .../impl/schema/table/CreateTableProcedure.java | 20 +- .../impl/schema/table/DeleteDevicesProcedure.java | 43 +- .../schema/table/DropTableColumnProcedure.java | 21 +- .../impl/schema/table/DropTableProcedure.java | 24 +- .../schema/table/RenameTableColumnProcedure.java | 18 +- .../schema/table/SetTablePropertiesProcedure.java | 18 +- .../procedure/impl/sync/StartPipeProcedure.java | 6 - .../impl/trigger/CreateTriggerProcedure.java | 9 +- .../procedure/state/ReconstructRegionState.java | 10 +- .../procedure/store/ProcedureFactory.java | 42 +- .../confignode/procedure/store/ProcedureType.java | 12 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 24 + .../request/ConfigPhysicalPlanSerDeTest.java | 59 ++ .../response/pipe/PipePluginTableRespTest.java | 4 +- .../confignode/manager/load/LoadManagerTest.java | 12 +- .../manager/load/cache/RegionGroupCacheTest.java | 132 +-- .../connector/PipeConfigNodeThriftRequestTest.java | 12 +- ...igPhysicalPlanTablePatternParseVisitorTest.java | 133 +++ ...igPhysicalPlanTreePatternParseVisitorTest.java} | 44 +- .../pipe/annotation/PipePluginAnnotationTest.java} | 30 +- .../confignode/procedure/entity/IncProcedure.java | 5 - .../confignode/procedure/entity/NoopProcedure.java | 5 - .../procedure/entity/SimpleLockProcedure.java | 5 - .../procedure/entity/SleepProcedure.java | 5 - .../procedure/entity/StuckProcedure.java | 5 - .../pipe/receiver/PipeEnrichedProcedureTest.java | 194 +++++ .../schema/table/AddTableColumnProcedureTest.java | 8 +- .../schema/table/CreateTableProcedureTest.java | 5 +- .../schema/table/DeleteDevicesProcedureTest.java | 5 +- .../schema/table/DropTableColumnProcedureTest.java | 4 +- .../impl/schema/table/DropTableProcedureTest.java | 4 +- .../table/RenameTableColumnProcedureTest.java | 4 +- .../table/SetTablePropertiesProcedureTest.java | 6 +- .../apache/iotdb/consensus/ratis/RatisClient.java | 6 +- .../apache/iotdb/consensus/ratis/utils/Utils.java | 1 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 - .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 20 +- .../dataregion/DataExecutionVisitor.java | 9 +- .../dataregion/DataRegionStateMachine.java | 144 +--- .../schemaregion/SchemaRegionStateMachine.java | 15 +- .../pipe/agent/plugin/PipeDataNodePluginAgent.java | 20 +- .../dataregion/PipeDataRegionPluginAgent.java | 49 ++ .../agent/task/connection/PipeEventCollector.java | 35 +- .../client/IoTDBDataNodeAsyncClientManager.java | 2 +- .../PipeTransferDataNodeHandshakeV1Req.java | 10 +- .../evolvable/request/PipeTransferPlanNodeReq.java | 18 +- .../request/PipeTransferSchemaSnapshotSealReq.java | 94 ++- .../request/PipeTransferTabletInsertNodeReqV2.java | 2 +- .../request/PipeTransferTsFileSealWithModReq.java | 42 +- .../airgap/IoTDBDataNodeAirGapConnector.java | 4 + .../airgap/IoTDBDataRegionAirGapConnector.java | 4 + .../airgap/IoTDBSchemaRegionAirGapConnector.java | 19 +- .../protocol/legacy/IoTDBLegacyPipeConnector.java | 2 + .../connector/protocol/opcua/OpcUaConnector.java | 4 + .../connector/protocol/opcua/OpcUaNameSpace.java | 36 +- .../pipeconsensus/PipeConsensusAsyncConnector.java | 4 + .../pipeconsensus/PipeConsensusSyncConnector.java | 4 + .../async/IoTDBDataRegionAsyncConnector.java | 4 + .../thrift/sync/IoTDBDataNodeSyncConnector.java | 4 + .../thrift/sync/IoTDBDataRegionSyncConnector.java | 4 + .../thrift/sync/IoTDBSchemaRegionConnector.java | 26 +- .../protocol/websocket/WebSocketConnector.java | 2 + .../protocol/writeback/WriteBackConnector.java | 199 ++++- .../util/builder/PipeTableModeTsFileBuilder.java | 6 +- .../util/builder/PipeTreeModelTsFileBuilder.java | 4 +- .../sorter/PipeTableModelTabletEventSorter.java | 54 +- .../sorter/PipeTreeModelTabletEventSorter.java | 24 +- .../pipe/consensus/deletion/DeletionResource.java | 2 +- .../db/pipe/event/common/PipeInsertionEvent.java | 15 +- .../common/deletion/PipeDeleteDataNodeEvent.java | 24 +- .../db/pipe/event/common/row/PipeRowCollector.java | 2 +- .../schema/PipeSchemaRegionSnapshotEvent.java | 75 +- .../schema/PipeSchemaSerializableEventType.java | 15 +- .../common/tablet/PipeRawTabletInsertionEvent.java | 2 +- .../tablet/parser/TabletInsertionEventParser.java | 8 +- .../TabletInsertionEventTreePatternParser.java | 13 +- .../scan/TsFileInsertionEventScanParser.java | 61 +- .../table/TsFileInsertionEventTableParser.java | 2 +- .../dataregion/DataRegionListeningFilter.java | 21 +- .../dataregion/IoTDBDataRegionExtractor.java | 136 ++-- ...oricalDataRegionTsFileAndDeletionExtractor.java | 4 +- .../listener/PipeInsertionDataNodeListener.java | 27 +- .../schemaregion/IoTDBSchemaRegionExtractor.java | 28 +- .../PipePlanTablePatternParseVisitor.java | 77 ++ ...r.java => PipePlanTreePatternParseVisitor.java} | 7 +- .../schemaregion/SchemaRegionListeningFilter.java | 87 +- .../schemaregion/SchemaRegionListeningQueue.java | 8 +- .../pipe/metric/PipeDataNodeReceiverMetrics.java | 48 +- .../processor/aggregate/AggregateProcessor.java | 2 + .../StandardStatisticsOperatorProcessor.java | 2 + .../processor/TumblingWindowingProcessor.java | 2 + .../changing/ChangingValueSamplingProcessor.java | 2 + .../sdt/SwingingDoorTrendingSamplingProcessor.java | 2 + .../tumbling/TumblingTimeSamplingProcessor.java | 2 + .../pipeconsensus/PipeConsensusProcessor.java | 4 + .../schemachange/RenameDatabaseProcessor.java | 11 +- .../twostage/plugin/TwoStageCountProcessor.java | 37 +- .../pipeconsensus/PipeConsensusReceiver.java | 2 + .../protocol/thrift/IoTDBDataNodeReceiver.java | 216 +++-- .../visitor/PipePlanToStatementVisitor.java | 54 +- .../PipeStatementTablePatternParseVisitor.java | 32 +- ...a => PipeStatementTreePatternParseVisitor.java} | 6 +- ...r.java => PipeTreeStatementToBatchVisitor.java} | 2 +- .../pipe/resource/memory/PipeMemoryWeightUtil.java | 69 +- .../iotdb/db/protocol/client/ConfigNodeClient.java | 29 + .../iotdb/db/protocol/session/ClientSession.java | 20 +- .../iotdb/db/protocol/session/IClientSession.java | 1 + .../db/protocol/session/InternalClientSession.java | 5 +- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 40 +- .../impl/DataNodeInternalRPCServiceImpl.java | 3 +- .../thrift/impl/DataNodeRegionManager.java | 4 +- .../execution/executor/RegionWriteExecutor.java | 66 +- .../source/relational/MarkDistinctOperator.java | 170 ++++ .../relational/aggregation/AccumulatorFactory.java | 247 +++++- .../relational/aggregation/AggregationMask.java | 204 +++++ .../relational/aggregation/AvgAccumulator.java | 122 ++- .../relational/aggregation/CountAccumulator.java | 22 +- ...IfAccumulator.java => CountAllAccumulator.java} | 48 +- .../relational/aggregation/CountIfAccumulator.java | 22 +- .../relational/aggregation/ExtremeAccumulator.java | 94 ++- .../relational/aggregation/FirstAccumulator.java | 158 +++- .../relational/aggregation/FirstByAccumulator.java | 164 +++- .../aggregation/FirstByDescAccumulator.java | 132 ++- .../aggregation/FirstDescAccumulator.java | 126 ++- .../relational/aggregation/LastAccumulator.java | 140 +++- .../relational/aggregation/LastByAccumulator.java | 146 +++- .../aggregation/LastByDescAccumulator.java | 150 +++- .../aggregation/LastDescAccumulator.java | 144 +++- ...dAccumulator.java => MaskedRecordIterator.java} | 40 +- .../relational/aggregation/MaxAccumulator.java | 140 +++- .../relational/aggregation/MinAccumulator.java | 140 +++- .../relational/aggregation/SumAccumulator.java | 114 ++- .../relational/aggregation/TableAccumulator.java | 2 +- .../relational/aggregation/TableAggregator.java | 9 +- .../aggregation/TableMaxMinByBaseAccumulator.java | 146 +++- .../aggregation/TableModeAccumulator.java | 192 +++-- .../aggregation/TableVarianceAccumulator.java | 158 +++- .../UserDefinedAggregateFunctionAccumulator.java | 8 +- .../aggregation/grouped/GroupedAccumulator.java | 4 +- .../aggregation/grouped/GroupedAggregator.java | 6 +- .../aggregation/grouped/GroupedAvgAccumulator.java | 119 ++- .../grouped/GroupedCountAccumulator.java | 22 +- .../grouped/GroupedCountIfAccumulator.java | 22 +- .../grouped/GroupedExtremeAccumulator.java | 95 ++- .../grouped/GroupedFirstAccumulator.java | 153 +++- .../grouped/GroupedFirstByAccumulator.java | 191 +++-- .../grouped/GroupedLastAccumulator.java | 133 ++- .../grouped/GroupedLastByAccumulator.java | 150 +++- .../aggregation/grouped/GroupedMaxAccumulator.java | 141 +++- .../grouped/GroupedMaxMinByBaseAccumulator.java | 147 +++- .../aggregation/grouped/GroupedMinAccumulator.java | 141 +++- .../grouped/GroupedModeAccumulator.java | 246 ++++-- .../aggregation/grouped/GroupedSumAccumulator.java | 111 ++- .../GroupedUserDefinedAggregateAccumulator.java | 14 +- .../grouped/GroupedVarianceAccumulator.java | 159 +++- .../aggregation/grouped/hash/GroupByHash.java | 2 + .../aggregation/grouped/hash/MarkDistinctHash.java | 90 +++ .../relational/ColumnTransformerBuilder.java | 6 +- .../iotdb/db/queryengine/plan/Coordinator.java | 8 +- .../db/queryengine/plan/analyze/AnalyzeUtils.java | 9 +- .../queryengine/plan/analyze/AnalyzeVisitor.java | 4 +- .../analyze/load/LoadTsFileTableSchemaCache.java | 3 +- .../load/LoadTsFileToTableModelAnalyzer.java | 16 +- .../load/LoadTsFileToTreeModelAnalyzer.java | 5 +- .../analyze/load/LoadTsFileTreeSchemaCache.java | 3 +- .../queryengine/plan/execution/QueryExecution.java | 2 - .../execution/config/TableConfigTaskVisitor.java | 156 +++- .../execution/config/TreeConfigTaskVisitor.java | 38 +- .../config/executor/ClusterConfigTaskExecutor.java | 209 ++++- .../config/executor/IConfigTaskExecutor.java | 22 +- .../config/metadata/DropPipePluginTask.java | 13 +- ...rateRegionTask.java => RemoveDataNodeTask.java} | 12 +- .../config/metadata/ShowPipePluginsTask.java | 20 +- .../ExtendRegionTask.java} | 19 +- .../metadata/{ => region}/MigrateRegionTask.java | 4 +- .../ReconstructRegionTask.java} | 19 +- .../RemoveRegionTask.java} | 19 +- .../metadata/relational/ShowTablesDetailsTask.java | 47 +- .../config/metadata/relational/ShowTablesTask.java | 35 +- .../db/queryengine/plan/parser/ASTVisitor.java | 41 +- .../plan/planner/LogicalPlanVisitor.java | 7 +- .../plan/planner/TableOperatorGenerator.java | 54 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 41 +- .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../node/metadata/write/AlterTimeSeriesNode.java | 9 +- .../node/metadata/write/CreateTimeSeriesNode.java | 74 +- .../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 79 +- .../plan/node/pipe/PipeEnrichedInsertNode.java | 49 +- .../plan/node/pipe/PipeEnrichedWritePlanNode.java | 23 +- .../ContinuousSameSearchIndexSeparatorNode.java | 6 + .../planner/plan/node/write/DeleteDataNode.java | 36 + .../plan/node/write/InsertMultiTabletsNode.java | 5 + .../plan/planner/plan/node/write/InsertNode.java | 21 + .../planner/plan/node/write/InsertRowNode.java | 12 + .../planner/plan/node/write/InsertRowsNode.java | 15 + .../plan/node/write/InsertRowsOfOneDeviceNode.java | 5 + .../planner/plan/node/write/InsertTabletNode.java | 11 + .../plan/node/write/RelationalDeleteDataNode.java | 23 +- .../plan/planner/plan/node/write/SearchNode.java | 4 + .../relational/analyzer/StatementAnalyzer.java | 93 ++- .../relational/metadata/TableMetadataImpl.java | 6 +- .../fetcher/TableHeaderSchemaValidator.java | 14 +- .../fetcher/cache/TableDeviceLastCache.java | 2 +- .../fetcher/cache/TableDeviceSchemaCache.java | 19 + .../plan/relational/planner/IrTypeAnalyzer.java | 4 +- .../plan/relational/planner/RelationPlanner.java | 91 ++- .../relational/planner/SimplePlanRewriter.java | 88 ++ .../relational/planner/TableLogicalPlanner.java | 15 +- .../distribute/TableDistributedPlanGenerator.java | 24 + .../MultipleDistinctAggregationToMarkDistinct.java | 202 +++++ .../iterative/rule/PruneMarkDistinctColumns.java | 58 ++ .../rule/SingleDistinctAggregationToGroupBy.java | 160 ++++ .../relational/planner/node/AggregationNode.java | 6 +- .../relational/planner/node/MarkDistinctNode.java | 162 ++++ .../plan/relational/planner/node/Patterns.java | 7 +- .../node/schema/CreateOrUpdateTableDeviceNode.java | 14 +- .../optimizations/LogicalOptimizeFactory.java | 23 +- .../PushAggregationIntoTableScan.java | 4 + .../optimizations/PushPredicateIntoTableScan.java | 125 ++- ...mQuantifiedComparisonApplyToCorrelatedJoin.java | 341 ++++++++ .../optimizations/UnaliasSymbolReferences.java | 21 + .../relational/sql/ast/AbstractTraverseDevice.java | 8 + .../plan/relational/sql/ast/AstVisitor.java | 14 +- .../relational/sql/ast/CreateOrUpdateDevice.java | 31 +- .../plan/relational/sql/ast/CreatePipe.java | 4 +- .../plan/relational/sql/ast/Delete.java | 28 +- .../plan/relational/sql/ast/PipeEnriched.java | 8 +- .../ast/{PipeEnriched.java => RemoveDataNode.java} | 37 +- .../ast/{Statement.java => StartRepairData.java} | 12 +- .../plan/relational/sql/ast/Statement.java | 4 +- .../ast/{Statement.java => StopRepairData.java} | 12 +- .../plan/relational/sql/ast/Update.java | 6 +- .../plan/relational/sql/parser/AstBuilder.java | 38 +- .../plan/scheduler/ClusterScheduler.java | 4 - .../plan/scheduler/load/LoadTsFileScheduler.java | 7 +- .../plan/statement/StatementVisitor.java | 23 +- .../plan/statement/crud/InsertBaseStatement.java | 16 +- .../plan/statement/crud/InsertRowStatement.java | 4 +- .../plan/statement/crud/InsertTabletStatement.java | 4 +- .../metadata/AlterTimeSeriesStatement.java | 7 +- ...Statement.java => RemoveDataNodeStatement.java} | 43 +- .../metadata/pipe/DropPipePluginStatement.java | 17 +- .../metadata/pipe/ShowPipePluginsStatement.java | 23 +- .../ExtendRegionStatement.java} | 47 +- .../{ => region}/MigrateRegionStatement.java | 16 +- .../ReconstructRegionStatement.java} | 47 +- .../RemoveRegionStatement.java} | 47 +- .../plan/statement/pipe/PipeEnrichedStatement.java | 10 +- .../AbstractCastFunctionColumnTransformer.java | 43 +- .../scalar/CastFunctionColumnTransformer.java | 4 +- .../scalar/TryCastFunctionColumnTransformer.java | 4 +- .../attribute/DeviceAttributeStore.java | 10 +- .../attribute/IDeviceAttributeStore.java | 6 +- .../schemaregion/impl/SchemaRegionMemoryImpl.java | 136 ++-- .../schemaregion/mtree/impl/mem/MemMTreeStore.java | 52 +- .../mtree/impl/mem/mnode/basic/BasicMNode.java | 22 +- .../impl/mem/snapshot/MemMTreeSnapshotUtil.java | 37 +- .../db/schemaengine/table/DataNodeTableCache.java | 108 ++- .../db/storageengine/dataregion/DataRegion.java | 39 +- .../task/InsertionCrossSpaceCompactionTask.java | 3 - .../utils/CompactionTableSchemaCollector.java | 2 +- .../compaction/execute/utils/CompactionUtils.java | 5 +- .../compaction/repair/RepairDataFileScanUtil.java | 30 +- .../compaction/schedule/TTLScheduleTask.java | 4 +- .../selector/impl/SettleSelectorImpl.java | 3 + .../dataregion/memtable/TsFileProcessor.java | 4 + .../dataregion/modification/ModificationFile.java | 152 +++- .../modification/PartitionLevelModFileManager.java | 5 +- .../dataregion/snapshot/SnapshotTaker.java | 2 +- .../dataregion/tsfile/TsFileResource.java | 27 +- .../tsfile/timeindex/ArrayDeviceTimeIndex.java | 20 + .../db/storageengine/load/LoadTsFileManager.java | 2 +- .../load/splitter/TsFileSplitter.java | 5 +- .../broker/SubscriptionPrefetchingQueue.java | 47 +- .../broker/SubscriptionPrefetchingQueueStates.java | 96 ++- .../broker/SubscriptionPrefetchingTabletQueue.java | 2 +- .../broker/SubscriptionPrefetchingTsFileQueue.java | 2 +- .../db/subscription/event/SubscriptionEvent.java | 80 +- .../event/batch/SubscriptionPipeEventBatch.java | 30 - .../batch/SubscriptionPipeTabletEventBatch.java | 68 +- .../batch/SubscriptionPipeTsFileEventBatch.java | 15 - .../cache/CachedSubscriptionPollResponse.java | 5 + .../event/pipe/SubscriptionPipeEmptyEvent.java | 4 +- .../pipe/SubscriptionPipeTabletBatchEvents.java | 46 +- .../pipe/SubscriptionPipeTsFileBatchEvents.java | 8 +- .../pipe/SubscriptionPipeTsFilePlainEvent.java | 8 +- .../SubscriptionEventExtendableResponse.java | 1 + .../event/response/SubscriptionEventResponse.java | 10 +- .../response/SubscriptionEventSingleResponse.java | 1 + .../response/SubscriptionEventTabletResponse.java | 201 ++++- .../response/SubscriptionEventTsFileResponse.java | 32 +- .../db/tools/schema/SRStatementGenerator.java | 208 +++-- .../tools/schema/SchemaRegionSnapshotParser.java | 32 +- .../iotdb/db/utils/constant/SqlConstant.java | 1 + .../dataregion/DataRegionStateMachineTest.java | 4 +- .../connector/PipeDataNodeThriftRequestTest.java | 22 +- .../PipeStatementTablePatternParseVisitorTest.java | 57 ++ ... PipeStatementTreePatternParseVisitorTest.java} | 20 +- .../pipe/connector/PipeTabletEventSorterTest.java | 61 +- .../db/pipe/consensus/DeletionResourceTest.java | 39 +- .../pipe/event/PipeTabletInsertionEventTest.java | 25 +- .../PipePlanTablePatternParseVisitorTest.java | 129 +++ ...va => PipePlanTreePatternParseVisitorTest.java} | 32 +- .../annotation/PipePluginAnnotationTest.java} | 30 +- .../resource/PipeTsFileResourceManagerTest.java | 2 +- .../metadata/write/MetadataWriteNodeSerDeTest.java | 2 +- .../plan/relational/analyzer/DistinctTest.java | 209 +++++ .../plan/relational/planner/SubqueryTest.java | 203 +++++ .../planner/assertions/MarkDistinctMatcher.java | 93 +++ .../planner/assertions/PlanMatchPattern.java | 28 + .../storageengine/buffer/BloomFilterCacheTest.java | 5 +- .../compaction/CompactionValidationTest.java | 7 +- .../inner/InnerSpaceCompactionSelectorTest.java | 2 +- .../repair/RepairDataFileScanUtilTest.java | 2 +- .../TableModelFastCompactionPerformerTest.java | 13 +- ...TableModelReadChunkCompactionPerformerTest.java | 13 +- ...TableModelReadPointCompactionPerformerTest.java | 13 +- .../modification/ModificationFileTest.java | 90 ++- .../file/UnsealedTsFileRecoverPerformerTest.java | 4 +- .../iotdb/db/tools/TsFileAndModSettleToolTest.java | 2 +- .../iotdb/db/tools/TsFileSelfCheckToolTest.java | 6 +- .../iotdb/db/tools/TsFileSketchToolTest.java | 10 +- .../db/utils/SchemaRegionSnapshotParserTest.java | 200 +++-- iotdb-core/node-commons/pom.xml | 4 + .../conf/iotdb-system.properties.template | 6 +- .../apache/iotdb/commons/conf/CommonConfig.java | 87 +- .../iotdb/commons/conf/CommonDescriptor.java | 42 +- .../commons/partition/DataPartitionTable.java | 12 + .../commons/partition/SeriesPartitionTable.java | 16 + .../org/apache/iotdb/commons/path/PartialPath.java | 8 +- .../commons/pipe/agent/plugin/PipePluginAgent.java | 17 +- .../builtin/connector/PlaceholderConnector.java | 4 + .../connector/donothing/DoNothingConnector.java | 4 + .../iotdb/airgap/IoTDBAirGapConnector.java | 4 + .../consensus/PipeConsensusAsyncConnector.java | 4 + .../iotdb/thrift/IoTDBLegacyPipeConnector.java | 2 + .../iotdb/thrift/IoTDBThriftAsyncConnector.java | 5 + .../iotdb/thrift/IoTDBThriftConnector.java | 4 + .../iotdb/thrift/IoTDBThriftSslConnector.java | 4 + .../iotdb/thrift/IoTDBThriftSyncConnector.java | 5 + .../builtin/connector/opcua/OpcUaConnector.java | 4 + .../connector/websocket/WebSocketConnector.java | 2 + .../connector/writeback/WriteBackConnector.java | 4 + .../extractor/donothing/DoNothingExtractor.java | 4 + .../builtin/extractor/iotdb/IoTDBExtractor.java | 4 + .../processor/donothing/DoNothingProcessor.java | 4 + .../throwing/ThrowingExceptionProcessor.java | 4 + .../agent/plugin/meta/PipePluginMetaKeeper.java | 47 +- .../commons/pipe/agent/task/PipeTaskAgent.java | 2 +- .../commons/pipe/agent/task/meta/PipeMeta.java | 40 - .../pipe/agent/task/meta/PipeMetaKeeper.java | 6 +- .../pipe/agent/task/meta/PipeStaticMeta.java | 14 + .../connector/client/IoTDBSyncClientManager.java | 28 +- .../payload/thrift/request/PipeRequestType.java | 4 +- .../thrift/request/PipeTransferFileSealReqV2.java | 13 +- .../connector/protocol/IoTDBAirGapConnector.java | 14 +- .../pipe/connector/protocol/IoTDBConnector.java | 10 + .../connector/protocol/IoTDBSslSyncConnector.java | 4 + .../options/PipeInclusionOptions.java | 43 +- .../pipe/datastructure/pattern/TablePattern.java | 36 +- .../pipe/datastructure/pattern/TreePattern.java | 33 +- .../{ => resource}/PersistentResource.java | 2 +- .../commons/pipe/datastructure/result/Result.java | 39 +- .../pipe/datastructure/visibility/Visibility.java | 9 +- .../visibility/VisibilityTestUtils.java | 135 ++++ .../datastructure/visibility/VisibilityUtils.java | 133 +++ .../commons/pipe/extractor/IoTDBExtractor.java | 38 +- .../extractor/IoTDBNonDataRegionExtractor.java | 24 +- .../commons/pipe/receiver/IoTDBFileReceiver.java | 32 +- .../apache/iotdb/commons/schema/node/IMNode.java | 18 +- .../commons/schema/node/visitor/MNodeVisitor.java | 6 +- .../apache/iotdb/commons/schema/table/TsTable.java | 4 + .../schema/table/column/TsTableColumnSchema.java | 19 + .../apache/iotdb/commons/schema/ttl/TTLCache.java | 27 + .../subscription/config/SubscriptionConfig.java | 47 +- .../commons/subscription/meta/topic/TopicMeta.java | 4 + .../iotdb/commons/udf/access/RecordIterator.java | 9 +- .../TableBuiltinAggregationFunction.java | 2 + .../org/apache/iotdb/commons/utils/PathUtils.java | 2 +- .../iotdb/commons/utils/TimePartitionUtils.java | 13 + .../annotation/PipePluginAnnotationTest.java} | 30 +- .../db/relational/grammar/sql/RelationalSql.g4 | 22 +- .../thrift-commons/src/main/thrift/common.thrift | 3 +- .../src/main/thrift/confignode.thrift | 31 + .../thrift-datanode/src/main/thrift/client.thrift | 1 + pom.xml | 13 +- 695 files changed, 23079 insertions(+), 6266 deletions(-) diff --cc integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java index 93c269b66bb,00000000000..8b11c20b89b mode 100644,000000..100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBAlterColumnTypeIT.java @@@ -1,826 -1,0 +1,826 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.schema; + +import org.apache.iotdb.commons.utils.MetadataUtils; +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.record.Tablet.ColumnCategory; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.iotdb.relational.it.session.IoTDBSessionRelationalIT.genValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@SuppressWarnings("ResultOfMethodCallIgnored") +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBAlterColumnTypeIT { + + @BeforeClass + public static void setUp() throws Exception { - EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleIntervalInMs(1000); ++ EnvFactory.getEnv().getConfig().getDataNodeConfig().setCompactionScheduleInterval(1000); + EnvFactory.getEnv().initClusterEnvironment(); + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS test"); + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testWriteAndAlter() throws IoTDBConnectionException, StatementExecutionException { + Set<TSDataType> typesToTest = new HashSet<>(); + Collections.addAll(typesToTest, TSDataType.values()); + typesToTest.remove(TSDataType.VECTOR); + typesToTest.remove(TSDataType.UNKNOWN); + + // doWriteAndAlter(TSDataType.INT32, TSDataType.FLOAT, false); + // doWriteAndAlter(TSDataType.INT32, TSDataType.FLOAT, true); + + for (TSDataType from : typesToTest) { + for (TSDataType to : typesToTest) { + System.out.printf("testing %s to %s%n", from, to); + doWriteAndAlter(from, to, false); + doWriteAndAlter(from, to, true); + } + } + } + + private void doWriteAndAlter(TSDataType from, TSDataType to, boolean flush) + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + // create a table with type of "from" + session.executeNonQueryStatement( + "CREATE TABLE IF NOT EXISTS write_and_alter_column_type (s1 " + from + ")"); + + // write a point of "from" + Tablet tablet = + new Tablet( + "write_and_alter_column_type", + Collections.singletonList("s1"), + Collections.singletonList(from), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, genValue(from, 1)); + session.insert(tablet); + tablet.reset(); + + if (flush) { + session.executeNonQueryStatement("FLUSH"); + } + + // alter the type to "to" + boolean isCompatible = MetadataUtils.canAlter(from, to); + if (isCompatible) { + session.executeNonQueryStatement( + "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } else { + try { + session.executeNonQueryStatement( + "ALTER TABLE write_and_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } catch (StatementExecutionException e) { + assertEquals( + "701: New type " + to + " is not compatible with the existing one " + from, + e.getMessage()); + } + } + + SessionDataSet dataSet = + session.executeQueryStatement("select * from write_and_alter_column_type order by time"); + RowRecord rec = dataSet.next(); + TSDataType newType = isCompatible ? to : from; + assertEquals(1, rec.getFields().get(0).getLongV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 1).toString(), rec.getFields().get(1).toString()); + } + + // write a point + tablet = + new Tablet( + "write_and_alter_column_type", + Collections.singletonList("s1"), + Collections.singletonList(newType), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 2); + tablet.addValue("s1", 0, genValue(newType, 2)); + session.insert(tablet); + tablet.reset(); + + if (flush) { + session.executeNonQueryStatement("FLUSH"); + } + + dataSet = + session.executeQueryStatement("select * from write_and_alter_column_type order by time"); + rec = dataSet.next(); + assertEquals(1, rec.getFields().get(0).getLongV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 1).toString(), rec.getFields().get(1).toString()); + } + + rec = dataSet.next(); + assertEquals(2, rec.getFields().get(0).getLongV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 2), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 2), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 2).toString(), rec.getFields().get(1).toString()); + } + + dataSet = + session.executeQueryStatement( + "select min(s1),max(s1),first(s1),last(s1) from write_and_alter_column_type"); + rec = dataSet.next(); + for (int i = 0; i < 4; i++) { + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, i % 2 + 1), rec.getFields().get(i).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, i % 2 + 1), rec.getFields().get(i).getDateV()); + } else { + assertEquals(genValue(newType, i % 2 + 1).toString(), rec.getFields().get(i).toString()); + } + } + assertFalse(dataSet.hasNext()); + + if (newType.isNumeric()) { + dataSet = + session.executeQueryStatement( + "select avg(s1),sum(s1) from write_and_alter_column_type"); + rec = dataSet.next(); + assertEquals(1.5, rec.getFields().get(0).getDoubleV(), 0.001); + assertEquals(3.0, rec.getFields().get(1).getDoubleV(), 0.001); + assertFalse(dataSet.hasNext()); + } + + session.executeNonQueryStatement("DROP TABLE write_and_alter_column_type"); + } + } + + @Test + public void testAlterWithoutWrite() throws IoTDBConnectionException, StatementExecutionException { + Set<TSDataType> typesToTest = new HashSet<>(); + Collections.addAll(typesToTest, TSDataType.values()); + typesToTest.remove(TSDataType.VECTOR); + typesToTest.remove(TSDataType.UNKNOWN); + + for (TSDataType from : typesToTest) { + for (TSDataType to : typesToTest) { + System.out.printf("testing %s to %s%n", from, to); + doAlterWithoutWrite(from, to, false); + doAlterWithoutWrite(from, to, true); + } + } + } + + private void doAlterWithoutWrite(TSDataType from, TSDataType to, boolean flush) + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + // create a table with type of "from" + session.executeNonQueryStatement( + "CREATE TABLE IF NOT EXISTS just_alter_column_type (s1 " + from + ")"); + + // alter the type to "to" + boolean isCompatible = MetadataUtils.canAlter(from, to); + if (isCompatible) { + session.executeNonQueryStatement( + "ALTER TABLE just_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } else { + try { + session.executeNonQueryStatement( + "ALTER TABLE just_alter_column_type ALTER COLUMN s1 SET DATA TYPE " + to); + } catch (StatementExecutionException e) { + assertEquals( + "701: New type " + to + " is not compatible with the existing one " + from, + e.getMessage()); + } + } + + TSDataType newType = isCompatible ? to : from; + + // write a point + Tablet tablet = + new Tablet( + "just_alter_column_type", + Collections.singletonList("s1"), + Collections.singletonList(newType), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, genValue(newType, 1)); + session.insert(tablet); + tablet.reset(); + + if (flush) { + session.executeNonQueryStatement("FLUSH"); + } + + SessionDataSet dataSet = + session.executeQueryStatement("select * from just_alter_column_type order by time"); + RowRecord rec = dataSet.next(); + assertEquals(1, rec.getFields().get(0).getLongV()); + if (newType == TSDataType.BLOB) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getBinaryV()); + } else if (newType == TSDataType.DATE) { + assertEquals(genValue(newType, 1), rec.getFields().get(1).getDateV()); + } else { + assertEquals(genValue(newType, 1).toString(), rec.getFields().get(1).toString()); + } + + assertFalse(dataSet.hasNext()); + + session.executeNonQueryStatement("DROP TABLE just_alter_column_type"); + } + } + + @Test + public void testAlterNonExist() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + try { + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN s1 SET DATA TYPE INT64"); + fail("Should throw exception"); + } catch (StatementExecutionException e) { + assertEquals("550: Table 'test.non_exist' does not exist", e.getMessage()); + } + session.executeNonQueryStatement( + "ALTER TABLE IF EXISTS non_exist ALTER COLUMN s1 SET DATA TYPE INT64"); + + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS non_exist (s1 int32)"); + + try { + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN s2 SET DATA TYPE INT64"); + fail("Should throw exception"); + } catch (StatementExecutionException e) { + assertEquals("616: Column s2 in table 'test.non_exist' does not exist.", e.getMessage()); + } + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN IF EXISTS s2 SET DATA TYPE INT64"); + } + } + + @Test + public void testAlterWrongType() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS wrong_type (s1 int32)"); + + try { + session.executeNonQueryStatement( + "ALTER TABLE non_exist ALTER COLUMN s1 SET DATA TYPE VECTOR"); + fail("Should throw exception"); + } catch (StatementExecutionException e) { + assertEquals("701: Unknown type: VECTOR", e.getMessage()); + } + } + } + + @Test + public void testDropAndAlter() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS drop_and_alter (s1 int32)"); + + // time=1 and time=2 are INT32 and deleted by drop column + Tablet tablet = + new Tablet( + "drop_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, genValue(TSDataType.INT32, 1)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("FLUSH"); + + tablet = + new Tablet( + "drop_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 2); + tablet.addValue("s1", 0, genValue(TSDataType.INT32, 2)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("ALTER TABLE drop_and_alter DROP COLUMN s1"); + + // time=3 and time=4 are STRING + tablet = + new Tablet( + "drop_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.STRING), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 3); + tablet.addValue("s1", 0, genValue(TSDataType.STRING, 3)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("FLUSH"); + + tablet = + new Tablet( + "drop_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.STRING), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 4); + tablet.addValue("s1", 0, genValue(TSDataType.STRING, 4)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement( + "ALTER TABLE drop_and_alter ALTER COLUMN s1 SET DATA TYPE TEXT"); + + // time=5 and time=6 are TEXT + tablet = + new Tablet( + "drop_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 5); + tablet.addValue("s1", 0, genValue(TSDataType.STRING, 5)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("FLUSH"); + + tablet = + new Tablet( + "drop_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 6); + tablet.addValue("s1", 0, genValue(TSDataType.STRING, 6)); + session.insert(tablet); + tablet.reset(); + + SessionDataSet dataSet = + session.executeQueryStatement("select * from drop_and_alter order by time"); + // s1 is dropped but the time should remain + RowRecord rec; + for (int i = 1; i < 3; i++) { + rec = dataSet.next(); + assertEquals(i, rec.getFields().get(0).getLongV()); + assertNull(rec.getFields().get(1).getDataType()); + } + for (int i = 3; i < 7; i++) { + rec = dataSet.next(); + assertEquals(i, rec.getFields().get(0).getLongV()); + assertEquals(genValue(TSDataType.STRING, i).toString(), rec.getFields().get(1).toString()); + } + assertFalse(dataSet.hasNext()); + } + } + + @Test + public void testContinuousAlter() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement("CREATE TABLE IF NOT EXISTS alter_and_alter (s1 int32)"); + + // time=1 and time=2 are INT32 + Tablet tablet = + new Tablet( + "alter_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, genValue(TSDataType.INT32, 1)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("FLUSH"); + + tablet = + new Tablet( + "alter_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 2); + tablet.addValue("s1", 0, genValue(TSDataType.INT32, 2)); + session.insert(tablet); + tablet.reset(); + + // time=3 and time=4 are FLOAT + session.executeNonQueryStatement( + "ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE FLOAT"); + tablet = + new Tablet( + "alter_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.FLOAT), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 3); + tablet.addValue("s1", 0, genValue(TSDataType.FLOAT, 3)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("FLUSH"); + + tablet = + new Tablet( + "alter_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.FLOAT), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 4); + tablet.addValue("s1", 0, genValue(TSDataType.FLOAT, 4)); + session.insert(tablet); + tablet.reset(); + + // time=5 and time=6 are DOUBLE + session.executeNonQueryStatement( + "ALTER TABLE alter_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE"); + tablet = + new Tablet( + "alter_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.DOUBLE), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 5); + tablet.addValue("s1", 0, genValue(TSDataType.DOUBLE, 5)); + session.insert(tablet); + tablet.reset(); + + session.executeNonQueryStatement("FLUSH"); + + tablet = + new Tablet( + "alter_and_alter", + Collections.singletonList("s1"), + Collections.singletonList(TSDataType.DOUBLE), + Collections.singletonList(ColumnCategory.FIELD)); + tablet.addTimestamp(0, 6); + tablet.addValue("s1", 0, genValue(TSDataType.DOUBLE, 6)); + session.insert(tablet); + tablet.reset(); + + SessionDataSet dataSet = + session.executeQueryStatement("select * from alter_and_alter order by time"); + RowRecord rec; + for (int i = 1; i < 7; i++) { + rec = dataSet.next(); + assertEquals(i, rec.getFields().get(0).getLongV()); + assertEquals(genValue(TSDataType.DOUBLE, i).toString(), rec.getFields().get(1).toString()); + } + assertFalse(dataSet.hasNext()); + } + } + + @Test + public void testConcurrentWriteAndAlter() + throws IoTDBConnectionException, StatementExecutionException, InterruptedException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement( + "CREATE TABLE IF NOT EXISTS concurrent_write_and_alter (s1 int32)"); + // session.executeNonQueryStatement("SET CONFIGURATION + // enable_seq_space_compaction='false'"); + } + + ExecutorService threadPool = Executors.newCachedThreadPool(); + AtomicInteger writeCounter = new AtomicInteger(0); + int maxWrite = 10000; + int flushInterval = 100; + int alterStart = 5000; + threadPool.submit( + () -> { + try { + write(writeCounter, maxWrite, flushInterval); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + }); + threadPool.submit( + () -> { + try { + alter(writeCounter, alterStart); + } catch (InterruptedException + | IoTDBConnectionException + | StatementExecutionException e) { + throw new RuntimeException(e); + } + }); + threadPool.shutdown(); + assertTrue(threadPool.awaitTermination(1, TimeUnit.MINUTES)); + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + SessionDataSet dataSet = + session.executeQueryStatement("select count(s1) from concurrent_write_and_alter"); + RowRecord rec; + rec = dataSet.next(); + assertEquals(maxWrite, rec.getFields().get(0).getLongV()); + assertFalse(dataSet.hasNext()); + } + } + + private void write(AtomicInteger writeCounter, int maxWrite, int flushInterval) + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + int writtenCnt = 0; + do { + session.executeNonQueryStatement( + String.format( + "INSERT INTO concurrent_write_and_alter (time, s1) VALUES (%d, %d)", + writtenCnt, writtenCnt)); + if (((writtenCnt + 1) % flushInterval) == 0) { + session.executeNonQueryStatement("FLUSH"); + } + } while ((writtenCnt = writeCounter.incrementAndGet()) < maxWrite); + } + } + + private void alter(AtomicInteger writeCounter, int alterStart) + throws InterruptedException, IoTDBConnectionException, StatementExecutionException { + while (writeCounter.get() < alterStart) { + Thread.sleep(10); + } + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement( + "ALTER TABLE concurrent_write_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE"); + } + } + + @Test + public void testLoadAndAlter() + throws IoTDBConnectionException, + StatementExecutionException, + IOException, + WriteProcessException { + // file1-file4 s1=INT32 + TableSchema schema1 = + new TableSchema( + "load_and_alter", + Arrays.asList( + new ColumnSchema("dId", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.INT32, ColumnCategory.FIELD))); + // file1-file3 single device small range ([1, 1]), may load without split + List<File> filesToLoad = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + File file = new File("target", "f" + i + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) { + Tablet tablet = + new Tablet( + schema1.getTableName(), + Arrays.asList("dId", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + tablet.addTimestamp(0, 1); + tablet.addValue("dId", 0, "d" + i); + tablet.addValue("s1", 0, 1); + tsFileWriter.write(tablet); + } + filesToLoad.add(file); + } + // file4 multi device large range ([2, 100_000_000]), load with split + File file = new File("target", "f" + 4 + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) { + Tablet tablet = + new Tablet( + schema1.getTableName(), + Arrays.asList("dId", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + int rowIndex = 0; + for (int i = 1; i <= 3; i++) { + tablet.addTimestamp(rowIndex, 2); + tablet.addValue("dId", rowIndex, "d" + i); + tablet.addValue("s1", rowIndex, 2); + rowIndex++; + tablet.addTimestamp(rowIndex, 100_000_000); + tablet.addValue("dId", rowIndex, "d" + i); + tablet.addValue("s1", rowIndex, 100_000_000); + rowIndex++; + } + tsFileWriter.write(tablet); + } + filesToLoad.add(file); + + // load file1-file4 + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + for (File f : filesToLoad) { + session.executeNonQueryStatement("LOAD '" + f.getAbsolutePath() + "'"); + } + } + // check load result + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + SessionDataSet dataSet = + session.executeQueryStatement("select count(s1) from load_and_alter"); + RowRecord rec; + rec = dataSet.next(); + assertEquals(9, rec.getFields().get(0).getLongV()); + assertFalse(dataSet.hasNext()); + } + + filesToLoad.forEach(File::delete); + filesToLoad.clear(); + + // file5-file8 s1=DOUBLE + TableSchema schema2 = + new TableSchema( + "load_and_alter", + Arrays.asList( + new ColumnSchema("dId", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.DOUBLE, ColumnCategory.FIELD))); + // file5-file7 single device small range ([3, 3]), may load without split + for (int i = 5; i <= 7; i++) { + file = new File("target", "f" + i + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schema2).build()) { + Tablet tablet = + new Tablet( + schema2.getTableName(), + Arrays.asList("dId", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.DOUBLE), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + tablet.addTimestamp(0, 3); + tablet.addValue("dId", 0, "d" + i); + tablet.addValue("s1", 0, 3.0); + tsFileWriter.write(tablet); + } + filesToLoad.add(file); + } + // file8 multi device large range ([4, 100_000_001]), load with split + file = new File("target", "f" + 8 + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schema2).build()) { + Tablet tablet = + new Tablet( + schema1.getTableName(), + Arrays.asList("dId", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.DOUBLE), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + int rowIndex = 0; + for (int i = 1; i <= 3; i++) { + tablet.addTimestamp(rowIndex, 4); + tablet.addValue("dId", rowIndex, "d" + i); + tablet.addValue("s1", rowIndex, 4.0); + rowIndex++; + tablet.addTimestamp(rowIndex, 100_000_001); + tablet.addValue("dId", rowIndex, "d" + i); + tablet.addValue("s1", rowIndex, 100_000_001.0); + rowIndex++; + } + tsFileWriter.write(tablet); + } + filesToLoad.add(file); + + // load file5-file8 + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + for (File f : filesToLoad) { + session.executeNonQueryStatement("LOAD '" + f.getAbsolutePath() + "'"); + } + } + // check load result + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + SessionDataSet dataSet = + session.executeQueryStatement("select count(s1) from load_and_alter"); + RowRecord rec; + rec = dataSet.next(); + assertEquals(18, rec.getFields().get(0).getLongV()); + assertFalse(dataSet.hasNext()); + } + + // alter s1 to double + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + session.executeNonQueryStatement( + "ALTER TABLE load_and_alter ALTER COLUMN s1 SET DATA TYPE DOUBLE"); + } + + filesToLoad.forEach(File::delete); + filesToLoad.clear(); + + // file9-file12 s1=INT32 + // file9-file11 single device small range ([5, 5]), may load without split + for (int i = 9; i <= 11; i++) { + file = new File("target", "f" + i + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) { + Tablet tablet = + new Tablet( + schema1.getTableName(), + Arrays.asList("dId", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + tablet.addTimestamp(0, 5); + tablet.addValue("dId", 0, "d" + i); + tablet.addValue("s1", 0, 5); + tsFileWriter.write(tablet); + } + filesToLoad.add(file); + } + // file12 multi device large range ([6, 100_000_002]), load with split + file = new File("target", "f" + 12 + ".tsfile"); + try (ITsFileWriter tsFileWriter = + new TsFileWriterBuilder().file(file).tableSchema(schema1).build()) { + Tablet tablet = + new Tablet( + schema1.getTableName(), + Arrays.asList("dId", "s1"), + Arrays.asList(TSDataType.STRING, TSDataType.INT32), + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD)); + int rowIndex = 0; + for (int i = 1; i <= 3; i++) { + tablet.addTimestamp(rowIndex, 6); + tablet.addValue("dId", rowIndex, "d" + i); + tablet.addValue("s1", rowIndex, 6); + rowIndex++; + tablet.addTimestamp(rowIndex, 100_000_002); + tablet.addValue("dId", rowIndex, "d" + i); + tablet.addValue("s1", rowIndex, 100_000_002); + rowIndex++; + } + tsFileWriter.write(tablet); + } + filesToLoad.add(file); + + // load file9-file12, should succeed + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + for (File f : filesToLoad) { + session.executeNonQueryStatement("LOAD '" + f.getAbsolutePath() + "'"); + } + } + // check load result + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnectionWithDB("test")) { + SessionDataSet dataSet = + session.executeQueryStatement("select count(s1) from load_and_alter"); + RowRecord rec; + rec = dataSet.next(); + assertEquals(27, rec.getFields().get(0).getLongV()); + assertFalse(dataSet.hasNext()); + } + + filesToLoad.forEach(File::delete); + } +} diff --cc iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 36c3a3c097e,2b2acd14019..00b37a9b0da --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@@ -136,7 -143,7 +144,8 @@@ import org.apache.iotdb.db.schemaengine import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; + import org.apache.ratis.util.AutoCloseableLock; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.ReadWriteIOUtils; @@@ -1538,24 -1813,13 +1815,29 @@@ public class ProcedureManager req.queryId, ProcedureType.DROP_TABLE_COLUMN_PROCEDURE, new DropTableColumnProcedure( - req.database, req.tableName, req.queryId, ReadWriteIOUtils.readString(req.updateInfo))); + req.database, + req.tableName, + req.queryId, + ReadWriteIOUtils.readString(req.updateInfo), + false)); } + public TSStatus alterTableColumnDataType(TAlterOrDropTableReq req) { + return executeWithoutDuplicate( + req.database, + null, + req.tableName, + req.queryId, + ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE, + new AlterTableColumnDataTypeProcedure( + req.database, + req.tableName, + req.queryId, + ReadWriteIOUtils.readVarIntString(req.updateInfo), - TSDataType.deserialize(req.updateInfo.get()))); ++ TSDataType.deserialize(req.updateInfo.get()), ++ false)); + } + public TSStatus dropTable(final TAlterOrDropTableReq req) { return executeWithoutDuplicate( req.database, diff --cc iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java index 24ab17f8266,00000000000..cd666379d0d mode 100644,000000..100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/table/AlterTableColumnDataTypeProcedure.java @@@ -1,210 -1,0 +1,211 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema.table; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.confignode.consensus.request.write.table.AlterColumnDataTypePlan; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; +import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils; +import org.apache.iotdb.confignode.procedure.state.schema.AlterTableColumnDataTypeState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class AlterTableColumnDataTypeProcedure + extends AbstractAlterOrDropTableProcedure<AlterTableColumnDataTypeState> { + private static final Logger LOGGER = + LoggerFactory.getLogger(AlterTableColumnDataTypeProcedure.class); + + private String columnName; + private TSDataType dataType; + - public AlterTableColumnDataTypeProcedure() { - super(); ++ public AlterTableColumnDataTypeProcedure(final boolean isGeneratedByPipe) { ++ super(isGeneratedByPipe); + } + + public AlterTableColumnDataTypeProcedure( + final String database, + final String tableName, + final String queryId, + final String columnName, - final TSDataType dataType) { - super(database, tableName, queryId); ++ final TSDataType dataType, ++ final boolean isGeneratedByPipe) { ++ super(database, tableName, queryId, isGeneratedByPipe); + this.columnName = columnName; + this.dataType = dataType; + } + + @Override + protected String getActionMessage() { + return "Alter table column data type"; + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final AlterTableColumnDataTypeState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case CHECK_AND_INVALIDATE_COLUMN: + LOGGER.info( + "Check and invalidate column {} in {}.{} when altering column data type", + columnName, + database, + tableName); + checkAndPreAlterColumn(env); + break; + case PRE_RELEASE: + LOGGER.info("Pre-release info of table {}.{} when altering column", database, tableName); + preRelease(env); + break; + case ALTER_TABLE_COLUMN_DATA_TYPE: + LOGGER.info("Altering column {} in {}.{} on configNode", columnName, database, tableName); + alterColumnDataType(env); + break; + case COMMIT_RELEASE: + LOGGER.info( + "Commit release info of table {}.{} when altering column", database, tableName); + commitRelease(env); + return Flow.NO_MORE_STATE; + default: + setFailure( + new ProcedureException("Unrecognized AlterTableColumnDataTypeProcedure " + state)); + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } finally { + LOGGER.info( + "AlterTableColumnDataType-{}.{}-{} costs {}ms", + database, + tableName, + state, + (System.currentTimeMillis() - startTime)); + } + } + + @Override + protected void preRelease(ConfigNodeProcedureEnv env) { + super.preRelease(env); + setNextState(AlterTableColumnDataTypeState.ALTER_TABLE_COLUMN_DATA_TYPE); + } + + private void checkAndPreAlterColumn(final ConfigNodeProcedureEnv env) { + try { + final Pair<TSStatus, TsTable> result = + env.getConfigManager() + .getClusterSchemaManager() + .tableColumnCheckForColumnAltering(database, tableName, columnName, dataType); + final TSStatus status = result.getLeft(); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure( + new ProcedureException(new IoTDBException(status.getMessage(), status.getCode()))); + return; + } + table = result.getRight(); + setNextState(AlterTableColumnDataTypeState.PRE_RELEASE); + } catch (final MetadataException e) { + setFailure(new ProcedureException(e)); + } + } + + private void alterColumnDataType(final ConfigNodeProcedureEnv env) { + final TSStatus status = + SchemaUtils.executeInConsensusLayer( + new AlterColumnDataTypePlan(database, tableName, columnName, dataType), env, LOGGER); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + setFailure(new ProcedureException(new IoTDBException(status.getMessage(), status.getCode()))); + } + setNextState(AlterTableColumnDataTypeState.COMMIT_RELEASE); + } + + @Override + protected boolean isRollbackSupported(final AlterTableColumnDataTypeState state) { + return false; + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv configNodeProcedureEnv, + final AlterTableColumnDataTypeState alterTableColumnDataTypeState) + throws IOException, InterruptedException, ProcedureException { + // Do nothing + } + + @Override + protected AlterTableColumnDataTypeState getState(final int stateId) { + return AlterTableColumnDataTypeState.values()[stateId]; + } + + @Override + protected int getStateId(final AlterTableColumnDataTypeState alterTableColumnDataTypeState) { + return alterTableColumnDataTypeState.ordinal(); + } + + @Override + protected AlterTableColumnDataTypeState getInitialState() { + return AlterTableColumnDataTypeState.CHECK_AND_INVALIDATE_COLUMN; + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE.getTypeCode()); + super.serialize(stream); + + ReadWriteIOUtils.write(columnName, stream); + ReadWriteIOUtils.write(dataType, stream); + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + + this.columnName = ReadWriteIOUtils.readString(byteBuffer); + this.dataType = ReadWriteIOUtils.readDataType(byteBuffer); + } + + @Override + public boolean equals(final Object o) { + return super.equals(o) + && Objects.equals(columnName, ((AlterTableColumnDataTypeProcedure) o).columnName) + && Objects.equals(dataType, ((AlterTableColumnDataTypeProcedure) o).dataType); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), columnName, dataType); + } +} diff --cc iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index 0dbd4a33e1b,9641c9fe565..37732df5ab0 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@@ -194,28 -196,25 +197,28 @@@ public class ProcedureFactory implement procedure = new UnsetTemplateProcedure(false); break; case CREATE_TABLE_PROCEDURE: - procedure = new CreateTableProcedure(); + procedure = new CreateTableProcedure(false); break; case ADD_TABLE_COLUMN_PROCEDURE: - procedure = new AddTableColumnProcedure(); + procedure = new AddTableColumnProcedure(false); break; case SET_TABLE_PROPERTIES_PROCEDURE: - procedure = new SetTablePropertiesProcedure(); + procedure = new SetTablePropertiesProcedure(false); break; case RENAME_TABLE_COLUMN_PROCEDURE: - procedure = new RenameTableColumnProcedure(); + procedure = new RenameTableColumnProcedure(false); break; case DROP_TABLE_COLUMN_PROCEDURE: - procedure = new DropTableColumnProcedure(); + procedure = new DropTableColumnProcedure(false); break; + case ALTER_TABLE_COLUMN_DATATYPE_PROCEDURE: - procedure = new AlterTableColumnDataTypeProcedure(); ++ procedure = new AlterTableColumnDataTypeProcedure(false); + break; case DROP_TABLE_PROCEDURE: - procedure = new DropTableProcedure(); + procedure = new DropTableProcedure(false); break; case DELETE_DEVICES_PROCEDURE: - procedure = new DeleteDevicesProcedure(); + procedure = new DeleteDevicesProcedure(false); break; case CREATE_PIPE_PLUGIN_PROCEDURE: procedure = new CreatePipePluginProcedure(); diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index acd70459748,86b355f929b..e90e063494f --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@@ -403,28 -429,9 +431,28 @@@ public class TableConfigTaskVisitor ext table.addColumnSchema( TableHeaderSchemaValidator.generateColumnSchema(category, columnName, dataType)); } - return new CreateTableTask(table, databaseTablePair.getLeft(), node.isIfNotExists()); + return new CreateTableTask(table, database, node.isIfNotExists()); } + @Override + protected IConfigTask visitAlterColumnDataType( + AlterColumnDataType node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + final Pair<String, String> databaseTablePair = splitQualifiedName(node.getTableName(), true); + final String columnName = node.getColumnName().getValue(); + final DataType dataType = node.getDataType(); + final boolean ifTableExists = node.isIfTableExists(); + final boolean ifColumnExists = node.isIfColumnExists(); + return new AlterColumnDataTypeTask( + databaseTablePair.getLeft(), + databaseTablePair.getRight(), + context.getQueryId().getId(), + ifTableExists, + ifColumnExists, + columnName, + getDataType(dataType)); + } + private boolean checkTimeColumnIdempotent( final TsTableColumnCategory category, final String columnName, final TSDataType dataType) { if (category == TsTableColumnCategory.TIME || columnName.equals(TsTable.TIME_COLUMN_NAME)) { diff --cc iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java index 9ad6114aea9,371bd5c6dac..9b40ce2943f --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnSchema.java @@@ -77,5 -87,12 +87,14 @@@ public abstract class TsTableColumnSche this.dataType = dataType; } + public abstract TsTableColumnSchema copy(); ++ + @Override + public String toString() { + return toStringHelper(this) + .add("columnName", columnName) + .add("dataType", dataType) + .add("props", props) + .toString(); + } } diff --cc pom.xml index ee9194757a7,40164fc32ed..96542b9446c --- a/pom.xml +++ b/pom.xml @@@ -167,7 -167,7 +167,7 @@@ <thrift.version>0.14.1</thrift.version> <xz.version>1.9</xz.version> <zstd-jni.version>1.5.6-3</zstd-jni.version> - <tsfile.version>2.0.0-typeconv-SNAPSHOT</tsfile.version> - <tsfile.version>2.0.0-250118-SNAPSHOT</tsfile.version> ++ <tsfile.version>2.1.0-250124-SNAPSHOT</tsfile.version> </properties> <!-- if we claim dependencies in dependencyManagement, then we do not claim
