This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e56b34f3535 fix mods file metrics & fix the issue of missing mods with 
concurrent deletion and compaction (#14765)
e56b34f3535 is described below

commit e56b34f3535accee21295c274a6e7dd25625f69c
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 24 09:46:38 2025 +0800

    fix mods file metrics & fix the issue of missing mods with concurrent 
deletion and compaction (#14765)
    
    * fix mods file metrics
    
    * add lock for ModificationFile.remove
    
    * spotless
---
 .../db/storageengine/dataregion/DataRegion.java    | 14 ++++-
 .../compaction/execute/utils/CompactionUtils.java  |  5 +-
 .../dataregion/modification/ModificationFile.java  | 69 +++++++++++++++-------
 .../dataregion/tsfile/TsFileResource.java          | 23 +++++---
 4 files changed, 80 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3b28ec76b5b..ca758e26c8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -501,7 +501,12 @@ public class DataRegion implements IDataRegionForQuery {
                     resource.getTsFile().length(),
                     true,
                     resource.getTsFile().getName());
-            resource.upgradeModFile(upgradeModFileThreadPool);
+            if 
(ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
+              // update mods file metrics
+              resource.getExclusiveModFile();
+            } else {
+              resource.upgradeModFile(upgradeModFileThreadPool);
+            }
           }
         }
         while (!value.isEmpty()) {
@@ -530,7 +535,12 @@ public class DataRegion implements IDataRegionForQuery {
                     false,
                     resource.getTsFile().getName());
           }
