This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbe596118ae8df1e2e77b79d913eedbc9e0b6273
Author: HeimingZ <[email protected]>
AuthorDate: Mon May 22 19:53:50 2023 +0800

    add migration
---
 .../resources/conf/iotdb-common.properties         |  18 ++
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../apache/iotdb/commons/service/ServiceType.java  |   4 +-
 .../org/apache/iotdb/os/fileSystem/OSFile.java     |  13 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  55 ++++--
 .../iotdb/db/conf/directories/FolderManager.java   |   4 +-
 .../iotdb/db/conf/directories/TierManager.java     | 127 ++++++++++++-
 .../impl/SizeTieredCompactionSelector.java         |   2 +-
 .../utils/CrossSpaceCompactionCandidate.java       |   5 +-
 .../db/engine/migration/LocalMigrationTask.java    |  70 ++++++++
 .../iotdb/db/engine/migration/MigrationCause.java  |  24 +++
 .../iotdb/db/engine/migration/MigrationTask.java   |  93 ++++++++++
 .../db/engine/migration/MigrationTaskManager.java  | 196 +++++++++++++++++++++
 .../db/engine/migration/RemoteMigrationTask.java   |  60 +++++++
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   4 +-
 .../engine/storagegroup/TsFileNameGenerator.java   |   5 +-
 .../db/engine/storagegroup/TsFileResource.java     |  30 +++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +
 .../tsfile/fileSystem/fsFactory/FSFactory.java     |  10 +-
 .../tsfile/fileSystem/fsFactory/HDFSFactory.java   |   3 +
 .../fileSystem/fsFactory/HybridFSFactory.java      |   5 +
 .../fileSystem/fsFactory/LocalFSFactory.java       |   5 +
 .../tsfile/fileSystem/fsFactory/OSFSFactory.java   |   3 +
 .../org/apache/iotdb/tsfile/utils/FSUtils.java     |   6 +-
 24 files changed, 703 insertions(+), 43 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index d0e00cb5d38..a3c906e4cf0 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -174,6 +174,14 @@ cluster_name=defaultCluster
 # Datatype: long
 # heartbeat_interval_in_ms=1000
 
+####################
+### Disk management
+####################
+
+# thread pool size for migrate operation in the DataNode's data directories.
+# Datatype: int
+# migrate_thread_count=3
+
 # Disk remaining threshold at which DataNode is set to ReadOnly status
 # Datatype: double(percentage)
 # disk_space_warning_threshold=0.05
@@ -1125,3 +1133,13 @@ cluster_name=defaultCluster
 
 # Datatype: int
 # influxdb_rpc_port=8086
+
+####################
+### Object storage management
+####################
+
+# object_storage_name=aws_s3
+# object_storage_bucket=iotdb
+# object_storage_endpoiont=yourEndpoint
+# object_storage_access_key=yourAccessKey
+# object_storage_access_secret=yourAccessSecret
\ No newline at end of file
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 5d6fa3b53b4..6a9360b93c6 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -71,6 +71,8 @@ public enum ThreadName {
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
+  MIGRATION_SCHEDULER("Migration-Scheduler"),
+  MIGRATION("Migration-Executor-Pool"),
   ;
 
   private final String name;
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 94d524defcf..b7a6e3aceaa 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -75,7 +75,9 @@ public enum ServiceType {
   IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
   PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
       "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
-  MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
+  MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"),
+  MIGRATION_SERVICE("Migration Manager", "Migration Manager");
+
   private final String name;
   private final String jmxName;
 
diff --git 
a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java 
b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index 31de138b9a5..d04eaa397ea 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -260,12 +260,12 @@ public class OSFile extends File {
 
   @Override
   public boolean mkdir() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    return true;
   }
 
   @Override
   public boolean mkdirs() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    return true;
   }
 
   @Override
@@ -326,17 +326,20 @@ public class OSFile extends File {
 
   @Override
   public long getTotalSpace() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    // object storage has infinity space
+    return Long.MAX_VALUE;
   }
 
   @Override
   public long getFreeSpace() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    // object storage has infinity space
+    return Long.MAX_VALUE;
   }
 
   @Override
   public long getUsableSpace() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    // object storage has infinity space
