This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_worker_refactor_0928
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/compaction_worker_refactor_0928 by this push:
new 77bd0d88d10 remove read lock
77bd0d88d10 is described below
commit 77bd0d88d10d04b49fdaeb98123bdc59f0bd88ae
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sat Oct 7 11:05:40 2023 +0800
remove read lock
---
.../execute/task/AbstractCompactionTask.java | 5 ++-
.../execute/task/CrossSpaceCompactionTask.java | 42 +++-------------------
.../execute/task/InnerSpaceCompactionTask.java | 23 +++---------
3 files changed, 10 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index a3cee65a3b4..de6792e0e25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -96,10 +96,10 @@ public abstract class AbstractCompactionTask {
protected abstract boolean doCompaction();
- public boolean start() {
+ public void start() {
boolean isSuccess = false;
+ summary.start();
try {
- summary.start();
isSuccess = doCompaction();
} finally {
summary.finish(isSuccess);
@@ -107,7 +107,6 @@ public abstract class AbstractCompactionTask {
CompactionMetrics.getInstance()
.recordTaskFinishOrAbort(crossTask, innerSeqTask,
summary.getTimeCost());
}
- return isSuccess;
}
public String getStorageGroupName() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index db9a8192d5a..5f61e5f10be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -56,7 +56,6 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
protected TsFileResourceList unseqTsFileResourceList;
private File logFile;
protected List<TsFileResource> targetTsfileResourceList;
- protected List<TsFileResource> holdReadLockList = new ArrayList<>();
protected List<TsFileResource> holdWriteLockList = new ArrayList<>();
protected double selectedSeqFileSize = 0;
protected double selectedUnseqFileSize = 0;
@@ -191,8 +190,8 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
throw new CompactionValidationFailedException("Failed to pass
compaction validation");
}
- releaseReadAndLockWrite(selectedSequenceFiles);
- releaseReadAndLockWrite(selectedUnsequenceFiles);
+ lockWrite(selectedSequenceFiles);
+ lockWrite(selectedUnsequenceFiles);
for (TsFileResource sequenceResource : selectedSequenceFiles) {
if (sequenceResource.getModFile().exists()) {
@@ -294,14 +293,9 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
}
private void releaseAllLocksAndResetStatus() {
- resetCompactionCandidateStatusForAllSourceFiles();
- for (TsFileResource tsFileResource : holdReadLockList) {
- tsFileResource.readUnlock();
- }
for (TsFileResource tsFileResource : holdWriteLockList) {
tsFileResource.writeUnlock();
}
- holdReadLockList.clear();
holdWriteLockList.clear();
}
@@ -348,10 +342,8 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
return equalsOtherTask((CrossSpaceCompactionTask) other);
}
- private void releaseReadAndLockWrite(List<TsFileResource>
tsFileResourceList) {
+ private void lockWrite(List<TsFileResource> tsFileResourceList) {
for (TsFileResource tsFileResource : tsFileResourceList) {
- tsFileResource.readUnlock();
- holdReadLockList.remove(tsFileResource);
tsFileResource.writeLock();
holdWriteLockList.add(tsFileResource);
}
@@ -380,16 +372,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
resetCompactionCandidateStatusForAllSourceFiles();
return false;
}
-
- boolean addReadLockSuccess =
- addReadLock(selectedSequenceFiles) &&
addReadLock(selectedUnsequenceFiles);
- if (!addReadLockSuccess) {
- SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
- SystemInfo.getInstance()
- .decreaseCompactionFileNumCost(
- selectedSequenceFiles.size() + selectedUnsequenceFiles.size());
- }
- return addReadLockSuccess;
+ return true;
}
@Override
@@ -402,23 +385,6 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
return selectedSequenceFiles.size() + selectedUnsequenceFiles.size();
}
- private boolean addReadLock(List<TsFileResource> tsFileResourceList) {
- try {
- for (TsFileResource tsFileResource : tsFileResourceList) {
- tsFileResource.readLock();
- holdReadLockList.add(tsFileResource);
- if (!tsFileResource.setStatus(TsFileResourceStatus.COMPACTING)) {
- releaseAllLocksAndResetStatus();
- return false;
- }
- }
- } catch (Exception e) {
- releaseAllLocksAndResetStatus();
- throw e;
- }
- return true;
- }
-
@Override
protected void createSummary() {
if (performer instanceof FastCompactionPerformer) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index d4ed8be016b..ad5f638564c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -72,7 +72,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
protected TsFileResourceList tsFileResourceList;
protected List<TsFileResource> targetTsFileList;
- protected boolean[] isHoldingReadLock;
protected boolean[] isHoldingWriteLock;
protected long maxModsFileSize;
@@ -102,11 +101,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
}
}
- isHoldingReadLock = new boolean[selectedTsFileResourceList.size()];
isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()];
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
isHoldingWriteLock[i] = false;
- isHoldingReadLock[i] = false;
}
if (sequence) {
tsFileResourceList =
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition);
@@ -123,10 +120,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
@Override
@SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"})
protected boolean doCompaction() {
- if (!tsFileManager.isAllowCompaction()) {
- return true;
- }
-
long startTime = System.currentTimeMillis();
// get resource of target file
String dataDirectory =
selectedTsFileResourceList.get(0).getTsFile().getParent();
@@ -226,8 +219,6 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
// release the read lock of all source files, and get the write lock
of them to delete them
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- selectedTsFileResourceList.get(i).readUnlock();
- isHoldingReadLock[i] = false;
selectedTsFileResourceList.get(i).writeLock();
isHoldingWriteLock[i] = true;
}
@@ -324,7 +315,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
isSequence());
}
} finally {
- releaseAllLocksAndResetStatus();
+ releaseAllLocks();
}
return isSuccess;
}
@@ -431,13 +422,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
* release the read lock and write lock of files if it is held, and set the
merging status of
* selected files to false.
*/
- private void releaseAllLocksAndResetStatus() {
- resetCompactionCandidateStatusForAllSourceFiles();
+ private void releaseAllLocks() {
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
TsFileResource resource = selectedTsFileResourceList.get(i);
- if (isHoldingReadLock[i]) {
- resource.readUnlock();
- }
if (isHoldingWriteLock[i]) {
resource.writeUnlock();
}
@@ -453,10 +440,8 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
try {
for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
TsFileResource resource = selectedTsFileResourceList.get(i);
- resource.readLock();
- isHoldingReadLock[i] = true;
if (!resource.setStatus(TsFileResourceStatus.COMPACTING)) {
- releaseAllLocksAndResetStatus();
+ releaseAllLocks();
return false;
}
}
@@ -475,7 +460,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
LOGGER.warn("No enough file num for current compaction task {}", this,
e);
SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
}
- releaseAllLocksAndResetStatus();
+ releaseAllLocks();
return false;
} finally {
try {