This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a4ce6f7a1a3 Modify the condition to skip compaction schdule after 
insertion compaction task selection (#14644)
a4ce6f7a1a3 is described below

commit a4ce6f7a1a309dbfd5dfec8d8b75849b620820ff
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jan 9 14:27:45 2025 +0800

    Modify the condition to skip compaction schdule after insertion compaction 
task selection (#14644)
    
    * Modify the condition to skip compaction schdule after insertion 
compaction task selection
    
    * add ut
    
    * delay insertion compaction
    
    * fix ut
---
 .../db/storageengine/dataregion/DataRegion.java    | 66 ++++++++++------------
 .../schedule/CompactionScheduleContext.java        | 21 +++++++
 .../compaction/schedule/CompactionScheduler.java   | 60 +++++++++++---------
 .../impl/RewriteCrossSpaceCompactionSelector.java  | 11 +++-
 .../InsertionCrossSpaceCompactionSelectorTest.java |  4 +-
 .../cross/InsertionCrossSpaceCompactionTest.java   | 64 +++++++++++++++++++--
 6 files changed, 157 insertions(+), 69 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 54b469de3fe..84ff12e3dc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -176,7 +176,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -2719,31 +2718,32 @@ public class DataRegion implements IDataRegionForQuery {
     if (!isCompactionSelecting.compareAndSet(false, true)) {
       return 0;
     }
-    int trySubmitCount = 0;
+    CompactionScheduleContext context = new CompactionScheduleContext();
     try {
       List<Long> timePartitions = new 
ArrayList<>(tsFileManager.getTimePartitions());
       // Sort the time partition from largest to smallest
       timePartitions.sort(Comparator.reverseOrder());
 
-      CompactionScheduleContext context = new CompactionScheduleContext();
-
       // schedule insert compaction
-      trySubmitCount += executeInsertionCompaction(timePartitions, context);
-      context.incrementSubmitTaskNum(CompactionTaskType.INSERTION, 
trySubmitCount);
+      int[] submitCountOfTimePartitions = 
executeInsertionCompaction(timePartitions, context);
 
       // schedule the other compactions
-      if (trySubmitCount == 0) {
-        // the name of this variable is trySubmitCount, because the task 
submitted to the queue
-        // could be evicted due to the low priority of the task
-        for (long timePartition : timePartitions) {
-          CompactionScheduler.sharedLockCompactionSelection();
-          try {
-            trySubmitCount +=
-                CompactionScheduler.scheduleCompaction(tsFileManager, 
timePartition, context);
-          } finally {
-            context.clearTimePartitionDeviceInfoCache();
-            CompactionScheduler.sharedUnlockCompactionSelection();
-          }
+      for (int i = 0; i < timePartitions.size(); i++) {
+        boolean skipOtherCompactionSchedule =
+            submitCountOfTimePartitions[i] > 0
+                && !config
+                    .getDataRegionConsensusProtocolClass()
+                    .equals(ConsensusFactory.IOT_CONSENSUS_V2);
+        if (skipOtherCompactionSchedule) {
+          continue;
+        }
+        long timePartition = timePartitions.get(i);
+        CompactionScheduler.sharedLockCompactionSelection();
+        try {
+          CompactionScheduler.scheduleCompaction(tsFileManager, timePartition, 
context);
+        } finally {
+          context.clearTimePartitionDeviceInfoCache();
+          CompactionScheduler.sharedUnlockCompactionSelection();
         }
       }
       if (context.hasSubmitTask()) {
@@ -2756,7 +2756,7 @@ public class DataRegion implements IDataRegionForQuery {
     } finally {
       isCompactionSelecting.set(false);
     }
-    return trySubmitCount;
+    return context.getSubmitCompactionTaskNum();
   }
 
   /** Schedule settle compaction for ttl check. */
@@ -2803,40 +2803,36 @@ public class DataRegion implements IDataRegionForQuery {
     return trySubmitCount;
   }
 
-  protected int executeInsertionCompaction(
+  protected int[] executeInsertionCompaction(
       List<Long> timePartitions, CompactionScheduleContext context) throws 
InterruptedException {
-    int trySubmitCount = 0;
+    int[] trySubmitCountOfTimePartitions = new int[timePartitions.size()];
     CompactionScheduler.sharedLockCompactionSelection();
     try {
       while (true) {
         int currentSubmitCount = 0;
-        for (long timePartition : timePartitions) {
-          while (true) {
-            Phaser insertionTaskPhaser = new Phaser(1);
-            int selectedTaskNum =
-                CompactionScheduler.scheduleInsertionCompaction(
-                    tsFileManager, timePartition, insertionTaskPhaser, 
context);
-            
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
-            currentSubmitCount += selectedTaskNum;
-            if (selectedTaskNum <= 0) {
-              break;
-            }
-          }
+        for (int i = 0; i < timePartitions.size(); i++) {
+          long timePartition = timePartitions.get(i);
+          int selectedTaskNum =
+              CompactionScheduler.scheduleInsertionCompaction(
+                  tsFileManager, timePartition, context);
+          currentSubmitCount += selectedTaskNum;
+          trySubmitCountOfTimePartitions[i] += selectedTaskNum;
           context.clearTimePartitionDeviceInfoCache();
         }
         if (currentSubmitCount <= 0) {
           break;
         }
-        trySubmitCount += currentSubmitCount;
+        context.incrementSubmitTaskNum(CompactionTaskType.INSERTION, 
currentSubmitCount);
       }
     } catch (InterruptedException e) {
       throw e;
     } catch (Throwable e) {
       logger.error("Meet error in insertion compaction schedule.", e);
     } finally {
+      context.clearTimePartitionDeviceInfoCache();
       CompactionScheduler.sharedUnlockCompactionSelection();
     }
-    return trySubmitCount;
+    return trySubmitCountOfTimePartitions;
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index 4d51ba5010f..1f01cac0b25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -31,7 +31,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public class CompactionScheduleContext {
   private int submitSeqInnerSpaceCompactionTaskNum = 0;
@@ -49,8 +51,19 @@ public class CompactionScheduleContext {
   private final Map<TsFileResource, ArrayDeviceTimeIndex> 
partitionFileDeviceInfoCache;
   private long cachedDeviceInfoSize = 0;
 
+  private final Set<Long> timePartitionsDelayInsertionSelection;
+
   public CompactionScheduleContext() {
     this.partitionFileDeviceInfoCache = new HashMap<>();
+    this.timePartitionsDelayInsertionSelection = new HashSet<>();
+  }
+
+  public void delayInsertionSelection(long timePartitionId) {
+    timePartitionsDelayInsertionSelection.add(timePartitionId);
+  }
+
+  public boolean isInsertionSelectionDelayed(long timePartitionId) {
+    return timePartitionsDelayInsertionSelection.remove(timePartitionId);
   }
 
   public void addResourceDeviceTimeIndex(
@@ -135,6 +148,14 @@ public class CompactionScheduleContext {
     return submitSettleCompactionTaskNum;
   }
 
+  public int getSubmitCompactionTaskNum() {
+    return submitSeqInnerSpaceCompactionTaskNum
+        + submitUnseqInnerSpaceCompactionTaskNum
+        + submitCrossSpaceCompactionTaskNum
+        + submitInsertionCrossSpaceCompactionTaskNum
+        + submitSettleCompactionTaskNum;
+  }
+
   public boolean hasSubmitTask() {
     return submitCrossSpaceCompactionTaskNum
             + submitInsertionCrossSpaceCompactionTaskNum
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index c465e6fa863..a8877f9ae8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -93,29 +93,32 @@ public class CompactionScheduler {
    * @param context the context of compaction schedule
    * @return the count of submitted task
    */
-  public static int scheduleCompaction(
+  public static void scheduleCompaction(
       TsFileManager tsFileManager, long timePartition, 
CompactionScheduleContext context)
       throws InterruptedException {
     if (!tsFileManager.isAllowCompaction()) {
-      return 0;
+      return;
     }
     // the name of this variable is trySubmitCount, because the task submitted 
to the queue could be
     // evicted due to the low priority of the task
-    int trySubmitCount = 0;
     try {
-      trySubmitCount +=
+      int submitInnerTaskNum = 0;
+      submitInnerTaskNum +=
           tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, 
true, context);
-      trySubmitCount +=
+      submitInnerTaskNum +=
           tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition, 
false, context);
-      trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager, 
timePartition, context);
-      trySubmitCount +=
-          tryToSubmitSettleCompactionTask(tsFileManager, timePartition, 
context, false);
+      boolean executeDelayedInsertionSelection =
+          submitInnerTaskNum == 0 && 
context.isInsertionSelectionDelayed(timePartition);
+      if (executeDelayedInsertionSelection) {
+        scheduleInsertionCompaction(tsFileManager, timePartition, context);
+      }
+      tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition, 
context);
+      tryToSubmitSettleCompactionTask(tsFileManager, timePartition, context, 
false);
     } catch (InterruptedException e) {
       throw e;
     } catch (Throwable e) {
       LOGGER.error("Meet error in compaction schedule.", e);
     }
-    return trySubmitCount;
   }
 
   @TestOnly
@@ -124,22 +127,6 @@ public class CompactionScheduler {
     scheduleCompaction(tsFileManager, timePartition, new 
CompactionScheduleContext());
   }
 
-  public static int scheduleInsertionCompaction(
-      TsFileManager tsFileManager,
-      long timePartition,
-      Phaser insertionTaskPhaser,
-      CompactionScheduleContext context)
-      throws InterruptedException {
-    if (!tsFileManager.isAllowCompaction()) {
-      return 0;
-    }
-    int trySubmitCount = 0;
-    trySubmitCount +=
-        tryToSubmitInsertionCompactionTask(
-            tsFileManager, timePartition, insertionTaskPhaser, context);
-    return trySubmitCount;
-  }
-
   public static int tryToSubmitInnerSpaceCompactionTask(
       TsFileManager tsFileManager,
       long timePartition,
@@ -223,13 +210,31 @@ public class CompactionScheduler {
     return true;
   }
 
-  private static int tryToSubmitInsertionCompactionTask(
+  public static int scheduleInsertionCompaction(
+      TsFileManager tsFileManager, long timePartition, 
CompactionScheduleContext context)
+      throws InterruptedException {
+    int count = 0;
+    while (true) {
+      Phaser insertionTaskPhaser = new Phaser(1);
+      int selectedTaskNum =
+          tryToSubmitInsertionCompactionTask(
+              tsFileManager, timePartition, insertionTaskPhaser, context);
+      
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
+      if (selectedTaskNum <= 0) {
+        break;
+      }
+      count += selectedTaskNum;
+    }
+    return count;
+  }
+
+  public static int tryToSubmitInsertionCompactionTask(
       TsFileManager tsFileManager,
       long timePartition,
       Phaser insertionTaskPhaser,
       CompactionScheduleContext context)
       throws InterruptedException {
-    if (!config.isEnableCrossSpaceCompaction()) {
+    if (!tsFileManager.isAllowCompaction() || 
!config.isEnableCrossSpaceCompaction()) {
       return 0;
     }
     String logicalStorageGroupName = tsFileManager.getStorageGroupName();
@@ -351,6 +356,7 @@ public class CompactionScheduler {
         trySubmitCount++;
       }
     }
+    context.incrementSubmitTaskNum(CompactionTaskType.SETTLE, trySubmitCount);
     return trySubmitCount;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index ef1fe4e0208..804d441d2f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -69,7 +69,8 @@ public class RewriteCrossSpaceCompactionSelector implements 
ICrossSpaceSelector
   protected TsFileManager tsFileManager;
 
   private static boolean hasPrintedLog = false;
-  private static int maxDeserializedFileNumToCheckInsertionCandidateValid = 
500;
+  private static int maxDeserializedFileNumToCheckInsertionCandidateValid = 
100;
+  private static int maxFileNumToSelectInsertionTaskInOnePartition = 200;
 
   private final long memoryBudget;
   private final int maxCrossCompactionFileNum;
@@ -166,6 +167,14 @@ public class RewriteCrossSpaceCompactionSelector 
implements ICrossSpaceSelector
           "Selecting insertion cross compaction task resources from {} 
seqFile, {} unseqFiles",
           candidate.getSeqFiles().size(),
           candidate.getUnseqFiles().size());
+      boolean delaySelection =
+          candidate.getSeqFiles().size() + candidate.getUnseqFiles().size()
+              > maxFileNumToSelectInsertionTaskInOnePartition;
+      if (delaySelection) {
+        context.delayInsertionSelection(timePartition);
+        return new InsertionCrossCompactionTaskResource();
+      }
+
       InsertionCrossCompactionTaskResource result =
           
insertionCrossSpaceCompactionSelector.executeInsertionCrossSpaceCompactionTaskSelection();
       if (result.isValid()) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
index 6a86b3955f9..2cb60ed2256 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
@@ -146,7 +146,7 @@ public class InsertionCrossSpaceCompactionSelectorTest 
extends AbstractCompactio
 
     Phaser phaser = new Phaser(1);
     int submitTaskNum =
-        CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, 
phaser, context);
+        CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager, 
0, phaser, context);
     Assert.assertEquals(1, submitTaskNum);
     // perform insertion compaction
     phaser.awaitAdvanceInterruptibly(phaser.arrive());
@@ -169,7 +169,7 @@ public class InsertionCrossSpaceCompactionSelectorTest 
extends AbstractCompactio
     // unseq resource2 d2[10, 20]
 
     submitTaskNum =
-        CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0, 
phaser, context);
+        CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager, 
0, phaser, context);
     Assert.assertEquals(0, submitTaskNum);
     Assert.assertTrue(
         
TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(tsFileManager.getTsFileList(true)));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
index ddcb90f3ac8..0e0516a46c0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
 
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -452,7 +453,7 @@ public class InsertionCrossSpaceCompactionTest extends 
AbstractCompactionTest {
   }
 
   @Test
-  public void testInsertionCompactionScheduleWithMultiTimePartitions()
+  public void testInsertionCompactionScheduleWithMultiTimePartitions1()
       throws IOException, InterruptedException {
     TsFileResource unseqResource1 =
         generateSingleNonAlignedSeriesFileWithDevices(
@@ -504,6 +505,60 @@ public class InsertionCrossSpaceCompactionTest extends 
AbstractCompactionTest {
         TsFileResourceManager.getInstance().getPriorityQueueSize());
   }
 
+  @Test
+  public void testInsertionCompactionScheduleWithMultiTimePartitions2()
+      throws IOException, InterruptedException {
+    IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+    int innerCompactionCandidateFileNum = 
config.getInnerCompactionCandidateFileNum();
+    config.setInnerCompactionCandidateFileNum(2);
+    try {
+      TsFileResource unseqResource1 =
+          generateSingleNonAlignedSeriesFileWithDevices(
+              "2-2-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new 
TimeRange(1, 4)}, false);
+      unseqResource1.setStatusForTest(TsFileResourceStatus.NORMAL);
+
+      TsFileResource unseqResource2 =
+          generateSingleNonAlignedSeriesFileWithDevices(
+              "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new 
TimeRange(6, 9)}, false);
+      unseqResource2.setStatusForTest(TsFileResourceStatus.NORMAL);
+      createTimePartitionDirIfNotExist(2808L);
+      TsFileResource unseqResource3 =
+          generateSingleNonAlignedSeriesFileWithDevicesWithTimePartition(
+              "4-4-0-0.tsfile",
+              new String[] {"d1"},
+              new TimeRange[] {new TimeRange(1698301490305L, 1698301490405L)},
+              2808L,
+              true);
+      TsFileResource unseqResource4 =
+          generateSingleNonAlignedSeriesFileWithDevicesWithTimePartition(
+              "5-5-0-0.tsfile",
+              new String[] {"d1"},
+              new TimeRange[] {new TimeRange(1698301490306L, 1698301490406L)},
+              2808L,
+              true);
+      unseqResource3.setStatusForTest(TsFileResourceStatus.NORMAL);
+      unseqResources.add(unseqResource1);
+      unseqResources.add(unseqResource2);
+      seqResources.add(unseqResource3);
+      seqResources.add(unseqResource4);
+
+      DataRegionForCompactionTest dataRegion = createDataRegion();
+      TsFileManager tsFileManager = dataRegion.getTsFileManager();
+      
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource1);
+      
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource2);
+      
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource3);
+      
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource4);
+      
tsFileManager.getOrCreateUnsequenceListByTimePartition(0).keepOrderInsert(unseqResource1);
+      
tsFileManager.getOrCreateUnsequenceListByTimePartition(0).keepOrderInsert(unseqResource2);
+      
tsFileManager.getOrCreateSequenceListByTimePartition(2808).keepOrderInsert(unseqResource3);
+      
tsFileManager.getOrCreateSequenceListByTimePartition(2808).keepOrderInsert(unseqResource4);
+      // 2 insertion task + 1 inner task
+      Assert.assertEquals(3, dataRegion.executeCompaction());
+    } finally {
+      
config.setInnerCompactionCandidateFileNum(innerCompactionCandidateFileNum);
+    }
+  }
+
   @Test
   public void testInsertionCompactionUpdateFileMetrics() throws IOException {
     TsFileResource unseqResource1 =
@@ -606,9 +661,10 @@ public class InsertionCrossSpaceCompactionTest extends 
AbstractCompactionTest {
     }
 
     public int executeInsertionCompaction() throws InterruptedException {
-      return super.executeInsertionCompaction(
-          new ArrayList<>(this.getTsFileManager().getTimePartitions()),
-          new CompactionScheduleContext());
+      CompactionScheduleContext context = new CompactionScheduleContext();
+      super.executeInsertionCompaction(
+          new ArrayList<>(this.getTsFileManager().getTimePartitions()), 
context);
+      return context.getSubmitCompactionTaskNum();
     }
   }
 }

Reply via email to