This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e9ee0ee99 Fix compaction schedule task manager cannot stop and 
modify some output in log (#12125)
37e9ee0ee99 is described below

commit 37e9ee0ee99ca95890d76629640bb140b1ecd384
Author: shuwenwei <[email protected]>
AuthorDate: Fri Mar 8 09:31:01 2024 +0800

    Fix compaction schedule task manager cannot stop and modify some output in 
log (#12125)
---
 .../thrift/impl/DataNodeInternalRPCServiceImpl.java       | 12 ++++++++++--
 .../config/executor/ClusterConfigTaskExecutor.java        | 15 ++++++++++++---
 .../iotdb/db/storageengine/dataregion/DataRegion.java     |  2 +-
 .../compaction/execute/task/InnerSpaceCompactionTask.java |  5 +++++
 .../execute/task/InsertionCrossSpaceCompactionTask.java   |  2 +-
 .../compaction/repair/RepairTimePartitionScanTask.java    |  7 +++----
 .../schedule/CompactionScheduleTaskManager.java           | 13 ++++++++-----
 .../impl/RewriteCrossSpaceCompactionSelector.java         |  5 +++++
 .../cross/InsertionCrossSpaceCompactionSelectorTest.java  |  8 ++++----
 9 files changed, 49 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 42500a83ba9..a0dab7b8e9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -122,6 +122,8 @@ import 
org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
 import org.apache.iotdb.db.service.DataNode;
 import org.apache.iotdb.db.service.RegionMigrateService;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
@@ -1336,8 +1338,14 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       if (storageEngine.repairData()) {
         return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
       } else {
-        return RpcUtils.getStatus(
-            TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running 
repair task");
+        if 
(CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus()
+            == RepairTaskStatus.STOPPING) {
+          return RpcUtils.getStatus(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair task is 
still stopping");
+        } else {
+          return RpcUtils.getStatus(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running 
repair task");
+        }
       }
     } catch (StorageEngineException e) {
       return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a98c7aeff10..1c0d10cdc21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -192,6 +192,8 @@ import 
org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
 import 
org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
 import org.apache.iotdb.db.schemaengine.template.alter.TemplateExtendInfo;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
 import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
 import org.apache.iotdb.pipe.api.PipePlugin;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -1034,9 +1036,16 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         if (StorageEngine.getInstance().repairData()) {
           tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
         } else {
-          tsStatus =
-              RpcUtils.getStatus(
-                  TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a 
running repair task");
+          if 
(CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus()
+              == RepairTaskStatus.STOPPING) {
+            tsStatus =
+                RpcUtils.getStatus(
+                    TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair 
task is still stopping");
+          } else {
+            tsStatus =
+                RpcUtils.getStatus(
+                    TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a 
running repair task");
+          }
         }
       } catch (Exception e) {
         tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2de67afec9d..20cbfef2aee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2446,7 +2446,7 @@ public class DataRegion implements IDataRegionForQuery {
                   tsFileManager, timePartition, insertionTaskPhaser);
         }
         trySubmitCount += currentSubmitCount;
-        insertionTaskPhaser.arriveAndAwaitAdvance();
+        
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
         if (currentSubmitCount != 0) {
           continue;
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 5219ab6bbf1..d5d709a4df5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.tsfile.utils.TsFileUtils;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -486,6 +487,10 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
       try {
         memoryCost = 
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
       } catch (IOException e) {
+        if (e instanceof ClosedByInterruptException || Thread.interrupted()) {
+          Thread.currentThread().interrupt();
+          return -1;
+        }
         innerSpaceEstimator.cleanup();
         LOGGER.error("Meet error when estimate inner compaction memory", e);
         return -1;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
index 2c4baf23c9c..6f262a79af6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
@@ -105,7 +105,7 @@ public class InsertionCrossSpaceCompactionTask extends 
AbstractCompactionTask {
 
   @Override
   public void handleTaskCleanup() {
-    if (phaser != null && phaser.getRegisteredParties() > 0) {
+    if (phaser != null) {
       phaser.arrive();
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
index ee86f6f10b5..5bdb967d1b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
@@ -99,7 +99,7 @@ public class RepairTimePartitionScanTask implements 
Callable<Void> {
         scanUtil.scanTsFile();
         checkTaskStatusAndMayStop();
         if (scanUtil.isBrokenFile()) {
-          LOGGER.warn("[RepairScheduler] file {} is skipped because it is 
broken", sourceFile);
+          LOGGER.warn("[RepairScheduler] {} is skipped because it is broken", 
sourceFile);
           sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR);
           latch.countDown();
           continue;
@@ -112,8 +112,7 @@ public class RepairTimePartitionScanTask implements 
Callable<Void> {
         sourceFile.readUnlock();
       }
       LOGGER.info(
-          "[RepairScheduler] file {} need to repair because it has internal 
unsorted data",
-          sourceFile);
+          "[RepairScheduler] {} need to repair because it has internal 
unsorted data", sourceFile);
       TsFileManager tsFileManager = timePartition.getTsFileManager();
       RepairUnsortedFileCompactionTask task =
           new RepairUnsortedFileCompactionTask(
@@ -150,7 +149,7 @@ public class RepairTimePartitionScanTask implements 
Callable<Void> {
               false,
               tsFileManager.getNextCompactionTaskId());
       LOGGER.info(
-          "[RepairScheduler] file {} need to repair because it is overlapped 
with other files",
+          "[RepairScheduler] {} need to repair because it is overlapped with 
other files",
           overlapFile);
       if (submitRepairFileTaskSafely(task)) {
         latch.await();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index bc8a0bf8dd4..f449cb72ad0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -150,13 +150,15 @@ public class CompactionScheduleTaskManager implements 
IService {
       }
       try {
         compactionScheduleTaskThreadPool.shutdownNow();
-        compactionScheduleTaskThreadPool.awaitTermination(milliseconds, 
TimeUnit.MILLISECONDS);
+        if (!compactionScheduleTaskThreadPool.awaitTermination(
+            milliseconds, TimeUnit.MILLISECONDS)) {
+          throw new InterruptedException();
+        }
       } catch (InterruptedException e) {
         logger.warn(
             "compaction schedule task thread pool can not be closed in {} ms", 
milliseconds);
         Thread.currentThread().interrupt();
       }
-      waitForThreadPoolTerminated();
     } finally {
       lock.unlock();
     }
@@ -289,10 +291,11 @@ public class CompactionScheduleTaskManager implements 
IService {
 
     public Future<Void> submitRepairScanTask(RepairTimePartitionScanTask 
scanTask) {
       lock.lock();
-      if (repairTaskStatus.get() != RepairTaskStatus.RUNNING) {
-        return null;
-      }
       try {
+        if (repairTaskStatus.get() != RepairTaskStatus.RUNNING) {
+          logger.info("[RepairTaskManager] skip current task because repair 
task is stopping");
+          return null;
+        }
         Future<Void> future = 
compactionScheduleTaskThreadPool.submit(scanTask);
         submitRepairScanTaskFutures.add(future);
         return future;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index e472e8ea8db..56efdfbf500 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -128,6 +129,10 @@ public class RewriteCrossSpaceCompactionSelector 
implements ICrossSpaceSelector
 
       return executeTaskResourceSelection(candidate);
     } catch (IOException e) {
+      if (e instanceof ClosedByInterruptException || Thread.interrupted()) {
+        Thread.currentThread().interrupt();
+        return new CrossCompactionTaskResource();
+      }
       throw new MergeException(e);
     } finally {
       compactionEstimator.cleanup();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
index 66671b2265f..bce51162000 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
@@ -1013,7 +1013,7 @@ public class InsertionCrossSpaceCompactionSelectorTest 
extends AbstractCompactio
       }
       InsertionCrossSpaceCompactionTask task =
           new InsertionCrossSpaceCompactionTask(
-              new Phaser(),
+              new Phaser(1),
               0,
               tsFileManager,
               taskResource,
@@ -1314,7 +1314,7 @@ public class InsertionCrossSpaceCompactionSelectorTest 
extends AbstractCompactio
       if (taskResource.isValid()) {
         InsertionCrossSpaceCompactionTask task =
             new InsertionCrossSpaceCompactionTask(
-                new Phaser(),
+                new Phaser(1),
                 0,
                 tsFileManager,
                 taskResource,
@@ -1710,7 +1710,7 @@ public class InsertionCrossSpaceCompactionSelectorTest 
extends AbstractCompactio
       }
       InsertionCrossSpaceCompactionTask task =
           new InsertionCrossSpaceCompactionTask(
-              new Phaser(),
+              new Phaser(1),
               0,
               tsFileManager,
               taskResource,
@@ -2031,7 +2031,7 @@ public class InsertionCrossSpaceCompactionSelectorTest 
extends AbstractCompactio
       if (taskResource.isValid()) {
         InsertionCrossSpaceCompactionTask task =
             new InsertionCrossSpaceCompactionTask(
-                new Phaser(),
+                new Phaser(1),
                 0,
                 tsFileManager,
                 taskResource,

Reply via email to