This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch compaction_worker_refactor_0928
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/compaction_worker_refactor_0928 by this push:
new 19d5772851b add compactionWorkerTest
19d5772851b is described below
commit 19d5772851b520664d4f1d79511ae1879fecc09b
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sat Oct 7 12:34:24 2023 +0800
add compactionWorkerTest
---
.../compaction/schedule/CompactionWorker.java | 77 ++++++++++++----------
...yControlTest.java => CompactionWorkerTest.java} | 51 +++++++++-----
2 files changed, 75 insertions(+), 53 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index 64be3cdd09f..e722bb1bdaf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -53,46 +53,53 @@ public class CompactionWorker implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
- AbstractCompactionTask task;
- try {
- task = compactionTaskQueue.take();
- } catch (InterruptedException e) {
- log.warn("CompactionThread-{} terminates because interruption",
threadId);
- return;
- }
+ processOneCompactionTask();
+ }
+ }
+
+ private void processOneCompactionTask() {
+ AbstractCompactionTask task;
+ try {
+ task = compactionTaskQueue.take();
+ } catch (InterruptedException e) {
+ log.warn("CompactionThread-{} terminates because interruption",
threadId);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ long estimatedMemoryCost = 0L;
+ boolean memoryAcquired = false;
+ boolean fileHandleAcquired = false;
+ try {
if (task == null || !task.isCompactionAllowed()) {
log.info("Compaction task is not allowed to be executed by
TsFileManager. Task {}", task);
return;
}
- long estimatedMemoryCost = 0L;
- boolean memoryAcquired = false;
- boolean fileHandleAcquired = false;
- try {
- task.transitSourceFilesToMerging();
- estimatedMemoryCost = task.getEstimatedMemoryCost();
- memoryAcquired =
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
- fileHandleAcquired =
-
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
- CompactionTaskSummary summary = task.getSummary();
- CompactionTaskFuture future = new CompactionTaskFuture(summary);
- CompactionTaskManager.getInstance().recordTask(task, future);
- task.start();
- } catch (FileCannotTransitToCompactingException
- | IOException
- | CompactionMemoryNotEnoughException
- | CompactionFileCountExceededException e) {
- log.info("CompactionTask {} cannot be executed. Reason: {}", task, e);
- } catch (InterruptedException e) {
- log.warn("InterruptedException occurred when preparing compaction
task. {}", task, e);
- Thread.currentThread().interrupt();
- } finally {
+ task.transitSourceFilesToMerging();
+ estimatedMemoryCost = task.getEstimatedMemoryCost();
+ memoryAcquired =
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+ fileHandleAcquired =
+
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
+ CompactionTaskSummary summary = task.getSummary();
+ CompactionTaskFuture future = new CompactionTaskFuture(summary);
+ CompactionTaskManager.getInstance().recordTask(task, future);
+ task.start();
+ } catch (FileCannotTransitToCompactingException
+ | IOException
+ | CompactionMemoryNotEnoughException
+ | CompactionFileCountExceededException e) {
+ log.info("CompactionTask {} cannot be executed. Reason: {}", task, e);
+ } catch (InterruptedException e) {
+ log.warn("InterruptedException occurred when preparing compaction task.
{}", task, e);
+ Thread.currentThread().interrupt();
+ } finally {
+ if (task != null) {
task.resetCompactionCandidateStatusForAllSourceFiles();
- if (memoryAcquired) {
-
SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost);
- }
- if (fileHandleAcquired) {
-
SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum());
- }
+ }
+ if (memoryAcquired) {
+
SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost);
+ }
+ if (fileHandleAcquired) {
+
SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum());
}
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java
similarity index 79%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java
rename to
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java
index 2bcde7dae9c..a6b11b38070 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java
@@ -20,12 +20,15 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction;
import org.apache.iotdb.commons.exception.MetadataException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.junit.Assert;
@@ -38,7 +41,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-public class MemoryControlTest {
+public class CompactionWorkerTest {
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
@@ -75,8 +78,13 @@ public class MemoryControlTest {
null,
1024L * 1024L * 1024L * 50L,
0);
- boolean success = task.checkValidAndSetMerging();
- Assert.assertFalse(success);
+ CrossSpaceCompactionTask taskMock = Mockito.spy(task);
+ Mockito.doReturn(true).when(taskMock).start();
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take()).thenReturn(taskMock).thenThrow(new
InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionMemoryCost().get());
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionFileNumCost().get());
for (TsFileResource tsFileResource : sequenceFiles) {
@@ -90,7 +98,7 @@ public class MemoryControlTest {
}
@Test
- public void testFailedToAllocateFileNumInCrossTask() {
+ public void testFailedToAllocateFileNumInCrossTask() throws
InterruptedException {
int oldMaxCrossCompactionCandidateFileNum =
SystemInfo.getInstance().getTotalFileLimitForCrossTask();
SystemInfo.getInstance().setTotalFileLimitForCrossTask(2);
@@ -116,9 +124,13 @@ public class MemoryControlTest {
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0L, tsFileManager, sequenceFiles, unsequenceFiles, null, 1000,
0);
-
- boolean success = task.checkValidAndSetMerging();
- Assert.assertFalse(success);
+ CrossSpaceCompactionTask taskMock = Mockito.spy(task);
+ Mockito.doReturn(true).when(taskMock).start();
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take()).thenReturn(taskMock).thenThrow(new
InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionMemoryCost().get());
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionFileNumCost().get());
for (TsFileResource tsFileResource : sequenceFiles) {
@@ -140,7 +152,7 @@ public class MemoryControlTest {
* @throws Exception
*/
@Test
- public void testFailedToCheckValidInCrossTask() {
+ public void testFailedToCheckValidInCrossTask() throws InterruptedException {
List<TsFileResource> sequenceFiles = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
sequenceFiles.add(
@@ -156,15 +168,16 @@ public class MemoryControlTest {
TsFileResourceStatus.COMPACTION_CANDIDATE));
}
TsFileManager tsFileManager = Mockito.mock(TsFileManager.class);
- Mockito.when(tsFileManager.getStorageGroupName()).thenReturn("root.sg");
- Mockito.when(tsFileManager.getDataRegionId()).thenReturn("1");
-
+ Mockito.when(tsFileManager.isAllowCompaction()).thenReturn(false);
// fail to check valid when tsfile manager is not allowed to compaction in
cross task
CrossSpaceCompactionTask task =
new CrossSpaceCompactionTask(
0L, tsFileManager, sequenceFiles, unsequenceFiles, null, 1000, 0);
- boolean success = task.checkValidAndSetMerging();
- Assert.assertFalse(success);
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take()).thenReturn(task).thenThrow(new
InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionMemoryCost().get());
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionFileNumCost().get());
for (TsFileResource tsFileResource : sequenceFiles) {
@@ -183,7 +196,7 @@ public class MemoryControlTest {
* @throws Exception
*/
@Test
- public void testFailedToCheckValidInInnerTask() {
+ public void testFailedToCheckValidInInnerTask() throws InterruptedException {
List<TsFileResource> sequenceFiles = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
sequenceFiles.add(
@@ -192,14 +205,16 @@ public class MemoryControlTest {
TsFileResourceStatus.COMPACTION_CANDIDATE));
}
TsFileManager tsFileManager = Mockito.mock(TsFileManager.class);
- Mockito.when(tsFileManager.getStorageGroupName()).thenReturn("root.sg");
- Mockito.when(tsFileManager.getDataRegionId()).thenReturn("1");
+ Mockito.when(tsFileManager.isAllowCompaction()).thenReturn(false);
// fail to check valid when tsfile manager is not allowed to compaction in
inner task
InnerSpaceCompactionTask innerTask =
new InnerSpaceCompactionTask(0L, tsFileManager, sequenceFiles, true,
null, 0L);
- boolean success = innerTask.checkValidAndSetMerging();
- Assert.assertFalse(success);
+ FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+ Mockito.mock(FixedPriorityBlockingQueue.class);
+ Mockito.when(mockQueue.take()).thenReturn(innerTask).thenThrow(new
InterruptedException());
+ CompactionWorker worker = new CompactionWorker(0, mockQueue);
+ worker.run();
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionMemoryCost().get());
Assert.assertEquals(0,
SystemInfo.getInstance().getCompactionFileNumCost().get());
for (TsFileResource tsFileResource : sequenceFiles) {