This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a56059aabb575766d28ce73b40eff601c699cc62
Merge: 0649dc76876 f8516ed32d8
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Tue May 23 13:39:02 2023 +0800

    merge master

 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |   1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  43 ++-
 .../heartbeat/DataNodeHeartbeatHandler.java        |  25 +-
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  49 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  16 +
 .../consensus/request/ConfigPhysicalPlan.java      |   8 +
 .../consensus/request/ConfigPhysicalPlanType.java  |   4 +
 .../request/read/pipe/task/ShowPipePlanV2.java     |  25 +-
 .../coordinator/PipeHandleLeaderChangePlan.java    |  74 +++++
 .../response/pipe/task/PipeTableResp.java          | 123 ++++++++
 .../statemachine/ConfigRegionStateMachine.java     |   7 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  34 +--
 .../apache/iotdb/confignode/manager/IManager.java  |   9 +-
 .../iotdb/confignode/manager/ProcedureManager.java |  40 +++
 .../iotdb/confignode/manager/load/LoadManager.java |   1 +
 .../manager/load/balancer/RegionBalancer.java      |   2 +-
 .../confignode/manager/load/cache/LoadCache.java   |  11 +-
 .../manager/load/cache/route/RegionRouteCache.java |   4 +
 .../manager/load/service/HeartbeatService.java     |   4 +-
 .../manager/load/service/StatisticsService.java    |   9 +-
 .../iotdb/confignode/manager/node/NodeManager.java |   5 +-
 .../manager/partition/PartitionManager.java        |  17 +-
 .../manager/partition/PartitionMetrics.java        |   2 +-
 .../iotdb/confignode/manager/pipe/PipeManager.java |  10 +
 .../pipe/{ => plugin}/PipePluginCoordinator.java   |   2 +-
 .../manager/pipe/runtime/PipeMetaSyncer.java       |  95 ++++++
 .../pipe/runtime/PipeRuntimeCoordinator.java       |  98 ++++++
 .../pipe/{ => task}/PipeTaskCoordinator.java       |  37 ++-
 .../manager/{ => schema}/ClusterSchemaManager.java |  54 +++-
 .../schema/ClusterSchemaQuotaStatistics.java       |  28 +-
 .../persistence/executor/ConfigPlanExecutor.java   |   7 +
 .../partition/DatabasePartitionTable.java          |   6 +-
 .../persistence/partition/PartitionInfo.java       |  13 +
 .../persistence/pipe/PipePluginInfo.java           |  57 +++-
 .../confignode/persistence/pipe/PipeTaskInfo.java  | 127 ++++++--
 .../persistence/pipe/PipeTaskOperation.java        |   2 +
 .../procedure/env/ConfigNodeProcedureEnv.java      |   9 +-
 .../pipe/plugin/CreatePipePluginProcedure.java     |   2 +-
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |   2 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   | 195 ++++++++++++
 .../impl/pipe/runtime/PipeMetaSyncProcedure.java   | 126 ++++++++
 .../pipe/task/AbstractOperatePipeProcedureV2.java  |  37 ++-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  41 ++-
 .../impl/pipe/task/DropPipeProcedureV2.java        |  14 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |  14 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |  14 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |   8 +-
 .../procedure/store/ProcedureFactory.java          |  20 ++
 .../confignode/procedure/store/ProcedureType.java  |  12 +-
 .../iotdb/confignode/service/ConfigNode.java       |   6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   7 -
 .../request/ConfigPhysicalPlanSerDeTest.java       |  18 ++
 .../manager/ClusterSchemaManagerTest.java          |   2 +
 .../PipeHandleLeaderChangeProcedureTest.java       |  63 ++++
 .../pipe/runtime/PipeMetaSyncProcedureTest.java    |  55 ++++
 consensus/pom.xml                                  |   2 +-
 .../consensus/iot/IoTConsensusServerImpl.java      |   2 +-
 .../consensus/iot/logdispatcher/LogDispatcher.java |   2 +-
 .../consensus/iot/logdispatcher/SyncStatus.java    |   8 +-
 .../consensus/iot/wal/ConsensusReqReader.java      |   3 +
 .../ratis/ApplicationStateMachineProxy.java        |  10 +-
 .../apache/iotdb/consensus/ratis/RatisClient.java  |  18 +-
 .../iotdb/consensus/ratis/RatisConsensus.java      |   3 +-
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |  17 ++
 .../iot/logdispatcher/SyncStatusTest.java          |  10 +-
 .../consensus/iot/util/FakeConsensusReqReader.java |   5 +
 distribution/pom.xml                               |  14 +
 distribution/src/assembly/spark-connector.xml      |  48 +++
 .../Ecosystem-Integration/Grafana-Plugin.md        |   8 +-
 .../UserGuide/Ecosystem-Integration/Spark-IoTDB.md | 273 +++++++++--------
 docs/UserGuide/Monitor-Alert/Metric-Tool.md        |   8 +-
 docs/UserGuide/Operate-Metadata/Node.md            |   4 +-
 docs/UserGuide/Operate-Metadata/Timeseries.md      |  89 ++++--
 docs/UserGuide/QuickStart/QuickStart.md            |   2 +-
 docs/UserGuide/Reference/Common-Config-Manual.md   |  68 ++++-
 docs/UserGuide/Reference/Keywords.md               |   1 +
 docs/UserGuide/Reference/Status-Codes.md           |   2 +
 .../Ecosystem-Integration/Grafana-Plugin.md        |   6 +-
 .../UserGuide/Ecosystem-Integration/Spark-IoTDB.md | 261 ++++++++--------
 docs/zh/UserGuide/Monitor-Alert/Metric-Tool.md     |  24 +-
 docs/zh/UserGuide/Operate-Metadata/Node.md         |   4 +-
 docs/zh/UserGuide/Operate-Metadata/Timeseries.md   |  84 +++++-
 docs/zh/UserGuide/QuickStart/QuickStart.md         |   3 +-
 .../zh/UserGuide/Reference/Common-Config-Manual.md |  66 ++++-
 docs/zh/UserGuide/Reference/Keywords.md            |   1 +
 docs/zh/UserGuide/Reference/Status-Codes.md        | 258 ++++++++--------
 integration-test/pom.xml                           |  18 ++
 .../iotdb/it/env/cluster/MppCommonConfig.java      |  12 +
 .../it/env/cluster/MppSharedCommonConfig.java      |  14 +
 .../iotdb/it/env/remote/RemoteCommonConfig.java    |  10 +
 .../org/apache/iotdb/it/utils/TsFileGenerator.java |  42 +++
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   4 +
 .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java  |  18 ++
 .../db/it/aligned/IoTDBAlignedDataDeletionIT.java  |  17 ++
 .../db/it/last/IoTDBLastQueryLastCacheIT.java      |  17 +-
 .../apache/iotdb/db/it/orderBy/IoTDBOrderByIT.java |  26 ++
 .../db/it/schema/IoTDBClusterDeviceQuotaIT.java    |  44 +++
 .../it/schema/IoTDBClusterMeasurementQuotaIT.java  | 126 ++++++++
 .../db/it/schema/IoTDBDeactivateTemplateIT.java    |  22 ++
 .../iotdb/db/it/schema/IoTDBExtendTemplateIT.java  |   9 +
 .../iotdb/db/it/schema/IoTDBMetadataFetchIT.java   | 103 ++++++-
 .../iotdb/db/it/schema/IoTDBSchemaTemplateIT.java  |  45 +++
 .../db/it/schema/IoTDBSortedShowTimeseriesIT.java  |   3 +-
 .../apache/iotdb/db/it/schema/IoTDBTagAlterIT.java |  10 +-
 .../org/apache/iotdb/db/it/schema/IoTDBTagIT.java  |  20 +-
 .../session/it/IoTDBSessionSchemaTemplateIT.java   | 107 +++++++
 .../org/apache/iotdb/spark/it/AbstractTest.java    |  55 ++++
 .../java/org/apache/iotdb/spark/it/ReadTest.java   | 145 +++++++++
 .../test/java/org/apache/iotdb/spark/it/Utils.java |  96 ++++++
 .../java/org/apache/iotdb/spark/it/WriteTest.java  | 116 ++++++++
 .../metrics/metricsets/jvm/JvmMemoryMetrics.java   |   6 +-
 .../resources/conf/iotdb-common.properties         |  51 +++-
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |   6 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |   8 -
 .../org/apache/iotdb/commons/path/AlignedPath.java |  15 +-
 .../apache/iotdb/commons/path/MeasurementPath.java |  31 +-
 .../org/apache/iotdb/commons/path/PartialPath.java |  47 ++-
 .../apache/iotdb/commons/path/PathPatternNode.java |   2 +-
 .../apache/iotdb/commons/path/PathPatternTree.java |  34 ++-
 .../apache/iotdb/commons/path/PathPatternUtil.java |  52 ++++
 .../iotdb/commons/path/fa/nfa/SimpleNFA.java       |   3 +-
 .../pipe/plugin/builtin/BuiltinPipePlugin.java     |   6 +-
 .../{DefaultCollector.java => IoTDBCollector.java} |   2 +-
 .../builtin/connector/DoNothingConnector.java      |   4 +-
 ...ingConnector.java => IoTDBThriftConnector.java} |  28 +-
 .../builtin/processor/DoNothingProcessor.java      |  11 +-
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |   9 +-
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |   4 +
 .../commons/pipe/task/meta/PipeRuntimeMeta.java    |  27 +-
 .../commons/pipe/task/meta/PipeStaticMeta.java     |  88 ++++--
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  59 +++-
 .../commons/schema/ClusterSchemaQuotaLevel.java    |  15 +-
 .../iotdb/commons/schema/filter/SchemaFilter.java  |  75 +++++
 .../commons/schema/filter/SchemaFilterType.java    |  42 +--
 .../commons/schema/filter/SchemaFilterVisitor.java |  59 ++++
 .../commons/schema/filter/impl/DataTypeFilter.java |  65 ++++
 .../schema/filter/impl/PathContainsFilter.java     |  64 ++++
 .../commons/schema/filter/impl/TagFilter.java      |  83 ++++++
 .../commons/schema/view}/LogicalViewSchema.java    |  23 +-
 .../view/viewExpression/ViewExpression.java        |  56 ++--
 .../view/viewExpression/ViewExpressionType.java    |   2 +-
 .../binary/BinaryViewExpression.java               |   6 +-
 .../binary/arithmetic/AdditionViewExpression.java  |   8 +-
 .../arithmetic/ArithmeticBinaryViewExpression.java |   8 +-
 .../binary/arithmetic/DivisionViewExpression.java  |   8 +-
 .../binary/arithmetic/ModuloViewExpression.java    |   8 +-
 .../arithmetic/MultiplicationViewExpression.java   |   8 +-
 .../arithmetic/SubtractionViewExpression.java      |   8 +-
 .../compare/CompareBinaryViewExpression.java       |   8 +-
 .../binary/compare/EqualToViewExpression.java      |   8 +-
 .../binary/compare/GreaterEqualViewExpression.java |   8 +-
 .../binary/compare/GreaterThanViewExpression.java  |   8 +-
 .../binary/compare/LessEqualViewExpression.java    |   8 +-
 .../binary/compare/LessThanViewExpression.java     |   8 +-
 .../binary/compare/NonEqualViewExpression.java     |   8 +-
 .../binary/logic/LogicAndViewExpression.java       |   8 +-
 .../binary/logic/LogicBinaryViewExpression.java    |   8 +-
 .../binary/logic/LogicOrViewExpression.java        |   8 +-
 .../viewExpression/leaf/ConstantViewOperand.java   |   6 +-
 .../view/viewExpression/leaf/LeafViewOperand.java  |   4 +-
 .../view/viewExpression/leaf/NullViewOperand.java  |   6 +-
 .../viewExpression/leaf/TimeSeriesViewOperand.java |   6 +-
 .../viewExpression/leaf/TimestampViewOperand.java  |   6 +-
 .../multi/FunctionViewExpression.java              |   8 +-
 .../ternary/BetweenViewExpression.java             |   8 +-
 .../ternary/TernaryViewExpression.java             |   6 +-
 .../viewExpression/unary/InViewExpression.java     |   8 +-
 .../viewExpression/unary/IsNullViewExpression.java |   8 +-
 .../viewExpression/unary/LikeViewExpression.java   |   8 +-
 .../unary/LogicNotViewExpression.java              |   8 +-
 .../unary/NegationViewExpression.java              |   8 +-
 .../unary/RegularViewExpression.java               |   8 +-
 .../viewExpression/unary/UnaryViewExpression.java  |   6 +-
 .../visitor/ViewExpressionVisitor.java             |  70 ++---
 .../apache/iotdb/commons/service/ServiceType.java  |   6 +-
 .../iotdb/commons/path/PathPatternTreeTest.java    |  61 +++-
 .../commons/pipe/task/meta/PipeMetaDeSerTest.java  |  77 +++++
 .../org/apache/iotdb/pipe/api/PipeCollector.java   |   1 +
 .../org/apache/iotdb/pipe/api/PipeConnector.java   |  14 +-
 .../org/apache/iotdb/pipe/api/PipeProcessor.java   |  10 +-
 .../iotdb/pipe/api/collector/EventCollector.java   |  39 +--
 .../org/apache/iotdb/pipe/api/event/Event.java     |   6 +-
 .../event/dml/insertion/TabletInsertionEvent.java  |   6 -
 .../event/dml/insertion/TsFileInsertionEvent.java  |   6 -
 pom.xml                                            |  35 +--
 .../schemaregion/rocksdb/RSchemaRegion.java        |  15 +-
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  15 +-
 .../apache/iotdb/db/client/ConfigNodeClient.java   |  17 --
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 120 +++++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  67 ++++-
 .../org/apache/iotdb/db/conf/OperationType.java    |   3 +-
 .../db/consensus/DataRegionConsensusImpl.java      |   9 +
 .../statemachine/DataRegionStateMachine.java       |   3 +-
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  41 +++
 .../iotdb/db/engine/cache/BloomFilterCache.java    |   2 +-
 .../db/engine/cache/CacheHitRatioMonitor.java      |   2 +-
 .../db/engine/cache/TimeSeriesMetadataCache.java   |  13 +-
 .../compaction/constant/CompactionTaskStatus.java} |  11 +-
 .../compaction/constant/CompactionTaskType.java}   |  10 +-
 .../exception/CompactionExceptionHandler.java      |   2 +-
 .../execute/task/AbstractCompactionTask.java       |   1 -
 .../execute/task/CrossSpaceCompactionTask.java     |  39 +--
 .../execute/task/InnerSpaceCompactionTask.java     |   6 +-
 .../compaction/execute/utils/CompactionUtils.java  |  22 +-
 .../execute/utils/MultiTsFileDeviceIterator.java   |  16 +-
 .../compaction/schedule/CompactionScheduler.java   |  49 ++-
 .../compaction/schedule/CompactionTaskManager.java |  71 ++++-
 .../compaction/schedule/CompactionWorker.java      |   3 -
 .../impl/RewriteCrossSpaceCompactionSelector.java  |  26 +-
 .../impl/SizeTieredCompactionSelector.java         |   2 +-
 .../utils/CrossSpaceCompactionCandidate.java       |   6 +-
 .../memtable/AlignedWritableMemChunkGroup.java     |  10 +-
 .../db/engine/memtable/WritableMemChunkGroup.java  |  10 +-
 .../iotdb/db/engine/migration/MigrationTask.java   |   2 +-
 .../db/engine/migration/MigrationTaskManager.java  |   2 +-
 .../db/engine/settle/SettleRequestHandler.java     |   2 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 327 +++++----------------
 .../db/engine/storagegroup/TsFileManager.java      |   1 +
 .../db/engine/storagegroup/TsFileResource.java     |  40 ++-
 .../db/engine/storagegroup/TsFileResourceList.java |  62 ++--
 .../engine/storagegroup/TsFileResourceStatus.java  |   3 +-
 .../metadata/SchemaQuotaExceededException.java     |  37 +--
 .../db/exception/query/OutOfTTLException.java      |   4 +-
 .../db/metadata/cache/DataNodeSchemaCache.java     |  18 +-
 .../metadata/cache/DataNodeSchemaCacheMetrics.java |  31 +-
 .../cache/DeviceUsingTemplateSchemaCache.java      |  50 +++-
 .../iotdb/db/metadata/cache/SchemaCacheEntry.java  |  36 ++-
 .../db/metadata/cache/TimeSeriesSchemaCache.java   |   9 +-
 .../dualkeycache/impl/DualKeyCacheBuilder.java     |  18 +-
 .../dualkeycache/impl/DualKeyCachePolicy.java      |   3 +-
 .../dualkeycache/impl/FIFOCacheEntryManager.java   | 189 ++++++++++++
 .../metadata/mnode/mem/impl/LogicalViewMNode.java  |   3 +-
 .../metadata/mnode/mem/info/LogicalViewInfo.java   |  12 +-
 .../iotdb/db/metadata/mtree/ConfigMTree.java       |   8 +-
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  33 ++-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  32 +-
 .../mtree/snapshot/MemMTreeSnapshotUtil.java       |   8 +-
 .../db/metadata/mtree/store/CachedMTreeStore.java  |   3 +-
 .../db/metadata/mtree/store/MemMTreeStore.java     |   4 +
 .../db/metadata/mtree/traverser/Traverser.java     |   7 +
 .../mtree/traverser/basic/EntityTraverser.java     |  31 +-
 .../traverser/basic/MeasurementTraverser.java      |  36 ++-
 .../impl/SchemaRegionPlanDeserializer.java         |   2 +-
 .../impl/SchemaRegionPlanSerializer.java           |   2 +-
 .../impl/SchemaRegionPlanTxtSerializer.java        |   2 +-
 .../impl/read/SchemaRegionReadPlanFactory.java     |  37 +--
 .../impl/read/ShowDevicesPlanImpl.java             |  15 +-
 .../impl/read/ShowTimeSeriesPlanImpl.java          |  37 +--
 .../impl/write/CreateLogicalViewPlanImpl.java      |   2 +-
 .../plan/schemaregion/read/IShowDevicesPlan.java   |   4 +
 .../schemaregion/read/IShowTimeSeriesPlan.java     |   7 +-
 .../schemaregion/result/ShowTimeSeriesResult.java  |   3 +-
 .../schemaregion/write/ICreateLogicalViewPlan.java |   2 +-
 .../rescon/DataNodeSchemaQuotaManager.java         |  92 ++++++
 .../db/metadata/schemaregion/ISchemaRegion.java    |  16 +-
 .../db/metadata/schemaregion/SchemaEngine.java     |  92 ++++--
 .../schemaregion/SchemaRegionMemoryImpl.java       |  88 ++----
 .../schemaregion/SchemaRegionSchemaFileImpl.java   |  76 ++---
 .../apache/iotdb/db/metadata/tag/TagManager.java   |  16 +-
 .../metadata/template/ClusterTemplateManager.java  |   4 +-
 .../template/alter/TemplateExtendInfo.java         |  78 +++++
 .../apache/iotdb/db/metadata/utils/MetaUtils.java  |   3 +-
 .../visitor/GetSourcePathsVisitor.java             |  13 +-
 .../visitor/TransformToExpressionVisitor.java      |  67 ++---
 .../metadata/visitor/SchemaExecutionVisitor.java   |   2 +-
 .../mpp/common/schematree/ClusterSchemaTree.java   |   6 +-
 .../db/mpp/common/schematree/DeviceSchemaInfo.java |  18 +-
 .../common/schematree/IMeasurementSchemaInfo.java  |   8 +-
 .../common/schematree/MeasurementSchemaInfo.java   |  22 +-
 .../schematree/node/SchemaMeasurementNode.java     |  42 ++-
 .../iotdb/db/mpp/execution/driver/Driver.java      |  15 +-
 .../execution/executor/RegionWriteExecutor.java    | 124 +++++---
 .../process/join/HorizontallyConcatOperator.java   |   8 +-
 .../operator/schema/source/DeviceSchemaSource.java |  13 +-
 .../schema/source/SchemaSourceFactory.java         |  27 +-
 .../schema/source/TimeSeriesSchemaSource.java      |  17 +-
 .../mpp/metric/TimeSeriesMetadataCacheMetrics.java |   4 +-
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |   2 +-
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java |  16 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 100 ++++++-
 .../analyze/schema/AutoCreateSchemaExecutor.java   |  10 +-
 .../analyze/schema/ClusterSchemaFetchExecutor.java |   7 +-
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  50 +++-
 .../mpp/plan/analyze/schema/SchemaValidator.java   |  27 +-
 .../db/mpp/plan/execution/IQueryExecution.java     |   1 +
 .../db/mpp/plan/execution/QueryExecution.java      |  13 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  26 +-
 .../visitor/TransformToViewExpressionVisitor.java  |  52 ++--
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       | 107 ++++---
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |  47 ++-
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  87 +++---
 .../db/mpp/plan/planner/OperatorTreeGenerator.java |  21 +-
 .../plan/node/load/LoadSingleTsFileNode.java       |   4 +
 .../node/metedata/read/DevicesSchemaScanNode.java  |  23 +-
 .../metedata/read/LevelTimeSeriesCountNode.java    |  47 +--
 .../node/metedata/read/TimeSeriesCountNode.java    |  43 +--
 .../metedata/read/TimeSeriesSchemaScanNode.java    |  95 ++----
 .../node/metedata/write/CreateLogicalViewNode.java |   2 +-
 .../write/InternalCreateMultiTimeSeriesNode.java   |   1 +
 .../planner/plan/node/write/BatchInsertNode.java   |  33 ---
 .../plan/node/write/InsertMultiTabletsNode.java    |  22 +-
 .../plan/planner/plan/node/write/InsertNode.java   | 131 +--------
 .../planner/plan/node/write/InsertRowNode.java     | 171 ++---------
 .../planner/plan/node/write/InsertRowsNode.java    |  33 +--
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  32 +-
 .../planner/plan/node/write/InsertTabletNode.java  | 145 ++-------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   5 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   8 +-
 .../plan/statement/crud/InsertBaseStatement.java   | 157 +++++++++-
 .../crud/InsertMultiTabletsStatement.java          |  30 ++
 .../plan/statement/crud/InsertRowStatement.java    | 183 +++++++++++-
 .../crud/InsertRowsOfOneDeviceStatement.java       |  42 +++
 .../plan/statement/crud/InsertRowsStatement.java   |  41 +++
 .../plan/statement/crud/InsertTabletStatement.java | 155 +++++++++-
 .../db/mpp/plan/statement/crud/QueryStatement.java |  55 ++--
 .../metadata/CountLevelTimeSeriesStatement.java    |  30 +-
 .../metadata/CountTimeSeriesStatement.java         |  30 +-
 .../statement/metadata/ShowDevicesStatement.java   |  10 +
 .../metadata/ShowTimeSeriesStatement.java          |  30 +-
 .../org/apache/iotdb/db/pipe/agent/PipeAgent.java  |  12 +
 .../db/pipe/agent/plugin/PipePluginAgent.java      |   2 +-
 .../IoTDBThriftReceiver.java}                      |  24 +-
 .../db/pipe/agent/receiver/PipeReceiverAgent.java  |  90 ++++++
 .../db/pipe/agent/runtime/MetaSyncScheduler.java   |  22 --
 .../iotdb/db/pipe/agent/runtime/PipeLauncher.java  | 175 +++++++++++
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  47 ++-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 242 ++++++++++++++-
 .../db/pipe/config/PipeCollectorConstant.java      |   5 +-
 .../apache/iotdb/db/pipe/config/PipeConfig.java    |  22 ++
 .../db/pipe/config/PipeConnectorConstant.java      |   3 +
 .../core/collector/IoTDBDataRegionCollector.java   |  39 ++-
 .../PipeHistoricalDataRegionTsFileCollector.java   |  13 +-
 .../realtime/PipeRealtimeDataRegionCollector.java  |   6 +-
 .../PipeRealtimeDataRegionHybridCollector.java     |  78 +++--
 .../realtime/assigner/PipeDataRegionAssigner.java  |  19 +-
 .../listener/PipeInsertionDataNodeListener.java    |   4 +-
 .../matcher/CachedSchemaPatternMatcher.java        |   6 +-
 .../realtime/matcher/PipeDataRegionMatcher.java    |   9 +-
 .../impl/iotdb/IoTDBThriftConnectorClient.java     |  70 +++++
 .../impl/iotdb/IoTDBThriftConnectorVersion.java}   |  16 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      | 231 +++++++++++++++
 .../impl/iotdb/v1/IoTDBThriftReceiverV1.java       | 275 +++++++++++++++++
 .../connector/impl/iotdb/v1/PipeRequestType.java}  |  38 ++-
 .../v1/reponse/PipeTransferFilePieceResp.java      |  80 +++++
 .../iotdb/v1/request/PipeTransferFilePieceReq.java |  88 ++++++
 .../iotdb/v1/request/PipeTransferFileSealReq.java  |  79 +++++
 .../iotdb/v1/request/PipeTransferHandshakeReq.java |  71 +++++
 .../v1/request/PipeTransferInsertNodeReq.java      | 102 +++++++
 .../PipeConnectorSubtaskLifeCycle.java             |   2 +-
 .../{ => manager}/PipeConnectorSubtaskManager.java |  37 ++-
 .../iotdb/db/pipe/core/event/EnrichedEvent.java    |  54 ++++
 .../core/event/impl/PipeTabletInsertionEvent.java  |  31 +-
 .../core/event/impl/PipeTsFileInsertionEvent.java  |  85 +++++-
 .../event/realtime/PipeRealtimeCollectEvent.java   |  20 +-
 .../realtime/PipeRealtimeCollectEventFactory.java  |   8 +-
 .../event/view/collector/PipeEventCollector.java   |  22 +-
 .../execution/executor/PipeSubtaskExecutor.java    |  41 +--
 .../execution/scheduler/PipeSubtaskScheduler.java  |  89 ++++++
 .../execution/scheduler/PipeTaskScheduler.java     |  74 -----
 .../db/pipe/resource/PipeFileResourceManager.java  | 186 ++++++++++++
 .../db/pipe/resource/PipeResourceManager.java      |  16 +-
 .../iotdb/db/pipe/resource/PipeTsFileHolder.java   |  22 --
 .../iotdb/db/pipe/resource/PipeWALHolder.java      |  22 --
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |   8 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |   2 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  50 +++-
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |  11 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |  59 +++-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |  34 ++-
 .../db/pipe/task/subtask/PipeProcessorSubtask.java |  31 +-
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  62 +++-
 .../java/org/apache/iotdb/db/service/DataNode.java | 103 +------
 .../metrics/recorder/CompactionMetricsManager.java |  81 ++---
 .../db/service/thrift/ProcessorWithMetrics.java    |   6 +-
 .../service/thrift/impl/ClientRPCServiceImpl.java  | 209 ++++++++-----
 .../impl/DataNodeInternalRPCServiceImpl.java       |  23 +-
 .../db/sync/common/ClusterSyncInfoFetcher.java     |  11 +-
 .../apache/iotdb/db/tools/IoTDBDataDirViewer.java  |   4 +-
 .../iotdb/db/tools/TsFileResourcePrinter.java      |   4 +-
 .../iotdb/db/tools/TsFileSplitByPartitionTool.java |   2 +-
 .../org/apache/iotdb/db/tools/TsFileSplitTool.java |   2 +-
 .../db/tools/settle/TsFileAndModSettleTool.java    |   6 +-
 .../db/tools/validate/TsFileValidationTool.java    |  16 +-
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |   2 +-
 .../org/apache/iotdb/db/utils/UpgradeUtils.java    |   2 +-
 .../db/utils/datastructure/AlignedTVList.java      |   6 +-
 .../datastructure/FixedPriorityBlockingQueue.java  |  10 +
 .../iotdb/db/wal/buffer/AbstractWALBuffer.java     |  26 +-
 .../org/apache/iotdb/db/wal/buffer/WALBuffer.java  |  32 +-
 .../org/apache/iotdb/db/wal/buffer/WALEntry.java   |   4 +-
 .../iotdb/db/wal/checkpoint/CheckpointManager.java | 118 ++++++--
 .../iotdb/db/wal/checkpoint/MemTableInfo.java      |  28 +-
 .../exception/MemTablePinException.java}           |  16 +-
 .../exception/WALPipeException.java}               |  16 +-
 .../apache/iotdb/db/wal/io/WALByteBufReader.java   |   6 +-
 .../org/apache/iotdb/db/wal/node/WALFakeNode.java  |  24 +-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java |  57 +++-
 .../db/wal/recover/file/TsFilePlanRedoer.java      |   4 -
 .../iotdb/db/wal/utils/WALEntryPosition.java       | 157 ++++++++++
 .../apache/iotdb/db/wal/utils/WALFileUtils.java    |  11 +
 .../iotdb/db/wal/utils/WALInsertNodeCache.java     | 170 +++++++++++
 .../apache/iotdb/db/wal/utils/WALPipeHandler.java  | 132 +++++++++
 .../db/wal/utils/listener/WALFlushListener.java    |  13 +-
 .../iotdb/db/engine/cache/ChunkCacheTest.java      |   4 +-
 .../engine/compaction/AbstractCompactionTest.java  |  62 ++--
 .../FastCrossCompactionPerformerTest.java          |   5 +
 .../db/engine/compaction/MemoryControlTest.java    |  75 +++++
 .../ReadPointCompactionPerformerTest.java          |  16 +-
 .../cross/CrossSpaceCompactionExceptionTest.java   |   7 +-
 .../cross/CrossSpaceCompactionSelectorTest.java    | 171 +++++++++++
 .../CrossSpaceCompactionWithFastPerformerTest.java |   6 +-
 ...eCompactionWithFastPerformerValidationTest.java |  20 +-
 ...sSpaceCompactionWithReadPointPerformerTest.java |   6 +-
 .../db/engine/compaction/cross/MergeTest.java      |   6 +-
 .../cross/RewriteCompactionFileSelectorTest.java   |  46 +--
 .../RewriteCrossSpaceCompactionRecoverTest.java    |  10 +-
 ...eCrossSpaceCompactionWithFastPerformerTest.java |  21 +-
 ...sSpaceCompactionWithReadPointPerformerTest.java |  21 +-
 .../inner/AbstractInnerSpaceCompactionTest.java    |   4 +-
 .../inner/InnerCompactionMoreDataTest.java         |   2 +-
 .../compaction/inner/InnerCompactionTest.java      |   4 +-
 .../ReadChunkCompactionPerformerAlignedTest.java   |  76 +++++
 .../SizeTieredCompactionRecoverTest.java           |  52 ++--
 .../SizeTieredCompactionSelectorTest.java          |  38 +++
 .../inner/sizetiered/SizeTieredCompactionTest.java |  10 +-
 .../compaction/utils/CompactionCheckerUtils.java   |   4 +
 .../utils/MultiTsFileDeviceIteratorTest.java       |  26 +-
 .../db/engine/snapshot/IoTDBSnapshotTest.java      |   2 +-
 .../db/engine/storagegroup/DataRegionTest.java     |  16 +-
 .../engine/storagegroup/FakedTsFileResource.java   |   2 +-
 .../storagegroup/TsFileResourceListTest.java       | 110 +++++++
 .../db/engine/storagegroup/TsFileResourceTest.java |   2 +-
 .../cache/dualkeycache/DualKeyCacheTest.java       |  17 ++
 .../schemaRegion/SchemaRegionBasicTest.java        | 167 ++++++++++-
 .../schemaRegion/SchemaRegionTestUtil.java         |   4 +-
 .../metadata/view/ViewExpressionToStringTest.java  |  16 +-
 .../execution/operator/AlignedSeriesTestUtil.java  |   6 +-
 .../mpp/execution/operator/OperatorMemoryTest.java |  34 +++
 .../schema/SchemaFetchScanOperatorTest.java        |   3 +-
 .../schema/SchemaQueryScanOperatorTest.java        |   4 +-
 .../db/mpp/plan/analyze/FakeSchemaFetcherImpl.java |   7 +-
 .../iotdb/db/mpp/plan/plan/LogicalPlannerTest.java | 120 +++++++-
 .../distribution/DistributionPlannerBasicTest.java |  10 +-
 .../read/DeviceSchemaScanNodeSerdeTest.java        |   3 +-
 .../metadata/read/SchemaCountNodeSerdeTest.java    |   8 +-
 .../read/TimeSeriesSchemaScanNodeSerdeTest.java    |   3 +-
 .../collector/CachedSchemaPatternMatcherTest.java  |  12 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  73 +++--
 .../pipe/core/connector/PipeThriftRequestTest.java | 118 ++++++++
 .../pipe/resource/PipeFileResourceManagerTest.java | 218 ++++++++++++++
 .../AlignedSeriesScanLimitOffsetPushDownTest.java  |  10 +-
 .../query/reader/series/SeriesReaderTestUtil.java  |   6 +-
 .../series/SeriesScanLimitOffsetPushDownTest.java  |   8 +-
 .../iotdb/db/rescon/ResourceManagerTest.java       |  12 +-
 .../db/utils/datastructure/VectorTVListTest.java   |  11 +
 .../org/apache/iotdb/db/wal/io/WALFileTest.java    |  25 +-
 .../iotdb/db/wal/node/ConsensusReqReaderTest.java  |  27 +-
 .../org/apache/iotdb/db/wal/node/WALNodeTest.java  |  33 +--
 .../iotdb/db/wal/node/WALPipeHandlerTest.java      | 256 ++++++++++++++++
 .../db/wal/recover/file/TsFilePlanRedoerTest.java  |  32 +-
 .../file/UnsealedTsFileRecoverPerformerTest.java   |   5 +-
 .../iotdb/db/wal/utils/WALInsertNodeCacheTest.java | 167 +++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  11 +-
 .../java/org/apache/iotdb/session/Session.java     |   4 +
 .../org/apache/iotdb/session/pool/SessionPool.java | 319 +++++++++++++++++++-
 spark-iotdb-connector/pom.xml                      |  84 +-----
 spark-iotdb-connector/{ => scala_2.11}/pom.xml     | 108 +++----
 spark-iotdb-connector/{ => scala_2.12}/pom.xml     | 103 +++----
 .../src/test/resources/iotdb-datanode.properties   |  23 --
 .../apache/iotdb/spark/db/EnvironmentUtils.java    | 264 -----------------
 .../org/apache/iotdb/spark/db/IoTDBTest.scala      | 150 ----------
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 118 --------
 .../iotdb/spark/db/unit/DataFrameToolsTest.scala   |  96 ------
 spark-tsfile/pom.xml                               |  50 +---
 .../iotdb/spark/tsfile/qp/common/SQLConstant.java  | 118 ++++----
 .../src/main/thrift/confignode.thrift              |  14 +-
 thrift/src/main/thrift/client.thrift               |  13 +
 thrift/src/main/thrift/datanode.thrift             |  18 +-
 .../java/org/apache/iotdb/tsfile/utils/Binary.java |  20 +-
 .../tsfile/write/schema/IMeasurementSchema.java    |   4 +
 .../tsfile/write/schema/MeasurementSchema.java     |  10 +
 .../tsfile/write/schema/MeasurementSchemaType.java |  19 +-
 .../write/schema/VectorMeasurementSchema.java      |  10 +
 484 files changed, 13275 insertions(+), 4969 deletions(-)

