This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_worker_refactor_0928 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 981e3595205885137f24c5b543378dd6f6686315 Author: Jinrui.Zhang <[email protected]> AuthorDate: Sat Oct 7 11:05:40 2023 +0800 remove read lock --- .../execute/task/AbstractCompactionTask.java | 5 ++- .../execute/task/CrossSpaceCompactionTask.java | 42 +++------------------- .../execute/task/InnerSpaceCompactionTask.java | 23 +++--------- 3 files changed, 10 insertions(+), 60 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index f3d6810c43f..564f0aac939 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -111,10 +111,10 @@ public abstract class AbstractCompactionTask { protected abstract boolean doCompaction(); - public boolean start() { + public void start() { boolean isSuccess = false; + summary.start(); try { - summary.start(); isSuccess = doCompaction(); } finally { summary.finish(isSuccess); @@ -122,7 +122,6 @@ public abstract class AbstractCompactionTask { CompactionMetrics.getInstance() .recordTaskFinishOrAbort(crossTask, innerSeqTask, summary.getTimeCost()); } - return isSuccess; } public String getStorageGroupName() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 5c460f7c79b..4d67f8f7022 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -56,7 +56,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { protected TsFileResourceList unseqTsFileResourceList; private File logFile; protected List<TsFileResource> targetTsfileResourceList; - protected List<TsFileResource> holdReadLockList = new ArrayList<>(); protected List<TsFileResource> holdWriteLockList = new ArrayList<>(); protected double selectedSeqFileSize = 0; protected double selectedUnseqFileSize = 0; @@ -191,8 +190,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { throw new CompactionValidationFailedException("Failed to pass compaction validation"); } - releaseReadAndLockWrite(selectedSequenceFiles); - releaseReadAndLockWrite(selectedUnsequenceFiles); + lockWrite(selectedSequenceFiles); + lockWrite(selectedUnsequenceFiles); for (TsFileResource sequenceResource : selectedSequenceFiles) { if (sequenceResource.getModFile().exists()) { @@ -294,14 +293,9 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { } private void releaseAllLocksAndResetStatus() { - resetCompactionCandidateStatusForAllSourceFiles(); - for (TsFileResource tsFileResource : holdReadLockList) { - tsFileResource.readUnlock(); - } for (TsFileResource tsFileResource : holdWriteLockList) { tsFileResource.writeUnlock(); } - holdReadLockList.clear(); holdWriteLockList.clear(); } @@ -348,10 +342,8 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { return equalsOtherTask((CrossSpaceCompactionTask) other); } - private void releaseReadAndLockWrite(List<TsFileResource> tsFileResourceList) { + private void lockWrite(List<TsFileResource> tsFileResourceList) { for (TsFileResource tsFileResource : tsFileResourceList) { - tsFileResource.readUnlock(); - holdReadLockList.remove(tsFileResource); tsFileResource.writeLock(); holdWriteLockList.add(tsFileResource); } @@ -385,16 +377,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { resetCompactionCandidateStatusForAllSourceFiles(); return false; } - - boolean addReadLockSuccess = - addReadLock(selectedSequenceFiles) && addReadLock(selectedUnsequenceFiles); - if (!addReadLockSuccess) { - SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost); - SystemInfo.getInstance() - .decreaseCompactionFileNumCost( - selectedSequenceFiles.size() + selectedUnsequenceFiles.size()); - } - return addReadLockSuccess; + return true; } @Override @@ -407,23 +390,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { return selectedSequenceFiles.size() + selectedUnsequenceFiles.size(); } - private boolean addReadLock(List<TsFileResource> tsFileResourceList) { - try { - for (TsFileResource tsFileResource : tsFileResourceList) { - tsFileResource.readLock(); - holdReadLockList.add(tsFileResource); - if (!tsFileResource.setStatus(TsFileResourceStatus.COMPACTING)) { - releaseAllLocksAndResetStatus(); - return false; - } - } - } catch (Exception e) { - releaseAllLocksAndResetStatus(); - throw e; - } - return true; - } - @Override protected void createSummary() { if (performer instanceof FastCompactionPerformer) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 40cb99d8104..f52048c24ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -72,7 +72,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { protected TsFileResourceList tsFileResourceList; protected List<TsFileResource> targetTsFileList; - protected boolean[] isHoldingReadLock; protected boolean[] isHoldingWriteLock; protected long maxModsFileSize; @@ -121,11 +120,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { innerSpaceEstimator = new FastCompactionInnerCompactionEstimator(); } } - isHoldingReadLock = new boolean[selectedTsFileResourceList.size()]; isHoldingWriteLock = new boolean[selectedTsFileResourceList.size()]; for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { isHoldingWriteLock[i] = false; - isHoldingReadLock[i] = false; } if (sequence) { tsFileResourceList = tsFileManager.getOrCreateSequenceListByTimePartition(timePartition); @@ -142,10 +139,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { @Override @SuppressWarnings({"squid:S6541", "squid:S3776", "squid:S2142"}) protected boolean doCompaction() { - if (!tsFileManager.isAllowCompaction()) { - return true; - } - long startTime = System.currentTimeMillis(); // get resource of target file String dataDirectory = selectedTsFileResourceList.get(0).getTsFile().getParent(); @@ -245,8 +238,6 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { // release the read lock of all source files, and get the write lock of them to delete them for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { - selectedTsFileResourceList.get(i).readUnlock(); - isHoldingReadLock[i] = false; selectedTsFileResourceList.get(i).writeLock(); isHoldingWriteLock[i] = true; } @@ -343,7 +334,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { isSequence()); } } finally { - releaseAllLocksAndResetStatus(); + releaseAllLocks(); } return isSuccess; } @@ -450,13 +441,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { * release the read lock and write lock of files if it is held, and set the merging status of * selected files to false. */ - private void releaseAllLocksAndResetStatus() { - resetCompactionCandidateStatusForAllSourceFiles(); + private void releaseAllLocks() { for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { TsFileResource resource = selectedTsFileResourceList.get(i); - if (isHoldingReadLock[i]) { - resource.readUnlock(); - } if (isHoldingWriteLock[i]) { resource.writeUnlock(); } @@ -477,10 +464,8 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { try { for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { TsFileResource resource = selectedTsFileResourceList.get(i); - resource.readLock(); - isHoldingReadLock[i] = true; if (!resource.setStatus(TsFileResourceStatus.COMPACTING)) { - releaseAllLocksAndResetStatus(); + releaseAllLocks(); return false; } } @@ -499,7 +484,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { LOGGER.warn("No enough file num for current compaction task {}", this, e); SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost); } - releaseAllLocksAndResetStatus(); + releaseAllLocks(); return false; } finally { try {
