This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a7063c1 [IOTDB-2166] Avoid TTL deleting source files in running
compaction tasks (#4589)
a7063c1 is described below
commit a7063c1e40fbdca5873311f301946f76b80295d7
Author: liuxuxin <[email protected]>
AuthorDate: Tue Dec 21 23:56:06 2021 +0800
[IOTDB-2166] Avoid TTL deleting source files in running compaction tasks
(#4589)
---
.../SizeTieredCompactionRecoverTask.java | 2 +-
.../inner/sizetiered/SizeTieredCompactionTask.java | 80 ++++++++++++----------
.../engine/storagegroup/StorageGroupProcessor.java | 7 +-
3 files changed, 50 insertions(+), 39 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
index 8e4fe4c..dea6d4f 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionRecoverTask.java
@@ -60,7 +60,7 @@ public class SizeTieredCompactionRecoverTask extends
SizeTieredCompactionTask {
timePartition,
null,
null,
- null,
+ new ArrayList<>(),
sequence,
currentTaskNum);
this.compactionLogFile = compactionLogFile;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
index fb9fe11..a0b9566 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionTask.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,6 +48,8 @@ public class SizeTieredCompactionTask extends
AbstractInnerSpaceCompactionTask {
private static final Logger LOGGER = LoggerFactory.getLogger("COMPACTION");
protected TsFileResourceList tsFileResourceList;
protected TsFileManager tsFileManager;
+ protected boolean[] isHoldingReadLock;
+ protected boolean[] isHoldingWriteLock;
public SizeTieredCompactionTask(
String logicalStorageGroupName,
@@ -67,6 +68,12 @@ public class SizeTieredCompactionTask extends
AbstractInnerSpaceCompactionTask {
selectedTsFileResourceList);
this.tsFileResourceList = tsFileResourceList;
this.tsFileManager = tsFileManager;
+ isHoldingReadLock = new boolean[selectedTsFileResourceList.size()];
+ isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()];
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ isHoldingWriteLock[i] = false;
+ isHoldingReadLock[i] = false;
+ }
}
@Override
@@ -86,19 +93,6 @@ public class SizeTieredCompactionTask extends
AbstractInnerSpaceCompactionTask {
selectedTsFileResourceList.size());
File logFile = null;
SizeTieredCompactionLogger sizeTieredCompactionLogger = null;
- // to mark if we got the write lock or read lock of the selected tsfile
- boolean[] isHoldingReadLock = new
boolean[selectedTsFileResourceList.size()];
- boolean[] isHoldingWriteLock = new
boolean[selectedTsFileResourceList.size()];
- for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- isHoldingReadLock[i] = false;
- isHoldingWriteLock[i] = false;
- }
- LOGGER.info(
- "{} [Compaction] Try to get the read lock of all selected files",
fullStorageGroupName);
- for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- selectedTsFileResourceList.get(i).readLock();
- isHoldingReadLock[i] = true;
- }
try {
logFile =
@@ -212,16 +206,8 @@ public class SizeTieredCompactionTask extends
AbstractInnerSpaceCompactionTask {
tsFileManager,
tsFileResourceList);
} finally {
+ releaseFileLocksAndResetMergingStatus(true);
tsFileManager.writeUnlock();
- for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- if (isHoldingReadLock[i]) {
- selectedTsFileResourceList.get(i).readUnlock();
- }
- if (isHoldingWriteLock[i]) {
- selectedTsFileResourceList.get(i).writeUnlock();
- }
- selectedTsFileResourceList.get(i).setMerging(false);
- }
}
}
@@ -239,24 +225,46 @@ public class SizeTieredCompactionTask extends
AbstractInnerSpaceCompactionTask {
@Override
public boolean checkValidAndSetMerging() {
- long minVersionNum = Long.MAX_VALUE;
+ tsFileResourceList.readLock();
try {
- for (TsFileResource resource : selectedTsFileResourceList) {
- if (resource.isMerging() | !resource.isClosed() ||
!resource.getTsFile().exists()) {
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ TsFileResource resource = selectedTsFileResourceList.get(i);
+ resource.readLock();
+ isHoldingReadLock[i] = true;
+ if (resource.isMerging() | !resource.isClosed()
+ || !resource.getTsFile().exists()
+ || resource.isDeleted()) {
+ // this source file cannot be compacted
+ // release the lock of locked files, and return
+ releaseFileLocksAndResetMergingStatus(false);
return false;
}
- TsFileNameGenerator.TsFileName tsFileName =
- TsFileNameGenerator.getTsFileName(resource.getTsFile().getName());
- if (tsFileName.getVersion() < minVersionNum) {
- minVersionNum = tsFileName.getVersion();
- }
}
- } catch (IOException e) {
- LOGGER.error("CompactionTask exists while check valid", e);
+
+ for (TsFileResource resource : selectedTsFileResourceList) {
+ resource.setMerging(true);
+ }
+ return true;
+ } finally {
+ tsFileResourceList.readUnlock();
}
- for (TsFileResource resource : selectedTsFileResourceList) {
- resource.setMerging(true);
+ }
+
+ /**
+ * release the read lock and write lock of files if it is held, and set the
merging status of
+ * selected files to false
+ */
+ private void releaseFileLocksAndResetMergingStatus(boolean
resetCompactingStatus) {
+ for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
+ if (isHoldingReadLock[i]) {
+ selectedTsFileResourceList.get(i).readUnlock();
+ }
+ if (isHoldingWriteLock[i]) {
+ selectedTsFileResourceList.get(i).writeUnlock();
+ }
+ if (resetCompactingStatus) {
+ selectedTsFileResourceList.get(i).setMerging(false);
+ }
}
- return true;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 688ab42..49ee221 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1534,7 +1534,9 @@ public class StorageGroupProcessor {
return;
}
- writeLock("checkFileTTL");
+ TsFileResourceList resourceList =
+
tsFileManager.getSequenceListByTimePartition(resource.getTimePartition());
+ resourceList.writeLock();
try {
// prevent new merges and queries from choosing this file
resource.setDeleted(true);
@@ -1544,6 +1546,7 @@ public class StorageGroupProcessor {
try {
// physical removal
resource.remove();
+ resourceList.remove(resource);
if (logger.isInfoEnabled()) {
logger.info(
"Removed a file {} before {} by ttl ({}ms)",
@@ -1557,7 +1560,7 @@ public class StorageGroupProcessor {
}
}
} finally {
- writeUnlock();
+ resourceList.writeUnlock();
}
}