This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixBugs-0414 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 09d5ac4788d4440c03ebb52447abcf2f1ed6c5b0 Author: shuwenwei <[email protected]> AuthorDate: Tue Apr 14 11:00:27 2026 +0800 Fix: send set configuration only to target nodes and harden compaction schedule interruption handling --- .../relational/it/db/it/IoTDBSetConfigurationTableIT.java | 14 ++++++++++++++ .../apache/iotdb/confignode/manager/node/NodeManager.java | 2 +- .../iotdb/db/storageengine/dataregion/DataRegion.java | 6 +++++- .../schedule/CompactionScheduleTaskManager.java | 9 +++++++++ .../compaction/schedule/CompactionScheduleTaskWorker.java | 15 +++++++++++++-- .../dataregion/compaction/schedule/TTLScheduleTask.java | 13 +++++++++++-- 6 files changed, 53 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java index 2295b88f1bc..8a4bc388a59 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java @@ -109,6 +109,9 @@ public class IoTDBSetConfigurationTableIT { statement.execute( "set configuration inner_compaction_candidate_file_num='1',max_cross_compaction_candidate_file_num='1' on " + dnId); + if (dnId == 5) { + statement.execute("set configuration compaction_schedule_thread_num='2' on 5"); + } } } catch (Exception e) { Assert.fail(e.getMessage()); @@ -131,6 +134,17 @@ public class IoTDBSetConfigurationTableIT { "enable_cross_space_compaction=false", "inner_compaction_candidate_file_num=1", "max_cross_compaction_candidate_file_num=1")); + boolean scheduleThreadNumChanged = + checkConfigFileContains( + dnId, + EnvFactory.getEnv().getDataNodeWrapperList().get(i), + "compaction_schedule_thread_num=2"); + if (scheduleThreadNumChanged && dnId != 5) { + Assert.fail(); + } + if (!scheduleThreadNumChanged && dnId == 5) { + Assert.fail(); + } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index e3d775259d6..2e50c6b787b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -1052,7 +1052,7 @@ public class NodeManager { if (!targetDataNodes.isEmpty()) { DataNodeAsyncRequestContext<Object, TSStatus> clientHandler = new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.SET_CONFIGURATION, req, dataNodeLocationMap); + CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes); CnToDnInternalServiceAsyncRequestManager.getInstance() .sendAsyncRequestWithRetry(clientHandler); responseList.addAll(clientHandler.getResponseList()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index eb37cb1d5d2..c65105ae54c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3705,7 +3705,11 @@ public class DataRegion implements IDataRegionForQuery { if (!regionObjectDir.isDirectory()) { continue; } - CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, databaseName); + try { + CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, databaseName); + } catch (Exception e) { + logger.error("Failed to execute object ttl check", e); + } } CompactionMetrics.getInstance() .updateTTLCheckForObjectFileCost(System.currentTimeMillis() - startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java index 8348b878137..516b1489a20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java @@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements IService { ConcurrentHashMap.newKeySet(); private ReentrantLock lock = new ReentrantLock(); private volatile boolean init = false; + private volatile boolean isStoppingAllScheduleTask = false; @Override public void start() throws StartupException { @@ -76,8 +77,13 @@ public class CompactionScheduleTaskManager implements IService { logger.info("Compaction schedule task manager started."); } + public boolean isStoppingAllScheduleTask() { + return isStoppingAllScheduleTask; + } + public void stopCompactionScheduleTasks() throws InterruptedException { lock.lock(); + isStoppingAllScheduleTask = true; try { for (Future<Void> task : submitCompactionScheduleTaskFutures) { task.cancel(true); @@ -121,6 +127,7 @@ public class CompactionScheduleTaskManager implements IService { public void startScheduleTasks() { lock.lock(); + isStoppingAllScheduleTask = false; try { // compaction selector for (int workerId = 0; workerId < compactionSelectorNum; workerId++) { @@ -144,6 +151,7 @@ public class CompactionScheduleTaskManager implements IService { @Override public void stop() { lock.lock(); + isStoppingAllScheduleTask = true; try { if (!init) { return; @@ -160,6 +168,7 @@ public class CompactionScheduleTaskManager implements IService { @Override public void waitAndStop(long milliseconds) { lock.lock(); + isStoppingAllScheduleTask = true; try { if (!init) { return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java index 17ad0dd4334..f5646fbed4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java @@ -72,9 +72,20 @@ public class CompactionScheduleTaskWorker implements Callable<Void> { dataRegion.executeCompaction(); } } catch (InterruptedException ignored) { + boolean isStoppedByUser = + CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask(); logger.info( - "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted", workerId); - return null; + "[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}", + workerId, + isStoppedByUser); + if (isStoppedByUser) { + return null; + } + } catch (Exception e) { + logger.error( + "[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task", + workerId, + e); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java index e15a908ac24..c7b5271f352 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java @@ -78,8 +78,17 @@ public class TTLScheduleTask implements Callable<Void> { } } } catch (InterruptedException ignored) { - logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId); - return null; + boolean isStoppedByUser = + CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask(); + logger.info( + "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}", + workerId, + isStoppedByUser); + if (isStoppedByUser) { + return null; + } + } catch (Exception e) { + logger.error("[TTLCheckTask-{}] Failed to execute ttl check", workerId, e); } } }
