This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e34850cacc [IOTDB-3189] Fix compaction is not well-distributed across 
sgs (#6324)
e34850cacc is described below

commit e34850cacccf8085829ac907adb06d7efdc74519
Author: Liu Xuxin <[email protected]>
AuthorDate: Sun Jun 19 10:00:31 2022 +0800

    [IOTDB-3189] Fix compaction is not well-distributed across sgs (#6324)
---
 .../db/engine/compaction/CompactionScheduler.java  |  6 +-
 .../DefaultCompactionTaskComparatorImpl.java       | 17 ++++-
 .../compaction/cross/CrossSpaceCompactionTask.java |  6 +-
 .../compaction/inner/InnerSpaceCompactionTask.java |  6 +-
 .../compaction/task/AbstractCompactionTask.java    |  9 ++-
 .../db/engine/storagegroup/TsFileManager.java      |  6 ++
 .../compaction/CompactionTaskComparatorTest.java   | 74 ++++++++++++++++++----
 .../compaction/CompactionTaskManagerTest.java      | 27 +++++---
 .../compaction/cross/CrossSpaceCompactionTest.java |  9 ++-
 .../cross/RewriteCrossSpaceCompactionTest.java     | 12 ++--
 .../inner/InnerCompactionEmptyTsFileTest.java      |  3 +-
 .../db/engine/storagegroup/DataRegionTest.java     |  3 +-
 .../storagegroup/StorageGroupProcessorTest.java    |  3 +-
 13 files changed, 139 insertions(+), 42 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
index cffa50884a..ac5349abbd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionScheduler.java
@@ -127,7 +127,8 @@ public class CompactionScheduler {
                   task,
                   sequence,
                   performer,
-                  CompactionTaskManager.currentTaskNum));
+                  CompactionTaskManager.currentTaskNum,
+                  tsFileManager.getNextCompactionTaskId()));
     }
   }
 
@@ -160,7 +161,8 @@ public class CompactionScheduler {
                       .getConfig()
                       .getCrossCompactionPerformer()
                       .createInstance(),
-                  CompactionTaskManager.currentTaskNum));
+                  CompactionTaskManager.currentTaskNum,
+                  tsFileManager.getNextCompactionTaskId()));
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
index 04a213b7e6..8f0d26c66e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/comparator/DefaultCompactionTaskComparatorImpl.java
@@ -37,9 +37,7 @@ public class DefaultCompactionTaskComparatorImpl implements 
ICompactionTaskCompa
     if ((((o1 instanceof InnerSpaceCompactionTask) && (o2 instanceof 
CrossSpaceCompactionTask))
         || ((o2 instanceof InnerSpaceCompactionTask)
             && (o1 instanceof CrossSpaceCompactionTask)))) {
-      if (config.getCompactionPriority() == CompactionPriority.BALANCE) {
-        return 0;
-      } else if (config.getCompactionPriority() == 
CompactionPriority.INNER_CROSS) {
+      if (config.getCompactionPriority() != CompactionPriority.CROSS_INNER) {
         return o1 instanceof InnerSpaceCompactionTask ? -1 : 1;
       } else {
         return o1 instanceof CrossSpaceCompactionTask ? -1 : 1;
@@ -86,6 +84,12 @@ public class DefaultCompactionTaskComparatorImpl implements 
ICompactionTaskCompa
       return selectedFilesOfO2.size() - selectedFilesOfO1.size();
     }
 
+    // if the serial id of the tasks are different
+    // we prefer task with small serial id
+    if (o1.getSerialId() != o2.getSerialId()) {
+      return o1.getSerialId() > o2.getSerialId() ? 1 : -1;
+    }
+
     // if the size of selected files are different
     // we prefer to execute task with smaller file size
     // because small files can be compacted quickly
@@ -103,6 +107,13 @@ public class DefaultCompactionTaskComparatorImpl 
implements ICompactionTaskCompa
       // because this type of tasks consume fewer memory during execution
       return o1.getSelectedSequenceFiles().size() - 
o2.getSelectedSequenceFiles().size();
     }
+
+    // if the serial id of the tasks are different
+    // we prefer task with small serial id
+    if (o1.getSerialId() != o2.getSerialId()) {
+      return o1.getSerialId() > o2.getSerialId() ? 1 : -1;
+    }
+
     // we prefer the task with more unsequence files
     // because this type of tasks reduce more unsequence files
     return o2.getSelectedUnsequenceFiles().size() - 
o1.getSelectedUnsequenceFiles().size();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
index d50e092f2b..ae47fc3916 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTask.java
@@ -64,12 +64,14 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
       List<TsFileResource> selectedSequenceFiles,
       List<TsFileResource> selectedUnsequenceFiles,
       ICrossCompactionPerformer performer,
-      AtomicInteger currentTaskNum) {
+      AtomicInteger currentTaskNum,
+      long serialId) {
     super(
         tsFileManager.getStorageGroupName() + "-" + 
tsFileManager.getDataRegion(),
         timePartition,
         tsFileManager,
-        currentTaskNum);
+        currentTaskNum,
+        serialId);
     this.selectedSequenceFiles = selectedSequenceFiles;
     this.selectedUnsequenceFiles = selectedUnsequenceFiles;
     this.seqTsFileResourceList = 
