This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e34850cacc [IOTDB-3189] Fix compaction is not well-distributed across
sgs (#6324)
e34850cacc is described below
commit e34850cacccf8085829ac907adb06d7efdc74519
Author: Liu Xuxin <[email protected]>
AuthorDate: Sun Jun 19 10:00:31 2022 +0800
[IOTDB-3189] Fix compaction is not well-distributed across sgs (#6324)
---
.../db/engine/compaction/CompactionScheduler.java | 6 +-
.../DefaultCompactionTaskComparatorImpl.java | 17 ++++-
.../compaction/cross/CrossSpaceCompactionTask.java | 6 +-
.../compaction/inner/InnerSpaceCompactionTask.java | 6 +-
.../compaction/task/AbstractCompactionTask.java | 9 ++-
.../db/engine/storagegroup/TsFileManager.java | 6 ++
.../compaction/CompactionTaskComparatorTest.java | 74 ++++++++++++++++++----
.../compaction/CompactionTaskManagerTest.java | 27 +++++---
.../compaction/cross/CrossSpaceCompactionTest.java | 9 ++-
.../cross/RewriteCrossSpaceCompactionTest.java | 12 ++--
.../inner/InnerCompactionEmptyTsFileTest.java | 3 +-
.../db/engine/storagegroup/DataRegionTest.java | 3 +-
.../storagegroup/StorageGroupProcessorTest.java | 3 +-
13 files changed, 139 insertions(+), 42 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index cffa50884a..ac5349abbd 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -127,7 +127,8 @@ public class CompactionScheduler {
task,
sequence,
performer,
- CompactionTaskManager.currentTaskNum));
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId()));
}
}
@@ -160,7 +161,8 @@ public class CompactionScheduler {
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
- CompactionTaskManager.currentTaskNum));
+ CompactionTaskManager.currentTaskNum,
+ tsFileManager.getNextCompactionTaskId()));
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
index 04a213b7e6..8f0d26c66e 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
@@ -37,9 +37,7 @@ public class DefaultCompactionTaskComparatorImpl implements
ICompactionTaskCompa
if ((((o1 instanceof InnerSpaceCompactionTask) && (o2 instanceof
CrossSpaceCompactionTask))
|| ((o2 instanceof InnerSpaceCompactionTask)
&& (o1 instanceof CrossSpaceCompactionTask)))) {
- if (config.getCompactionPriority() == CompactionPriority.BALANCE) {
- return 0;
- } else if (config.getCompactionPriority() ==
CompactionPriority.INNER_CROSS) {
+ if (config.getCompactionPriority() != CompactionPriority.CROSS_INNER) {
return o1 instanceof InnerSpaceCompactionTask ? -1 : 1;
} else {
return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
@@ -86,6 +84,12 @@ public class DefaultCompactionTaskComparatorImpl implements
ICompactionTaskCompa
return selectedFilesOfO2.size() - selectedFilesOfO1.size();
}
+ // if the serial id of the tasks are different
+ // we prefer task with small serial id
+ if (o1.getSerialId() != o2.getSerialId()) {
+ return o1.getSerialId() > o2.getSerialId() ? 1 : -1;
+ }
+
// if the size of selected files are different
// we prefer to execute task with smaller file size
// because small files can be compacted quickly
@@ -103,6 +107,13 @@ public class DefaultCompactionTaskComparatorImpl
implements ICompactionTaskCompa
// because this type of tasks consume fewer memory during execution
return o1.getSelectedSequenceFiles().size() -
o2.getSelectedSequenceFiles().size();
}
+
+ // if the serial id of the tasks are different
+ // we prefer task with small serial id
+ if (o1.getSerialId() != o2.getSerialId()) {
+ return o1.getSerialId() > o2.getSerialId() ? 1 : -1;
+ }
+
// we prefer the task with more unsequence files
// because this type of tasks reduce more unsequence files
return o2.getSelectedUnsequenceFiles().size() -
o1.getSelectedUnsequenceFiles().size();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index d50e092f2b..ae47fc3916 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -64,12 +64,14 @@ public class CrossSpaceCompactionTask extends
AbstractCompactionTask {
List<TsFileResource> selectedSequenceFiles,
List<TsFileResource> selectedUnsequenceFiles,
ICrossCompactionPerformer performer,
- AtomicInteger currentTaskNum) {
+ AtomicInteger currentTaskNum,
+ long serialId) {
super(
tsFileManager.getStorageGroupName() + "-" +
tsFileManager.getDataRegion(),
timePartition,
tsFileManager,
- currentTaskNum);
+ currentTaskNum,
+ serialId);
this.selectedSequenceFiles = selectedSequenceFiles;
this.selectedUnsequenceFiles = selectedUnsequenceFiles;
this.seqTsFileResourceList =
tsFileManager.getSequenceListByTimePartition(timePartition);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index 4b2760ce5b..91e31fd126 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -66,12 +66,14 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
List<TsFileResource> selectedTsFileResourceList,
boolean sequence,
ICompactionPerformer performer,
- AtomicInteger currentTaskNum) {
+ AtomicInteger currentTaskNum,
+ long serialId) {
super(
tsFileManager.getStorageGroupName() + "-" +
tsFileManager.getDataRegion(),
timePartition,
tsFileManager,
- currentTaskNum);
+ currentTaskNum,
+ serialId);
this.selectedTsFileResourceList = selectedTsFileResourceList;
this.sequence = sequence;
this.performer = performer;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index e6752b754a..7a00e466be 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -49,16 +49,19 @@ public abstract class AbstractCompactionTask implements
Callable<CompactionTaskS
protected volatile boolean finished = false;
protected ICompactionPerformer performer;
protected int hashCode = -1;
+ protected long serialId;
public AbstractCompactionTask(
String fullStorageGroupName,
long timePartition,
TsFileManager tsFileManager,
- AtomicInteger currentTaskNum) {
+ AtomicInteger currentTaskNum,
+ long serialId) {
this.fullStorageGroupName = fullStorageGroupName;
this.timePartition = timePartition;
this.tsFileManager = tsFileManager;
this.currentTaskNum = currentTaskNum;
+ this.serialId = serialId;
}
public abstract void setSourceFilesToCompactionCandidate();
@@ -133,4 +136,8 @@ public abstract class AbstractCompactionTask implements
Callable<CompactionTaskS
public boolean isTaskFinished() {
return finished;
}
+
+ public long getSerialId() {
+ return serialId;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 009e151d87..cd76c44a72 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -60,6 +61,7 @@ public class TsFileManager {
private List<TsFileResource> unsequenceRecoverTsFileResources = new
ArrayList<>();
private boolean allowCompaction = true;
+ private AtomicLong currentCompactionTaskSerialId = new AtomicLong(0);
public TsFileManager(String storageGroupName, String dataRegion, String
storageGroupDir) {
this.storageGroupName = storageGroupName;
@@ -425,4 +427,8 @@ public class TsFileManager {
return cmp;
}
}
+
+ public long getNextCompactionTaskId() {
+ return currentCompactionTaskSerialId.getAndIncrement();
+ }
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
index 0475026486..6bbba1a83c 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
@@ -40,9 +40,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class CompactionTaskComparatorTest {
@@ -73,7 +76,8 @@ public class CompactionTaskComparatorTest {
new FakedTsFileResource(new File(String.format("%d-%d-0-0.tsfile",
i + j, i + j)), j));
}
compactionTasks[i] =
- new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager,
taskNum, true, resources);
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
compactionTaskQueue.put(compactionTasks[i]);
}
@@ -95,7 +99,8 @@ public class CompactionTaskComparatorTest {
new File(String.format("%d-%d-0-0.tsfile", i + j, i + j)), j -
i + 101));
}
compactionTasks[i] =
- new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager,
taskNum, true, resources);
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
compactionTaskQueue.put(compactionTasks[i]);
}
@@ -117,7 +122,8 @@ public class CompactionTaskComparatorTest {
new File(String.format("%d-%d-%d-0.tsfile", i + j, i + j, j -
i + 101)), 1));
}
compactionTasks[i] =
- new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager,
taskNum, true, resources);
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
compactionTaskQueue.put(compactionTasks[i]);
}
@@ -142,7 +148,8 @@ public class CompactionTaskComparatorTest {
new File(String.format("%d-%d-%d-0.tsfile", i + j, i + j, j -
i + 101)), 1));
}
compactionTasks[i] =
- new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager,
taskNum, true, resources);
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
limitQueue.add(compactionTasks[i]);
}
@@ -164,7 +171,8 @@ public class CompactionTaskComparatorTest {
new File(String.format("%d-%d-0-0.tsfile", i + j, i + j, j - i
+ 101)), 1));
}
compactionTasks[i] =
- new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager,
taskNum, true, resources);
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
compactionTaskQueue.put(compactionTasks[i]);
}
@@ -187,7 +195,8 @@ public class CompactionTaskComparatorTest {
new FakedTsFileResource(new File(String.format("%d-%d-0-0.tsfile",
i + j, i + j)), j));
}
innerCompactionTasks[i] =
- new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager,
taskNum, true, resources);
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
}
for (int i = 0; i < 100; ++i) {
@@ -203,7 +212,7 @@ public class CompactionTaskComparatorTest {
}
crossCompactionTasks[i] =
new FakeCrossSpaceCompactionTask(
- "fakeSg", 0, tsFileManager, taskNum, sequenceResources,
unsequenceResources);
+ "fakeSg", 0, tsFileManager, taskNum, sequenceResources,
unsequenceResources, 0);
}
for (int i = 0; i < 100; i++) {
@@ -240,7 +249,7 @@ public class CompactionTaskComparatorTest {
}
crossCompactionTasks[i] =
new FakeCrossSpaceCompactionTask(
- "fakeSg", 0, tsFileManager, taskNum, sequenceResources,
unsequenceResources);
+ "fakeSg", 0, tsFileManager, taskNum, sequenceResources,
unsequenceResources, 0);
compactionTaskQueue.put(crossCompactionTasks[i]);
}
for (int i = 100; i < 200; ++i) {
@@ -256,7 +265,7 @@ public class CompactionTaskComparatorTest {
}
crossCompactionTasks[i] =
new FakeCrossSpaceCompactionTask(
- "fakeSg", 0, tsFileManager, taskNum, sequenceResources,
unsequenceResources);
+ "fakeSg", 0, tsFileManager, taskNum, sequenceResources,
unsequenceResources, 0);
compactionTaskQueue.put(crossCompactionTasks[i]);
}
@@ -266,6 +275,41 @@ public class CompactionTaskComparatorTest {
}
}
+ @Test
+ public void testSerialId() throws InterruptedException {
+ AbstractCompactionTask[] compactionTasks = new AbstractCompactionTask[100];
+ TsFileManager[] tsFileManagers = new TsFileManager[10];
+ for (int i = 0; i < 10; ++i) {
+ tsFileManagers[i] = new TsFileManager("fakeSg" + i, "0", "/");
+ for (int j = 0; j < 10; ++j) {
+ List<TsFileResource> resources = new ArrayList<>();
+ // the j th compaction task for i th sg
+ for (int k = 0; k < 10; ++k) {
+ resources.add(
+ new FakedTsFileResource(
+ new File(String.format("%d-%d-0-0.tsfile", j * 10 + k, j *
10 + k)), 10));
+ }
+ compactionTaskQueue.put(
+ new FakedInnerSpaceCompactionTask(
+ "fakeSg" + i, 0, tsFileManagers[i], taskNum, true, resources,
j));
+ }
+ }
+ Map<String, AtomicInteger> taskCount = new HashMap<>();
+ for (int i = 0; i < 10; ++i) {
+ taskCount.put("fakeSg" + i + "-0", new AtomicInteger(0));
+ }
+ long cnt = 0;
+ while (compactionTaskQueue.size() > 0) {
+ for (int i = 0; i < 10; ++i) {
+
taskCount.get(compactionTaskQueue.take().getFullStorageGroupName()).incrementAndGet();
+ }
+ cnt++;
+ for (int i = 0; i < 10; ++i) {
+ assertEquals(cnt, taskCount.get("fakeSg" + i + "-0").get());
+ }
+ }
+ }
+
private static class FakedInnerSpaceCompactionTask extends
InnerSpaceCompactionTask {
public FakedInnerSpaceCompactionTask(
@@ -274,14 +318,16 @@ public class CompactionTaskComparatorTest {
TsFileManager tsFileManager,
AtomicInteger currentTaskNum,
boolean sequence,
- List<TsFileResource> selectedTsFileResourceList) {
+ List<TsFileResource> selectedTsFileResourceList,
+ long serialId) {
super(
timePartition,
tsFileManager,
selectedTsFileResourceList,
sequence,
new ReadChunkCompactionPerformer(),
- currentTaskNum);
+ currentTaskNum,
+ serialId);
}
@Override
@@ -306,14 +352,16 @@ public class CompactionTaskComparatorTest {
TsFileManager tsFileManager,
AtomicInteger currentTaskNum,
List<TsFileResource> selectedSequenceFiles,
- List<TsFileResource> selectedUnsequenceFiles) {
+ List<TsFileResource> selectedUnsequenceFiles,
+ long serialId) {
super(
timePartition,
tsFileManager,
selectedSequenceFiles,
selectedUnsequenceFiles,
new ReadPointCompactionPerformer(),
- currentTaskNum);
+ currentTaskNum,
+ serialId);
}
@Override
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index df68f4eb07..620c40137a 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -80,7 +80,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
InnerSpaceCompactionTask task2 =
new InnerSpaceCompactionTask(
0,
@@ -88,7 +89,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
seqResources.get(0).readLock();
CompactionTaskManager manager = CompactionTaskManager.getInstance();
try {
@@ -141,7 +143,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
InnerSpaceCompactionTask task2 =
new InnerSpaceCompactionTask(
0,
@@ -149,7 +152,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
seqResources.get(0).readLock();
try {
CompactionTaskManager manager = CompactionTaskManager.getInstance();
@@ -203,7 +207,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
InnerSpaceCompactionTask task2 =
new InnerSpaceCompactionTask(
0,
@@ -211,7 +216,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
CompactionTaskManager manager = CompactionTaskManager.getInstance();
Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
manager.submitTaskFromTaskQueue();
@@ -254,7 +260,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
CompactionTaskManager manager = CompactionTaskManager.getInstance();
manager.restart();
seqResources.get(0).readLock();
@@ -298,7 +305,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
true,
new ReadChunkCompactionPerformer(seqResources),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
for (TsFileResource resource : seqResources) {
@@ -326,7 +334,8 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
seqResources,
unseqResources,
new ReadPointCompactionPerformer(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.isCompactionCandidate());
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index 4c3e8c595b..1d8b252fbd 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -441,7 +441,8 @@ public class CrossSpaceCompactionTest {
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
compactionTask.call();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources) {
@@ -745,7 +746,8 @@ public class CrossSpaceCompactionTest {
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
compactionTask.call();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources.subList(1, 4)) {
@@ -1048,7 +1050,8 @@ public class CrossSpaceCompactionTest {
.getConfig()
.getCrossCompactionPerformer()
.createInstance(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
compactionTask.call();
List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
for (TsFileResource seqResource : seqResources.subList(1, 4)) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index be4d472d5f..f4f4527daf 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -226,7 +226,8 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
seqResources,
unseqResources,
new ReadPointCompactionPerformer(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
task.call();
for (TsFileResource resource : seqResources) {
@@ -462,7 +463,8 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
seqResources,
unseqResources,
new ReadPointCompactionPerformer(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
task.call();
for (TsFileResource resource : seqResources) {
@@ -608,7 +610,8 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
seqResources,
unseqResources,
new ReadPointCompactionPerformer(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
task.setSourceFilesToCompactionCandidate();
task.checkValidAndSetMerging();
// delete data in source file during compaction
@@ -717,7 +720,8 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
seqResources,
unseqResources,
new ReadPointCompactionPerformer(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
task.setSourceFilesToCompactionCandidate();
task.checkValidAndSetMerging();
// delete data in source file during compaction
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
index 5b1306ff6a..407c57f5af 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -82,7 +82,8 @@ public class InnerCompactionEmptyTsFileTest extends
InnerCompactionTest {
unseqResources.subList(0, 3),
false,
new ReadPointCompactionPerformer(),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
Future<CompactionTaskSummary> future =
CompactionTaskManager.getInstance().submitTask(task);
Assert.assertTrue(future.get().isSuccess());
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index 28b3b45b6d..f1bd004ab8 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -785,7 +785,8 @@ public class DataRegionTest {
dataRegion.getSequenceFileList(),
true,
new
ReadChunkCompactionPerformer(dataRegion.getSequenceFileList()),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
CompactionTaskManager.getInstance().submitTask(task);
Thread.sleep(20);
StorageEngine.getInstance().deleteStorageGroup(new
PartialPath(storageGroup));
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index c85201353c..88088dc826 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -729,7 +729,8 @@ public class StorageGroupProcessorTest {
processor.getSequenceFileList(),
true,
new
ReadChunkCompactionPerformer(processor.getSequenceFileList()),
- new AtomicInteger(0));
+ new AtomicInteger(0),
+ 0);
CompactionTaskManager.getInstance().submitTask(task);
Thread.sleep(20);
StorageEngine.getInstance().deleteStorageGroup(new
PartialPath(storageGroup));