diff --cc 
node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 6a9360b93c6,d156287e1f3..d29ec62027f
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@@ -71,8 -71,7 +71,9 @@@ public enum ThreadName 
    PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
    PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
    PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
 +  MIGRATION_SCHEDULER("Migration-Scheduler"),
 +  MIGRATION("Migration-Executor-Pool"),
+   PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
    ;
  
    private final String name;
diff --cc 
node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index b7a6e3aceaa,7bde2af3102..d350305af87
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@@ -75,9 -75,8 +75,9 @@@ public enum ServiceType 
    IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
    PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
        "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
-   MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"),
-   MIGRATION_SERVICE("Migration Manager", "Migration Manager");
- 
++  MIGRATION_SERVICE("Migration Manager", "Migration Manager"),
+   PIPE_RUNTIME_AGENT("Pipe Runtime Agent", "PipeRuntimeAgent"),
+   MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
    private final String name;
    private final String jmxName;
  
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 677f7c3d0ec,838e8e3e232..406f1dd4d09
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@@ -1106,16 -1128,40 +1129,48 @@@ public class IoTDBConfig 
     */
    private String RateLimiterType = "FixedIntervalRateLimiter";
  
 +  private int migrateThreadCount = 3;
 +
 +  private String objectStorageName = "aws_s3";
 +  private String objectStorageBucket = "iotdb";
 +  private String objectStorageEndpoiont = "yourEndpoint";
 +  private String objectStorageAccessKey = "yourAccessKey";
 +  private String objectStorageAccessSecret = "yourAccessSecret";
 +
    IoTDBConfig() {}
  
