This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch avoid_stack_overflow_0.11 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8638e9db23f5a169c9bf70ef6ac5c470fb9a986e Author: qiaojialin <[email protected]> AuthorDate: Thu Jun 10 15:00:18 2021 +0800 avoid stack overflow risk in recover --- .../compaction/CompactionMergeTaskPoolManager.java | 7 ++++++- .../iotdb/db/engine/compaction/TsFileManagement.java | 6 +----- .../db/engine/storagegroup/StorageGroupProcessor.java | 19 +++---------------- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java index f949cfa..19f2ac7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java @@ -25,6 +25,7 @@ import java.io.File; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; @@ -46,7 +47,7 @@ public class CompactionMergeTaskPoolManager implements IService { private static final Logger logger = LoggerFactory .getLogger(CompactionMergeTaskPoolManager.class); private static final CompactionMergeTaskPoolManager INSTANCE = new CompactionMergeTaskPoolManager(); - private ExecutorService pool; + private ScheduledExecutorService pool; private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new ConcurrentHashMap<>(); public static CompactionMergeTaskPoolManager getInstance() { @@ -147,6 +148,10 @@ public class CompactionMergeTaskPoolManager implements IService { sgCompactionStatus.put(storageGroupName, false); } + public void init(Runnable function) { + pool.scheduleWithFixedDelay(function, 1, 1, TimeUnit.SECONDS); + } + public synchronized void submitTask(StorageGroupCompactionTask storageGroupCompactionTask) throws RejectedExecutionException { if (pool != null && !pool.isTerminated()) { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java index 2625b92..953de2a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java @@ -154,18 +154,14 @@ public abstract class TsFileManagement { public class CompactionOnePartitionUtil { - private CloseCompactionMergeCallBack closeCompactionMergeCallBack; private long timePartitionId; - public CompactionOnePartitionUtil( - CloseCompactionMergeCallBack closeCompactionMergeCallBack, long timePartitionId) { - this.closeCompactionMergeCallBack = closeCompactionMergeCallBack; + public CompactionOnePartitionUtil(long timePartitionId) { this.timePartitionId = timePartitionId; } public void run() { merge(timePartitionId); - closeCompactionMergeCallBack.call(isMergeExecutedInCurrentTask, timePartitionId); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 9f0cf36..9d7357f 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -391,13 +391,12 @@ public class StorageGroupProcessor { private void recoverCompaction() { if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) { - logger.info("{} submit a compaction merge task", storageGroupName); try { CompactionMergeTaskPoolManager.getInstance() .submitTask( tsFileManagement.new CompactionRecoverTask(this::closeCompactionRecoverCallBack)); + logger.info("{} submit a compaction merge task", storageGroupName); } catch (RejectedExecutionException e) { - this.closeCompactionRecoverCallBack(false, 0); logger.error("{} compaction submit task failed", storageGroupName); } } else { @@ -1884,11 +1883,8 @@ public class StorageGroupProcessor { // fork and filter current tsfile, then commit then to compaction merge tsFileManagement.forkCurrentFileList(timePartition); tsFileManagement.setForceFullMerge(fullMerge); - tsFileManagement - .new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, timePartition) - .run(); + tsFileManagement.new CompactionOnePartitionUtil(timePartition).run(); } catch (IOException e) { - this.closeCompactionMergeCallBack(false, timePartition); logger.error("{} compaction submit task failed", storageGroupName); } } @@ -1898,16 +1894,7 @@ public class StorageGroupProcessor { CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(storageGroupName); if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) { logger.info("{} recover finished, submit continuous compaction task", storageGroupName); - CompactionMergeTaskPoolManager.getInstance() - .submitTask(new CompactionAllPartitionTask(storageGroupName)); - } - } - - /** close compaction merge callback, to release some locks */ - private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) { - if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) { - syncCompactOnePartition( - timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); + CompactionMergeTaskPoolManager.getInstance().init(this::merge); } }
