This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch tiered_storage in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a56059aabb575766d28ce73b40eff601c699cc62 Merge: 0649dc76876 f8516ed32d8 Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue May 23 13:39:02 2023 +0800 merge master .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 | 1 + .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 43 ++- .../heartbeat/DataNodeHeartbeatHandler.java | 25 +- .../iotdb/confignode/conf/ConfigNodeConfig.java | 49 +++ .../confignode/conf/ConfigNodeDescriptor.java | 16 + .../consensus/request/ConfigPhysicalPlan.java | 8 + .../consensus/request/ConfigPhysicalPlanType.java | 4 + .../request/read/pipe/task/ShowPipePlanV2.java | 25 +- .../coordinator/PipeHandleLeaderChangePlan.java | 74 +++++ .../response/pipe/task/PipeTableResp.java | 123 ++++++++ .../statemachine/ConfigRegionStateMachine.java | 7 +- .../iotdb/confignode/manager/ConfigManager.java | 34 +-- .../apache/iotdb/confignode/manager/IManager.java | 9 +- .../iotdb/confignode/manager/ProcedureManager.java | 40 +++ .../iotdb/confignode/manager/load/LoadManager.java | 1 + .../manager/load/balancer/RegionBalancer.java | 2 +- .../confignode/manager/load/cache/LoadCache.java | 11 +- .../manager/load/cache/route/RegionRouteCache.java | 4 + .../manager/load/service/HeartbeatService.java | 4 +- .../manager/load/service/StatisticsService.java | 9 +- .../iotdb/confignode/manager/node/NodeManager.java | 5 +- .../manager/partition/PartitionManager.java | 17 +- .../manager/partition/PartitionMetrics.java | 2 +- .../iotdb/confignode/manager/pipe/PipeManager.java | 10 + .../pipe/{ => plugin}/PipePluginCoordinator.java | 2 +- .../manager/pipe/runtime/PipeMetaSyncer.java | 95 ++++++ .../pipe/runtime/PipeRuntimeCoordinator.java | 98 ++++++ .../pipe/{ => task}/PipeTaskCoordinator.java | 37 ++- .../manager/{ => schema}/ClusterSchemaManager.java | 54 +++- .../schema/ClusterSchemaQuotaStatistics.java | 28 +- .../persistence/executor/ConfigPlanExecutor.java | 7 + .../partition/DatabasePartitionTable.java | 6 +- .../persistence/partition/PartitionInfo.java | 13 + .../persistence/pipe/PipePluginInfo.java | 57 +++- .../confignode/persistence/pipe/PipeTaskInfo.java | 127 ++++++-- .../persistence/pipe/PipeTaskOperation.java | 2 + .../procedure/env/ConfigNodeProcedureEnv.java | 9 +- .../pipe/plugin/CreatePipePluginProcedure.java | 2 +- .../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +- .../runtime/PipeHandleLeaderChangeProcedure.java | 195 ++++++++++++ .../impl/pipe/runtime/PipeMetaSyncProcedure.java | 126 ++++++++ .../pipe/task/AbstractOperatePipeProcedureV2.java | 37 ++- .../impl/pipe/task/CreatePipeProcedureV2.java | 41 ++- .../impl/pipe/task/DropPipeProcedureV2.java | 14 +- .../impl/pipe/task/StartPipeProcedureV2.java | 14 +- .../impl/pipe/task/StopPipeProcedureV2.java | 14 +- .../impl/schema/DeleteDatabaseProcedure.java | 8 +- .../procedure/store/ProcedureFactory.java | 20 ++ .../confignode/procedure/store/ProcedureType.java | 12 +- .../iotdb/confignode/service/ConfigNode.java | 6 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 7 - .../request/ConfigPhysicalPlanSerDeTest.java | 18 ++ .../manager/ClusterSchemaManagerTest.java | 2 + .../PipeHandleLeaderChangeProcedureTest.java | 63 ++++ .../pipe/runtime/PipeMetaSyncProcedureTest.java | 55 ++++ consensus/pom.xml | 2 +- .../consensus/iot/IoTConsensusServerImpl.java | 2 +- .../consensus/iot/logdispatcher/LogDispatcher.java | 2 +- .../consensus/iot/logdispatcher/SyncStatus.java | 8 +- .../consensus/iot/wal/ConsensusReqReader.java | 3 + .../ratis/ApplicationStateMachineProxy.java | 10 +- .../apache/iotdb/consensus/ratis/RatisClient.java | 18 +- .../iotdb/consensus/ratis/RatisConsensus.java | 3 +- .../apache/iotdb/consensus/ratis/utils/Utils.java | 17 ++ .../iot/logdispatcher/SyncStatusTest.java | 10 +- .../consensus/iot/util/FakeConsensusReqReader.java | 5 + distribution/pom.xml | 14 + distribution/src/assembly/spark-connector.xml | 48 +++ .../Ecosystem-Integration/Grafana-Plugin.md | 8 +- .../UserGuide/Ecosystem-Integration/Spark-IoTDB.md | 273 +++++++++-------- docs/UserGuide/Monitor-Alert/Metric-Tool.md | 8 +- docs/UserGuide/Operate-Metadata/Node.md | 4 +- docs/UserGuide/Operate-Metadata/Timeseries.md | 89 ++++-- docs/UserGuide/QuickStart/QuickStart.md | 2 +- docs/UserGuide/Reference/Common-Config-Manual.md | 68 ++++- docs/UserGuide/Reference/Keywords.md | 1 + docs/UserGuide/Reference/Status-Codes.md | 2 + .../Ecosystem-Integration/Grafana-Plugin.md | 6 +- .../UserGuide/Ecosystem-Integration/Spark-IoTDB.md | 261 ++++++++-------- docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md | 24 +- docs/zh/UserGuide/Operate-Metadata/Node.md | 4 +- docs/zh/UserGuide/Operate-Metadata/Timeseries.md | 84 +++++- docs/zh/UserGuide/QuickStart/QuickStart.md | 3 +- .../zh/UserGuide/Reference/Common-Config-Manual.md | 66 ++++- docs/zh/UserGuide/Reference/Keywords.md | 1 + docs/zh/UserGuide/Reference/Status-Codes.md | 258 ++++++++-------- integration-test/pom.xml | 18 ++ .../iotdb/it/env/cluster/MppCommonConfig.java | 12 + .../it/env/cluster/MppSharedCommonConfig.java | 14 + .../iotdb/it/env/remote/RemoteCommonConfig.java | 10 + .../org/apache/iotdb/it/utils/TsFileGenerator.java | 42 +++ .../org/apache/iotdb/itbase/env/CommonConfig.java | 4 + .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 18 ++ .../db/it/aligned/IoTDBAlignedDataDeletionIT.java | 17 ++ .../db/it/last/IoTDBLastQueryLastCacheIT.java | 17 +- .../apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java | 26 ++ .../db/it/schema/IoTDBClusterDeviceQuotaIT.java | 44 +++ .../it/schema/IoTDBClusterMeasurementQuotaIT.java | 126 ++++++++ .../db/it/schema/IoTDBDeactivateTemplateIT.java | 22 ++ .../iotdb/db/it/schema/IoTDBExtendTemplateIT.java | 9 + .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java | 103 ++++++- .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java | 45 +++ .../db/it/schema/IoTDBSortedShowTimeseriesIT.java | 3 +- .../apache/iotdb/db/it/schema/IoTDBTagAlterIT.java | 10 +- .../org/apache/iotdb/db/it/schema/IoTDBTagIT.java | 20 +- .../session/it/IoTDBSessionSchemaTemplateIT.java | 107 +++++++ .../org/apache/iotdb/spark/it/AbstractTest.java | 55 ++++ .../java/org/apache/iotdb/spark/it/ReadTest.java | 145 +++++++++ .../test/java/org/apache/iotdb/spark/it/Utils.java | 96 ++++++ .../java/org/apache/iotdb/spark/it/WriteTest.java | 116 ++++++++ .../metrics/metricsets/jvm/JvmMemoryMetrics.java | 6 +- .../resources/conf/iotdb-common.properties | 51 +++- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../apache/iotdb/commons/conf/CommonConfig.java | 6 +- .../iotdb/commons/conf/CommonDescriptor.java | 8 - .../org/apache/iotdb/commons/path/AlignedPath.java | 15 +- .../apache/iotdb/commons/path/MeasurementPath.java | 31 +- .../org/apache/iotdb/commons/path/PartialPath.java | 47 ++- .../apache/iotdb/commons/path/PathPatternNode.java | 2 +- .../apache/iotdb/commons/path/PathPatternTree.java | 34 ++- .../apache/iotdb/commons/path/PathPatternUtil.java | 52 ++++ .../iotdb/commons/path/fa/nfa/SimpleNFA.java | 3 +- .../pipe/plugin/builtin/BuiltinPipePlugin.java | 6 +- .../{DefaultCollector.java => IoTDBCollector.java} | 2 +- .../builtin/connector/DoNothingConnector.java | 4 +- ...ingConnector.java => IoTDBThriftConnector.java} | 28 +- .../builtin/processor/DoNothingProcessor.java | 11 +- .../iotdb/commons/pipe/task/meta/PipeMeta.java | 9 +- .../commons/pipe/task/meta/PipeMetaKeeper.java | 4 + .../commons/pipe/task/meta/PipeRuntimeMeta.java | 27 +- .../commons/pipe/task/meta/PipeStaticMeta.java | 88 ++++-- .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java | 59 +++- .../commons/schema/ClusterSchemaQuotaLevel.java | 15 +- .../iotdb/commons/schema/filter/SchemaFilter.java | 75 +++++ .../commons/schema/filter/SchemaFilterType.java | 42 +-- .../commons/schema/filter/SchemaFilterVisitor.java | 59 ++++ .../commons/schema/filter/impl/DataTypeFilter.java | 65 ++++ .../schema/filter/impl/PathContainsFilter.java | 64 ++++ .../commons/schema/filter/impl/TagFilter.java | 83 ++++++ .../commons/schema/view}/LogicalViewSchema.java | 23 +- .../view/viewExpression/ViewExpression.java | 56 ++-- .../view/viewExpression/ViewExpressionType.java | 2 +- .../binary/BinaryViewExpression.java | 6 +- .../binary/arithmetic/AdditionViewExpression.java | 8 +- .../arithmetic/ArithmeticBinaryViewExpression.java | 8 +- .../binary/arithmetic/DivisionViewExpression.java | 8 +- .../binary/arithmetic/ModuloViewExpression.java | 8 +- .../arithmetic/MultiplicationViewExpression.java | 8 +- .../arithmetic/SubtractionViewExpression.java | 8 +- .../compare/CompareBinaryViewExpression.java | 8 +- .../binary/compare/EqualToViewExpression.java | 8 +- .../binary/compare/GreaterEqualViewExpression.java | 8 +- .../binary/compare/GreaterThanViewExpression.java | 8 +- .../binary/compare/LessEqualViewExpression.java | 8 +- .../binary/compare/LessThanViewExpression.java | 8 +- .../binary/compare/NonEqualViewExpression.java | 8 +- .../binary/logic/LogicAndViewExpression.java | 8 +- .../binary/logic/LogicBinaryViewExpression.java | 8 +- .../binary/logic/LogicOrViewExpression.java | 8 +- .../viewExpression/leaf/ConstantViewOperand.java | 6 +- .../view/viewExpression/leaf/LeafViewOperand.java | 4 +- .../view/viewExpression/leaf/NullViewOperand.java | 6 +- .../viewExpression/leaf/TimeSeriesViewOperand.java | 6 +- .../viewExpression/leaf/TimestampViewOperand.java | 6 +- .../multi/FunctionViewExpression.java | 8 +- .../ternary/BetweenViewExpression.java | 8 +- .../ternary/TernaryViewExpression.java | 6 +- .../viewExpression/unary/InViewExpression.java | 8 +- .../viewExpression/unary/IsNullViewExpression.java | 8 +- .../viewExpression/unary/LikeViewExpression.java | 8 +- .../unary/LogicNotViewExpression.java | 8 +- .../unary/NegationViewExpression.java | 8 +- .../unary/RegularViewExpression.java | 8 +- .../viewExpression/unary/UnaryViewExpression.java | 6 +- .../visitor/ViewExpressionVisitor.java | 70 ++--- .../apache/iotdb/commons/service/ServiceType.java | 6 +- .../iotdb/commons/path/PathPatternTreeTest.java | 61 +++- .../commons/pipe/task/meta/PipeMetaDeSerTest.java | 77 +++++ .../org/apache/iotdb/pipe/api/PipeCollector.java | 1 + .../org/apache/iotdb/pipe/api/PipeConnector.java | 14 +- .../org/apache/iotdb/pipe/api/PipeProcessor.java | 10 +- .../iotdb/pipe/api/collector/EventCollector.java | 39 +-- .../org/apache/iotdb/pipe/api/event/Event.java | 6 +- .../event/dml/insertion/TabletInsertionEvent.java | 6 - .../event/dml/insertion/TsFileInsertionEvent.java | 6 - pom.xml | 35 +-- .../schemaregion/rocksdb/RSchemaRegion.java | 15 +- .../metadata/tagSchemaRegion/TagSchemaRegion.java | 15 +- .../apache/iotdb/db/client/ConfigNodeClient.java | 17 -- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 120 +++++++- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 67 ++++- .../org/apache/iotdb/db/conf/OperationType.java | 3 +- .../db/consensus/DataRegionConsensusImpl.java | 9 + .../statemachine/DataRegionStateMachine.java | 3 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 41 +++ .../iotdb/db/engine/cache/BloomFilterCache.java | 2 +- .../db/engine/cache/CacheHitRatioMonitor.java | 2 +- .../db/engine/cache/TimeSeriesMetadataCache.java | 13 +- .../compaction/constant/CompactionTaskStatus.java} | 11 +- .../compaction/constant/CompactionTaskType.java} | 10 +- .../exception/CompactionExceptionHandler.java | 2 +- .../execute/task/AbstractCompactionTask.java | 1 - .../execute/task/CrossSpaceCompactionTask.java | 39 +-- .../execute/task/InnerSpaceCompactionTask.java | 6 +- .../compaction/execute/utils/CompactionUtils.java | 22 +- .../execute/utils/MultiTsFileDeviceIterator.java | 16 +- .../compaction/schedule/CompactionScheduler.java | 49 ++- .../compaction/schedule/CompactionTaskManager.java | 71 ++++- .../compaction/schedule/CompactionWorker.java | 3 - .../impl/RewriteCrossSpaceCompactionSelector.java | 26 +- .../impl/SizeTieredCompactionSelector.java | 2 +- .../utils/CrossSpaceCompactionCandidate.java | 6 +- .../memtable/AlignedWritableMemChunkGroup.java | 10 +- .../db/engine/memtable/WritableMemChunkGroup.java | 10 +- .../iotdb/db/engine/migration/MigrationTask.java | 2 +- .../db/engine/migration/MigrationTaskManager.java | 2 +- .../db/engine/settle/SettleRequestHandler.java | 2 +- .../iotdb/db/engine/storagegroup/DataRegion.java | 327 +++++---------------- .../db/engine/storagegroup/TsFileManager.java | 1 + .../db/engine/storagegroup/TsFileResource.java | 40 ++- .../db/engine/storagegroup/TsFileResourceList.java | 62 ++-- .../engine/storagegroup/TsFileResourceStatus.java | 3 +- .../metadata/SchemaQuotaExceededException.java | 37 +-- .../db/exception/query/OutOfTTLException.java | 4 +- .../db/metadata/cache/DataNodeSchemaCache.java | 18 +- .../metadata/cache/DataNodeSchemaCacheMetrics.java | 31 +- .../cache/DeviceUsingTemplateSchemaCache.java | 50 +++- .../iotdb/db/metadata/cache/SchemaCacheEntry.java | 36 ++- .../db/metadata/cache/TimeSeriesSchemaCache.java | 9 +- .../dualkeycache/impl/DualKeyCacheBuilder.java | 18 +- .../dualkeycache/impl/DualKeyCachePolicy.java | 3 +- .../dualkeycache/impl/FIFOCacheEntryManager.java | 189 ++++++++++++ .../metadata/mnode/mem/impl/LogicalViewMNode.java | 3 +- .../metadata/mnode/mem/info/LogicalViewInfo.java | 12 +- .../iotdb/db/metadata/mtree/ConfigMTree.java | 8 +- .../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 33 ++- .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 32 +- .../mtree/snapshot/MemMTreeSnapshotUtil.java | 8 +- .../db/metadata/mtree/store/CachedMTreeStore.java | 3 +- .../db/metadata/mtree/store/MemMTreeStore.java | 4 + .../db/metadata/mtree/traverser/Traverser.java | 7 + .../mtree/traverser/basic/EntityTraverser.java | 31 +- .../traverser/basic/MeasurementTraverser.java | 36 ++- .../impl/SchemaRegionPlanDeserializer.java | 2 +- .../impl/SchemaRegionPlanSerializer.java | 2 +- .../impl/SchemaRegionPlanTxtSerializer.java | 2 +- .../impl/read/SchemaRegionReadPlanFactory.java | 37 +-- .../impl/read/ShowDevicesPlanImpl.java | 15 +- .../impl/read/ShowTimeSeriesPlanImpl.java | 37 +-- .../impl/write/CreateLogicalViewPlanImpl.java | 2 +- .../plan/schemaregion/read/IShowDevicesPlan.java | 4 + .../schemaregion/read/IShowTimeSeriesPlan.java | 7 +- .../schemaregion/result/ShowTimeSeriesResult.java | 3 +- .../schemaregion/write/ICreateLogicalViewPlan.java | 2 +- .../rescon/DataNodeSchemaQuotaManager.java | 92 ++++++ .../db/metadata/schemaregion/ISchemaRegion.java | 16 +- .../db/metadata/schemaregion/SchemaEngine.java | 92 ++++-- .../schemaregion/SchemaRegionMemoryImpl.java | 88 ++---- .../schemaregion/SchemaRegionSchemaFileImpl.java | 76 ++--- .../apache/iotdb/db/metadata/tag/TagManager.java | 16 +- .../metadata/template/ClusterTemplateManager.java | 4 +- .../template/alter/TemplateExtendInfo.java | 78 +++++ .../apache/iotdb/db/metadata/utils/MetaUtils.java | 3 +- .../visitor/GetSourcePathsVisitor.java | 13 +- .../visitor/TransformToExpressionVisitor.java | 67 ++--- .../metadata/visitor/SchemaExecutionVisitor.java | 2 +- .../mpp/common/schematree/ClusterSchemaTree.java | 6 +- .../db/mpp/common/schematree/DeviceSchemaInfo.java | 18 +- .../common/schematree/IMeasurementSchemaInfo.java | 8 +- .../common/schematree/MeasurementSchemaInfo.java | 22 +- .../schematree/node/SchemaMeasurementNode.java | 42 ++- .../iotdb/db/mpp/execution/driver/Driver.java | 15 +- .../execution/executor/RegionWriteExecutor.java | 124 +++++--- .../process/join/HorizontallyConcatOperator.java | 8 +- .../operator/schema/source/DeviceSchemaSource.java | 13 +- .../schema/source/SchemaSourceFactory.java | 27 +- .../schema/source/TimeSeriesSchemaSource.java | 17 +- .../mpp/metric/TimeSeriesMetadataCacheMetrics.java | 4 +- .../org/apache/iotdb/db/mpp/plan/Coordinator.java | 2 +- .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 16 +- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 100 ++++++- .../analyze/schema/AutoCreateSchemaExecutor.java | 10 +- .../analyze/schema/ClusterSchemaFetchExecutor.java | 7 +- .../plan/analyze/schema/ClusterSchemaFetcher.java | 50 +++- .../mpp/plan/analyze/schema/SchemaValidator.java | 27 +- .../db/mpp/plan/execution/IQueryExecution.java | 1 + .../db/mpp/plan/execution/QueryExecution.java | 13 +- .../config/executor/ClusterConfigTaskExecutor.java | 26 +- .../visitor/TransformToViewExpressionVisitor.java | 52 ++-- .../iotdb/db/mpp/plan/parser/ASTVisitor.java | 107 ++++--- .../db/mpp/plan/planner/LogicalPlanBuilder.java | 47 ++- .../db/mpp/plan/planner/LogicalPlanVisitor.java | 87 +++--- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 21 +- .../plan/node/load/LoadSingleTsFileNode.java | 4 + .../node/metedata/read/DevicesSchemaScanNode.java | 23 +- .../metedata/read/LevelTimeSeriesCountNode.java | 47 +-- .../node/metedata/read/TimeSeriesCountNode.java | 43 +-- .../metedata/read/TimeSeriesSchemaScanNode.java | 95 ++---- .../node/metedata/write/CreateLogicalViewNode.java | 2 +- .../write/InternalCreateMultiTimeSeriesNode.java | 1 + .../planner/plan/node/write/BatchInsertNode.java | 33 --- .../plan/node/write/InsertMultiTabletsNode.java | 22 +- .../plan/planner/plan/node/write/InsertNode.java | 131 +-------- .../planner/plan/node/write/InsertRowNode.java | 171 ++--------- .../planner/plan/node/write/InsertRowsNode.java | 33 +-- .../plan/node/write/InsertRowsOfOneDeviceNode.java | 32 +- .../planner/plan/node/write/InsertTabletNode.java | 145 ++------- .../scheduler/FragmentInstanceDispatcherImpl.java | 5 +- .../plan/scheduler/load/LoadTsFileScheduler.java | 8 +- .../plan/statement/crud/InsertBaseStatement.java | 157 +++++++++- .../crud/InsertMultiTabletsStatement.java | 30 ++ .../plan/statement/crud/InsertRowStatement.java | 183 +++++++++++- .../crud/InsertRowsOfOneDeviceStatement.java | 42 +++ .../plan/statement/crud/InsertRowsStatement.java | 41 +++ .../plan/statement/crud/InsertTabletStatement.java | 155 +++++++++- .../db/mpp/plan/statement/crud/QueryStatement.java | 55 ++-- .../metadata/CountLevelTimeSeriesStatement.java | 30 +- .../metadata/CountTimeSeriesStatement.java | 30 +- .../statement/metadata/ShowDevicesStatement.java | 10 + .../metadata/ShowTimeSeriesStatement.java | 30 +- .../org/apache/iotdb/db/pipe/agent/PipeAgent.java | 12 + .../db/pipe/agent/plugin/PipePluginAgent.java | 2 +- .../IoTDBThriftReceiver.java} | 24 +- .../db/pipe/agent/receiver/PipeReceiverAgent.java | 90 ++++++ .../db/pipe/agent/runtime/MetaSyncScheduler.java | 22 -- .../iotdb/db/pipe/agent/runtime/PipeLauncher.java | 175 +++++++++++ .../db/pipe/agent/runtime/PipeRuntimeAgent.java | 47 ++- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 242 ++++++++++++++- .../db/pipe/config/PipeCollectorConstant.java | 5 +- .../apache/iotdb/db/pipe/config/PipeConfig.java | 22 ++ .../db/pipe/config/PipeConnectorConstant.java | 3 + .../core/collector/IoTDBDataRegionCollector.java | 39 ++- .../PipeHistoricalDataRegionTsFileCollector.java | 13 +- .../realtime/PipeRealtimeDataRegionCollector.java | 6 +- .../PipeRealtimeDataRegionHybridCollector.java | 78 +++-- .../realtime/assigner/PipeDataRegionAssigner.java | 19 +- .../listener/PipeInsertionDataNodeListener.java | 4 +- .../matcher/CachedSchemaPatternMatcher.java | 6 +- .../realtime/matcher/PipeDataRegionMatcher.java | 9 +- .../impl/iotdb/IoTDBThriftConnectorClient.java | 70 +++++ .../impl/iotdb/IoTDBThriftConnectorVersion.java} | 16 +- .../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 231 +++++++++++++++ .../impl/iotdb/v1/IoTDBThriftReceiverV1.java | 275 +++++++++++++++++ .../connector/impl/iotdb/v1/PipeRequestType.java} | 38 ++- .../v1/reponse/PipeTransferFilePieceResp.java | 80 +++++ .../iotdb/v1/request/PipeTransferFilePieceReq.java | 88 ++++++ .../iotdb/v1/request/PipeTransferFileSealReq.java | 79 +++++ .../iotdb/v1/request/PipeTransferHandshakeReq.java | 71 +++++ .../v1/request/PipeTransferInsertNodeReq.java | 102 +++++++ .../PipeConnectorSubtaskLifeCycle.java | 2 +- .../{ => manager}/PipeConnectorSubtaskManager.java | 37 ++- .../iotdb/db/pipe/core/event/EnrichedEvent.java | 54 ++++ .../core/event/impl/PipeTabletInsertionEvent.java | 31 +- .../core/event/impl/PipeTsFileInsertionEvent.java | 85 +++++- .../event/realtime/PipeRealtimeCollectEvent.java | 20 +- .../realtime/PipeRealtimeCollectEventFactory.java | 8 +- .../event/view/collector/PipeEventCollector.java | 22 +- .../execution/executor/PipeSubtaskExecutor.java | 41 +-- .../execution/scheduler/PipeSubtaskScheduler.java | 89 ++++++ .../execution/scheduler/PipeTaskScheduler.java | 74 ----- .../db/pipe/resource/PipeFileResourceManager.java | 186 ++++++++++++ .../db/pipe/resource/PipeResourceManager.java | 16 +- .../iotdb/db/pipe/resource/PipeTsFileHolder.java | 22 -- .../iotdb/db/pipe/resource/PipeWALHolder.java | 22 -- .../org/apache/iotdb/db/pipe/task/PipeBuilder.java | 8 +- .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java | 2 +- .../db/pipe/task/stage/PipeTaskCollectorStage.java | 50 +++- .../db/pipe/task/stage/PipeTaskConnectorStage.java | 11 +- .../db/pipe/task/stage/PipeTaskProcessorStage.java | 59 +++- .../db/pipe/task/subtask/PipeConnectorSubtask.java | 34 ++- .../db/pipe/task/subtask/PipeProcessorSubtask.java | 31 +- .../iotdb/db/pipe/task/subtask/PipeSubtask.java | 62 +++- .../java/org/apache/iotdb/db/service/DataNode.java | 103 +------ .../metrics/recorder/CompactionMetricsManager.java | 81 ++--- .../db/service/thrift/ProcessorWithMetrics.java | 6 +- .../service/thrift/impl/ClientRPCServiceImpl.java | 209 ++++++++----- .../impl/DataNodeInternalRPCServiceImpl.java | 23 +- .../db/sync/common/ClusterSyncInfoFetcher.java | 11 +- .../apache/iotdb/db/tools/IoTDBDataDirViewer.java | 4 +- .../iotdb/db/tools/TsFileResourcePrinter.java | 4 +- .../iotdb/db/tools/TsFileSplitByPartitionTool.java | 2 +- .../org/apache/iotdb/db/tools/TsFileSplitTool.java | 2 +- .../db/tools/settle/TsFileAndModSettleTool.java | 6 +- .../db/tools/validate/TsFileValidationTool.java | 16 +- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 2 +- .../org/apache/iotdb/db/utils/UpgradeUtils.java | 2 +- .../db/utils/datastructure/AlignedTVList.java | 6 +- .../datastructure/FixedPriorityBlockingQueue.java | 10 + .../iotdb/db/wal/buffer/AbstractWALBuffer.java | 26 +- .../org/apache/iotdb/db/wal/buffer/WALBuffer.java | 32 +- .../org/apache/iotdb/db/wal/buffer/WALEntry.java | 4 +- .../iotdb/db/wal/checkpoint/CheckpointManager.java | 118 ++++++-- .../iotdb/db/wal/checkpoint/MemTableInfo.java | 28 +- .../exception/MemTablePinException.java} | 16 +- .../exception/WALPipeException.java} | 16 +- .../apache/iotdb/db/wal/io/WALByteBufReader.java | 6 +- .../org/apache/iotdb/db/wal/node/WALFakeNode.java | 24 +- .../java/org/apache/iotdb/db/wal/node/WALNode.java | 57 +++- .../db/wal/recover/file/TsFilePlanRedoer.java | 4 - .../iotdb/db/wal/utils/WALEntryPosition.java | 157 ++++++++++ .../apache/iotdb/db/wal/utils/WALFileUtils.java | 11 + .../iotdb/db/wal/utils/WALInsertNodeCache.java | 170 +++++++++++ .../apache/iotdb/db/wal/utils/WALPipeHandler.java | 132 +++++++++ .../db/wal/utils/listener/WALFlushListener.java | 13 +- .../iotdb/db/engine/cache/ChunkCacheTest.java | 4 +- .../engine/compaction/AbstractCompactionTest.java | 62 ++-- .../FastCrossCompactionPerformerTest.java | 5 + .../db/engine/compaction/MemoryControlTest.java | 75 +++++ .../ReadPointCompactionPerformerTest.java | 16 +- .../cross/CrossSpaceCompactionExceptionTest.java | 7 +- .../cross/CrossSpaceCompactionSelectorTest.java | 171 +++++++++++ .../CrossSpaceCompactionWithFastPerformerTest.java | 6 +- ...eCompactionWithFastPerformerValidationTest.java | 20 +- ...sSpaceCompactionWithReadPointPerformerTest.java | 6 +- .../db/engine/compaction/cross/MergeTest.java | 6 +- .../cross/RewriteCompactionFileSelectorTest.java | 46 +-- .../RewriteCrossSpaceCompactionRecoverTest.java | 10 +- ...eCrossSpaceCompactionWithFastPerformerTest.java | 21 +- ...sSpaceCompactionWithReadPointPerformerTest.java | 21 +- .../inner/AbstractInnerSpaceCompactionTest.java | 4 +- .../inner/InnerCompactionMoreDataTest.java | 2 +- .../compaction/inner/InnerCompactionTest.java | 4 +- .../ReadChunkCompactionPerformerAlignedTest.java | 76 +++++ .../SizeTieredCompactionRecoverTest.java | 52 ++-- .../SizeTieredCompactionSelectorTest.java | 38 +++ .../inner/sizetiered/SizeTieredCompactionTest.java | 10 +- .../compaction/utils/CompactionCheckerUtils.java | 4 + .../utils/MultiTsFileDeviceIteratorTest.java | 26 +- .../db/engine/snapshot/IoTDBSnapshotTest.java | 2 +- .../db/engine/storagegroup/DataRegionTest.java | 16 +- .../engine/storagegroup/FakedTsFileResource.java | 2 +- .../storagegroup/TsFileResourceListTest.java | 110 +++++++ .../db/engine/storagegroup/TsFileResourceTest.java | 2 +- .../cache/dualkeycache/DualKeyCacheTest.java | 17 ++ .../schemaRegion/SchemaRegionBasicTest.java | 167 ++++++++++- .../schemaRegion/SchemaRegionTestUtil.java | 4 +- .../metadata/view/ViewExpressionToStringTest.java | 16 +- .../execution/operator/AlignedSeriesTestUtil.java | 6 +- .../mpp/execution/operator/OperatorMemoryTest.java | 34 +++ .../schema/SchemaFetchScanOperatorTest.java | 3 +- .../schema/SchemaQueryScanOperatorTest.java | 4 +- .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java | 7 +- .../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 120 +++++++- .../distribution/DistributionPlannerBasicTest.java | 10 +- .../read/DeviceSchemaScanNodeSerdeTest.java | 3 +- .../metadata/read/SchemaCountNodeSerdeTest.java | 8 +- .../read/TimeSeriesSchemaScanNodeSerdeTest.java | 3 +- .../collector/CachedSchemaPatternMatcherTest.java | 12 +- .../core/collector/PipeRealtimeCollectTest.java | 73 +++-- .../pipe/core/connector/PipeThriftRequestTest.java | 118 ++++++++ .../pipe/resource/PipeFileResourceManagerTest.java | 218 ++++++++++++++ .../AlignedSeriesScanLimitOffsetPushDownTest.java | 10 +- .../query/reader/series/SeriesReaderTestUtil.java | 6 +- .../series/SeriesScanLimitOffsetPushDownTest.java | 8 +- .../iotdb/db/rescon/ResourceManagerTest.java | 12 +- .../db/utils/datastructure/VectorTVListTest.java | 11 + .../org/apache/iotdb/db/wal/io/WALFileTest.java | 25 +- .../iotdb/db/wal/node/ConsensusReqReaderTest.java | 27 +- .../org/apache/iotdb/db/wal/node/WALNodeTest.java | 33 +-- .../iotdb/db/wal/node/WALPipeHandlerTest.java | 256 ++++++++++++++++ .../db/wal/recover/file/TsFilePlanRedoerTest.java | 32 +- .../file/UnsealedTsFileRecoverPerformerTest.java | 5 +- .../iotdb/db/wal/utils/WALInsertNodeCacheTest.java | 167 +++++++++++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 11 +- .../java/org/apache/iotdb/session/Session.java | 4 + .../org/apache/iotdb/session/pool/SessionPool.java | 319 +++++++++++++++++++- spark-iotdb-connector/pom.xml | 84 +----- spark-iotdb-connector/{ => scala_2.11}/pom.xml | 108 +++---- spark-iotdb-connector/{ => scala_2.12}/pom.xml | 103 +++---- .../src/test/resources/iotdb-datanode.properties | 23 -- .../apache/iotdb/spark/db/EnvironmentUtils.java | 264 ----------------- .../org/apache/iotdb/spark/db/IoTDBTest.scala | 150 ---------- .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 118 -------- .../iotdb/spark/db/unit/DataFrameToolsTest.scala | 96 ------ spark-tsfile/pom.xml | 50 +--- .../iotdb/spark/tsfile/qp/common/SQLConstant.java | 118 ++++---- .../src/main/thrift/confignode.thrift | 14 +- thrift/src/main/thrift/client.thrift | 13 + thrift/src/main/thrift/datanode.thrift | 18 +- .../java/org/apache/iotdb/tsfile/utils/Binary.java | 20 +- .../tsfile/write/schema/IMeasurementSchema.java | 4 + .../tsfile/write/schema/MeasurementSchema.java | 10 + .../tsfile/write/schema/MeasurementSchemaType.java | 19 +- .../write/schema/VectorMeasurementSchema.java | 10 + 484 files changed, 13275 insertions(+), 4969 deletions(-) diff --cc node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 6a9360b93c6,d156287e1f3..d29ec62027f --- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@@ -71,8 -71,7 +71,9 @@@ public enum ThreadName PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"), PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"), PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"), + MIGRATION_SCHEDULER("Migration-Scheduler"), + MIGRATION("Migration-Executor-Pool"), + PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"), ; private final String name; diff --cc node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java index b7a6e3aceaa,7bde2af3102..d350305af87 --- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java @@@ -75,9 -75,8 +75,9 @@@ public enum ServiceType IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"), PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE( "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"), - MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"), - MIGRATION_SERVICE("Migration Manager", "Migration Manager"); - ++ MIGRATION_SERVICE("Migration Manager", "Migration Manager"), + PIPE_RUNTIME_AGENT("Pipe Runtime Agent", "PipeRuntimeAgent"), + MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"); private final String name; private final String jmxName; diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 677f7c3d0ec,838e8e3e232..406f1dd4d09 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@@ -1106,16 -1128,40 +1129,48 @@@ public class IoTDBConfig */ private String RateLimiterType = "FixedIntervalRateLimiter"; + private int migrateThreadCount = 3; + + private String objectStorageName = "aws_s3"; + private String objectStorageBucket = "iotdb"; + private String objectStorageEndpoiont = "yourEndpoint"; + private String objectStorageAccessKey = "yourAccessKey"; + private String objectStorageAccessSecret = "yourAccessSecret"; + IoTDBConfig() {} + public int getMaxLogEntriesNumPerBatch() { + return maxLogEntriesNumPerBatch; + } + + public int getMaxSizePerBatch() { + return maxSizePerBatch; + } + + public int getMaxPendingBatchesNum() { + return maxPendingBatchesNum; + } + + public double getMaxMemoryRatioForQueue() { + return maxMemoryRatioForQueue; + } + + public void setMaxLogEntriesNumPerBatch(int maxLogEntriesNumPerBatch) { + this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch; + } + + public void setMaxSizePerBatch(int maxSizePerBatch) { + this.maxSizePerBatch = maxSizePerBatch; + } + + public void setMaxPendingBatchesNum(int maxPendingBatchesNum) { + this.maxPendingBatchesNum = maxPendingBatchesNum; + } + + public void setMaxMemoryRatioForQueue(double maxMemoryRatioForQueue) { + this.maxMemoryRatioForQueue = maxMemoryRatioForQueue; + } + public float getUdfMemoryBudgetInMB() { return udfMemoryBudgetInMB; } @@@ -3819,31 -3894,19 +3909,47 @@@ return sortTmpDir; } + public int getMigrateThreadCount() { + return migrateThreadCount; + } + + public void setMigrateThreadCount(int migrateThreadCount) { + this.migrateThreadCount = migrateThreadCount; + } + + public String getObjectStorageName() { + return objectStorageName; + } + + public String getObjectStorageBucket() { + return objectStorageBucket; + } + + public String getObjectStorageEndpoiont() { + return objectStorageEndpoiont; + } + + public String getObjectStorageAccessKey() { + return objectStorageAccessKey; + } + + public String getObjectStorageAccessSecret() { + return objectStorageAccessSecret; + } ++ + public String getClusterSchemaLimitLevel() { + return clusterSchemaLimitLevel; + } + + public void setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) { + this.clusterSchemaLimitLevel = clusterSchemaLimitLevel; + } + + public long getClusterSchemaLimitThreshold() { + return clusterSchemaLimitThreshold; + } + + public void setClusterSchemaLimitThreshold(long clusterSchemaLimitThreshold) { + this.clusterSchemaLimitThreshold = clusterSchemaLimitThreshold; + } } diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 5d57b5bd88f,e0bd3c536f3..595b7d71915 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@@ -357,15 -369,9 +368,14 @@@ public class IoTDBDescriptor conf.setQueryDir( FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME)); -- - conf.setDataDirs( - properties.getProperty("dn_data_dirs", String.join(",", conf.getDataDirs())).split(",")); + String[] defaultTierDirs = new String[conf.getTierDataDirs().length]; + for (int i = 0; i < defaultTierDirs.length; ++i) { + defaultTierDirs[i] = String.join(",", conf.getTierDataDirs()[i]); + } + conf.setTierDataDirs( + parseDataDirs( + properties.getProperty( + "dn_data_dirs", String.join(IoTDBConstant.TIER_SEPARATOR, defaultTierDirs)))); conf.setConsensusDir(properties.getProperty("dn_consensus_dir", conf.getConsensusDir())); diff --cc server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java index bb0a2c8ee44,805abe2f1d8..105e03637c4 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java @@@ -114,7 -114,7 +114,7 @@@ public class SizeTieredCompactionSelect selectedFileSize = 0L; continue; } - if (currentFile.getStatus() != TsFileResourceStatus.CLOSED || currentFile.isMigrating()) { - if (currentFile.getStatus() != TsFileResourceStatus.NORMAL) { ++ if (currentFile.getStatus() != TsFileResourceStatus.NORMAL || currentFile.isMigrating()) { selectedFileList.clear(); selectedFileSize = 0L; continue; diff --cc server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java index 329c778f120,9125c9bd8a3..42a91acc792 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java @@@ -142,9 -142,7 +142,9 @@@ public class CrossSpaceCompactionCandid private List<TsFileResourceCandidate> filterUnseqResource(List<TsFileResource> unseqResources) { List<TsFileResourceCandidate> ret = new ArrayList<>(); for (TsFileResource resource : unseqResources) { - if (resource.getStatus() != TsFileResourceStatus.CLOSED - if (resource.getStatus() != TsFileResourceStatus.NORMAL || !resource.getTsFile().exists()) { ++ if (resource.getStatus() != TsFileResourceStatus.NORMAL + || resource.isMigrating() + || !resource.getTsFile().exists()) { break; } else if (resource.stillLives(ttlLowerBound)) { ret.add(new TsFileResourceCandidate(resource)); @@@ -200,8 -198,7 +200,8 @@@ // although we do the judgement here, the task should be validated before executing because // the status of file may be changed after the task is submitted to queue this.isValidCandidate = - tsFileResource.getStatus() == TsFileResourceStatus.CLOSED + tsFileResource.getStatus() == TsFileResourceStatus.NORMAL + && !tsFileResource.isMigrating() && tsFileResource.getTsFile().exists(); } diff --cc server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java index 0571d9d47f7,00000000000..998e903925c mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java @@@ -1,93 -1,0 +1,93 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; +import org.apache.iotdb.tsfile.utils.FSUtils; + +import java.io.File; + +public abstract class MigrationTask implements Runnable { + protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + + protected final MigrationCause cause; + protected final TsFileResource tsFile; + protected final String targetDir; + + protected final File srcTsFile; + protected final File destTsFile; + protected final File srcResourceFile; + protected final File destResourceFile; + protected final File srcModsFile; + protected final File destModsFile; + + MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) { + this.cause = cause; + this.tsFile = tsFile; + this.targetDir = targetDir; + this.srcTsFile = tsFile.getTsFile(); + this.destTsFile = fsFactory.getFile(targetDir, tsFile.getTsFile().getName()); + this.srcResourceFile = + fsFactory.getFile( + srcTsFile.getParentFile(), srcTsFile.getName() + TsFileResource.RESOURCE_SUFFIX); + this.destResourceFile = + fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX); + this.srcModsFile = + fsFactory.getFile( + srcTsFile.getParentFile(), srcTsFile.getName() + ModificationFile.FILE_SUFFIX); + this.destModsFile = + fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + ModificationFile.FILE_SUFFIX); + } + + public static MigrationTask newTask( + MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + if (FSUtils.isLocal(targetDir)) { + return new LocalMigrationTask(cause, sourceTsFile, targetDir); + } else { + return new RemoteMigrationTask(cause, sourceTsFile, targetDir); + } + } + + @Override + public void run() { + if (canMigrate()) { + tsFile.setIsMigrating(true); + if (!canMigrate()) { + tsFile.setIsMigrating(false); + return; + } + } else { + return; + } + + migrate(); + + tsFile.setIsMigrating(false); + } + + protected boolean canMigrate() { - return tsFile.getStatus() == TsFileResourceStatus.CLOSED; ++ return tsFile.getStatus() == TsFileResourceStatus.NORMAL; + } + + public abstract void migrate(); +} diff --cc server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java index 8f3b2872a34,00000000000..4bd9712c263 mode 100644,000000..100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java @@@ -1,197 -1,0 +1,197 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.engine.migration; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.service.IService; +import org.apache.iotdb.commons.service.ServiceType; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.directories.TierManager; +import org.apache.iotdb.db.engine.StorageEngine; +import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus; +import org.apache.iotdb.db.utils.DateTimeUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MigrationTaskManager implements IService { + private static final Logger logger = LoggerFactory.getLogger(MigrationTaskManager.class); + private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); + private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + private static final TierManager tierManager = TierManager.getInstance(); + private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60; + private static final double TIER_DISK_SPACE_WARN_THRESHOLD = + commonConfig.getDiskSpaceWarningThreshold() + 0.1; + private static final double TIER_DISK_SPACE_SAFE_THRESHOLD = + commonConfig.getDiskSpaceWarningThreshold() + 0.2; + /** single thread to schedule */ + private ScheduledExecutorService scheduler; + /** single thread to sync syncingBuffer to disk */ + private ExecutorService workers; + + @Override + public void start() throws StartupException { + if (iotdbConfig.getTierDataDirs().length == 1) { + logger.info("tiered storage status: disable"); + return; + } + scheduler = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.MIGRATION_SCHEDULER.getName()); + workers = + IoTDBThreadPoolFactory.newFixedThreadPool( + iotdbConfig.getMigrateThreadCount(), ThreadName.MIGRATION.getName()); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + scheduler, + () -> new MigrationScheduleTask().run(), + CHECK_INTERVAL_IN_SECONDS, + CHECK_INTERVAL_IN_SECONDS, + TimeUnit.SECONDS); + } + + private class MigrationScheduleTask implements Runnable { + private final long[] tierDiskTotalSpace = tierManager.getTierDiskTotalSpace(); + private final long[] tierDiskUsableSpace = tierManager.getTierDiskUsableSpace(); + private final Set<Integer> needMigrationTiers = new HashSet<>(); + + public MigrationScheduleTask() { + for (int i = 0; i < tierManager.getTiersNum(); i++) { + double usage = tierDiskUsableSpace[i] * 1.0 / tierDiskTotalSpace[i]; + if (usage <= TIER_DISK_SPACE_WARN_THRESHOLD) { + needMigrationTiers.add(i); + } + } + } + + @Override + public void run() { + schedule(); + } + + private void schedule() { + // submit migration tasks + for (DataRegion dataRegion : StorageEngine.getInstance().getAllDataRegions()) { + List<TsFileResource> tsfiles = dataRegion.getSequenceFileList(); + tsfiles.addAll(dataRegion.getUnSequenceFileList()); + tsfiles.sort(this::compareMigrationPriority); + for (TsFileResource tsfile : tsfiles) { + try { + int tierLevel = tsfile.getTierLevel(); + // only migrate closed TsFiles not in the last tier - if (tsfile.getStatus() != TsFileResourceStatus.CLOSED ++ if (tsfile.getStatus() != TsFileResourceStatus.NORMAL + || tierLevel == iotdbConfig.getTierDataDirs().length - 1) { + continue; + } + // check tier ttl and disk space + long tierTTL = + DateTimeUtils.convertMilliTimeWithPrecision( + commonConfig.getTierTTLInMs()[tierLevel], iotdbConfig.getTimestampPrecision()); + if (tsfile.stillLives(tierTTL)) { + submitMigrationTask( + tierLevel, + MigrationCause.TTL, + tsfile, + tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq())); + } else if (needMigrationTiers.contains(tierLevel)) { + submitMigrationTask( + tierLevel, + MigrationCause.DISK_SPACE, + tsfile, + tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq())); + } + } catch (Exception e) { + logger.error( + "An error occurred when checking migration of TsFileResource {}", tsfile, e); + } + } + } + } + + private void submitMigrationTask( + int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) { + MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir); + workers.submit(task); + tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize(); + if (needMigrationTiers.contains(tierLevel)) { + double usage = tierDiskUsableSpace[tierLevel] * 1.0 / tierDiskTotalSpace[tierLevel]; + if (usage > TIER_DISK_SPACE_SAFE_THRESHOLD) { + needMigrationTiers.remove(tierLevel); + } + } + } + + private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) { + // old time partitions first + int res = Long.compare(f1.getTimePartition(), f2.getTimePartition()); + // sequence files in one partition + if (res == 0) { + if (f1.isSeq() && !f2.isSeq()) { + res = -1; + } else if (!f1.isSeq() && f2.isSeq()) { + res = 1; + } + } + // old version files in one partition + if (res == 0) { + res = Long.compare(f1.getVersion(), f2.getVersion()); + } + return res; + } + } + + @Override + public void stop() { + if (scheduler != null) { + scheduler.shutdownNow(); + } + if (workers != null) { + workers.shutdownNow(); + } + } + + @Override + public ServiceType getID() { + return ServiceType.MIGRATION_SERVICE; + } + + public static MigrationTaskManager getInstance() { + return InstanceHolder.INSTANCE; + } + + private static class InstanceHolder { + private InstanceHolder() {} + + private static final MigrationTaskManager INSTANCE = new MigrationTaskManager(); + } +} diff --cc server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index cb488eaf301,4b4d4cbd393..1cde01d65c5 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@@ -2689,60 -2536,26 +2554,26 @@@ public class DataRegion implements IDat boolean deleteOriginFile) throws LoadFileException, DiskSpaceInsufficientException { File targetFile; - switch (type) { - case LOAD_UNSEQUENCE: - targetFile = - fsFactory.getFile( - TierManager.getInstance().getNextFolderForTsFile(0, false), - databaseName - + File.separatorChar - + dataRegionId - + File.separatorChar - + filePartitionId - + File.separator - + tsFileResource.getTsFile().getName()); - tsFileResource.setFile(targetFile); - if (tsFileManager.contains(tsFileResource, false)) { - logger.error("The file {} has already been loaded in unsequence list", tsFileResource); - return false; - } - tsFileManager.add(tsFileResource, false); - logger.info( - "Load tsfile in unsequence list, move file from {} to {}", - tsFileToLoad.getAbsolutePath(), - targetFile.getAbsolutePath()); - break; - case LOAD_SEQUENCE: - targetFile = - fsFactory.getFile( - TierManager.getInstance().getNextFolderForTsFile(0, true), - databaseName - + File.separatorChar - + dataRegionId - + File.separatorChar - + filePartitionId - + File.separator - + tsFileResource.getTsFile().getName()); - tsFileResource.setFile(targetFile); - if (tsFileManager.contains(tsFileResource, true)) { - logger.error("The file {} has already been loaded in sequence list", tsFileResource); - return false; - } - if (insertPos == -1) { - tsFileManager.insertToPartitionFileList(tsFileResource, filePartitionId, true, 0); - } else { - tsFileManager.insertToPartitionFileList( - tsFileResource, filePartitionId, true, insertPos + 1); - } - logger.info( - "Load tsfile in sequence list, move file from {} to {}", - tsFileToLoad.getAbsolutePath(), - targetFile.getAbsolutePath()); - break; - default: - throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type)); + targetFile = + fsFactory.getFile( - DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), ++ TierManager.getInstance().getNextFolderForTsFile(0, false), + databaseName + + File.separatorChar + + dataRegionId + + File.separatorChar + + filePartitionId + + File.separator + + tsFileResource.getTsFile().getName()); + tsFileResource.setFile(targetFile); + if (tsFileManager.contains(tsFileResource, false)) { + logger.error("The file {} has already been loaded in unsequence list", tsFileResource); + return false; } + tsFileManager.add(tsFileResource, false); + logger.info( + "Load tsfile in unsequence list, move file from {} to {}", + tsFileToLoad.getAbsolutePath(), + targetFile.getAbsolutePath()); // move file from sync dir to data dir if (!targetFile.getParentFile().exists()) { @@@ -2898,7 -2711,7 +2729,7 @@@ * @param fileToBeUnloaded tsfile to be unloaded * @return whether the file to be unloaded exists. @UsedBy load external tsfile module. */ -- public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) { ++ public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) throws IOException { writeLock("unloadTsfile"); TsFileResource tsFileResourceToBeMoved = null; try { diff --cc server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 9f2bca44cfb,0e5949f41b4..9ffc5fca31c --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@@ -585,7 -576,7 +590,7 @@@ public class TsFileResource return true; } -- void moveTo(File targetDir) { ++ void moveTo(File targetDir) throws IOException { fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName())); fsFactory.moveFile( fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX), @@@ -632,18 -623,10 +637,18 @@@ return this.status == TsFileResourceStatus.COMPACTION_CANDIDATE; } + public boolean isMigrating() { + return isMigrating; + } + + public void setIsMigrating(boolean isMigrating) { + this.isMigrating = isMigrating; + } + public void setStatus(TsFileResourceStatus status) { switch (status) { - case CLOSED: - this.status = TsFileResourceStatus.CLOSED; + case NORMAL: + this.status = TsFileResourceStatus.NORMAL; break; case UNCLOSED: this.status = TsFileResourceStatus.UNCLOSED; diff --cc server/src/main/java/org/apache/iotdb/db/service/DataNode.java index 9c90bc94304,db28843d7c7..829494c6375 --- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java @@@ -86,10 -82,7 +84,9 @@@ import org.apache.iotdb.db.wal.WALManag import org.apache.iotdb.db.wal.utils.WALMode; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.utils.InternalReporterType; +import org.apache.iotdb.os.HybridFileInputFactoryDecorator; - import org.apache.iotdb.pipe.api.exception.PipeManagementException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.udf.api.exception.UDFManagementException; import org.apache.thrift.TException; @@@ -554,7 -540,8 +551,10 @@@ public class DataNode implements DataNo registerManager.register(RegionMigrateService.getInstance()); registerManager.register(CompactionTaskManager.getInstance()); ++ + registerManager.register(MigrationTaskManager.getInstance()); + + registerManager.register(PipeAgent.runtime()); } /** set up RPC and protocols after DataNode is available */
