This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a7063c1  [IOTDB-2166] Avoid TTL deleting source files in running 
compaction tasks (#4589)
a7063c1 is described below

commit a7063c1e40fbdca5873311f301946f76b80295d7
Author: liuxuxin <[email protected]>
AuthorDate: Tue Dec 21 23:56:06 2021 +0800

    [IOTDB-2166] Avoid TTL deleting source files in running compaction tasks 
(#4589)
---
 .../SizeTieredCompactionRecoverTask.java           |  2 +-
 .../inner/sizetiered/SizeTieredCompactionTask.java | 80 ++++++++++++----------
 .../engine/storagegroup/StorageGroupProcessor.java |  7 +-
 3 files changed, 50 insertions(+), 39 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
index 8e4fe4c..dea6d4f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
@@ -60,7 +60,7 @@ public class SizeTieredCompactionRecoverTask extends 
SizeTieredCompactionTask {
         timePartition,
         null,
         null,
-        null,
+        new ArrayList<>(),
         sequence,
         currentTaskNum);
     this.compactionLogFile = compactionLogFile;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index fb9fe11..a0b9566 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -49,6 +48,8 @@ public class SizeTieredCompactionTask extends 
AbstractInnerSpaceCompactionTask {
   private static final Logger LOGGER = LoggerFactory.getLogger("COMPACTION");
   protected TsFileResourceList tsFileResourceList;
   protected TsFileManager tsFileManager;
+  protected boolean[] isHoldingReadLock;
+  protected boolean[] isHoldingWriteLock;
 
   public SizeTieredCompactionTask(
       String logicalStorageGroupName,
@@ -67,6 +68,12 @@ public class SizeTieredCompactionTask extends 
AbstractInnerSpaceCompactionTask {
         selectedTsFileResourceList);
     this.tsFileResourceList = tsFileResourceList;
     this.tsFileManager = tsFileManager;
+    isHoldingReadLock = new boolean[selectedTsFileResourceList.size()];
+    isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()];
+    for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+      isHoldingWriteLock[i] = false;
+      isHoldingReadLock[i] = false;
+    }
   }
 
   @Override
@@ -86,19 +93,6 @@ public class SizeTieredCompactionTask extends 
AbstractInnerSpaceCompactionTask {
         selectedTsFileResourceList.size());
     File logFile = null;
     SizeTieredCompactionLogger sizeTieredCompactionLogger = null;
-    // to mark if we got the write lock or read lock of the selected tsfile
-    boolean[] isHoldingReadLock = new 
boolean[selectedTsFileResourceList.size()];
-    boolean[] isHoldingWriteLock = new 
boolean[selectedTsFileResourceList.size()];
-    for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
-      isHoldingReadLock[i] = false;
-      isHoldingWriteLock[i] = false;
-    }
-    LOGGER.info(
-        "{} [Compaction] Try to get the read lock of all selected files", 
fullStorageGroupName);
-    for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
-      selectedTsFileResourceList.get(i).readLock();
-      isHoldingReadLock[i] = true;
-    }
 
     try {
       logFile =
@@ -212,16 +206,8 @@ public class SizeTieredCompactionTask extends 
AbstractInnerSpaceCompactionTask {
           tsFileManager,
           tsFileResourceList);
     } finally {
+      releaseFileLocksAndResetMergingStatus(true);
       tsFileManager.writeUnlock();
-      for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
-        if (isHoldingReadLock[i]) {
-          selectedTsFileResourceList.get(i).readUnlock();
-        }
-        if (isHoldingWriteLock[i]) {
-          selectedTsFileResourceList.get(i).writeUnlock();
-        }
-        selectedTsFileResourceList.get(i).setMerging(false);
-      }
     }
   }
 
@@ -239,24 +225,46 @@ public class SizeTieredCompactionTask extends 
AbstractInnerSpaceCompactionTask {
 
   @Override
   public boolean checkValidAndSetMerging() {
-    long minVersionNum = Long.MAX_VALUE;
+    tsFileResourceList.readLock();
     try {
-      for (TsFileResource resource : selectedTsFileResourceList) {
-        if (resource.isMerging() | !resource.isClosed() || 
!resource.getTsFile().exists()) {
+      for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+        TsFileResource resource = selectedTsFileResourceList.get(i);
+        resource.readLock();
+        isHoldingReadLock[i] = true;
+        if (resource.isMerging() | !resource.isClosed()
+            || !resource.getTsFile().exists()
+            || resource.isDeleted()) {
+          // this source file cannot be compacted
+          // release the lock of locked files, and return
+          releaseFileLocksAndResetMergingStatus(false);
           return false;
         }
-        TsFileNameGenerator.TsFileName tsFileName =
-            TsFileNameGenerator.getTsFileName(resource.getTsFile().getName());
-        if (tsFileName.getVersion() < minVersionNum) {
-          minVersionNum = tsFileName.getVersion();
-        }
       }
-    } catch (IOException e) {
-      LOGGER.error("CompactionTask exists while check valid", e);
+
+      for (TsFileResource resource : selectedTsFileResourceList) {
+        resource.setMerging(true);
+      }
+      return true;
+    } finally {
+      tsFileResourceList.readUnlock();
     }
-    for (TsFileResource resource : selectedTsFileResourceList) {
-      resource.setMerging(true);
+  }
+
+  /**
+   * release the read lock and write lock of files if it is held, and set the 
merging status of
+   * selected files to false
+   */
+  private void releaseFileLocksAndResetMergingStatus(boolean 
resetCompactingStatus) {
+    for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+      if (isHoldingReadLock[i]) {
+        selectedTsFileResourceList.get(i).readUnlock();
+      }
+      if (isHoldingWriteLock[i]) {
+        selectedTsFileResourceList.get(i).writeUnlock();
+      }
+      if (resetCompactingStatus) {
+        selectedTsFileResourceList.get(i).setMerging(false);
+      }
     }
-    return true;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 688ab42..49ee221 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1534,7 +1534,9 @@ public class StorageGroupProcessor {
       return;
     }
 
-    writeLock("checkFileTTL");
+    TsFileResourceList resourceList =
+        
tsFileManager.getSequenceListByTimePartition(resource.getTimePartition());
+    resourceList.writeLock();
     try {
       // prevent new merges and queries from choosing this file
       resource.setDeleted(true);
@@ -1544,6 +1546,7 @@ public class StorageGroupProcessor {
         try {
           // physical removal
           resource.remove();
+          resourceList.remove(resource);
           if (logger.isInfoEnabled()) {
             logger.info(
                 "Removed a file {} before {} by ttl ({}ms)",
@@ -1557,7 +1560,7 @@ public class StorageGroupProcessor {
         }
       }
     } finally {
-      writeUnlock();
+      resourceList.writeUnlock();
     }
   }
 

Reply via email to