+    return Long.MAX_VALUE;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fed4db2feb9..1b042e42b25 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.audit.AuditLogOperation;
 import org.apache.iotdb.db.audit.AuditLogStorage;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import 
org.apache.iotdb.db.engine.compaction.execute.performer.constant.CrossCompactionPerformer;
 import 
org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
 import 
org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
@@ -1105,6 +1106,12 @@ public class IoTDBConfig {
    */
   private String RateLimiterType = "FixedIntervalRateLimiter";
 
+  private String objectStorageName = "aws_s3";
+  private String objectStorageBucket = "iotdb";
+  private String objectStorageEndpoiont = "yourEndpoint";
+  private String objectStorageAccessKey = "yourAccessKey";
+  private String objectStorageAccessSecret = "yourAccessSecret";
+
   IoTDBConfig() {}
 
   public float getUdfMemoryBudgetInMB() {
@@ -1211,17 +1218,18 @@ public class IoTDBConfig {
   private void formulateDataDirs(String[][] tierDataDirs) {
     for (int i = 0; i < tierDataDirs.length; i++) {
       for (int j = 0; j < tierDataDirs[i].length; j++) {
-        switch (FSUtils.getFSType(tierDataDirs[i][j])) {
-          case HDFS:
-            tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + 
tierDataDirs[i][j];
-            break;
-          case OBJECT_STORAGE:
-            // TODO(zhm) 对象存储路径配置
-            break;
-          case LOCAL:
-          default:
-            tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]);
-            break;
+        if (tierDataDirs[i][j].equals("object_storage")) {
+          tierDataDirs[i][j] = FSUtils.getOSDefaultPath(objectStorageBucket, 
dataNodeId);
+        } else {
+          switch (FSUtils.getFSType(tierDataDirs[i][j])) {
+            case HDFS:
+              tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + 
tierDataDirs[i][j];
+              break;
+            case LOCAL:
+            default:
+              tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]);
+              break;
+          }
         }
       }
     }
@@ -1230,7 +1238,7 @@ public class IoTDBConfig {
   void reloadDataDirs(String[][] tierDataDirs) throws 
LoadConfigurationException {
     // format data directories
     formulateDataDirs(tierDataDirs);
-    // make sure old data directories not removed, TODO(zhm) 
层级关系是否可以变化,当前实现仅支持在最后添加层级
+    // make sure old data directories not removed
     for (int i = 0; i < this.tierDataDirs.length; ++i) {
       HashSet<String> newDirs = new HashSet<>(Arrays.asList(tierDataDirs[i]));
       for (String oldDir : this.tierDataDirs[i]) {
@@ -1243,7 +1251,7 @@ public class IoTDBConfig {
       }
     }
     this.tierDataDirs = tierDataDirs;
-    //    TierManager.getInstance().updateFileFolders();
+    TierManager.getInstance().resetFolders();
   }
 
   // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the 
same with IOTDB_HOME
@@ -1313,6 +1321,7 @@ public class IoTDBConfig {
   }
 
   public void setTierDataDirs(String[][] tierDataDirs) {
+    formulateDataDirs(tierDataDirs);
     this.tierDataDirs = tierDataDirs;
     // TODO(szywilliam): rewrite the logic here when ratis supports complete 
snapshot semantic
     setRatisDataRegionSnapshotDir(
@@ -3810,4 +3819,24 @@ public class IoTDBConfig {
   public String getSortTmpDir() {
     return sortTmpDir;
   }
+
+  public String getObjectStorageName() {
+    return objectStorageName;
+  }
+
+  public String getObjectStorageBucket() {
+    return objectStorageBucket;
+  }
+
+  public String getObjectStorageEndpoiont() {
+    return objectStorageEndpoiont;
+  }
+
+  public String getObjectStorageAccessKey() {
+    return objectStorageAccessKey;
+  }
+
+  public String getObjectStorageAccessSecret() {
+    return objectStorageAccessSecret;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java 
b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
index 571a07a9ad4..9fb73749bc0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
@@ -61,7 +61,7 @@ public class FolderManager {
     try {
       this.selectStrategy.setFolders(folders);
     } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to 
read-only.", e);
+      logger.error("All folders are full, change system mode to read-only.", 
e);
       
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
       throw e;
     }
@@ -71,7 +71,7 @@ public class FolderManager {
     try {
       return folders.get(selectStrategy.nextFolderIndex());
     } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to 
read-only.", e);
+      logger.error("All folders are full, change system mode to read-only.", 
e);
       
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
       throw e;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java 
b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
index 2daeb089ff4..027c3e35971 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
@@ -33,11 +33,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /** The main class of multiple directories. Used to allocate folders to data 
files. */
@@ -58,6 +65,8 @@ public class TierManager {
   private final Map<String, Integer> seqDir2TierLevel = new HashMap<>();
   /** unSeq file folder's rawFsPath path -> tier level */
   private final Map<String, Integer> unSeqDir2TierLevel = new HashMap<>();
+  /** total space of each tier, Long.MAX_VALUE when one tier contains remote 
storage */
+  private long[] tierDiskTotalSpace;
 
   private TierManager() {
     try {
@@ -78,10 +87,26 @@ public class TierManager {
 
   public void resetFolders() {
     String[][] tierDirs = config.getTierDataDirs();
+    for (int i = 0; i < tierDirs.length; ++i) {
+      for (int j = 0; j < tierDirs[i].length; ++j) {
+        if (FSUtils.isLocal(tierDirs[i][j])) {
+          try {
+            tierDirs[i][j] = new File(tierDirs[i][j]).getCanonicalPath();
+          } catch (IOException e) {
+            logger.error("Fail to get canonical path of data dir {}", 
tierDirs[i][j], e);
+          }
+        }
+      }
+    }
+
     for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) {
       List<String> seqDirs =
           Arrays.stream(tierDirs[tierLevel])
-              .map(v -> v + File.separator + 
IoTDBConstant.SEQUENCE_FLODER_NAME)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.SEQUENCE_FLODER_NAME)
+                          .getPath())
               .collect(Collectors.toList());
       mkDataDirs(seqDirs);
       try {
@@ -95,7 +120,11 @@ public class TierManager {
 
       List<String> unSeqDirs =
           Arrays.stream(tierDirs[tierLevel])
-              .map(v -> v + File.separator + 
IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+                          .getPath())
               .collect(Collectors.toList());
       mkDataDirs(unSeqDirs);
       try {
@@ -107,6 +136,8 @@ public class TierManager {
         unSeqDir2TierLevel.put(dir, tierLevel);
       }
     }
+
+    tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL);
   }
 
   private void mkDataDirs(List<String> folders) {
@@ -121,13 +152,11 @@ public class TierManager {
     }
   }
 
-  public String getNextFolderForSequenceFile(int tierLevel) throws 
DiskSpaceInsufficientException {
-    return seqTiers.get(tierLevel).getNextFolder();
-  }
-
-  public String getNextFolderForUnSequenceFile(int tierLevel)
+  public String getNextFolderForTsFile(int tierLevel, boolean sequence)
       throws DiskSpaceInsufficientException {
-    return unSeqTiers.get(tierLevel).getNextFolder();
+    return sequence
+        ? seqTiers.get(tierLevel).getNextFolder()
+        : unSeqTiers.get(tierLevel).getNextFolder();
   }
 
   public List<String> getAllFilesFolders() {
@@ -162,6 +191,88 @@ public class TierManager {
     return seqTiers.size();
   }
 
+  public int getFileTierLevel(File file) {
+    String filePath;
+    try {
+      filePath = file.getCanonicalPath();
+    } catch (IOException e) {
+      logger.error("Fail to get canonical path of data dir {}", file, e);
+      filePath = file.getPath();
+    }
+
+    for (Map.Entry<String, Integer> entry : seqDir2TierLevel.entrySet()) {
+      if (filePath.startsWith(entry.getKey())) {
+        return entry.getValue();
+      }
+    }
+    for (Map.Entry<String, Integer> entry : unSeqDir2TierLevel.entrySet()) {
+      if (filePath.startsWith(entry.getKey())) {
+        return entry.getValue();
+      }
+    }
+    throw new RuntimeException(String.format("%s is not a legal TsFile path", 
file));
+  }
+
+  public long[] getTierDiskTotalSpace() {
+    return Arrays.copyOf(tierDiskTotalSpace, tierDiskTotalSpace.length);
+  }
+
+  public long[] getTierDiskUsableSpace() {
+    return getTierDiskSpace(DiskSpaceType.USABLE);
+  }
+
+  private long[] getTierDiskSpace(DiskSpaceType type) {
+    String[][] tierDirs = config.getTierDataDirs();
+    long[] tierDiskSpace = new long[tierDirs.length];
+    for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) {
+      Set<FileStore> tierFileStores = new HashSet<>();
+      for (String dir : tierDirs[tierLevel]) {
+        if (!FSUtils.isLocal(dir)) {
+          tierDiskSpace[tierLevel] = Long.MAX_VALUE;
+          break;
+        }
+        // get the FileStore of each local dir
+        Path path = Paths.get(dir);
+        FileStore fileStore = null;
+        try {
+          fileStore = Files.getFileStore(path);
+        } catch (IOException e) {
+          // check parent if path is not exists
+          path = path.getParent();
+          try {
+            fileStore = Files.getFileStore(path);
+          } catch (IOException innerException) {
+            logger.error("Failed to get storage path of {}, because", dir, 
innerException);
+          }
+        }
+        // update space info
+        if (fileStore != null && !tierFileStores.contains(fileStore)) {
+          tierFileStores.add(fileStore);
+          try {
+            switch (type) {
+              case TOTAL:
+                tierDiskSpace[tierLevel] += fileStore.getTotalSpace();
+                break;
+              case USABLE:
+                tierDiskSpace[tierLevel] += fileStore.getUsableSpace();
+                break;
+              default:
+                break;
+            }
+          } catch (IOException e) {
+            logger.error("Failed to statistic the size of {}, because", 
fileStore, e);
+          }
+        }
+      }
+    }
+    return tierDiskSpace;
+  }
+
+  private enum DiskSpaceType {
+    TOTAL,
+    USABLE,
+  }
+
   public static TierManager getInstance() {
     return TierManagerHolder.INSTANCE;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index df88248bf2e..bb0a2c8ee44 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -114,7 +114,7 @@ public class SizeTieredCompactionSelector
         selectedFileSize = 0L;
         continue;
       }
-      if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
+      if (currentFile.getStatus() != TsFileResourceStatus.CLOSED || 
currentFile.isMigrating()) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
index bedf2e2fbcc..329c778f120 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
@@ -142,7 +142,9 @@ public class CrossSpaceCompactionCandidate {
   private List<TsFileResourceCandidate> 
filterUnseqResource(List<TsFileResource> unseqResources) {
     List<TsFileResourceCandidate> ret = new ArrayList<>();
     for (TsFileResource resource : unseqResources) {
-      if (resource.getStatus() != TsFileResourceStatus.CLOSED || 
!resource.getTsFile().exists()) {
+      if (resource.getStatus() != TsFileResourceStatus.CLOSED
+          || resource.isMigrating()
+          || !resource.getTsFile().exists()) {
         break;
       } else if (resource.stillLives(ttlLowerBound)) {
         ret.add(new TsFileResourceCandidate(resource));
@@ -199,6 +201,7 @@ public class CrossSpaceCompactionCandidate {
       // the status of file may be changed after the task is submitted to queue
       this.isValidCandidate =
           tsFileResource.getStatus() == TsFileResourceStatus.CLOSED
+              && !tsFileResource.isMigrating()
               && tsFileResource.getTsFile().exists();
     }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
new file mode 100644
index 00000000000..347452e26ed
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class LocalMigrationTask extends MigrationTask {
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalMigrationTask.class);
+
+  LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String 
targetDir) {
+    super(cause, tsFile, targetDir);
+  }
+
+  @Override
+  public void migrate() {
+    // copy TsFile and resource file
+    tsFile.readLock();
+    try {
+      fsFactory.copyFile(srcTsFile, destTsFile);
+      fsFactory.copyFile(srcResourceFile, destResourceFile);
+    } catch (IOException e) {
+      logger.error("Fail to copy TsFile {}", srcTsFile);
+      destTsFile.delete();
+      destResourceFile.delete();
+      return;
+    } finally {
+      tsFile.readUnlock();
+    }
+    // close mods file and replace TsFile path
+    tsFile.writeLock();
+    try {
+      tsFile.resetModFile();
+      fsFactory.copyFile(srcModsFile, destModsFile);
+      tsFile.setFile(destTsFile);
+    } catch (IOException e) {
+      logger.error("Fail to copy mods file {}", srcModsFile);
+      destTsFile.delete();
+      destResourceFile.delete();
+      destModsFile.delete();
+      return;
+    } finally {
+      tsFile.writeUnlock();
+    }
+    // clear src files
+    srcTsFile.delete();
+    srcResourceFile.delete();
+    srcModsFile.delete();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java
new file mode 100644
index 00000000000..dc1d270ba61
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+public enum MigrationCause {
+  TTL,
+  DISK_SPACE
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
new file mode 100644
index 00000000000..0571d9d47f7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
+import java.io.File;
+
+public abstract class MigrationTask implements Runnable {
+  protected static final FSFactory fsFactory = 
FSFactoryProducer.getFSFactory();
+
+  protected final MigrationCause cause;
+  protected final TsFileResource tsFile;
+  protected final String targetDir;
+
+  protected final File srcTsFile;
+  protected final File destTsFile;
+  protected final File srcResourceFile;
+  protected final File destResourceFile;
+  protected final File srcModsFile;
+  protected final File destModsFile;
+
+  MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) 
{
+    this.cause = cause;
+    this.tsFile = tsFile;
+    this.targetDir = targetDir;
+    this.srcTsFile = tsFile.getTsFile();
+    this.destTsFile = fsFactory.getFile(targetDir, 
tsFile.getTsFile().getName());
+    this.srcResourceFile =
+        fsFactory.getFile(
+            srcTsFile.getParentFile(), srcTsFile.getName() + 
TsFileResource.RESOURCE_SUFFIX);
+    this.destResourceFile =
+        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + 
TsFileResource.RESOURCE_SUFFIX);
+    this.srcModsFile =
+        fsFactory.getFile(
+            srcTsFile.getParentFile(), srcTsFile.getName() + 
ModificationFile.FILE_SUFFIX);
+    this.destModsFile =
+        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + 
ModificationFile.FILE_SUFFIX);
+  }
+
+  public static MigrationTask newTask(
+      MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+    if (FSUtils.isLocal(targetDir)) {
+      return new LocalMigrationTask(cause, sourceTsFile, targetDir);
+    } else {
+      return new RemoteMigrationTask(cause, sourceTsFile, targetDir);
+    }
+  }
+
+  @Override
+  public void run() {
+    if (canMigrate()) {
+      tsFile.setIsMigrating(true);
+      if (!canMigrate()) {
+        tsFile.setIsMigrating(false);
+        return;
+      }
+    } else {
+      return;
+    }
+
+    migrate();
+
+    tsFile.setIsMigrating(false);
+  }
+
+  protected boolean canMigrate() {
+    return tsFile.getStatus() == TsFileResourceStatus.CLOSED;
+  }
+
+  public abstract void migrate();
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
new file mode 100644
index 00000000000..37957aa8b0a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.TierManager;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.utils.DateTimeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MigrationTaskManager implements IService {
+  private static final Logger logger = 
LoggerFactory.getLogger(MigrationTaskManager.class);
+  private static final IoTDBConfig iotdbConfig = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
+  private static final TierManager tierManager = TierManager.getInstance();
+  private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60;
+  private static final double TIER_DISK_SPACE_WARN_THRESHOLD =
+      commonConfig.getDiskSpaceWarningThreshold() + 0.1;
+  private static final double TIER_DISK_SPACE_SAFE_THRESHOLD =
+      commonConfig.getDiskSpaceWarningThreshold() + 0.2;
+  /** single thread to schedule */
+  private ScheduledExecutorService scheduler;
+  /** single thread to sync syncingBuffer to disk */
+  private ExecutorService workers;
+
+  @Override
+  public void start() throws StartupException {
+    if (iotdbConfig.getTierDataDirs().length == 1) {
+      return;
+    }
+    scheduler =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.MIGRATION_SCHEDULER.getName());
+    workers =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            iotdbConfig.getCompactionThreadCount(), 
ThreadName.MIGRATION.getName());
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        scheduler,
+        () -> new MigrationScheduleTask().run(),
+        CHECK_INTERVAL_IN_SECONDS,
+        CHECK_INTERVAL_IN_SECONDS,
+        TimeUnit.SECONDS);
+  }
+
+  private class MigrationScheduleTask implements Runnable {
+    private final long[] tierDiskTotalSpace = 
tierManager.getTierDiskTotalSpace();
+    private final long[] tierDiskUsableSpace = 
tierManager.getTierDiskUsableSpace();
+    private final Set<Integer> needMigrationTiers = new HashSet<>();
+
+    public MigrationScheduleTask() {
+      for (int i = 0; i < tierManager.getTiersNum(); i++) {
+        double usage = tierDiskUsableSpace[i] * 1.0 / tierDiskTotalSpace[i];
+        if (usage <= TIER_DISK_SPACE_WARN_THRESHOLD) {
+          needMigrationTiers.add(i);
+        }
+      }
+    }
+
+    @Override
+    public void run() {
+      schedule();
+    }
+
+    private void schedule() {
+      // submit migration tasks
+      for (DataRegion dataRegion : 
StorageEngine.getInstance().getAllDataRegions()) {
+        List<TsFileResource> tsfiles = dataRegion.getSequenceFileList();
+        tsfiles.addAll(dataRegion.getUnSequenceFileList());
+        tsfiles.sort(this::compareMigrationPriority);
+        for (TsFileResource tsfile : tsfiles) {
+          try {
+            int tierLevel = tsfile.getTierLevel();
+            // only migrate closed TsFiles not in the last tier
+            if (tsfile.getStatus() != TsFileResourceStatus.CLOSED
+                || tierLevel == iotdbConfig.getTierDataDirs().length - 1) {
+              continue;
+            }
+            // check tier ttl and disk space
+            long tierTTL =
+                DateTimeUtils.convertMilliTimeWithPrecision(
+                    commonConfig.getTierTTLInMs()[tierLevel], 
iotdbConfig.getTimestampPrecision());
+            if (tsfile.stillLives(tierTTL)) {
+              submitMigrationTask(
+                  tierLevel,
+                  MigrationCause.TTL,
+                  tsfile,
+                  tierManager.getNextFolderForTsFile(tierLevel, 
tsfile.isSeq()));
+            } else if (needMigrationTiers.contains(tierLevel)) {
+              submitMigrationTask(
+                  tierLevel,
+                  MigrationCause.DISK_SPACE,
+                  tsfile,
+                  tierManager.getNextFolderForTsFile(tierLevel, 
tsfile.isSeq()));
+            }
+          } catch (Exception e) {
+            logger.error(
+                "An error occurred when checking migration of TsFileResource 
{}", tsfile, e);
+          }
+        }
+      }
+    }
+
+    private void submitMigrationTask(
+        int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, 
String targetDir) {
+      MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, 
targetDir);
+      workers.submit(task);
+      tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize();
+      if (needMigrationTiers.contains(tierLevel)) {
+        double usage = tierDiskUsableSpace[tierLevel] * 1.0 / 
tierDiskTotalSpace[tierLevel];
+        if (usage > TIER_DISK_SPACE_SAFE_THRESHOLD) {
+          needMigrationTiers.remove(tierLevel);
+        }
+      }
+    }
+
+    private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) 
{
+      // old time partitions first
+      int res = Long.compare(f1.getTimePartition(), f2.getTimePartition());
+      // sequence files in one partition
+      if (res == 0) {
+        if (f1.isSeq() && !f2.isSeq()) {
+          res = -1;
+        } else if (!f1.isSeq() && f2.isSeq()) {
+          res = 1;
+        }
+      }
+      // old version files in one partition
+      if (res == 0) {
+        res = Long.compare(f1.getVersion(), f2.getVersion());
+      }
+      return res;
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+    }
+    if (workers != null) {
+      workers.shutdownNow();
+    }
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MIGRATION_SERVICE;
+  }
+
+  public static MigrationTaskManager getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static final MigrationTaskManager INSTANCE = new 
MigrationTaskManager();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
new file mode 100644
index 00000000000..18136da0c46
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RemoteMigrationTask extends MigrationTask {
+  private static final Logger logger = 
LoggerFactory.getLogger(RemoteMigrationTask.class);
+
+  RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String 
targetDir) {
+    super(cause, tsFile, targetDir);
+  }
+
+  @Override
+  public void migrate() {
+    // copy TsFile and resource file
+    tsFile.readLock();
+    try {
+      fsFactory.copyFile(srcTsFile, destTsFile);
+      fsFactory.copyFile(srcResourceFile, destResourceFile);
+    } catch (IOException e) {
+      logger.error("Fail to copy TsFile {}", srcTsFile);
+      destTsFile.delete();
+      destResourceFile.delete();
+      return;
+    } finally {
+      tsFile.readUnlock();
+    }
+    // replace TsFile path
+    tsFile.writeLock();
+    try {
+      tsFile.setFile(destTsFile);
+    } finally {
+      tsFile.writeUnlock();
+    }
+    // clear src files
+    srcTsFile.delete();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index b72150a93d5..850fda3ba78 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2676,7 +2676,7 @@ public class DataRegion implements IDataRegionForQuery {
       case LOAD_UNSEQUENCE:
         targetFile =
             fsFactory.getFile(
-                TierManager.getInstance().getNextFolderForUnSequenceFile(0),
+                TierManager.getInstance().getNextFolderForTsFile(0, false),
                 databaseName
                     + File.separatorChar
                     + dataRegionId
@@ -2698,7 +2698,7 @@ public class DataRegion implements IDataRegionForQuery {
       case LOAD_SEQUENCE:
         targetFile =
             fsFactory.getFile(
-                TierManager.getInstance().getNextFolderForSequenceFile(0),
+                TierManager.getInstance().getNextFolderForTsFile(0, true),
                 databaseName
                     + File.separatorChar
                     + dataRegionId
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index c69f9bfe023..b4e936a6cfc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -79,10 +79,7 @@ public class TsFileNameGenerator {
       long timePartitionId)
       throws DiskSpaceInsufficientException {
     TierManager tierManager = TierManager.getInstance();
-    String baseDir =
-        sequence
-            ? tierManager.getNextFolderForSequenceFile(0)
-            : tierManager.getNextFolderForUnSequenceFile(0);
+    String baseDir = tierManager.getNextFolderForTsFile(0, sequence);
     return baseDir
         + File.separator
         + logicalStorageGroup
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 5ab10f08b59..9f2bca44cfb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion.SettleTsFileCallBack;
@@ -130,6 +131,10 @@ public class TsFileResource {
 
   private long ramSize;
 
+  private volatile int tierLevel = 0;
+
+  private volatile boolean isMigrating = false;
+
   private volatile long tsFileSize = -1L;
 
   private TsFileProcessor processor;
@@ -170,6 +175,7 @@ public class TsFileResource {
     this.minPlanIndex = other.minPlanIndex;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.tsFileSize = other.tsFileSize;
+    this.tierLevel = other.tierLevel;
   }
 
   /** for sealed TsFile, call setClosed to close TsFileResource */
@@ -177,6 +183,7 @@ public class TsFileResource {
     this.file = file;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
+    this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
   }
 
   /** Used for compaction to create target files. */
@@ -191,6 +198,7 @@ public class TsFileResource {
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
     this.processor = processor;
+    this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
   }
 
   /** unsealed TsFile, for query */
@@ -206,6 +214,7 @@ public class TsFileResource {
     this.pathToChunkMetadataListMap.put(path, chunkMetadataList);
     this.originTsFileResource = originTsFileResource;
     this.version = originTsFileResource.version;
+    this.tierLevel = originTsFileResource.tierLevel;
   }
 
   /** unsealed TsFile, for query */
@@ -221,6 +230,7 @@ public class TsFileResource {
     generatePathToTimeSeriesMetadataMap();
     this.originTsFileResource = originTsFileResource;
     this.version = originTsFileResource.version;
+    this.tierLevel = originTsFileResource.tierLevel;
   }
 
   @TestOnly
@@ -355,9 +365,10 @@ public class TsFileResource {
     return compactionModFile;
   }
 
-  public void resetModFile() {
+  public void resetModFile() throws IOException {
     if (modFile != null) {
       synchronized (this) {
+        modFile.close();
         modFile = null;
       }
     }
@@ -375,6 +386,10 @@ public class TsFileResource {
     return file.getPath();
   }
 
+  public int getTierLevel() {
+    return tierLevel;
+  }
+
   public long getTsFileSize() {
     if (isClosed()) {
       if (tsFileSize == -1) {
@@ -419,8 +434,7 @@ public class TsFileResource {
   public DeviceTimeIndex buildDeviceTimeIndex() throws IOException {
     readLock();
     try (InputStream inputStream =
-        FSFactoryProducer.getFSFactory()
-            .getBufferedInputStream(file.getPath() + 
TsFileResource.RESOURCE_SUFFIX)) {
+        FSFactoryProducer.getFSFactory().getBufferedInputStream(file.getPath() 
+ RESOURCE_SUFFIX)) {
       ReadWriteIOUtils.readByte(inputStream);
       ITimeIndex timeIndexFromResourceFile = 
ITimeIndex.createTimeIndex(inputStream);
       if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) {
@@ -429,7 +443,7 @@ public class TsFileResource {
       return (DeviceTimeIndex) timeIndexFromResourceFile;
     } catch (Exception e) {
       throw new IOException(
-          "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX 
+ " from disk", e);
+          "Can't read file " + file.getPath() + RESOURCE_SUFFIX + " from 
disk", e);
     } finally {
       readUnlock();
     }
@@ -618,6 +632,14 @@ public class TsFileResource {
     return this.status == TsFileResourceStatus.COMPACTION_CANDIDATE;
   }
 
+  public boolean isMigrating() {
+    return isMigrating;
+  }
+
+  public void setIsMigrating(boolean isMigrating) {
+    this.isMigrating = isMigrating;
+  }
+
   public void setStatus(TsFileResourceStatus status) {
     switch (status) {
       case CLOSED:
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3e0f39ac4ff..1e95ffe09f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.migration.MigrationTaskManager;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -543,6 +544,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(RegionMigrateService.getInstance());
 
     registerManager.register(CompactionTaskManager.getInstance());
+    registerManager.register(MigrationTaskManager.getInstance());
   }
 
   /** set up RPC and protocols after DataNode is available */
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
index 8029bcca1d5..386cc210baa 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
@@ -105,13 +105,21 @@ public interface FSFactory {
   BufferedOutputStream getBufferedOutputStream(String filePath);
 
   /**
-   * move file
+   * TODO(zhm) move file
    *
    * @param srcFile src file
    * @param destFile dest file
    */
   void moveFile(File srcFile, File destFile);
 
+  /**
+   * TODO(zhm) copy file
+   *
+   * @param srcFile src file
+   * @param destFile dest file
+   */
+  void copyFile(File srcFile, File destFile) throws IOException;
+
   /**
    * list file by suffix
    *
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index ddf971b29cd..328c4f9e39e 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -199,6 +199,9 @@ public class HDFSFactory implements FSFactory {
     }
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {}
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     try {
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
index b2d58149753..325b0ba5e52 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
@@ -103,6 +103,11 @@ public class HybridFSFactory implements FSFactory {
     // TODO
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    // TODO
+  }
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     FSPath folder = FSUtils.parse(fileFolder);
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
index 53c1dcb31a6..b6d46989e88 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
@@ -123,6 +123,11 @@ public class LocalFSFactory implements FSFactory {
     }
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    FileUtils.copyFile(srcFile, destFile);
+  }
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     return new File(fileFolder).listFiles(file -> 
file.getName().endsWith(suffix));
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index eb2b4c837ec..1f58fc6ffd6 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -198,6 +198,9 @@ public class OSFSFactory implements FSFactory {
     }
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {}
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 0c9f80a591e..6ab36b56634 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -95,7 +95,11 @@ public class FSUtils {
     return new FSPath(type, path);
   }
 
-  public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, 
String dataNodeId)
+  public static String getOSDefaultPath(String bucket, int dataNodeId) {
+    return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + "/" + 
dataNodeId).getPath();
+  }
+
+  public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, 
int dataNodeId)
       throws IOException {
     String canonicalPath = lcoalFile.getCanonicalPath();
     int startIdx = canonicalPath.lastIndexOf("unsequence");

Reply via email to