This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 503776b3c88 delay estimate memory of InnerSpaceCompactionTask (#12314)
503776b3c88 is described below

commit 503776b3c8839e5e01b106203f0c2b09b6f76be9
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 10 20:36:26 2024 +0800

    delay estimate memory of InnerSpaceCompactionTask (#12314)
---
 .../compaction/schedule/CompactionScheduler.java   |  6 ---
 .../compaction/schedule/CompactionTaskQueue.java   | 52 ++++++++++------------
 2 files changed, 24 insertions(+), 34 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index 707f251ded7..6ebb5578670 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -200,12 +200,6 @@ public class CompactionScheduler {
           "Compaction task start check failed because disk free ratio is less 
than disk_space_warning_threshold");
       return false;
     }
-    // check task memory cost
-    long allocatedTotalCompactionMemory = 
SystemInfo.getInstance().getMemorySizeForCompaction();
-    long estimatedTaskMemoryCost = task.getEstimatedMemoryCost();
-    if (estimatedTaskMemoryCost < 0 || estimatedTaskMemoryCost > 
allocatedTotalCompactionMemory) {
-      return false;
-    }
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
index 66a6900afb1..9f0a5dbbc7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
@@ -36,48 +36,44 @@ public class CompactionTaskQueue extends 
FixedPriorityBlockingQueue<AbstractComp
   public AbstractCompactionTask take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
     while (true) {
+      AbstractCompactionTask task = null;
       lock.lockInterruptibly();
       try {
         while (queue.isEmpty()) {
           notEmpty.await();
         }
-        AbstractCompactionTask task = tryPollExecutableTask();
-        // task == null indicates that there is no runnable task now
-        if (task != null) {
-          return task;
-        }
+        task = queue.pollFirst();
       } finally {
         lock.unlock();
       }
-      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
-    }
-  }
-
-  private AbstractCompactionTask tryPollExecutableTask() {
-    while (true) {
-      if (queue.isEmpty()) {
-        return null;
-      }
-      AbstractCompactionTask task = queue.pollFirst();
-      if (task == null) {
-        continue;
-      }
-      if (!checkTaskValid(task)) {
-        dropCompactionTask(task);
-        continue;
-      }
-      if (!task.tryOccupyResourcesForRunning()) {
-        queue.add(task);
-        return null;
-      }
-      if (!transitTaskFileStatus(task)) {
-        dropCompactionTask(task);
+      boolean prepareTaskSuccess = prepareTask(task);
+      if (!prepareTaskSuccess) {
+        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
         continue;
       }
       return task;
     }
   }
 
+  private boolean prepareTask(AbstractCompactionTask task) throws 
InterruptedException {
+    if (task == null) {
+      return false;
+    }
+    if (!checkTaskValid(task)) {
+      dropCompactionTask(task);
+      return false;
+    }
+    if (!task.tryOccupyResourcesForRunning()) {
+      put(task);
+      return false;
+    }
+    if (!transitTaskFileStatus(task)) {
+      dropCompactionTask(task);
+      return false;
+    }
+    return true;
+  }
+
   private void dropCompactionTask(AbstractCompactionTask task) {
     task.resetCompactionCandidateStatusForAllSourceFiles();
     task.handleTaskCleanup();

Reply via email to