This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6961e2154a7 Fix NPE when concurrent closing mod files (#14708)
6961e2154a7 is described below
commit 6961e2154a7e73d014bcc53a845a9bccef4d8a39
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Jan 16 20:27:33 2025 +0800
Fix NPE when concurrent closing mod files (#14708)
---
.../dataregion/memtable/TsFileProcessor.java | 4 ++
.../dataregion/modification/ModificationFile.java | 18 ++++---
.../modification/ModificationFileTest.java | 58 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 21e8b751550..3b2c5fe2944 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -93,6 +93,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1652,6 +1653,9 @@ public class TsFileProcessor {
// Truncate broken metadata
try {
writer.reset();
+ } catch (ClosedChannelException e1) {
+ // the file is closed
+ break;
} catch (IOException e1) {
logger.error(
"{}: {} truncate corrupted data meets error",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index c649b578b4c..396255df037 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -172,12 +172,13 @@ public class ModificationFile implements AutoCloseable {
@Override
public void close() throws IOException {
- if (fileOutputStream == null) {
- return;
- }
-
lock.writeLock().lock();
+
try {
+ if (fileOutputStream == null) {
+ return;
+ }
+
fileOutputStream.close();
fileOutputStream = null;
channel.force(true);
@@ -342,8 +343,13 @@ public class ModificationFile implements AutoCloseable {
}
public void truncate(long size) throws IOException {
- if (channel != null) {
- channel.truncate(size);
+ lock.writeLock().lock();
+ try {
+ if (channel != null) {
+ channel.truncate(size);
+ }
+ } finally {
+ lock.writeLock().unlock();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
index 00053c54fca..a0a9885ecf0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFileTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.modification;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch;
@@ -37,6 +38,11 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -460,4 +466,56 @@ public class ModificationFileTest {
Files.delete(new File(originModsFileName).toPath());
}
}
+
+ @Test
+ public void testConcurrentClose() throws ExecutionException,
InterruptedException, IOException {
+ String modsFileName =
TestConstant.BASE_OUTPUT_PATH.concat("concurrentClose.mods");
+ try (ModificationFile modsFile = new ModificationFile(modsFileName,
false)) {
+ ExecutorService threadPool = Executors.newCachedThreadPool();
+ AtomicInteger writeCounter = new AtomicInteger();
+ int maxWrite = 10000;
+ int closeInterval = 100;
+ int closeFutureNum = 5;
+ Future<Void> writeFuture = threadPool.submit(() -> write(modsFile,
maxWrite, writeCounter));
+ List<Future<Void>> closeFutures = new ArrayList<>();
+ for (int i = 0; i < closeFutureNum; i++) {
+ closeFutures.add(
+ threadPool.submit(() -> close(modsFile, writeCounter, maxWrite,
closeInterval)));
+ }
+ writeFuture.get();
+ for (Future<Void> closeFuture : closeFutures) {
+ closeFuture.get();
+ }
+ assertEquals(maxWrite, modsFile.getAllMods().size());
+ } finally {
+ Files.delete(new File(modsFileName).toPath());
+ }
+ }
+
+ private Void write(ModificationFile modificationFile, int maxWrite,
AtomicInteger writeCounter)
+ throws IllegalPathException, IOException {
+ for (int i = 0; i < maxWrite; i++) {
+ modificationFile.write(new TreeDeletionEntry(new
MeasurementPath("root.db1.d1.s1"), i, i));
+ writeCounter.incrementAndGet();
+ }
+ return null;
+ }
+
+ private Void close(
+ ModificationFile modificationFile,
+ AtomicInteger writeCounter,
+ int maxWrite,
+ int closeInterval)
+ throws IOException, InterruptedException {
+ int prevWriteCnt = 0;
+ while (writeCounter.get() < maxWrite) {
+ int writeCnt = writeCounter.get();
+ if (writeCnt - prevWriteCnt >= closeInterval) {
+ modificationFile.close();
+ prevWriteCnt = writeCnt;
+ }
+ Thread.sleep(10);
+ }
+ return null;
+ }
}