This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_worker_refactor_0928
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/compaction_worker_refactor_0928 by this push:
new 29ea891a701 fix UT
29ea891a701 is described below
commit 29ea891a701b814ecbe08f849ae94e66e74c669b
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sat Oct 7 17:44:11 2023 +0800
fix UT
---
.../execute/task/AbstractCompactionTask.java | 8 ---
.../execute/task/CrossSpaceCompactionTask.java | 37 +----------
.../execute/task/InnerSpaceCompactionTask.java | 50 +--------------
.../compaction/schedule/CompactionWorker.java | 7 ++-
.../compaction/CompactionTaskComparatorTest.java | 10 ---
.../FastCrossCompactionPerformerTest.java | 15 +++--
.../cross/CrossSpaceCompactionSelectorTest.java | 72 +++++++++++++++++++---
...eCrossSpaceCompactionWithFastPerformerTest.java | 7 ++-
...sSpaceCompactionWithReadPointPerformerTest.java | 7 ++-
.../InnerSeqCompactionWithFastPerformerTest.java | 4 +-
...nerSeqCompactionWithReadChunkPerformerTest.java | 5 +-
.../inner/InnerSpaceCompactionSelectorTest.java | 47 +++++++++++---
12 files changed, 137 insertions(+), 132 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 7cb14b3290b..6babcaa1cf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -124,14 +124,6 @@ public abstract class AbstractCompactionTask {
public abstract boolean equalsOtherTask(AbstractCompactionTask otherTask);
- /**
- * Check if the compaction task is valid (selected files are not merging,
closed and exist). If
- * the task is valid, then set the merging status of selected files to true.
- *
- * @return true if the task is valid else false
- */
- public abstract boolean checkValidAndSetMerging();
-
public void transitSourceFilesToMerging() throws
FileCannotTransitToCompactingException {
for (TsFileResource f : getAllSourceTsFiles()) {
if (!f.setStatus(TsFileResourceStatus.COMPACTING)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 5f61e5f10be..08e36eb244c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -23,8 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -37,7 +35,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -272,11 +269,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
false,
true);
} finally {
- SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
- SystemInfo.getInstance()
- .decreaseCompactionFileNumCost(
- selectedSequenceFiles.size() + selectedUnsequenceFiles.size());
- releaseAllLocksAndResetStatus();
+ releaseAllLocks();
}
return isSuccess;
}
@@ -292,7 +285,7 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
&&
this.performer.getClass().isInstance(otherCrossCompactionTask.performer);
}
- private void releaseAllLocksAndResetStatus() {
+ private void releaseAllLocks() {
for (TsFileResource tsFileResource : holdWriteLockList) {
tsFileResource.writeUnlock();
}
@@ -349,32 +342,6 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
}
}
- @Override
- public boolean checkValidAndSetMerging() {
- if (!tsFileManager.isAllowCompaction()) {
- resetCompactionCandidateStatusForAllSourceFiles();
- return false;
- }
- try {
- SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60);
- SystemInfo.getInstance()
- .addCompactionFileNum(selectedSequenceFiles.size() +
selectedUnsequenceFiles.size(), 60);
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- LOGGER.warn("Interrupted when allocating memory for compaction", e);
- Thread.currentThread().interrupt();
- } else if (e instanceof CompactionMemoryNotEnoughException) {
- LOGGER.info("No enough memory for current compaction task {}", this,
e);
- } else if (e instanceof CompactionFileCountExceededException) {
- LOGGER.info("No enough file num for current compaction task {}", this,
e);
- SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
- }
- resetCompactionCandidateStatusForAllSourceFiles();
- return false;
- }
- return true;
- }
-
@Override
public long getEstimatedMemoryCost() {
return memoryCost;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index ad5f638564c..c67c0b60f88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -42,7 +40,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
@@ -97,7 +94,7 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
if
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
if (this.performer instanceof ReadChunkCompactionPerformer) {
innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
- } else if (!sequence && this.performer instanceof
FastCompactionInnerCompactionEstimator) {
+ } else if (!sequence && this.performer instanceof
FastCompactionPerformer) {
innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
}
}
@@ -431,52 +428,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
}
}
- @Override
- public boolean checkValidAndSetMerging() {
- if (!tsFileManager.isAllowCompaction()) {
- resetCompactionCandidateStatusForAllSourceFiles();
- return false;
- }
- try {
- for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
- TsFileResource resource = selectedTsFileResourceList.get(i);
- if (!resource.setStatus(TsFileResourceStatus.COMPACTING)) {
- releaseAllLocks();
- return false;
- }
- }
- if (innerSpaceEstimator != null) {
- memoryCost =
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
- }
- SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60);
-
SystemInfo.getInstance().addCompactionFileNum(selectedTsFileResourceList.size(),
60);
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- LOGGER.warn("Interrupted when allocating memory for compaction", e);
- Thread.currentThread().interrupt();
- } else if (e instanceof CompactionMemoryNotEnoughException) {
- LOGGER.warn("No enough memory for current compaction task {}", this,
e);
- } else if (e instanceof CompactionFileCountExceededException) {
- LOGGER.warn("No enough file num for current compaction task {}", this,
e);
- SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
- }
- releaseAllLocks();
- return false;
- } finally {
- try {
- if (innerSpaceEstimator != null) {
- innerSpaceEstimator.close();
- }
- } catch (IOException e) {
- LOGGER.warn("Failed to close InnerSpaceCompactionMemoryEstimator");
- }
- }
- return true;
- }
-
@Override
public long getEstimatedMemoryCost() throws IOException {
- if (memoryCost == 0L) {
+ if (innerSpaceEstimator != null && memoryCost == 0L) {
memoryCost =
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
}
return memoryCost;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index e722bb1bdaf..36421205502 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
@@ -75,8 +76,10 @@ public class CompactionWorker implements Runnable {
return;
}
task.transitSourceFilesToMerging();
- estimatedMemoryCost = task.getEstimatedMemoryCost();
- memoryAcquired =
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+ if
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
+ estimatedMemoryCost = task.getEstimatedMemoryCost();
+ memoryAcquired =
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+ }
fileHandleAcquired =
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
CompactionTaskSummary summary = task.getSummary();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
index bea68900384..ed7d0a70d2a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
@@ -424,11 +424,6 @@ public class CompactionTaskComparatorTest {
public boolean equalsOtherTask(AbstractCompactionTask other) {
return false;
}
-
- @Override
- public boolean checkValidAndSetMerging() {
- return true;
- }
}
private static class FakeCrossSpaceCompactionTask extends
CrossSpaceCompactionTask {
@@ -459,11 +454,6 @@ public class CompactionTaskComparatorTest {
public boolean equalsOtherTask(AbstractCompactionTask other) {
return false;
}
-
- @Override
- public boolean checkValidAndSetMerging() {
- return true;
- }
}
private static class FakedTsFileResource extends TsFileResource {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
index 6c5c336e6d9..d67bf6be322 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
@@ -27,17 +27,20 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -52,6 +55,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -4029,7 +4033,7 @@ public class FastCrossCompactionPerformerTest extends
AbstractCompactionTest {
@Test
public void testReleaseFileNumAndMemoryAfterCrossTask()
- throws IOException, MetadataException, WriteProcessException {
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
int oldMaxCrossCompactionCandidateFileNum =
SystemInfo.getInstance().getTotalFileLimitForCrossTask();
SystemInfo.getInstance().setTotalFileLimitForCrossTask(15);
@@ -4050,9 +4054,12 @@ public class FastCrossCompactionPerformerTest extends
AbstractCompactionTest {
1000,
0);
Assert.assertTrue(task.setSourceFilesToCompactionCandidate());
- boolean success = task.checkValidAndSetMerging();
- Assert.assertTrue(success);
- Assert.assertTrue(task.start());
+
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take()).thenReturn(task).thenThrow(new
InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionFileNumCost().get());
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionMemoryCost().get());
} finally {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
index ac4f8d4a939..8b9358ac969 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
@@ -22,7 +22,10 @@ import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate;
@@ -30,12 +33,14 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManag
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.List;
@@ -176,7 +181,7 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
@Test
public void testSelectWithTooManySourceFiles()
- throws IOException, MetadataException, WriteProcessException {
+ throws IOException, MetadataException, WriteProcessException,
InterruptedException {
int oldMaxFileNumForCompaction =
SystemInfo.getInstance().getTotalFileLimitForCrossTask();
SystemInfo.getInstance().setTotalFileLimitForCrossTask(1);
SystemInfo.getInstance().getCompactionFileNumCost().set(0);
@@ -207,7 +212,14 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
// set file status to COMPACTION_CANDIDATE
Assert.assertTrue(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate());
- Assert.assertFalse(crossSpaceCompactionTask.checkValidAndSetMerging());
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(crossSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
+
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionMemoryCost().get());
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionFileNumCost().get());
for (TsFileResource resource : seqResources) {
@@ -564,9 +576,19 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd1.countDown();
cd2.await();
- if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be invalid.");
+ try {
+ crossSpaceCompactionTask.transitSourceFilesToMerging();
+ Assert.fail("cross space compaction task should be
invalid.");
+ } catch (FileCannotTransitToCompactingException e) {
+
}
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(crossSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource resource = seqResources.get(i);
@@ -906,9 +928,19 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd1.countDown();
cd2.await();
- if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be invalid.");
+ try {
+ crossSpaceCompactionTask.transitSourceFilesToMerging();
+ Assert.fail("cross space compaction task should be
invalid.");
+ } catch (FileCannotTransitToCompactingException e) {
+
}
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(crossSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource resource = seqResources.get(i);
@@ -1382,9 +1414,19 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd1.countDown();
cd2.await();
- if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be invalid.");
+ try {
+ crossSpaceCompactionTask.transitSourceFilesToMerging();
+ Assert.fail("cross space compaction task should be
invalid.");
+ } catch (FileCannotTransitToCompactingException e) {
+
}
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(crossSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
for (int i = 0; i < unseqResources.size(); i++) {
TsFileResource resource = unseqResources.get(i);
@@ -1723,9 +1765,19 @@ public class CrossSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd1.countDown();
cd2.await();
- if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be invalid.");
+ try {
+ crossSpaceCompactionTask.transitSourceFilesToMerging();
+ Assert.fail("cross space compaction task should be
invalid.");
+ } catch (FileCannotTransitToCompactingException e) {
+
}
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(crossSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
for (int i = 0; i < unseqResources.size(); i++) {
TsFileResource resource = unseqResources.get(i);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index a0738f8f36b..5d9f43868ce 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -596,7 +597,8 @@ public class
RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
0,
0);
task.setSourceFilesToCompactionCandidate();
- task.checkValidAndSetMerging();
+ seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+ unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data in source file during compaction
vsgp.deleteByDevice(
new PartialPath(
@@ -714,7 +716,8 @@ public class
RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
0,
0);
task.setSourceFilesToCompactionCandidate();
- task.checkValidAndSetMerging();
+ seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+ unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data in source file during compaction
vsgp.deleteByDevice(
new PartialPath(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
index 89ac14094da..24bf2305eff 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -596,7 +597,8 @@ public class
RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
0,
0);
task.setSourceFilesToCompactionCandidate();
- task.checkValidAndSetMerging();
+ seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+ unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data in source file during compaction
vsgp.deleteByDevice(
new PartialPath(
@@ -714,7 +716,8 @@ public class
RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
0,
0);
task.setSourceFilesToCompactionCandidate();
- task.checkValidAndSetMerging();
+ seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+ unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data in source file during compaction
vsgp.deleteByDevice(
new PartialPath(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
index b0ac91e2fed..e8c01e2b0e1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType;
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -1136,7 +1137,8 @@ public class InnerSeqCompactionWithFastPerformerTest {
0, vsgp.getTsFileResourceManager(), sourceResources, true,
performer, 0);
task.setSourceFilesToCompactionCandidate();
- task.checkValidAndSetMerging();
+ // set the source files to COMPACTING manually to simulate the concurrent
scenario
+ sourceResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data during compaction
vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1200, 0);
vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1800, 0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
index 59537af962f..83b5933c60e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
@@ -39,6 +39,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType;
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -1059,7 +1060,7 @@ public class InnerSeqCompactionWithReadChunkPerformerTest
{
@Test
public void testCompactionWithDeletionsDuringCompactions()
- throws MetadataException, IOException, DataRegionException {
+ throws MetadataException, IOException, DataRegionException,
InterruptedException {
// create source seq files
List<TsFileResource> sourceResources = new ArrayList<>();
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
@@ -1102,7 +1103,7 @@ public class InnerSeqCompactionWithReadChunkPerformerTest
{
new ReadChunkCompactionPerformer(),
0);
task.setSourceFilesToCompactionCandidate();
- task.checkValidAndSetMerging();
+ sourceResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
// delete data during compaction
vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1200, 0);
vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1800, 0);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
index f07bc50031c..2e491ec2595 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
@@ -23,20 +23,25 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SizeTieredCompactionSelector;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.List;
@@ -280,9 +285,20 @@ public class InnerSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd1.countDown();
cd2.await();
- if (innerSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be invalid.");
+ try {
+ innerSpaceCompactionTask.transitSourceFilesToMerging();
+ Assert.fail("inner space compaction task should be
invalid.");
+ } catch (FileCannotTransitToCompactingException e) {
+
}
+ FixedPriorityBlockingQueue<AbstractCompactionTask>
mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(innerSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0,
mockQueue);
+ worker.run();
+
for (int i = 0; i < task.size(); i++) {
TsFileResource resource = task.get(i);
if (i == 1) {
@@ -294,8 +310,10 @@ public class InnerSpaceCompactionSelectorTest extends
AbstractCompactionTest {
}
}
} else {
- if (!innerSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be valid.");
+ try {
+ innerSpaceCompactionTask.transitSourceFilesToMerging();
+ } catch (FileCannotTransitToCompactingException e) {
+ Assert.fail("inner space compaction task should be
valid.");
}
for (int i = 0; i < task.size(); i++) {
TsFileResource resource = task.get(i);
@@ -566,9 +584,20 @@ public class InnerSpaceCompactionSelectorTest extends
AbstractCompactionTest {
cd1.countDown();
cd2.await();
- if (innerSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be invalid.");
+ try {
+ innerSpaceCompactionTask.transitSourceFilesToMerging();
+ Assert.fail("inner space compaction task should be
invalid.");
+ } catch (FileCannotTransitToCompactingException e) {
+
}
+ FixedPriorityBlockingQueue<AbstractCompactionTask>
mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take())
+ .thenReturn(innerSpaceCompactionTask)
+ .thenThrow(new InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0,
mockQueue);
+ worker.run();
+
for (int i = 0; i < task.size(); i++) {
TsFileResource resource = task.get(i);
if (i == 1) {
@@ -580,8 +609,10 @@ public class InnerSpaceCompactionSelectorTest extends
AbstractCompactionTest {
}
}
} else {
- if (!innerSpaceCompactionTask.checkValidAndSetMerging()) {
- throw new RuntimeException("cross space compaction task
should be valid.");
+ try {
+ innerSpaceCompactionTask.transitSourceFilesToMerging();
+ } catch (FileCannotTransitToCompactingException e) {
+ Assert.fail("inner space compaction task should be
valid.");
}
for (int i = 0; i < task.size(); i++) {
TsFileResource resource = task.get(i);