This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new f57782a  [IOTDB-1419][To rel/0.11] avoid stack overflow risk in 
recover (#3384)
f57782a is described below

commit f57782a0cfdfc59be252b8f671f35d5f227b7039
Author: Jialin Qiao <[email protected]>
AuthorDate: Thu Jun 10 02:46:54 2021 -0500

    [IOTDB-1419][To rel/0.11] avoid stack overflow risk in recover (#3384)
---
 .../engine/compaction/CompactionMergeTaskPoolManager.java |  7 ++++++-
 .../iotdb/db/engine/compaction/TsFileManagement.java      |  6 ++++--
 .../db/engine/storagegroup/StorageGroupProcessor.java     | 15 ++++-----------
 3 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index f949cfa..19f2ac7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
@@ -46,7 +47,7 @@ public class CompactionMergeTaskPoolManager implements 
IService {
   private static final Logger logger = LoggerFactory
       .getLogger(CompactionMergeTaskPoolManager.class);
   private static final CompactionMergeTaskPoolManager INSTANCE = new 
CompactionMergeTaskPoolManager();
-  private ExecutorService pool;
+  private ScheduledExecutorService pool;
 
   private static ConcurrentHashMap<String, Boolean> sgCompactionStatus = new 
ConcurrentHashMap<>();
   public static CompactionMergeTaskPoolManager getInstance() {
@@ -147,6 +148,10 @@ public class CompactionMergeTaskPoolManager implements 
IService {
     sgCompactionStatus.put(storageGroupName, false);
   }
 
+  public void init(Runnable function) {
+    pool.scheduleWithFixedDelay(function, 1, 1, TimeUnit.SECONDS);
+  }
+
   public synchronized void submitTask(StorageGroupCompactionTask 
storageGroupCompactionTask)
       throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 2625b92..da92f4f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -154,11 +154,13 @@ public abstract class TsFileManagement {
 
   public class CompactionOnePartitionUtil {
 
-    private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
 
+    private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
+
     public CompactionOnePartitionUtil(
-        CloseCompactionMergeCallBack closeCompactionMergeCallBack, long 
timePartitionId) {
+        CloseCompactionMergeCallBack closeCompactionMergeCallBack,
+        long timePartitionId) {
       this.closeCompactionMergeCallBack = closeCompactionMergeCallBack;
       this.timePartitionId = timePartitionId;
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9f0cf36..6b70bf5 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -391,13 +391,12 @@ public class StorageGroupProcessor {
 
   private void recoverCompaction() {
     if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) {
-      logger.info("{} submit a compaction merge task", storageGroupName);
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
                 tsFileManagement.new 
CompactionRecoverTask(this::closeCompactionRecoverCallBack));
+        logger.info("{} submit a compaction merge task", storageGroupName);
       } catch (RejectedExecutionException e) {
-        this.closeCompactionRecoverCallBack(false, 0);
         logger.error("{} compaction submit task failed", storageGroupName);
       }
     } else {
@@ -1888,7 +1887,6 @@ public class StorageGroupProcessor {
           .new CompactionOnePartitionUtil(this::closeCompactionMergeCallBack, 
timePartition)
           .run();
     } catch (IOException e) {
-      this.closeCompactionMergeCallBack(false, timePartition);
       logger.error("{} compaction submit task failed", storageGroupName);
     }
   }
@@ -1898,18 +1896,13 @@ public class StorageGroupProcessor {
     
CompactionMergeTaskPoolManager.getInstance().clearCompactionStatus(storageGroupName);
     if 
(IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
       logger.info("{} recover finished, submit continuous compaction task", 
storageGroupName);
-      CompactionMergeTaskPoolManager.getInstance()
-          .submitTask(new CompactionAllPartitionTask(storageGroupName));
+      CompactionMergeTaskPoolManager.getInstance().init(this::merge);
     }
   }
 
   /** close compaction merge callback, to release some locks */
-  private void closeCompactionMergeCallBack(boolean isMerge, long 
timePartitionId) {
-    if (isMerge && 
IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) {
-      syncCompactOnePartition(
-          timePartitionId, 
IoTDBDescriptor.getInstance().getConfig().isForceFullMerge());
-    }
-  }
+  private void closeCompactionMergeCallBack(boolean isMerge, long 
timePartitionId) {}
+
 
   /**
    * count all Tsfiles in the storage group which need to be upgraded

Reply via email to