tsFileManager.getSequenceListByTimePartition(timePartition);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index 4b2760ce5b..91e31fd126 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -66,12 +66,14 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       List<TsFileResource> selectedTsFileResourceList,
       boolean sequence,
       ICompactionPerformer performer,
-      AtomicInteger currentTaskNum) {
+      AtomicInteger currentTaskNum,
+      long serialId) {
     super(
         tsFileManager.getStorageGroupName() + "-" + 
tsFileManager.getDataRegion(),
         timePartition,
         tsFileManager,
-        currentTaskNum);
+        currentTaskNum,
+        serialId);
     this.selectedTsFileResourceList = selectedTsFileResourceList;
     this.sequence = sequence;
     this.performer = performer;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
index e6752b754a..7a00e466be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/task/AbstractCompactionTask.java
@@ -49,16 +49,19 @@ public abstract class AbstractCompactionTask implements 
Callable<CompactionTaskS
   protected volatile boolean finished = false;
   protected ICompactionPerformer performer;
   protected int hashCode = -1;
+  protected long serialId;
 
   public AbstractCompactionTask(
       String fullStorageGroupName,
       long timePartition,
       TsFileManager tsFileManager,
-      AtomicInteger currentTaskNum) {
+      AtomicInteger currentTaskNum,
+      long serialId) {
     this.fullStorageGroupName = fullStorageGroupName;
     this.timePartition = timePartition;
     this.tsFileManager = tsFileManager;
     this.currentTaskNum = currentTaskNum;
+    this.serialId = serialId;
   }
 
   public abstract void setSourceFilesToCompactionCandidate();
@@ -133,4 +136,8 @@ public abstract class AbstractCompactionTask implements 
Callable<CompactionTaskS
   public boolean isTaskFinished() {
     return finished;
   }
+
+  public long getSerialId() {
+    return serialId;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
index 009e151d87..cd76c44a72 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileManager.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -60,6 +61,7 @@ public class TsFileManager {
   private List<TsFileResource> unsequenceRecoverTsFileResources = new 
ArrayList<>();
 
   private boolean allowCompaction = true;
+  private AtomicLong currentCompactionTaskSerialId = new AtomicLong(0);
 
   public TsFileManager(String storageGroupName, String dataRegion, String 
storageGroupDir) {
     this.storageGroupName = storageGroupName;
@@ -425,4 +427,8 @@ public class TsFileManager {
       return cmp;
     }
   }
+
+  public long getNextCompactionTaskId() {
+    return currentCompactionTaskSerialId.getAndIncrement();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
index 0475026486..6bbba1a83c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskComparatorTest.java
@@ -40,9 +40,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class CompactionTaskComparatorTest {
@@ -73,7 +76,8 @@ public class CompactionTaskComparatorTest {
             new FakedTsFileResource(new File(String.format("%d-%d-0-0.tsfile", 
i + j, i + j)), j));
       }
       compactionTasks[i] =
-          new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager, 
taskNum, true, resources);
+          new FakedInnerSpaceCompactionTask(
+              "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
       compactionTaskQueue.put(compactionTasks[i]);
     }
 
@@ -95,7 +99,8 @@ public class CompactionTaskComparatorTest {
                 new File(String.format("%d-%d-0-0.tsfile", i + j, i + j)), j - 
i + 101));
       }
       compactionTasks[i] =
