This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 35d70733c7bc33c53f91074dd7e27cc5013cf441
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed May 24 21:43:06 2023 +0800

    refactor TsFile status with migration
---
 .../execute/task/AbstractCompactionTask.java       |  2 +-
 .../impl/SizeTieredCompactionSelector.java         |  4 +---
 .../utils/CrossSpaceCompactionCandidate.java       | 11 ++---------
 .../db/engine/migration/LocalMigrationTask.java    |  3 +++
 .../iotdb/db/engine/migration/MigrationTask.java   | 13 ++++++-------
 .../db/engine/migration/MigrationTaskManager.java  | 22 +---------------------
 .../db/engine/migration/RemoteMigrationTask.java   |  3 +++
 .../db/engine/storagegroup/TsFileResource.java     | 21 ++++++++++-----------
 .../engine/storagegroup/TsFileResourceStatus.java  |  2 ++
 9 files changed, 29 insertions(+), 52 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
index 8c91acbf556..3246a133866 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/AbstractCompactionTask.java
@@ -23,9 +23,9 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import 
org.apache.iotdb.db.engine.compaction.execute.performer.ICompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.service.metrics.recorder.CompactionMetricsManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index c3dddf47402..805abe2f1d8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -114,9 +114,7 @@ public class SizeTieredCompactionSelector
         selectedFileSize = 0L;
         continue;
       }
