This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fc2ff7d80a4 Fix compaction worker stopped after drop database (#12357)
fc2ff7d80a4 is described below

commit fc2ff7d80a469dd07e6a5f4c5dce6b27cef85c77
Author: shuwenwei <[email protected]>
AuthorDate: Thu Apr 18 09:57:28 2024 +0800

    Fix compaction worker stopped after drop database (#12357)
    
    * fix compaction worker stopped after drop database
    
    * fix restart
---
 .../compaction/schedule/CompactionTaskManager.java           |  9 +++++++++
 .../dataregion/compaction/schedule/CompactionWorker.java     | 12 ++++++++++--
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 7fa2e1158aa..aa3073bdce7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -69,6 +69,7 @@ public class CompactionTaskManager implements IService {
   // The thread pool that executes the compaction task. The default number of 
threads for this pool
   // is 10.
   private WrappedThreadPoolExecutor taskExecutionPool;
+  private volatile boolean stopAllCompactionWorker = false;
 
   // The thread pool that executes the sub compaction task.
   private WrappedThreadPoolExecutor subCompactionTaskExecutionPool;
@@ -90,6 +91,10 @@ public class CompactionTaskManager implements IService {
     return INSTANCE;
   }
 
+  public boolean isStopAllCompactionWorker() {
+    return stopAllCompactionWorker;
+  }
+
   @Override
   public synchronized void start() {
     if (taskExecutionPool == null
@@ -125,6 +130,7 @@ public class CompactionTaskManager implements IService {
 
   @Override
   public void stop() {
+    stopAllCompactionWorker = true;
     if (taskExecutionPool != null) {
       subCompactionTaskExecutionPool.shutdownNow();
       taskExecutionPool.shutdownNow();
@@ -137,6 +143,7 @@ public class CompactionTaskManager implements IService {
 
   @Override
   public void waitAndStop(long milliseconds) {
+    stopAllCompactionWorker = true;
     if (taskExecutionPool != null) {
       awaitTermination(subCompactionTaskExecutionPool, milliseconds);
       awaitTermination(taskExecutionPool, milliseconds);
@@ -412,6 +419,7 @@ public class CompactionTaskManager implements IService {
   }
 
   public void restart() throws InterruptedException {
+    stopAllCompactionWorker = true;
     if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() > 
0) {
       if (subCompactionTaskExecutionPool != null) {
         this.subCompactionTaskExecutionPool.shutdownNow();
@@ -438,6 +446,7 @@ public class CompactionTaskManager implements IService {
       init = true;
     }
     init = true;
+    stopAllCompactionWorker = false;
     logger.info("Compaction task manager started.");
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index 903c16a5898..455f375962d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -50,14 +50,22 @@ public class CompactionWorker implements Runnable {
   @SuppressWarnings("squid:S2142")
   @Override
   public void run() {
-    while (!Thread.currentThread().isInterrupted()) {
+    while (true) {
+      if (Thread.currentThread().isInterrupted()) {
+        // If the interrupt is caused by `drop database`, clear the status
+        if (!CompactionTaskManager.getInstance().isStopAllCompactionWorker()) {
+          Thread.interrupted();
+          continue;
+        }
+        return;
+      }
       AbstractCompactionTask task;
       try {
         task = compactionTaskQueue.take();
       } catch (InterruptedException e) {
         LOGGER.warn("CompactionThread-{} terminates because interruption", 
threadId);
         Thread.currentThread().interrupt();
-        return;
+        continue;
       }
       processOneCompactionTask(task);
     }

Reply via email to