-          new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager, 
taskNum, true, resources);
+          new FakedInnerSpaceCompactionTask(
+              "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
       compactionTaskQueue.put(compactionTasks[i]);
     }
 
@@ -117,7 +122,8 @@ public class CompactionTaskComparatorTest {
                 new File(String.format("%d-%d-%d-0.tsfile", i + j, i + j, j - 
i + 101)), 1));
       }
       compactionTasks[i] =
-          new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager, 
taskNum, true, resources);
+          new FakedInnerSpaceCompactionTask(
+              "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
       compactionTaskQueue.put(compactionTasks[i]);
     }
 
@@ -142,7 +148,8 @@ public class CompactionTaskComparatorTest {
                 new File(String.format("%d-%d-%d-0.tsfile", i + j, i + j, j - 
i + 101)), 1));
       }
       compactionTasks[i] =
-          new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager, 
taskNum, true, resources);
+          new FakedInnerSpaceCompactionTask(
+              "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
       limitQueue.add(compactionTasks[i]);
     }
 
@@ -164,7 +171,8 @@ public class CompactionTaskComparatorTest {
                 new File(String.format("%d-%d-0-0.tsfile", i + j, i + j, j - i 
+ 101)), 1));
       }
       compactionTasks[i] =
-          new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager, 
taskNum, true, resources);
+          new FakedInnerSpaceCompactionTask(
+              "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
       compactionTaskQueue.put(compactionTasks[i]);
     }
 
@@ -187,7 +195,8 @@ public class CompactionTaskComparatorTest {
             new FakedTsFileResource(new File(String.format("%d-%d-0-0.tsfile", 
i + j, i + j)), j));
       }
       innerCompactionTasks[i] =
-          new FakedInnerSpaceCompactionTask("fakeSg", 0, tsFileManager, 
taskNum, true, resources);
+          new FakedInnerSpaceCompactionTask(
+              "fakeSg", 0, tsFileManager, taskNum, true, resources, 0);
     }
 
     for (int i = 0; i < 100; ++i) {
@@ -203,7 +212,7 @@ public class CompactionTaskComparatorTest {
       }
       crossCompactionTasks[i] =
           new FakeCrossSpaceCompactionTask(
-              "fakeSg", 0, tsFileManager, taskNum, sequenceResources, 
unsequenceResources);
+              "fakeSg", 0, tsFileManager, taskNum, sequenceResources, 
unsequenceResources, 0);
     }
 
     for (int i = 0; i < 100; i++) {
@@ -240,7 +249,7 @@ public class CompactionTaskComparatorTest {
       }
       crossCompactionTasks[i] =
           new FakeCrossSpaceCompactionTask(
-              "fakeSg", 0, tsFileManager, taskNum, sequenceResources, 
unsequenceResources);
+              "fakeSg", 0, tsFileManager, taskNum, sequenceResources, 
unsequenceResources, 0);
       compactionTaskQueue.put(crossCompactionTasks[i]);
     }
     for (int i = 100; i < 200; ++i) {
@@ -256,7 +265,7 @@ public class CompactionTaskComparatorTest {
       }
       crossCompactionTasks[i] =
           new FakeCrossSpaceCompactionTask(
-              "fakeSg", 0, tsFileManager, taskNum, sequenceResources, 
unsequenceResources);
+              "fakeSg", 0, tsFileManager, taskNum, sequenceResources, 
unsequenceResources, 0);
       compactionTaskQueue.put(crossCompactionTasks[i]);
     }
 
@@ -266,6 +275,41 @@ public class CompactionTaskComparatorTest {
     }
   }
 
