This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 503776b3c88 delay estimate memory of InnerSpaceCompactionTask (#12314)
503776b3c88 is described below
commit 503776b3c8839e5e01b106203f0c2b09b6f76be9
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 10 20:36:26 2024 +0800
delay estimate memory of InnerSpaceCompactionTask (#12314)
---
.../compaction/schedule/CompactionScheduler.java | 6 ---
.../compaction/schedule/CompactionTaskQueue.java | 52 ++++++++++------------
2 files changed, 24 insertions(+), 34 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index 707f251ded7..6ebb5578670 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -200,12 +200,6 @@ public class CompactionScheduler {
"Compaction task start check failed because disk free ratio is less
than disk_space_warning_threshold");
return false;
}
- // check task memory cost
- long allocatedTotalCompactionMemory =
SystemInfo.getInstance().getMemorySizeForCompaction();
- long estimatedTaskMemoryCost = task.getEstimatedMemoryCost();
- if (estimatedTaskMemoryCost < 0 || estimatedTaskMemoryCost >
allocatedTotalCompactionMemory) {
- return false;
- }
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
index 66a6900afb1..9f0a5dbbc7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java
@@ -36,48 +36,44 @@ public class CompactionTaskQueue extends
FixedPriorityBlockingQueue<AbstractComp
public AbstractCompactionTask take() throws InterruptedException {
final ReentrantLock lock = this.lock;
while (true) {
+ AbstractCompactionTask task = null;
lock.lockInterruptibly();
try {
while (queue.isEmpty()) {
notEmpty.await();
}
- AbstractCompactionTask task = tryPollExecutableTask();
- // task == null indicates that there is no runnable task now
- if (task != null) {
- return task;
- }
+ task = queue.pollFirst();
} finally {
lock.unlock();
}
- Thread.sleep(TimeUnit.SECONDS.toMillis(1));
- }
- }
-
- private AbstractCompactionTask tryPollExecutableTask() {
- while (true) {
- if (queue.isEmpty()) {
- return null;
- }
- AbstractCompactionTask task = queue.pollFirst();
- if (task == null) {
- continue;
- }
- if (!checkTaskValid(task)) {
- dropCompactionTask(task);
- continue;
- }
- if (!task.tryOccupyResourcesForRunning()) {
- queue.add(task);
- return null;
- }
- if (!transitTaskFileStatus(task)) {
- dropCompactionTask(task);
+ boolean prepareTaskSuccess = prepareTask(task);
+ if (!prepareTaskSuccess) {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
continue;
}
return task;
}
}
+ private boolean prepareTask(AbstractCompactionTask task) throws
InterruptedException {
+ if (task == null) {
+ return false;
+ }
+ if (!checkTaskValid(task)) {
+ dropCompactionTask(task);
+ return false;
+ }
+ if (!task.tryOccupyResourcesForRunning()) {
+ put(task);
+ return false;
+ }
+ if (!transitTaskFileStatus(task)) {
+ dropCompactionTask(task);
+ return false;
+ }
+ return true;
+ }
+
private void dropCompactionTask(AbstractCompactionTask task) {
task.resetCompactionCandidateStatusForAllSourceFiles();
task.handleTaskCleanup();