This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e56b34f3535 fix mods file metrics & fix the issue of missing mods with
concurrent deletion and compaction (#14765)
e56b34f3535 is described below
commit e56b34f3535accee21295c274a6e7dd25625f69c
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 24 09:46:38 2025 +0800
fix mods file metrics & fix the issue of missing mods with concurrent
deletion and compaction (#14765)
* fix mods file metrics
* add lock for ModificationFile.remove
* spotless
---
.../db/storageengine/dataregion/DataRegion.java | 14 ++++-
.../compaction/execute/utils/CompactionUtils.java | 5 +-
.../dataregion/modification/ModificationFile.java | 69 +++++++++++++++-------
.../dataregion/tsfile/TsFileResource.java | 23 +++++---
4 files changed, 80 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3b28ec76b5b..ca758e26c8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -501,7 +501,12 @@ public class DataRegion implements IDataRegionForQuery {
resource.getTsFile().length(),
true,
resource.getTsFile().getName());
- resource.upgradeModFile(upgradeModFileThreadPool);
+ if
(ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
+ // update mods file metrics
+ resource.getExclusiveModFile();
+ } else {
+ resource.upgradeModFile(upgradeModFileThreadPool);
+ }
}
}
while (!value.isEmpty()) {
@@ -530,7 +535,12 @@ public class DataRegion implements IDataRegionForQuery {
false,
resource.getTsFile().getName());
}
- resource.upgradeModFile(upgradeModFileThreadPool);
+ if
(ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
+ // update mods file metrics
+ resource.getExclusiveModFile();
+ } else {
+ resource.upgradeModFile(upgradeModFileThreadPool);
+ }
}
while (!value.isEmpty()) {
TsFileResource tsFileResource = value.get(value.size() - 1);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index f776aa23022..6e1eafd1cf0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -148,7 +148,7 @@ public class CompactionUtils {
Set<ModEntry> modifications = new HashSet<>();
// get compaction mods from all source unseq files
for (TsFileResource unseqFile : unseqResources) {
-
modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getAllMods());
+
modifications.addAll(ModificationFile.readAllCompactionModifications(unseqFile.getTsFile()));
}
// write target mods file
@@ -158,7 +158,8 @@ public class CompactionUtils {
continue;
}
Set<ModEntry> seqModifications =
- new
HashSet<>(ModificationFile.getCompactionMods(seqResources.get(i)).getAllMods());
+ new HashSet<>(
+
ModificationFile.readAllCompactionModifications(seqResources.get(i).getTsFile()));
modifications.addAll(seqModifications);
updateOneTargetMods(targetResource, modifications);
modifications.removeAll(seqModifications);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
index 396255df037..435f1fe8d3a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java
@@ -73,6 +73,7 @@ public class ModificationFile implements AutoCloseable {
private boolean hasCompacted = false;
private boolean fileExists = false;
private final boolean updateMetrics;
+ private boolean removed = false;
private Set<ModificationFile> cascadeFiles = null;
@@ -89,19 +90,29 @@ public class ModificationFile implements AutoCloseable {
}
}
+ public void writeLock() {
+ this.lock.writeLock().lock();
+ }
+
+ public void writeUnlock() {
+ this.lock.writeLock().unlock();
+ }
+
@SuppressWarnings("java:S2093") // cannot use try-with-resource, should not
close here
public void write(ModEntry entry) throws IOException {
int updateFileNum = 0;
lock.writeLock().lock();
long size = 0;
try {
- if (fileOutputStream == null) {
- fileOutputStream =
- new BufferedOutputStream(Files.newOutputStream(file.toPath(),
CREATE, APPEND));
- channel = FileChannel.open(file.toPath(), CREATE, APPEND);
+ if (!removed) {
+ if (fileOutputStream == null) {
+ fileOutputStream =
+ new BufferedOutputStream(Files.newOutputStream(file.toPath(),
CREATE, APPEND));
+ channel = FileChannel.open(file.toPath(), CREATE, APPEND);
+ }
+ size += entry.serialize(fileOutputStream);
+ fileOutputStream.flush();
}
- size += entry.serialize(fileOutputStream);
- fileOutputStream.flush();
if (cascadeFiles != null) {
for (ModificationFile cascadeFile : cascadeFiles) {
@@ -124,15 +135,17 @@ public class ModificationFile implements AutoCloseable {
lock.writeLock().lock();
long size = 0;
try {
- if (fileOutputStream == null) {
- fileOutputStream =
- new BufferedOutputStream(Files.newOutputStream(file.toPath(),
CREATE, APPEND));
- channel = FileChannel.open(file.toPath(), CREATE, APPEND);
- }
- for (ModEntry entry : entries) {
- size += entry.serialize(fileOutputStream);
+ if (!removed) {
+ if (fileOutputStream == null) {
+ fileOutputStream =
+ new BufferedOutputStream(Files.newOutputStream(file.toPath(),
CREATE, APPEND));
+ channel = FileChannel.open(file.toPath(), CREATE, APPEND);
+ }
+ for (ModEntry entry : entries) {
+ size += entry.serialize(fileOutputStream);
+ }
+ fileOutputStream.flush();
}
- fileOutputStream.flush();
if (cascadeFiles != null) {
for (ModificationFile cascadeFile : cascadeFiles) {
@@ -150,7 +163,7 @@ public class ModificationFile implements AutoCloseable {
}
private void updateModFileMetric(int num, long size) {
- if (updateMetrics) {
+ if (!removed && updateMetrics) {
FileMetrics.getInstance().increaseModFileNum(num);
FileMetrics.getInstance().increaseModFileSize(size);
}
@@ -214,6 +227,16 @@ public class ModificationFile implements AutoCloseable {
return new long[] {levelNum, modNum};
}
+ public static List<ModEntry> readAllCompactionModifications(File tsfile)
throws IOException {
+ try (ModificationFile modificationFile =
+ new ModificationFile(ModificationFile.getCompactionMods(tsfile),
false)) {
+ if (modificationFile.exists()) {
+ return modificationFile.getAllMods();
+ }
+ }
+ return Collections.emptyList();
+ }
+
public static List<ModEntry> readAllModifications(
File tsfile, boolean readOldModFileIfNewModFileNotExists) throws
IOException {
try (ModificationFile modificationFile =
@@ -313,12 +336,18 @@ public class ModificationFile implements AutoCloseable {
}
public void remove() throws IOException {
- close();
- FileUtils.deleteFileOrDirectory(file);
- if (fileExists) {
- updateModFileMetric(-1, -getFileLength());
+ lock.writeLock().lock();
+ try {
+ close();
+ FileUtils.deleteFileOrDirectory(file);
+ if (fileExists) {
+ updateModFileMetric(-1, -getFileLength());
+ }
+ fileExists = false;
+ removed = true;
+ } finally {
+ lock.writeLock().unlock();
}
- fileExists = false;
}
public static ModificationFile getExclusiveMods(TsFileResource
tsFileResource) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 48ca35fbc7d..4e6fa190b60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -391,13 +391,23 @@ public class TsFileResource implements PersistentResource
{
}
public void linkModFile(TsFileResource target) throws IOException {
- if (exclusiveModFileExists()) {
- File modsFileForTargetResource =
ModificationFile.getExclusiveMods(target.getTsFile());
- Files.createLink(
- modsFileForTargetResource.toPath(),
- ModificationFile.getExclusiveMods(getTsFile()).toPath());
- target.setExclusiveModFile(new
ModificationFile(modsFileForTargetResource, true));
+ File targetModsFile =
ModificationFile.getExclusiveMods(target.getTsFile());
+ ModificationFile sourceModFile = this.getExclusiveModFile();
+ ModificationFile targetModsFileObject;
+ sourceModFile.writeLock();
+ try {
+ if (sourceModFile.exists()) {
+ Files.createLink(
+ targetModsFile.toPath(),
ModificationFile.getExclusiveMods(getTsFile()).toPath());
+ targetModsFileObject = new ModificationFile(targetModsFile, true);
+ } else {
+ targetModsFileObject = new ModificationFile(targetModsFile, true);
+
sourceModFile.setCascadeFile(Collections.singleton(targetModsFileObject));
+ }
+ } finally {
+ sourceModFile.writeUnlock();
}
+ target.setExclusiveModFile(targetModsFileObject);
if (sharedModFileExists()) {
modFileManagement.addReference(target, sharedModFile);
target.setSharedModFile(this.getSharedModFile(), false);
@@ -770,7 +780,6 @@ public class TsFileResource implements PersistentResource {
if (getExclusiveModFile().exists()) {
getExclusiveModFile().remove();
}
- exclusiveModFile = null;
if (getSharedModFile() != null && modFileManagement != null) {
modFileManagement.releaseFor(this, sharedModFile);