+  @Test
+  public void testSerialId() throws InterruptedException {
+    AbstractCompactionTask[] compactionTasks = new AbstractCompactionTask[100];
+    TsFileManager[] tsFileManagers = new TsFileManager[10];
+    for (int i = 0; i < 10; ++i) {
+      tsFileManagers[i] = new TsFileManager("fakeSg" + i, "0", "/");
+      for (int j = 0; j < 10; ++j) {
+        List<TsFileResource> resources = new ArrayList<>();
+        // the j th compaction task for i th sg
+        for (int k = 0; k < 10; ++k) {
+          resources.add(
+              new FakedTsFileResource(
+                  new File(String.format("%d-%d-0-0.tsfile", j * 10 + k, j * 
10 + k)), 10));
+        }
+        compactionTaskQueue.put(
+            new FakedInnerSpaceCompactionTask(
+                "fakeSg" + i, 0, tsFileManagers[i], taskNum, true, resources, 
j));
+      }
+    }
+    Map<String, AtomicInteger> taskCount = new HashMap<>();
+    for (int i = 0; i < 10; ++i) {
+      taskCount.put("fakeSg" + i + "-0", new AtomicInteger(0));
+    }
+    long cnt = 0;
+    while (compactionTaskQueue.size() > 0) {
+      for (int i = 0; i < 10; ++i) {
+        
taskCount.get(compactionTaskQueue.take().getFullStorageGroupName()).incrementAndGet();
+      }
+      cnt++;
+      for (int i = 0; i < 10; ++i) {
+        assertEquals(cnt, taskCount.get("fakeSg" + i + "-0").get());
+      }
+    }
+  }
+
   private static class FakedInnerSpaceCompactionTask extends 