+   public int getMaxLogEntriesNumPerBatch() {
+     return maxLogEntriesNumPerBatch;
+   }
+ 
+   public int getMaxSizePerBatch() {
+     return maxSizePerBatch;
+   }
+ 
+   public int getMaxPendingBatchesNum() {
+     return maxPendingBatchesNum;
+   }
+ 
+   public double getMaxMemoryRatioForQueue() {
+     return maxMemoryRatioForQueue;
+   }
+ 
+   public void setMaxLogEntriesNumPerBatch(int maxLogEntriesNumPerBatch) {
+     this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
+   }
+ 
+   public void setMaxSizePerBatch(int maxSizePerBatch) {
+     this.maxSizePerBatch = maxSizePerBatch;
+   }
+ 
+   public void setMaxPendingBatchesNum(int maxPendingBatchesNum) {
+     this.maxPendingBatchesNum = maxPendingBatchesNum;
+   }
+ 
+   public void setMaxMemoryRatioForQueue(double maxMemoryRatioForQueue) {
+     this.maxMemoryRatioForQueue = maxMemoryRatioForQueue;
+   }
+ 
    public float getUdfMemoryBudgetInMB() {
      return udfMemoryBudgetInMB;
    }
@@@ -3819,31 -3894,19 +3909,47 @@@
      return sortTmpDir;
    }
  
 +  public int getMigrateThreadCount() {
 +    return migrateThreadCount;
 +  }
 +
 +  public void setMigrateThreadCount(int migrateThreadCount) {
 +    this.migrateThreadCount = migrateThreadCount;
 +  }
 +
 +  public String getObjectStorageName() {
 +    return objectStorageName;
 +  }
 +
 +  public String getObjectStorageBucket() {
 +    return objectStorageBucket;
 +  }
 +
 +  public String getObjectStorageEndpoiont() {
 +    return objectStorageEndpoiont;
 +  }
 +
 +  public String getObjectStorageAccessKey() {
 +    return objectStorageAccessKey;
 +  }
 +
 +  public String getObjectStorageAccessSecret() {
 +    return objectStorageAccessSecret;
 +  }
