This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c04d3fa9cc [IOTDB-4835] Fix InnerSpaceCompaction may be blocked (#7901)
c04d3fa9cc is described below

commit c04d3fa9cc2e1f150e8442e1bb27cb06dff57668
Author: Liu Xuxin <[email protected]>
AuthorDate: Wed Nov 9 20:51:11 2022 +0800

    [IOTDB-4835] Fix InnerSpaceCompaction may be blocked (#7901)
---
 .../engine/compaction/CompactionTaskManager.java   |   1 +
 .../sizetiered/SizeTieredCompactionSelector.java   |  78 +++++++++-------
 .../engine/compaction/CompactionSchedulerTest.java | 103 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 36 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 649a277716..1720ad6c71 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -385,6 +385,7 @@ public class CompactionTaskManager implements IService {
       init = true;
     }
     currentTaskNum = new AtomicInteger(0);
+    init = true;
     logger.info("Compaction task manager started.");
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index 6261787398..d8430029c9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -78,38 +78,6 @@ public class SizeTieredCompactionSelector
     hasNextTimePartition = tsFileManager.hasNextTimePartition(timePartition, 
sequence);
   }
 
-  /**
-   * This method searches for a batch of files to be compacted from layer 0 to 
the highest layer. If
-   * there are more than a batch of files to be merged on a certain layer, it 
does not search to
-   * higher layers. It creates a compaction thread for each batch of files and 
put it into the
-   * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
-   *
-   * @return Returns whether the file was found and submits the merge task
-   */
-  @Override
-  public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource> 
tsFileResources) {
-    this.tsFileResources = tsFileResources;
-    PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
-        new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
-    try {
-      int maxLevel = searchMaxFileLevel();
-      for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
-        if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
-          break;
-        }
-      }
-      List<List<TsFileResource>> taskList = new LinkedList<>();
-      while (taskPriorityQueue.size() > 0) {
-        List<TsFileResource> resources = taskPriorityQueue.poll().left;
-        taskList.add(resources);
-      }
-      return taskList;
-    } catch (Exception e) {
-      LOGGER.error("Exception occurs while selecting files", e);
-    }
-    return Collections.emptyList();
-  }
-
   /**
    * This method searches for all files on the given level. If there are 
consecutive files on the
    * level that meet the system preset conditions (the number exceeds 10 or 
the total file size
@@ -135,8 +103,16 @@ public class SizeTieredCompactionSelector
     for (TsFileResource currentFile : tsFileResources) {
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
-      if (currentName.getInnerCompactionCnt() != level
-          || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
+      if (currentName.getInnerCompactionCnt() != level) {
+        if (selectedFileList.size() > 1) {
+          taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), 
selectedFileSize));
+          shouldContinueToSearch = false;
+        }
+        selectedFileList = new ArrayList<>();
+        selectedFileSize = 0L;
+        continue;
+      }
+      if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
@@ -155,10 +131,10 @@ public class SizeTieredCompactionSelector
         // submit the task
         if (selectedFileList.size() > 1) {
           taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList), 
selectedFileSize));
+          shouldContinueToSearch = false;
         }
         selectedFileList = new ArrayList<>();
         selectedFileSize = 0L;
-        shouldContinueToSearch = false;
       }
     }
 
@@ -171,6 +147,38 @@ public class SizeTieredCompactionSelector
     return shouldContinueToSearch;
   }
 
+  /**
+   * This method searches for a batch of files to be compacted from layer 0 to 
the highest layer. If
+   * there are more than a batch of files to be merged on a certain layer, it 
does not search to
+   * higher layers. It creates a compaction thread for each batch of files and 
put it into the
+   * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
+   *
+   * @return Returns whether the file was found and submits the merge task
+   */
+  @Override
+  public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource> 
tsFileResources) {
+    this.tsFileResources = tsFileResources;
+    PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+        new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
+    try {
+      int maxLevel = searchMaxFileLevel();
+      for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
+        if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
+          break;
+        }
+      }
+      List<List<TsFileResource>> taskList = new LinkedList<>();
+      while (taskPriorityQueue.size() > 0) {
+        List<TsFileResource> resources = taskPriorityQueue.poll().left;
+        taskList.add(resources);
+      }
+      return taskList;
+    } catch (Exception e) {
+      LOGGER.error("Exception occurs while selecting files", e);
+    }
+    return Collections.emptyList();
+  }
+
   private int searchMaxFileLevel() throws IOException {
     int maxLevel = -1;
     for (TsFileResource currentFile : tsFileResources) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index b9440b03e5..e498fcf1b2 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.compaction;
 
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
@@ -1627,7 +1628,6 @@ public class CompactionSchedulerTest {
         fullPath.add(sgName + device);
       }
       for (int i = 0; i < 100; i++) {
-
         List<List<Long>> chunkPagePointsNum = new ArrayList<>();
         List<Long> pagePointsNum = new ArrayList<>();
         pagePointsNum.add(100L);
@@ -1705,6 +1705,107 @@ public class CompactionSchedulerTest {
     }
   }
 
+  @Test
+  public void testLargeFileInLowerLevel() throws Exception {
+    logger.warn("Running test16");
+    int prevMaxCompactionCandidateFileNum =
+        
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+    
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+    long originTargetSize = 
IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize();
+    IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(1024 
* 1024);
+    String sgName = COMPACTION_TEST_SG + "test17";
+    try {
+      CompactionTaskManager.getInstance().restart();
+      TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
+      Set<String> fullPath = new HashSet<>();
+      for (String device : fullPaths) {
+        fullPath.add(sgName + device);
+      }
+      for (int i = 0; i < 10; i++) {
+        List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+        List<Long> pagePointsNum = new ArrayList<>();
+        pagePointsNum.add(100L);
+        chunkPagePointsNum.add(pagePointsNum);
+        TsFileResource tsFileResource =
+            new TsFileResource(
+                new File(
+                    TestConstant.BASE_OUTPUT_PATH
+                        .concat(File.separator)
+                        .concat("sequence")
+                        .concat(File.separator)
+                        .concat(sgName)
+                        .concat(File.separator)
+                        .concat("0")
+                        .concat(File.separator)
+                        .concat("0")
+                        .concat(File.separator)
+                        .concat(
+                            (i + 1)
+                                + IoTDBConstant.FILE_NAME_SEPARATOR
+                                + (i + 1)
+                                + IoTDBConstant.FILE_NAME_SEPARATOR
+                                + 1
+                                + IoTDBConstant.FILE_NAME_SEPARATOR
+                                + 0
+                                + ".tsfile")));
+        CompactionFileGeneratorUtils.writeTsFile(
+            fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
+        tsFileManager.add(tsFileResource, true);
+      }
+
+      List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+      List<Long> pagePointsNum = new ArrayList<>();
+      pagePointsNum.add(100000L);
+      chunkPagePointsNum.add(pagePointsNum);
+      TsFileResource tsFileResource =
+          new TsFileResource(
+              new File(
+                  TestConstant.BASE_OUTPUT_PATH
+                      .concat(File.separator)
+                      .concat("sequence")
+                      .concat(File.separator)
+                      .concat(sgName)
+                      .concat(File.separator)
+                      .concat("0")
+                      .concat(File.separator)
+                      .concat("0")
+                      .concat(File.separator)
+                      .concat(
+                          11
+                              + IoTDBConstant.FILE_NAME_SEPARATOR
+                              + 11
+                              + IoTDBConstant.FILE_NAME_SEPARATOR
+                              + 0
+                              + IoTDBConstant.FILE_NAME_SEPARATOR
+                              + 0
+                              + ".tsfile")));
+      CompactionFileGeneratorUtils.writeTsFile(
+          fullPath, chunkPagePointsNum, 100 * 10 + 100, tsFileResource);
+      tsFileManager.add(tsFileResource, true);
+
+      CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+      Thread.sleep(100);
+      long sleepTime = 0;
+      while (tsFileManager.getTsFileList(true).size() > 3) {
+        CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+        Thread.sleep(100);
+        sleepTime += 100;
+        if (sleepTime >= 20_000) {
+          fail();
+        }
+      }
+
+      stopCompactionTaskManager();
+      tsFileManager.setAllowCompaction(false);
+      assertEquals(3, tsFileManager.getTsFileList(true).size());
+    } finally {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
+      
IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(originTargetSize);
+    }
+  }
+
   public void stopCompactionTaskManager() {
     CompactionTaskManager.getInstance().clearCandidateQueue();
     while 
(CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0) 
{

Reply via email to