InnerSpaceCompactionTask {
 
     public FakedInnerSpaceCompactionTask(
@@ -274,14 +318,16 @@ public class CompactionTaskComparatorTest {
         TsFileManager tsFileManager,
         AtomicInteger currentTaskNum,
         boolean sequence,
-        List<TsFileResource> selectedTsFileResourceList) {
+        List<TsFileResource> selectedTsFileResourceList,
+        long serialId) {
       super(
           timePartition,
           tsFileManager,
           selectedTsFileResourceList,
           sequence,
           new ReadChunkCompactionPerformer(),
-          currentTaskNum);
+          currentTaskNum,
+          serialId);
     }
 
     @Override
@@ -306,14 +352,16 @@ public class CompactionTaskComparatorTest {
         TsFileManager tsFileManager,
         AtomicInteger currentTaskNum,
         List<TsFileResource> selectedSequenceFiles,
-        List<TsFileResource> selectedUnsequenceFiles) {
+        List<TsFileResource> selectedUnsequenceFiles,
+        long serialId) {
       super(
           timePartition,
           tsFileManager,
           selectedSequenceFiles,
           selectedUnsequenceFiles,
           new ReadPointCompactionPerformer(),
-          currentTaskNum);
+          currentTaskNum,
+          serialId);
     }
 
     @Override
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index df68f4eb07..620c40137a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -80,7 +80,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     InnerSpaceCompactionTask task2 =
         new InnerSpaceCompactionTask(
             0,
@@ -88,7 +89,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     seqResources.get(0).readLock();
     CompactionTaskManager manager = CompactionTaskManager.getInstance();
     try {
@@ -141,7 +143,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     InnerSpaceCompactionTask task2 =
         new InnerSpaceCompactionTask(
             0,
@@ -149,7 +152,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     seqResources.get(0).readLock();
     try {
       CompactionTaskManager manager = CompactionTaskManager.getInstance();
@@ -203,7 +207,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     InnerSpaceCompactionTask task2 =
         new InnerSpaceCompactionTask(
             0,
@@ -211,7 +216,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     CompactionTaskManager manager = CompactionTaskManager.getInstance();
     Assert.assertTrue(manager.addTaskToWaitingQueue(task1));
     manager.submitTaskFromTaskQueue();
@@ -254,7 +260,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     CompactionTaskManager manager = CompactionTaskManager.getInstance();
     manager.restart();
     seqResources.get(0).readLock();
@@ -298,7 +305,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             true,
             new ReadChunkCompactionPerformer(seqResources),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     CompactionTaskManager.getInstance().addTaskToWaitingQueue(task);
 
     for (TsFileResource resource : seqResources) {
@@ -326,7 +334,8 @@ public class CompactionTaskManagerTest extends 
InnerCompactionTest {
             seqResources,
             unseqResources,
             new ReadPointCompactionPerformer(),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
 
     for (TsFileResource resource : seqResources) {
       Assert.assertFalse(resource.isCompactionCandidate());
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index 4c3e8c595b..1d8b252fbd 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -441,7 +441,8 @@ public class CrossSpaceCompactionTest {
                         .getConfig()
                         .getCrossCompactionPerformer()
                         .createInstance(),
-                    new AtomicInteger(0));
+                    new AtomicInteger(0),
+                    0);
             compactionTask.call();
             List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
             for (TsFileResource seqResource : seqResources) {
@@ -745,7 +746,8 @@ public class CrossSpaceCompactionTest {
                         .getConfig()
                         .getCrossCompactionPerformer()
                         .createInstance(),
-                    new AtomicInteger(0));
+                    new AtomicInteger(0),
+                    0);
             compactionTask.call();
             List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
             for (TsFileResource seqResource : seqResources.subList(1, 4)) {
@@ -1048,7 +1050,8 @@ public class CrossSpaceCompactionTest {
                         .getConfig()
                         .getCrossCompactionPerformer()
                         .createInstance(),
-                    new AtomicInteger(0));
+                    new AtomicInteger(0),
+                    0);
             compactionTask.call();
             List<TsFileResource> targetTsfileResourceList = new ArrayList<>();
             for (TsFileResource seqResource : seqResources.subList(1, 4)) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index be4d472d5f..f4f4527daf 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -226,7 +226,8 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
             seqResources,
             unseqResources,
             new ReadPointCompactionPerformer(),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     task.call();
 
     for (TsFileResource resource : seqResources) {
@@ -462,7 +463,8 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
             seqResources,
             unseqResources,
             new ReadPointCompactionPerformer(),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     task.call();
 
     for (TsFileResource resource : seqResources) {
@@ -608,7 +610,8 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
             seqResources,
             unseqResources,
             new ReadPointCompactionPerformer(),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     task.setSourceFilesToCompactionCandidate();
     task.checkValidAndSetMerging();
     // delete data in source file during compaction
@@ -717,7 +720,8 @@ public class RewriteCrossSpaceCompactionTest extends 
AbstractCompactionTest {
             seqResources,
             unseqResources,
             new ReadPointCompactionPerformer(),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     task.setSourceFilesToCompactionCandidate();
     task.checkValidAndSetMerging();
     // delete data in source file during compaction
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
index 5b1306ff6a..407c57f5af 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionEmptyTsFileTest.java
@@ -82,7 +82,8 @@ public class InnerCompactionEmptyTsFileTest extends 
InnerCompactionTest {
             unseqResources.subList(0, 3),
             false,
             new ReadPointCompactionPerformer(),
-            new AtomicInteger(0));
+            new AtomicInteger(0),
+            0);
     Future<CompactionTaskSummary> future = 
CompactionTaskManager.getInstance().submitTask(task);
     Assert.assertTrue(future.get().isSuccess());
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index 28b3b45b6d..f1bd004ab8 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -785,7 +785,8 @@ public class DataRegionTest {
               dataRegion.getSequenceFileList(),
               true,
               new 
ReadChunkCompactionPerformer(dataRegion.getSequenceFileList()),
-              new AtomicInteger(0));
+              new AtomicInteger(0),
+              0);
       CompactionTaskManager.getInstance().submitTask(task);
       Thread.sleep(20);
       StorageEngine.getInstance().deleteStorageGroup(new 
PartialPath(storageGroup));
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index c85201353c..88088dc826 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -729,7 +729,8 @@ public class StorageGroupProcessorTest {
               processor.getSequenceFileList(),
               true,
               new 
ReadChunkCompactionPerformer(processor.getSequenceFileList()),
-              new AtomicInteger(0));
+              new AtomicInteger(0),
+              0);
       CompactionTaskManager.getInstance().submitTask(task);
       Thread.sleep(20);
       StorageEngine.getInstance().deleteStorageGroup(new 
PartialPath(storageGroup));

Reply via email to