-      if (currentFile.getStatus() != TsFileResourceStatus.NORMAL
-          || currentFile.onRemote()
-          || currentFile.isMigrating()) {
+      if (currentFile.getStatus() != TsFileResourceStatus.NORMAL) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
index b70cf0f0b0b..26d0ee0b1ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
@@ -142,10 +142,7 @@ public class CrossSpaceCompactionCandidate {
   private List<TsFileResourceCandidate> 
filterUnseqResource(List<TsFileResource> unseqResources) {
     List<TsFileResourceCandidate> ret = new ArrayList<>();
     for (TsFileResource resource : unseqResources) {
-      if (resource.getStatus() != TsFileResourceStatus.NORMAL
-          || resource.onRemote()
-          || resource.isMigrating()
-          || !resource.getTsFile().exists()) {
+      if (resource.getStatus() != TsFileResourceStatus.NORMAL) {
         break;
       } else if (resource.stillLives(ttlLowerBound)) {
         ret.add(new TsFileResourceCandidate(resource));
@@ -200,11 +197,7 @@ public class CrossSpaceCompactionCandidate {
       this.selected = false;
       // although we do the judgement here, the task should be validated 
before executing because
       // the status of file may be changed after the task is submitted to queue
-      this.isValidCandidate =
-          tsFileResource.getStatus() == TsFileResourceStatus.NORMAL
-              && !tsFileResource.onRemote()
-              && !tsFileResource.isMigrating()
-              && tsFileResource.getTsFile().exists();
+      this.isValidCandidate = tsFileResource.getStatus() == 
TsFileResourceStatus.NORMAL;
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
index 93fcd99f82c..99b0a78c8d7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.migration;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +59,8 @@ public class LocalMigrationTask extends MigrationTask {
         fsFactory.copyFile(srcModsFile, destModsFile);
       }
       tsFileResource.setFile(destTsFile);
+      tsFileResource.increaseTierLevel();
+      tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
     } catch (Exception e) {
       logger.error("Fail to copy mods file from local {} to local {}", 
srcModsFile, destModsFile);
       destTsFile.delete();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index a894334a84c..e84a5074803 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -76,13 +76,12 @@ public abstract class MigrationTask implements Runnable {
 
   @Override
   public void run() {
-    migrate();
-    tsFileResource.increaseTierLevel();
-    tsFileResource.setIsMigrating(false);
-  }
-
-  protected boolean canMigrate() {
-    return tsFileResource.getStatus() == TsFileResourceStatus.NORMAL;
+    try {
+      migrate();
+    } finally {
+      // try to set the final status to NORMAL to avoid migrate failure
+      tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+    }
   }
 
   public abstract void migrate();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index ede3ecce410..53ea1a5ad4f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -144,7 +144,7 @@ public class MigrationTaskManager implements IService {
     private void submitMigrationTask(
         int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, 
String targetDir)
         throws IOException {
-      if (!checkAndMarkMigrate(sourceTsFile)) {
+      if (!sourceTsFile.setStatus(TsFileResourceStatus.MIGRATING)) {
         return;
       }
       MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, 
targetDir);
@@ -158,26 +158,6 @@ public class MigrationTaskManager implements IService {
       }
     }
 
-    private boolean checkAndMarkMigrate(TsFileResource tsFile) {
-      if (canMigrate(tsFile)) {
-        tsFile.setIsMigrating(true);
-        if (occupiedByCompaction(tsFile)) {
-          tsFile.setIsMigrating(false);
-          return false;
-        }
-        return true;
-      }
-      return false;
-    }
-
-    private boolean canMigrate(TsFileResource tsFile) {
-      return tsFile.getStatus() == TsFileResourceStatus.NORMAL && 
!tsFile.isMigrating();
-    }
-
-    private boolean occupiedByCompaction(TsFileResource tsFile) {
-      return tsFile.getStatus() != TsFileResourceStatus.NORMAL;
-    }
-
     private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) 
{
       // old time partitions first
       int res = Long.compare(f1.getTimePartition(), f2.getTimePartition());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
index fa4bbe51cd6..ee209234019 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.migration;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +61,8 @@ public class RemoteMigrationTask extends MigrationTask {
     tsFileResource.writeLock();
     try {
       srcFile.delete();
+      tsFileResource.increaseTierLevel();
+      tsFileResource.setStatus(TsFileResourceStatus.NORMAL_ON_REMOTE);
     } catch (Exception e) {
       logger.error("Fail to delete local TsFile {}", srcFile);
     } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 1e627629c26..4dc2e1bede7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -135,8 +135,6 @@ public class TsFileResource {
 
   private volatile int tierLevel = 0;
 
-  private volatile boolean isMigrating = false;
-
   private volatile long tsFileSize = -1L;
 
   private TsFileProcessor processor;
@@ -190,6 +188,9 @@ public class TsFileResource {
     // This method is invoked when DataNode recovers, so the tierLevel should 
be calculated when
     // restarting
     this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
+    if (onRemote()) {
+      this.setAtomicStatus(TsFileResourceStatus.NORMAL_ON_REMOTE);
+    }
   }
 
   /** Used for compaction to create target files. */
@@ -652,16 +653,8 @@ public class TsFileResource {
     return getStatus() == TsFileResourceStatus.COMPACTION_CANDIDATE;
   }
 
-  public boolean isMigrating() {
-    return isMigrating;
-  }
-
-  public void setIsMigrating(boolean isMigrating) {
-    this.isMigrating = isMigrating;
-  }
-
   public boolean onRemote() {
-    return !file.exists();
+    return !isDeleted() && !file.exists();
   }
 
   private boolean compareAndSetStatus(
@@ -682,9 +675,13 @@ public class TsFileResource {
     switch (status) {
       case NORMAL:
         return compareAndSetStatus(TsFileResourceStatus.UNCLOSED, 
TsFileResourceStatus.NORMAL)
+            || compareAndSetStatus(TsFileResourceStatus.MIGRATING, 
TsFileResourceStatus.NORMAL)
             || compareAndSetStatus(TsFileResourceStatus.COMPACTING, 
TsFileResourceStatus.NORMAL)
             || compareAndSetStatus(
                 TsFileResourceStatus.COMPACTION_CANDIDATE, 
TsFileResourceStatus.NORMAL);
+      case NORMAL_ON_REMOTE:
+        return compareAndSetStatus(
+            TsFileResourceStatus.MIGRATING, 
TsFileResourceStatus.NORMAL_ON_REMOTE);
       case UNCLOSED:
         // TsFile cannot be set back to UNCLOSED so false is always returned
         return false;
@@ -698,6 +695,8 @@ public class TsFileResource {
       case COMPACTION_CANDIDATE:
         return compareAndSetStatus(
             TsFileResourceStatus.NORMAL, 
TsFileResourceStatus.COMPACTION_CANDIDATE);
+      case MIGRATING:
+        return compareAndSetStatus(TsFileResourceStatus.NORMAL, 
TsFileResourceStatus.COMPACTING);
       default:
         return false;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
index 4c1c95ca7e6..f389492553e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceStatus.java
@@ -22,7 +22,9 @@ public enum TsFileResourceStatus {
   UNCLOSED,
   /** The resource in status NORMAL, COMPACTION_CANDIDATE, COMPACTING, DELETED 
is all CLOSED. */
   NORMAL,
+  NORMAL_ON_REMOTE,
   COMPACTION_CANDIDATE,
   COMPACTING,
+  MIGRATING,
   DELETED
 }

Reply via email to