This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6961e2154a7 Fix NPE when concurrent closing mod files (#14708)
6961e2154a7 is described below

commit 6961e2154a7e73d014bcc53a845a9bccef4d8a39
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Jan 16 20:27:33 2025 +0800

    Fix NPE when concurrent closing mod files (#14708)
---
 .../dataregion/memtable/TsFileProcessor.java       |  4 ++
 .../dataregion/modification/ModificationFile.java  | 18 ++++---
 .../modification/ModificationFileTest.java         | 58 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 21e8b751550..3b2c5fe2944 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -93,6 +93,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1652,6 +1653,9 @@ public class TsFileProcessor {
         // Truncate broken metadata
         try {
           writer.reset();
+        } catch (ClosedChannelException e1) {
+          // the file is closed
+          break;
         } catch (IOException e1) {
           logger.error(
               "{}: {} truncate corrupted data meets error",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index c649b578b4c..396255df037 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -172,12 +172,13 @@ public class ModificationFile implements AutoCloseable {
 
   @Override
   public void close() throws IOException {
-    if (fileOutputStream == null) {
-      return;
-    }
-
     lock.writeLock().lock();
+
     try {
+      if (fileOutputStream == null) {
+        return;
+      }
+
       fileOutputStream.close();
       fileOutputStream = null;
       channel.force(true);
@@ -342,8 +343,13 @@ public class ModificationFile implements AutoCloseable {
   }
 
   public void truncate(long size) throws IOException {
-    if (channel != null) {
-      channel.truncate(size);
+    lock.writeLock().lock();
+    try {
+      if (channel != null) {
+        channel.truncate(size);
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
index 00053c54fca..a0a9885ecf0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.modification;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch;
@@ -37,6 +38,11 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -460,4 +466,56 @@ public class ModificationFileTest {
       Files.delete(new File(originModsFileName).toPath());
     }
   }
+
+  @Test
+  public void testConcurrentClose() throws ExecutionException, 
InterruptedException, IOException {
+    String modsFileName = 
TestConstant.BASE_OUTPUT_PATH.concat("concurrentClose.mods");
+    try (ModificationFile modsFile = new ModificationFile(modsFileName, 
false)) {
+      ExecutorService threadPool = Executors.newCachedThreadPool();
+      AtomicInteger writeCounter = new AtomicInteger();
+      int maxWrite = 10000;
+      int closeInterval = 100;
+      int closeFutureNum = 5;
+      Future<Void> writeFuture = threadPool.submit(() -> write(modsFile, 
maxWrite, writeCounter));
+      List<Future<Void>> closeFutures = new ArrayList<>();
+      for (int i = 0; i < closeFutureNum; i++) {
+        closeFutures.add(
+            threadPool.submit(() -> close(modsFile, writeCounter, maxWrite, 
closeInterval)));
+      }
+      writeFuture.get();
+      for (Future<Void> closeFuture : closeFutures) {
+        closeFuture.get();
+      }
+      assertEquals(maxWrite, modsFile.getAllMods().size());
+    } finally {
+      Files.delete(new File(modsFileName).toPath());
+    }
+  }
+
+  private Void write(ModificationFile modificationFile, int maxWrite, 
AtomicInteger writeCounter)
+      throws IllegalPathException, IOException {
+    for (int i = 0; i < maxWrite; i++) {
+      modificationFile.write(new TreeDeletionEntry(new 
MeasurementPath("root.db1.d1.s1"), i, i));
+      writeCounter.incrementAndGet();
+    }
+    return null;
+  }
+
+  private Void close(
+      ModificationFile modificationFile,
+      AtomicInteger writeCounter,
+      int maxWrite,
+      int closeInterval)
+      throws IOException, InterruptedException {
+    int prevWriteCnt = 0;
+    while (writeCounter.get() < maxWrite) {
+      int writeCnt = writeCounter.get();
+      if (writeCnt - prevWriteCnt >= closeInterval) {
+        modificationFile.close();
+        prevWriteCnt = writeCnt;
+      }
+      Thread.sleep(10);
+    }
+    return null;
+  }
 }

Reply via email to