This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3427b320b62 Fix compaction UTs and add concurrency control when 
schedule compaction (#11638)
3427b320b62 is described below

commit 3427b320b629bfd39e2e329203b0fd067f3e1aa4
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 6 11:47:05 2023 +0800

    Fix compaction UTs and add concurrency control when schedule compaction 
(#11638)
---
 .../db/storageengine/dataregion/DataRegion.java    |  6 ++++++
 .../compaction/schedule/CompactionScheduler.java   | 13 +++++++++++
 .../compaction/settle/SettleRequestHandler.java    | 24 +++++++++++++--------
 .../compaction/CompactionSchedulerTest.java        | 25 +++++++++++++++++++---
 4 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a884ba9fcd3..8b6c1d1289d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2322,6 +2322,7 @@ public class DataRegion implements IDataRegionForQuery {
       // the name of this variable is trySubmitCount, because the task 
submitted to the queue could
       // be
       // evicted due to the low priority of the task
+      CompactionScheduler.lockCompactionSelection();
       try {
         for (long timePartition : timePartitions) {
           trySubmitCount +=
@@ -2329,6 +2330,8 @@ public class DataRegion implements IDataRegionForQuery {
         }
       } catch (Throwable e) {
         logger.error("Meet error in compaction schedule.", e);
+      } finally {
+        CompactionScheduler.unlockCompactionSelection();
       }
     }
     if (summary.hasSubmitTask()) {
@@ -2349,6 +2352,7 @@ public class DataRegion implements IDataRegionForQuery {
 
   protected int executeInsertionCompaction(List<Long> timePartitions) {
     int trySubmitCount = 0;
+    CompactionScheduler.lockCompactionSelection();
     try {
       while (true) {
         int currentSubmitCount = 0;
@@ -2367,6 +2371,8 @@ public class DataRegion implements IDataRegionForQuery {
       }
     } catch (Throwable e) {
       logger.error("Meet error in compaction schedule.", e);
+    } finally {
+      CompactionScheduler.unlockCompactionSelection();
     }
     return trySubmitCount;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index f36dea68747..d2534a427e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 /**
@@ -59,6 +61,17 @@ public class CompactionScheduler {
 
   private CompactionScheduler() {}
 
+  /** avoid timed compaction schedule conflict with manually triggered 
schedule */
+  private static final Lock compactionTaskSelectionLock = new ReentrantLock();
+
+  public static void lockCompactionSelection() {
+    compactionTaskSelectionLock.lock();
+  }
+
+  public static void unlockCompactionSelection() {
+    compactionTaskSelectionLock.unlock();
+  }
+
   /**
    * Select compaction task and submit them to CompactionTaskManager.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
index 60652442dad..9e1e90441a4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -73,16 +74,21 @@ public class SettleRequestHandler {
   public TSStatus handleSettleRequest(TSettleReq req) {
     List<String> paths = req.getPaths();
 
-    SettleRequestContext context = new SettleRequestContext(paths);
-    TSStatus validationResult = context.validate();
-    if (validationResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      return validationResult;
-    }
-    if (testMode) {
-      return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+    CompactionScheduler.lockCompactionSelection();
+    try {
+      SettleRequestContext context = new SettleRequestContext(paths);
+      TSStatus validationResult = context.validate();
+      if (validationResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return validationResult;
+      }
+      if (testMode) {
+        return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+      }
+      List<TsFileResource> selectedTsFileResources = 
context.getTsFileResourcesByFileNames();
+      return context.submitCompactionTask(selectedTsFileResources);
+    } finally {
+      CompactionScheduler.unlockCompactionSelection();
     }
-    List<TsFileResource> selectedTsFileResources = 
context.getTsFileResourcesByFileNames();
-    return context.submitCompactionTask(selectedTsFileResources);
   }
 
   private static class SettleRequestContext {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
index 52b6f091576..dd2d9651b61 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionC
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -1792,18 +1793,36 @@ public class CompactionSchedulerTest {
       CompactionScheduler.scheduleCompaction(tsFileManager, 0);
       Thread.sleep(100);
       long sleepTime = 0;
-      while (tsFileManager.getTsFileList(true).size() > 3) {
+      while (tsFileManager.getTsFileList(true).size() >= 2) {
         CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+        tsFileManager.readLock();
+        List<TsFileResource> resources = tsFileManager.getTsFileList(true);
+        int previousFileLevel =
+            
TsFileNameGenerator.getTsFileName(resources.get(0).getTsFile().getName())
+                .getInnerCompactionCnt();
+        boolean canMerge = false;
+        for (int i = 1; i < resources.size(); i++) {
+          int currentFileLevel =
+              
TsFileNameGenerator.getTsFileName(resources.get(i).getTsFile().getName())
+                  .getInnerCompactionCnt();
+          if (currentFileLevel == previousFileLevel) {
+            canMerge = true;
+            break;
+          }
+        }
+        tsFileManager.readUnlock();
+        if (!canMerge) {
+          break;
+        }
         Thread.sleep(100);
         sleepTime += 100;
-        if (sleepTime >= 20_000) {
+        if (sleepTime >= 200_000) {
           fail();
         }
       }
 
       stopCompactionTaskManager();
       tsFileManager.setAllowCompaction(false);
-      assertEquals(3, tsFileManager.getTsFileList(true).size());
     } finally {
       IoTDBDescriptor.getInstance()
           .getConfig()

Reply via email to