This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 37e9ee0ee99 Fix compaction schedule task manager cannot stop and
modify some output in log (#12125)
37e9ee0ee99 is described below
commit 37e9ee0ee99ca95890d76629640bb140b1ecd384
Author: shuwenwei <[email protected]>
AuthorDate: Fri Mar 8 09:31:01 2024 +0800
Fix compaction schedule task manager cannot stop and modify some output in
log (#12125)
---
.../thrift/impl/DataNodeInternalRPCServiceImpl.java | 12 ++++++++++--
.../config/executor/ClusterConfigTaskExecutor.java | 15 ++++++++++++---
.../iotdb/db/storageengine/dataregion/DataRegion.java | 2 +-
.../compaction/execute/task/InnerSpaceCompactionTask.java | 5 +++++
.../execute/task/InsertionCrossSpaceCompactionTask.java | 2 +-
.../compaction/repair/RepairTimePartitionScanTask.java | 7 +++----
.../schedule/CompactionScheduleTaskManager.java | 13 ++++++++-----
.../impl/RewriteCrossSpaceCompactionSelector.java | 5 +++++
.../cross/InsertionCrossSpaceCompactionSelectorTest.java | 8 ++++----
9 files changed, 49 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 42500a83ba9..a0dab7b8e9a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -122,6 +122,8 @@ import
org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
import
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
@@ -1336,8 +1338,14 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
if (storageEngine.repairData()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
- return RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running
repair task");
+ if
(CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus()
+ == RepairTaskStatus.STOPPING) {
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair task is
still stopping");
+ } else {
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running
repair task");
+ }
}
} catch (StorageEngineException e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index a98c7aeff10..1c0d10cdc21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -192,6 +192,8 @@ import
org.apache.iotdb.db.schemaengine.template.TemplateAlterOperationType;
import
org.apache.iotdb.db.schemaengine.template.alter.TemplateAlterOperationUtil;
import org.apache.iotdb.db.schemaengine.template.alter.TemplateExtendInfo;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import org.apache.iotdb.db.trigger.service.TriggerClassLoader;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.rpc.RpcUtils;
@@ -1034,9 +1036,16 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
if (StorageEngine.getInstance().repairData()) {
tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
- tsStatus =
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a
running repair task");
+ if
(CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus()
+ == RepairTaskStatus.STOPPING) {
+ tsStatus =
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair
task is still stopping");
+ } else {
+ tsStatus =
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a
running repair task");
+ }
}
} catch (Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2de67afec9d..20cbfef2aee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2446,7 +2446,7 @@ public class DataRegion implements IDataRegionForQuery {
tsFileManager, timePartition, insertionTaskPhaser);
}
trySubmitCount += currentSubmitCount;
- insertionTaskPhaser.arriveAndAwaitAdvance();
+
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
if (currentSubmitCount != 0) {
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index 5219ab6bbf1..d5d709a4df5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.tsfile.utils.TsFileUtils;
import java.io.File;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
@@ -486,6 +487,10 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
try {
memoryCost =
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
} catch (IOException e) {
+ if (e instanceof ClosedByInterruptException || Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ return -1;
+ }
innerSpaceEstimator.cleanup();
LOGGER.error("Meet error when estimate inner compaction memory", e);
return -1;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
index 2c4baf23c9c..6f262a79af6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InsertionCrossSpaceCompactionTask.java
@@ -105,7 +105,7 @@ public class InsertionCrossSpaceCompactionTask extends
AbstractCompactionTask {
@Override
public void handleTaskCleanup() {
- if (phaser != null && phaser.getRegisteredParties() > 0) {
+ if (phaser != null) {
phaser.arrive();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
index ee86f6f10b5..5bdb967d1b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
@@ -99,7 +99,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
scanUtil.scanTsFile();
checkTaskStatusAndMayStop();
if (scanUtil.isBrokenFile()) {
- LOGGER.warn("[RepairScheduler] file {} is skipped because it is
broken", sourceFile);
+ LOGGER.warn("[RepairScheduler] {} is skipped because it is broken",
sourceFile);
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR);
latch.countDown();
continue;
@@ -112,8 +112,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
sourceFile.readUnlock();
}
LOGGER.info(
- "[RepairScheduler] file {} need to repair because it has internal
unsorted data",
- sourceFile);
+ "[RepairScheduler] {} need to repair because it has internal
unsorted data", sourceFile);
TsFileManager tsFileManager = timePartition.getTsFileManager();
RepairUnsortedFileCompactionTask task =
new RepairUnsortedFileCompactionTask(
@@ -150,7 +149,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
false,
tsFileManager.getNextCompactionTaskId());
LOGGER.info(
- "[RepairScheduler] file {} need to repair because it is overlapped
with other files",
+ "[RepairScheduler] {} need to repair because it is overlapped with
other files",
overlapFile);
if (submitRepairFileTaskSafely(task)) {
latch.await();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index bc8a0bf8dd4..f449cb72ad0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -150,13 +150,15 @@ public class CompactionScheduleTaskManager implements
IService {
}
try {
compactionScheduleTaskThreadPool.shutdownNow();
- compactionScheduleTaskThreadPool.awaitTermination(milliseconds,
TimeUnit.MILLISECONDS);
+ if (!compactionScheduleTaskThreadPool.awaitTermination(
+ milliseconds, TimeUnit.MILLISECONDS)) {
+ throw new InterruptedException();
+ }
} catch (InterruptedException e) {
logger.warn(
"compaction schedule task thread pool can not be closed in {} ms",
milliseconds);
Thread.currentThread().interrupt();
}
- waitForThreadPoolTerminated();
} finally {
lock.unlock();
}
@@ -289,10 +291,11 @@ public class CompactionScheduleTaskManager implements
IService {
public Future<Void> submitRepairScanTask(RepairTimePartitionScanTask
scanTask) {
lock.lock();
- if (repairTaskStatus.get() != RepairTaskStatus.RUNNING) {
- return null;
- }
try {
+ if (repairTaskStatus.get() != RepairTaskStatus.RUNNING) {
+ logger.info("[RepairTaskManager] skip current task because repair
task is stopping");
+ return null;
+ }
Future<Void> future =
compactionScheduleTaskThreadPool.submit(scanTask);
submitRepairScanTaskFutures.add(future);
return future;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index e472e8ea8db..56efdfbf500 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -128,6 +129,10 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
return executeTaskResourceSelection(candidate);
} catch (IOException e) {
+ if (e instanceof ClosedByInterruptException || Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ return new CrossCompactionTaskResource();
+ }
throw new MergeException(e);
} finally {
compactionEstimator.cleanup();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
index 66671b2265f..bce51162000 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
@@ -1013,7 +1013,7 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
}
InsertionCrossSpaceCompactionTask task =
new InsertionCrossSpaceCompactionTask(
- new Phaser(),
+ new Phaser(1),
0,
tsFileManager,
taskResource,
@@ -1314,7 +1314,7 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
if (taskResource.isValid()) {
InsertionCrossSpaceCompactionTask task =
new InsertionCrossSpaceCompactionTask(
- new Phaser(),
+ new Phaser(1),
0,
tsFileManager,
taskResource,
@@ -1710,7 +1710,7 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
}
InsertionCrossSpaceCompactionTask task =
new InsertionCrossSpaceCompactionTask(
- new Phaser(),
+ new Phaser(1),
0,
tsFileManager,
taskResource,
@@ -2031,7 +2031,7 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
if (taskResource.isValid()) {
InsertionCrossSpaceCompactionTask task =
new InsertionCrossSpaceCompactionTask(
- new Phaser(),
+ new Phaser(1),
0,
tsFileManager,
taskResource,