++
+   public String getClusterSchemaLimitLevel() {
+     return clusterSchemaLimitLevel;
+   }
+ 
+   public void setClusterSchemaLimitLevel(String clusterSchemaLimitLevel) {
+     this.clusterSchemaLimitLevel = clusterSchemaLimitLevel;
+   }
+ 
+   public long getClusterSchemaLimitThreshold() {
+     return clusterSchemaLimitThreshold;
+   }
+ 
+   public void setClusterSchemaLimitThreshold(long 
clusterSchemaLimitThreshold) {
+     this.clusterSchemaLimitThreshold = clusterSchemaLimitThreshold;
+   }
  }
diff --cc server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5d57b5bd88f,e0bd3c536f3..595b7d71915
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@@ -357,15 -369,9 +368,14 @@@ public class IoTDBDescriptor 
  
      conf.setQueryDir(
          FilePathUtils.regularizePath(conf.getSystemDir() + 
IoTDBConstant.QUERY_FOLDER_NAME));
--
 -    conf.setDataDirs(
 -        properties.getProperty("dn_data_dirs", String.join(",", 
conf.getDataDirs())).split(","));
 +    String[] defaultTierDirs = new String[conf.getTierDataDirs().length];
 +    for (int i = 0; i < defaultTierDirs.length; ++i) {
 +      defaultTierDirs[i] = String.join(",", conf.getTierDataDirs()[i]);
 +    }
 +    conf.setTierDataDirs(
 +        parseDataDirs(
 +            properties.getProperty(
 +                "dn_data_dirs", String.join(IoTDBConstant.TIER_SEPARATOR, 
defaultTierDirs))));
  
      conf.setConsensusDir(properties.getProperty("dn_consensus_dir", 
conf.getConsensusDir()));
  
