This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 1a18dbb807 [To rel/0.13][IOTDB-5183]Fix CI OOM (#8412)
1a18dbb807 is described below

commit 1a18dbb807d7bab64c63a8d666b5812ed96881a5
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Dec 14 20:53:36 2022 +0800

    [To rel/0.13][IOTDB-5183]Fix CI OOM (#8412)
---
 .../iotdb/db/engine/compaction/CompactionTaskManager.java    | 12 +++++++++++-
 .../db/engine/compaction/cross/CrossSpaceCompactionTest.java |  2 ++
 .../db/engine/compaction/inner/InnerSeqCompactionTest.java   |  5 +++++
 .../db/engine/compaction/inner/InnerUnseqCompactionTest.java |  5 +++++
 4 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 070f0224d1..ddac1512ec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -134,6 +134,7 @@ public class CompactionTaskManager implements IService {
   @Override
   public void stop() {
     if (taskExecutionPool != null) {
+      subCompactionTaskExecutionPool.shutdownNow();
       taskExecutionPool.shutdownNow();
       compactionTaskSubmissionThreadPool.shutdownNow();
       logger.info("Waiting for task taskExecutionPool to shut down");
@@ -146,6 +147,7 @@ public class CompactionTaskManager implements IService {
   @Override
   public void waitAndStop(long milliseconds) {
     if (taskExecutionPool != null) {
+      awaitTermination(subCompactionTaskExecutionPool, milliseconds);
       awaitTermination(taskExecutionPool, milliseconds);
       awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
       logger.info("Waiting for task taskExecutionPool to shut down in {} ms", 
milliseconds);
@@ -184,7 +186,7 @@ public class CompactionTaskManager implements IService {
 
   private void waitTermination() {
     long startTime = System.currentTimeMillis();
-    while (!taskExecutionPool.isTerminated()) {
+    while (!subCompactionTaskExecutionPool.isTerminated() || 
!taskExecutionPool.isTerminated()) {
       int timeMillis = 0;
       try {
         Thread.sleep(200);
@@ -202,6 +204,7 @@ public class CompactionTaskManager implements IService {
       }
     }
     taskExecutionPool = null;
+    subCompactionTaskExecutionPool = null;
     storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
   }
@@ -369,6 +372,8 @@ public class CompactionTaskManager implements IService {
   public void restart() throws InterruptedException {
     if 
(IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() > 0) 
{
       if (taskExecutionPool != null) {
+        subCompactionTaskExecutionPool.shutdownNow();
+        subCompactionTaskExecutionPool.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
         this.taskExecutionPool.shutdownNow();
         this.taskExecutionPool.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
       }
@@ -377,6 +382,11 @@ public class CompactionTaskManager implements IService {
               IoTDBThreadPoolFactory.newScheduledThreadPool(
                   
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(),
                   ThreadName.COMPACTION_SERVICE.getName());
+      this.subCompactionTaskExecutionPool =
+          IoTDBThreadPoolFactory.newScheduledThreadPool(
+              
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+                  * 
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
+              ThreadName.COMPACTION_SUB_SERVICE.getName());
       this.compactionTaskSubmissionThreadPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
ThreadName.COMPACTION_SERVICE.getName());
       candidateCompactionTaskQueue.regsitPollLastHook(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index eadfd6b71e..2e0905ff39 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -110,6 +110,7 @@ public class CrossSpaceCompactionTest {
           Collections.emptyMap());
     }
     CompactionTaskManager.getInstance().start();
+    IoTDB.activated = true;
     Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
   }
 
@@ -121,6 +122,7 @@ public class CrossSpaceCompactionTest {
     TimeSeriesMetadataCache.getInstance().clear();
     IoTDB.metaManager.clear();
     CompactionTaskManager.getInstance().stop();
+    IoTDB.activated = false;
     EnvironmentUtils.cleanAllDir();
     Thread.currentThread().setName(oldThreadName);
     new CompactionConfigRestorer().restoreCompactionConfig();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index d7703a1580..03142c891d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction.inner;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
@@ -98,10 +99,14 @@ public class InnerSeqCompactionTest {
           TSFileDescriptor.getInstance().getConfig().getCompressor(),
           Collections.emptyMap());
     }
+    CompactionTaskManager.getInstance().start();
+    IoTDB.activated = true;
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
+    IoTDB.activated = false;
+    CompactionTaskManager.getInstance().stop();
     new CompactionConfigRestorer().restoreCompactionConfig();
     CompactionClearUtils.clearAllCompactionFiles();
     ChunkCache.getInstance().clear();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index 14707e6b96..a3ce39baa9 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.inner;
 
 import org.apache.iotdb.db.engine.cache.ChunkCache;
 import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
 import org.apache.iotdb.db.engine.compaction.CompactionUtils;
 import 
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils;
@@ -110,10 +111,14 @@ public class InnerUnseqCompactionTest {
           Collections.emptyMap());
     }
     Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
+    CompactionTaskManager.getInstance().start();
+    IoTDB.activated = true;
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
+    IoTDB.activated = false;
+    CompactionTaskManager.getInstance().stop();
     new CompactionConfigRestorer().restoreCompactionConfig();
     CompactionClearUtils.clearAllCompactionFiles();
     ChunkCache.getInstance().clear();

Reply via email to