-          resource.upgradeModFile(upgradeModFileThreadPool);
+          if 
(ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
+            // update mods file metrics
+            resource.getExclusiveModFile();
+          } else {
+            resource.upgradeModFile(upgradeModFileThreadPool);
+          }
         }
         while (!value.isEmpty()) {
           TsFileResource tsFileResource = value.get(value.size() - 1);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index f776aa23022..6e1eafd1cf0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -148,7 +148,7 @@ public class CompactionUtils {
     Set<ModEntry> modifications = new HashSet<>();
     // get compaction mods from all source unseq files
     for (TsFileResource unseqFile : unseqResources) {
-      
modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getAllMods());
+      
modifications.addAll(ModificationFile.readAllCompactionModifications(unseqFile.getTsFile()));
     }
 
     // write target mods file
@@ -158,7 +158,8 @@ public class CompactionUtils {
         continue;
       }
       Set<ModEntry> seqModifications =
-          new 
HashSet<>(ModificationFile.getCompactionMods(seqResources.get(i)).getAllMods());
+          new HashSet<>(
+              
ModificationFile.readAllCompactionModifications(seqResources.get(i).getTsFile()));
       modifications.addAll(seqModifications);
       updateOneTargetMods(targetResource, modifications);
       modifications.removeAll(seqModifications);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 396255df037..435f1fe8d3a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -73,6 +73,7 @@ public class ModificationFile implements AutoCloseable {
   private boolean hasCompacted = false;
   private boolean fileExists = false;
   private final boolean updateMetrics;
+  private boolean removed = false;
 
   private Set<ModificationFile> cascadeFiles = null;
 
@@ -89,19 +90,29 @@ public class ModificationFile implements AutoCloseable {
     }
   }
 
+  public void writeLock() {
+    this.lock.writeLock().lock();
+  }
+
+  public void writeUnlock() {
+    this.lock.writeLock().unlock();
+  }
+
   @SuppressWarnings("java:S2093") // cannot use try-with-resource, should not 
close here
   public void write(ModEntry entry) throws IOException {
     int updateFileNum = 0;
     lock.writeLock().lock();
     long size = 0;
     try {
-      if (fileOutputStream == null) {
-        fileOutputStream =
-            new BufferedOutputStream(Files.newOutputStream(file.toPath(), 
CREATE, APPEND));
-        channel = FileChannel.open(file.toPath(), CREATE, APPEND);
+      if (!removed) {
+        if (fileOutputStream == null) {
+          fileOutputStream =
+              new BufferedOutputStream(Files.newOutputStream(file.toPath(), 
CREATE, APPEND));
+          channel = FileChannel.open(file.toPath(), CREATE, APPEND);
+        }
+        size += entry.serialize(fileOutputStream);
+        fileOutputStream.flush();
       }
-      size += entry.serialize(fileOutputStream);
-      fileOutputStream.flush();
 
       if (cascadeFiles != null) {
         for (ModificationFile cascadeFile : cascadeFiles) {
@@ -124,15 +135,17 @@ public class ModificationFile implements AutoCloseable {
     lock.writeLock().lock();
     long size = 0;
     try {
-      if (fileOutputStream == null) {
-        fileOutputStream =
-            new BufferedOutputStream(Files.newOutputStream(file.toPath(), 
CREATE, APPEND));
-        channel = FileChannel.open(file.toPath(), CREATE, APPEND);
-      }
-      for (ModEntry entry : entries) {
-        size += entry.serialize(fileOutputStream);
+      if (!removed) {
+        if (fileOutputStream == null) {
+          fileOutputStream =
+              new BufferedOutputStream(Files.newOutputStream(file.toPath(), 
CREATE, APPEND));
+          channel = FileChannel.open(file.toPath(), CREATE, APPEND);
+        }
+        for (ModEntry entry : entries) {
+          size += entry.serialize(fileOutputStream);
+        }
+        fileOutputStream.flush();
       }
-      fileOutputStream.flush();
 
       if (cascadeFiles != null) {
         for (ModificationFile cascadeFile : cascadeFiles) {
@@ -150,7 +163,7 @@ public class ModificationFile implements AutoCloseable {
   }
 
   private void updateModFileMetric(int num, long size) {
-    if (updateMetrics) {
+    if (!removed && updateMetrics) {
       FileMetrics.getInstance().increaseModFileNum(num);
       FileMetrics.getInstance().increaseModFileSize(size);
     }
@@ -214,6 +227,16 @@ public class ModificationFile implements AutoCloseable {
     return new long[] {levelNum, modNum};
   }
 
+  public static List<ModEntry> readAllCompactionModifications(File tsfile) 
throws IOException {
+    try (ModificationFile modificationFile =
+        new ModificationFile(ModificationFile.getCompactionMods(tsfile), 
false)) {
+      if (modificationFile.exists()) {
+        return modificationFile.getAllMods();
+      }
+    }
+    return Collections.emptyList();
+  }
+
   public static List<ModEntry> readAllModifications(
       File tsfile, boolean readOldModFileIfNewModFileNotExists) throws 
IOException {
     try (ModificationFile modificationFile =
@@ -313,12 +336,18 @@ public class ModificationFile implements AutoCloseable {
   }
 
   public void remove() throws IOException {
-    close();
-    FileUtils.deleteFileOrDirectory(file);
-    if (fileExists) {
-      updateModFileMetric(-1, -getFileLength());
+    lock.writeLock().lock();
+    try {
+      close();
+      FileUtils.deleteFileOrDirectory(file);
+      if (fileExists) {
+        updateModFileMetric(-1, -getFileLength());
+      }
+      fileExists = false;
+      removed = true;
+    } finally {
+      lock.writeLock().unlock();
     }
-    fileExists = false;
   }
 
   public static ModificationFile getExclusiveMods(TsFileResource 
tsFileResource) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 48ca35fbc7d..4e6fa190b60 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -391,13 +391,23 @@ public class TsFileResource implements PersistentResource 
{
   }
 
   public void linkModFile(TsFileResource target) throws IOException {
-    if (exclusiveModFileExists()) {
-      File modsFileForTargetResource = 
ModificationFile.getExclusiveMods(target.getTsFile());
-      Files.createLink(
-          modsFileForTargetResource.toPath(),
-          ModificationFile.getExclusiveMods(getTsFile()).toPath());
-      target.setExclusiveModFile(new 
ModificationFile(modsFileForTargetResource, true));
+    File targetModsFile = 
ModificationFile.getExclusiveMods(target.getTsFile());
+    ModificationFile sourceModFile = this.getExclusiveModFile();
+    ModificationFile targetModsFileObject;
+    sourceModFile.writeLock();
+    try {
+      if (sourceModFile.exists()) {
+        Files.createLink(
+            targetModsFile.toPath(), 
ModificationFile.getExclusiveMods(getTsFile()).toPath());
+        targetModsFileObject = new ModificationFile(targetModsFile, true);
+      } else {
+        targetModsFileObject = new ModificationFile(targetModsFile, true);
+        
sourceModFile.setCascadeFile(Collections.singleton(targetModsFileObject));
+      }
+    } finally {
+      sourceModFile.writeUnlock();
     }
+    target.setExclusiveModFile(targetModsFileObject);
     if (sharedModFileExists()) {
       modFileManagement.addReference(target, sharedModFile);
       target.setSharedModFile(this.getSharedModFile(), false);
@@ -770,7 +780,6 @@ public class TsFileResource implements PersistentResource {
     if (getExclusiveModFile().exists()) {
       getExclusiveModFile().remove();
     }
-    exclusiveModFile = null;
 
     if (getSharedModFile() != null && modFileManagement != null) {
       modFileManagement.releaseFor(this, sharedModFile);

Reply via email to