diff --cc 
server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index bb0a2c8ee44,805abe2f1d8..105e03637c4
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@@ -114,7 -114,7 +114,7 @@@ public class SizeTieredCompactionSelect
          selectedFileSize = 0L;
          continue;
        }
-       if (currentFile.getStatus() != TsFileResourceStatus.CLOSED || 
currentFile.isMigrating()) {
 -      if (currentFile.getStatus() != TsFileResourceStatus.NORMAL) {
++      if (currentFile.getStatus() != TsFileResourceStatus.NORMAL || 
currentFile.isMigrating()) {
          selectedFileList.clear();
          selectedFileSize = 0L;
          continue;
diff --cc 
server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
index 329c778f120,9125c9bd8a3..42a91acc792
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
@@@ -142,9 -142,7 +142,9 @@@ public class CrossSpaceCompactionCandid
    private List<TsFileResourceCandidate> 
filterUnseqResource(List<TsFileResource> unseqResources) {
      List<TsFileResourceCandidate> ret = new ArrayList<>();
      for (TsFileResource resource : unseqResources) {
-       if (resource.getStatus() != TsFileResourceStatus.CLOSED
 -      if (resource.getStatus() != TsFileResourceStatus.NORMAL || 
!resource.getTsFile().exists()) {
++      if (resource.getStatus() != TsFileResourceStatus.NORMAL
 +          || resource.isMigrating()
 +          || !resource.getTsFile().exists()) {
          break;
        } else if (resource.stillLives(ttlLowerBound)) {
          ret.add(new TsFileResourceCandidate(resource));
@@@ -200,8 -198,7 +200,8 @@@
        // although we do the judgement here, the task should be validated 
before executing because
        // the status of file may be changed after the task is submitted to 
queue
        this.isValidCandidate =
-           tsFileResource.getStatus() == TsFileResourceStatus.CLOSED
+           tsFileResource.getStatus() == TsFileResourceStatus.NORMAL
 +              && !tsFileResource.isMigrating()
                && tsFileResource.getTsFile().exists();
      }
  
diff --cc 
server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index 0571d9d47f7,00000000000..998e903925c
mode 100644,000000..100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@@ -1,93 -1,0 +1,93 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.engine.migration;
 +
 +import org.apache.iotdb.db.engine.modification.ModificationFile;
 +import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 +import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 +import org.apache.iotdb.tsfile.utils.FSUtils;
 +
 +import java.io.File;
 +
 +public abstract class MigrationTask implements Runnable {
 +  protected static final FSFactory fsFactory = 
FSFactoryProducer.getFSFactory();
 +
 +  protected final MigrationCause cause;
 +  protected final TsFileResource tsFile;
 +  protected final String targetDir;
 +
 +  protected final File srcTsFile;
 +  protected final File destTsFile;
 +  protected final File srcResourceFile;
 +  protected final File destResourceFile;
 +  protected final File srcModsFile;
 +  protected final File destModsFile;
 +
 +  MigrationTask(MigrationCause cause, TsFileResource tsFile, String 
targetDir) {
 +    this.cause = cause;
 +    this.tsFile = tsFile;
 +    this.targetDir = targetDir;
 +    this.srcTsFile = tsFile.getTsFile();
 +    this.destTsFile = fsFactory.getFile(targetDir, 
tsFile.getTsFile().getName());
 +    this.srcResourceFile =
 +        fsFactory.getFile(
 +            srcTsFile.getParentFile(), srcTsFile.getName() + 
TsFileResource.RESOURCE_SUFFIX);
 +    this.destResourceFile =
 +        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + 
TsFileResource.RESOURCE_SUFFIX);
 +    this.srcModsFile =
 +        fsFactory.getFile(
 +            srcTsFile.getParentFile(), srcTsFile.getName() + 
ModificationFile.FILE_SUFFIX);
 +    this.destModsFile =
 +        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + 
ModificationFile.FILE_SUFFIX);
 +  }
 +
 +  public static MigrationTask newTask(
 +      MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
 +    if (FSUtils.isLocal(targetDir)) {
 +      return new LocalMigrationTask(cause, sourceTsFile, targetDir);
 +    } else {
 +      return new RemoteMigrationTask(cause, sourceTsFile, targetDir);
 +    }
 +  }
 +
 +  @Override
 +  public void run() {
 +    if (canMigrate()) {
 +      tsFile.setIsMigrating(true);
 +      if (!canMigrate()) {
 +        tsFile.setIsMigrating(false);
 +        return;
 +      }
 +    } else {
 +      return;
 +    }
 +
 +    migrate();
 +
 +    tsFile.setIsMigrating(false);
 +  }
 +
 +  protected boolean canMigrate() {
-     return tsFile.getStatus() == TsFileResourceStatus.CLOSED;
++    return tsFile.getStatus() == TsFileResourceStatus.NORMAL;
 +  }
 +
 +  public abstract void migrate();
 +}
diff --cc 
server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index 8f3b2872a34,00000000000..4bd9712c263
mode 100644,000000..100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@@ -1,197 -1,0 +1,197 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.db.engine.migration;
 +
 +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 +import org.apache.iotdb.commons.concurrent.ThreadName;
 +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 +import org.apache.iotdb.commons.conf.CommonConfig;
 +import org.apache.iotdb.commons.conf.CommonDescriptor;
 +import org.apache.iotdb.commons.exception.StartupException;
 +import org.apache.iotdb.commons.service.IService;
 +import org.apache.iotdb.commons.service.ServiceType;
 +import org.apache.iotdb.db.conf.IoTDBConfig;
 +import org.apache.iotdb.db.conf.IoTDBDescriptor;
 +import org.apache.iotdb.db.conf.directories.TierManager;
 +import org.apache.iotdb.db.engine.StorageEngine;
 +import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 +import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 +import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 +import org.apache.iotdb.db.utils.DateTimeUtils;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +
 +public class MigrationTaskManager implements IService {
 +  private static final Logger logger = 
LoggerFactory.getLogger(MigrationTaskManager.class);
 +  private static final IoTDBConfig iotdbConfig = 
IoTDBDescriptor.getInstance().getConfig();
 +  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 +  private static final TierManager tierManager = TierManager.getInstance();
 +  private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60;
 +  private static final double TIER_DISK_SPACE_WARN_THRESHOLD =
 +      commonConfig.getDiskSpaceWarningThreshold() + 0.1;
 +  private static final double TIER_DISK_SPACE_SAFE_THRESHOLD =
 +      commonConfig.getDiskSpaceWarningThreshold() + 0.2;
 +  /** single thread to schedule */
 +  private ScheduledExecutorService scheduler;
 +  /** single thread to sync syncingBuffer to disk */
 +  private ExecutorService workers;
 +
 +  @Override
 +  public void start() throws StartupException {
 +    if (iotdbConfig.getTierDataDirs().length == 1) {
 +      logger.info("tiered storage status: disable");
 +      return;
 +    }
 +    scheduler =
 +        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
 +            ThreadName.MIGRATION_SCHEDULER.getName());
 +    workers =
 +        IoTDBThreadPoolFactory.newFixedThreadPool(
 +            iotdbConfig.getMigrateThreadCount(), 
ThreadName.MIGRATION.getName());
 +    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
 +        scheduler,
 +        () -> new MigrationScheduleTask().run(),
 +        CHECK_INTERVAL_IN_SECONDS,
 +        CHECK_INTERVAL_IN_SECONDS,
 +        TimeUnit.SECONDS);
 +  }
 +
 +  private class MigrationScheduleTask implements Runnable {
 +    private final long[] tierDiskTotalSpace = 
tierManager.getTierDiskTotalSpace();
 +    private final long[] tierDiskUsableSpace = 
tierManager.getTierDiskUsableSpace();
 +    private final Set<Integer> needMigrationTiers = new HashSet<>();
 +
 +    public MigrationScheduleTask() {
 +      for (int i = 0; i < tierManager.getTiersNum(); i++) {
 +        double usage = tierDiskUsableSpace[i] * 1.0 / tierDiskTotalSpace[i];
 +        if (usage <= TIER_DISK_SPACE_WARN_THRESHOLD) {
 +          needMigrationTiers.add(i);
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public void run() {
 +      schedule();
 +    }
 +
 +    private void schedule() {
 +      // submit migration tasks
 +      for (DataRegion dataRegion : 
StorageEngine.getInstance().getAllDataRegions()) {
 +        List<TsFileResource> tsfiles = dataRegion.getSequenceFileList();
 +        tsfiles.addAll(dataRegion.getUnSequenceFileList());
 +        tsfiles.sort(this::compareMigrationPriority);
 +        for (TsFileResource tsfile : tsfiles) {
 +          try {
 +            int tierLevel = tsfile.getTierLevel();
 +            // only migrate closed TsFiles not in the last tier
-             if (tsfile.getStatus() != TsFileResourceStatus.CLOSED
++            if (tsfile.getStatus() != TsFileResourceStatus.NORMAL
 +                || tierLevel == iotdbConfig.getTierDataDirs().length - 1) {
 +              continue;
 +            }
 +            // check tier ttl and disk space
 +            long tierTTL =
 +                DateTimeUtils.convertMilliTimeWithPrecision(
 +                    commonConfig.getTierTTLInMs()[tierLevel], 
iotdbConfig.getTimestampPrecision());
 +            if (tsfile.stillLives(tierTTL)) {
 +              submitMigrationTask(
 +                  tierLevel,
 +                  MigrationCause.TTL,
 +                  tsfile,
 +                  tierManager.getNextFolderForTsFile(tierLevel, 
tsfile.isSeq()));
 +            } else if (needMigrationTiers.contains(tierLevel)) {
 +              submitMigrationTask(
 +                  tierLevel,
 +                  MigrationCause.DISK_SPACE,
 +                  tsfile,
 +                  tierManager.getNextFolderForTsFile(tierLevel, 
tsfile.isSeq()));
 +            }
 +          } catch (Exception e) {
 +            logger.error(
 +                "An error occurred when checking migration of TsFileResource 
{}", tsfile, e);
 +          }
 +        }
 +      }
 +    }
 +
 +    private void submitMigrationTask(
 +        int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, 
String targetDir) {
 +      MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, 
targetDir);
 +      workers.submit(task);
 +      tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize();
 +      if (needMigrationTiers.contains(tierLevel)) {
 +        double usage = tierDiskUsableSpace[tierLevel] * 1.0 / 
tierDiskTotalSpace[tierLevel];
 +        if (usage > TIER_DISK_SPACE_SAFE_THRESHOLD) {
 +          needMigrationTiers.remove(tierLevel);
 +        }
 +      }
 +    }
 +
 +    private int compareMigrationPriority(TsFileResource f1, TsFileResource 
f2) {
 +      // old time partitions first
 +      int res = Long.compare(f1.getTimePartition(), f2.getTimePartition());
 +      // sequence files in one partition
 +      if (res == 0) {
 +        if (f1.isSeq() && !f2.isSeq()) {
 +          res = -1;
 +        } else if (!f1.isSeq() && f2.isSeq()) {
 +          res = 1;
 +        }
 +      }
 +      // old version files in one partition
 +      if (res == 0) {
 +        res = Long.compare(f1.getVersion(), f2.getVersion());
 +      }
 +      return res;
 +    }
 +  }
 +
 +  @Override
 +  public void stop() {
 +    if (scheduler != null) {
 +      scheduler.shutdownNow();
 +    }
 +    if (workers != null) {
 +      workers.shutdownNow();
 +    }
 +  }
 +
 +  @Override
 +  public ServiceType getID() {
 +    return ServiceType.MIGRATION_SERVICE;
 +  }
 +
 +  public static MigrationTaskManager getInstance() {
 +    return InstanceHolder.INSTANCE;
 +  }
 +
 +  private static class InstanceHolder {
 +    private InstanceHolder() {}
 +
 +    private static final MigrationTaskManager INSTANCE = new 
MigrationTaskManager();
 +  }
 +}
diff --cc 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index cb488eaf301,4b4d4cbd393..1cde01d65c5
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@@ -2689,60 -2536,26 +2554,26 @@@ public class DataRegion implements IDat
        boolean deleteOriginFile)
        throws LoadFileException, DiskSpaceInsufficientException {
      File targetFile;
-     switch (type) {
-       case LOAD_UNSEQUENCE:
-         targetFile =
-             fsFactory.getFile(
-                 TierManager.getInstance().getNextFolderForTsFile(0, false),
-                 databaseName
-                     + File.separatorChar
-                     + dataRegionId
-                     + File.separatorChar
-                     + filePartitionId
-                     + File.separator
-                     + tsFileResource.getTsFile().getName());
-         tsFileResource.setFile(targetFile);
-         if (tsFileManager.contains(tsFileResource, false)) {
-           logger.error("The file {} has already been loaded in unsequence 
list", tsFileResource);
-           return false;
-         }
-         tsFileManager.add(tsFileResource, false);
-         logger.info(
-             "Load tsfile in unsequence list, move file from {} to {}",
-             tsFileToLoad.getAbsolutePath(),
-             targetFile.getAbsolutePath());
-         break;
-       case LOAD_SEQUENCE:
-         targetFile =
-             fsFactory.getFile(
-                 TierManager.getInstance().getNextFolderForTsFile(0, true),
-                 databaseName
-                     + File.separatorChar
-                     + dataRegionId
-                     + File.separatorChar
-                     + filePartitionId
-                     + File.separator
-                     + tsFileResource.getTsFile().getName());
-         tsFileResource.setFile(targetFile);
-         if (tsFileManager.contains(tsFileResource, true)) {
-           logger.error("The file {} has already been loaded in sequence 
list", tsFileResource);
-           return false;
-         }
-         if (insertPos == -1) {
-           tsFileManager.insertToPartitionFileList(tsFileResource, 
filePartitionId, true, 0);
-         } else {
-           tsFileManager.insertToPartitionFileList(
-               tsFileResource, filePartitionId, true, insertPos + 1);
-         }
-         logger.info(
-             "Load tsfile in sequence list, move file from {} to {}",
-             tsFileToLoad.getAbsolutePath(),
-             targetFile.getAbsolutePath());
-         break;
-       default:
-         throw new LoadFileException(String.format("Unsupported type of 
loading tsfile : %s", type));
+     targetFile =
+         fsFactory.getFile(
 -            DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
++            TierManager.getInstance().getNextFolderForTsFile(0, false),
+             databaseName
+                 + File.separatorChar
+                 + dataRegionId
+                 + File.separatorChar
+                 + filePartitionId
+                 + File.separator
+                 + tsFileResource.getTsFile().getName());
+     tsFileResource.setFile(targetFile);
+     if (tsFileManager.contains(tsFileResource, false)) {
+       logger.error("The file {} has already been loaded in unsequence list", 
tsFileResource);
+       return false;
      }
+     tsFileManager.add(tsFileResource, false);
+     logger.info(
+         "Load tsfile in unsequence list, move file from {} to {}",
+         tsFileToLoad.getAbsolutePath(),
+         targetFile.getAbsolutePath());
  
      // move file from sync dir to data dir
      if (!targetFile.getParentFile().exists()) {
@@@ -2898,7 -2711,7 +2729,7 @@@
     * @param fileToBeUnloaded tsfile to be unloaded
     * @return whether the file to be unloaded exists. @UsedBy load external 
tsfile module.
     */
--  public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) {
++  public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) throws 
IOException {
      writeLock("unloadTsfile");
      TsFileResource tsFileResourceToBeMoved = null;
      try {
diff --cc 
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 9f2bca44cfb,0e5949f41b4..9ffc5fca31c
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@@ -585,7 -576,7 +590,7 @@@ public class TsFileResource 
      return true;
    }
  
--  void moveTo(File targetDir) {
++  void moveTo(File targetDir) throws IOException {
      fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName()));
      fsFactory.moveFile(
          fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
@@@ -632,18 -623,10 +637,18 @@@
      return this.status == TsFileResourceStatus.COMPACTION_CANDIDATE;
    }
  
 +  public boolean isMigrating() {
 +    return isMigrating;
 +  }
 +
 +  public void setIsMigrating(boolean isMigrating) {
 +    this.isMigrating = isMigrating;
 +  }
 +
    public void setStatus(TsFileResourceStatus status) {
      switch (status) {
-       case CLOSED:
-         this.status = TsFileResourceStatus.CLOSED;
+       case NORMAL:
+         this.status = TsFileResourceStatus.NORMAL;
          break;
        case UNCLOSED:
          this.status = TsFileResourceStatus.UNCLOSED;
diff --cc server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 9c90bc94304,db28843d7c7..829494c6375
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@@ -86,10 -82,7 +84,9 @@@ import org.apache.iotdb.db.wal.WALManag
  import org.apache.iotdb.db.wal.utils.WALMode;
  import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
  import org.apache.iotdb.metrics.utils.InternalReporterType;
 +import org.apache.iotdb.os.HybridFileInputFactoryDecorator;
- import org.apache.iotdb.pipe.api.exception.PipeManagementException;
  import org.apache.iotdb.rpc.TSStatusCode;
 +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
  import org.apache.iotdb.udf.api.exception.UDFManagementException;
  
  import org.apache.thrift.TException;
@@@ -554,7 -540,8 +551,10 @@@ public class DataNode implements DataNo
      registerManager.register(RegionMigrateService.getInstance());
  
      registerManager.register(CompactionTaskManager.getInstance());
++
 +    registerManager.register(MigrationTaskManager.getInstance());
+ 
+     registerManager.register(PipeAgent.runtime());
    }
  
    /** set up RPC and protocols after DataNode is available */

Reply via email to