This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch fix-compaction-blocking in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a766abf779301423ee946082bbdcd511a0fa74aa Author: Liu Xuxin <[email protected]> AuthorDate: Sun May 7 19:04:54 2023 +0800 [To rel/1.1] [IOTDB-5844] Fix compaction module getting stuck (#9776) --- .../CompactionMemoryNotEnoughException.java | 27 ++++++++++++++++++++++ .../execute/task/CrossSpaceCompactionTask.java | 6 ++++- .../org/apache/iotdb/db/rescon/SystemInfo.java | 20 +++++++++++++++- 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionMemoryNotEnoughException.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionMemoryNotEnoughException.java new file mode 100644 index 0000000000..bd96afb613 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/exception/CompactionMemoryNotEnoughException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.compaction.execute.exception; + +public class CompactionMemoryNotEnoughException extends Exception { + + public CompactionMemoryNotEnoughException(String message) { + super(message); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java index 69f2de7f8c..1eddd86077 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CrossSpaceCompactionTask.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionExceptionHandler; +import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.engine.compaction.execute.performer.ICrossCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.engine.compaction.execute.task.subtask.FastCompactionTaskSummary; @@ -98,10 +99,13 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { @Override public boolean doCompaction() { try { - SystemInfo.getInstance().addCompactionMemoryCost(memoryCost); + SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60); } catch (InterruptedException e) { LOGGER.error("Interrupted when allocating memory for compaction", e); return false; + } catch (CompactionMemoryNotEnoughException e) { + LOGGER.error("No enough memory for current compaction task {}", this, e); + return false; } boolean isSuccess = true; try { diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index d85531a509..c77825ddd6 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.storagegroup.DataRegionInfo; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; @@ -181,13 +182,30 @@ public class SystemInfo { this.flushingMemTablesCost -= flushingMemTableCost; } - public void addCompactionMemoryCost(long memoryCost) throws InterruptedException { + public void addCompactionMemoryCost(long memoryCost, long timeOutInSecond) + throws InterruptedException, CompactionMemoryNotEnoughException { if (!config.isEnableCompactionMemControl()) { return; } + if (memoryCost > memorySizeForCompaction) { + // required memory cost is greater than the total memory budget for compaction + throw new CompactionMemoryNotEnoughException( + String.format( + "Required memory cost %d bytes is greater than " + + "the total memory budget for compaction %d bytes", + memoryCost, memorySizeForCompaction)); + } + long startTime = System.currentTimeMillis(); long originSize = this.compactionMemoryCost.get(); while (originSize + memoryCost > memorySizeForCompaction || !compactionMemoryCost.compareAndSet(originSize, originSize + memoryCost)) { + if (System.currentTimeMillis() - startTime >= timeOutInSecond * 1000L) { + throw new CompactionMemoryNotEnoughException( + String.format( + "Failed to allocate %d bytes memory for compaction after %d seconds, " + + "total memory budget for compaction module is %d bytes, %d bytes is used", + memoryCost, timeOutInSecond, memorySizeForCompaction, originSize)); + } Thread.sleep(100); originSize = this.compactionMemoryCost.get(); }
