This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch fixBugs-0414
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 09d5ac4788d4440c03ebb52447abcf2f1ed6c5b0
Author: shuwenwei <[email protected]>
AuthorDate: Tue Apr 14 11:00:27 2026 +0800

    Fix: send set configuration only to target nodes and harden compaction 
schedule interruption handling
---
 .../relational/it/db/it/IoTDBSetConfigurationTableIT.java | 14 ++++++++++++++
 .../apache/iotdb/confignode/manager/node/NodeManager.java |  2 +-
 .../iotdb/db/storageengine/dataregion/DataRegion.java     |  6 +++++-
 .../schedule/CompactionScheduleTaskManager.java           |  9 +++++++++
 .../compaction/schedule/CompactionScheduleTaskWorker.java | 15 +++++++++++++--
 .../dataregion/compaction/schedule/TTLScheduleTask.java   | 13 +++++++++++--
 6 files changed, 53 insertions(+), 6 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
index 2295b88f1bc..8a4bc388a59 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java
@@ -109,6 +109,9 @@ public class IoTDBSetConfigurationTableIT {
         statement.execute(
             "set configuration 
inner_compaction_candidate_file_num='1',max_cross_compaction_candidate_file_num='1'
 on "
                 + dnId);
+        if (dnId == 5) {
+          statement.execute("set configuration 
compaction_schedule_thread_num='2' on 5");
+        }
       }
     } catch (Exception e) {
       Assert.fail(e.getMessage());
@@ -131,6 +134,17 @@ public class IoTDBSetConfigurationTableIT {
               "enable_cross_space_compaction=false",
               "inner_compaction_candidate_file_num=1",
               "max_cross_compaction_candidate_file_num=1"));
+      boolean scheduleThreadNumChanged =
+          checkConfigFileContains(
+              dnId,
+              EnvFactory.getEnv().getDataNodeWrapperList().get(i),
+              "compaction_schedule_thread_num=2");
+      if (scheduleThreadNumChanged && dnId != 5) {
+        Assert.fail();
+      }
+      if (!scheduleThreadNumChanged && dnId == 5) {
+        Assert.fail();
+      }
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index e3d775259d6..2e50c6b787b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -1052,7 +1052,7 @@ public class NodeManager {
     if (!targetDataNodes.isEmpty()) {
       DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
           new DataNodeAsyncRequestContext<>(
-              CnToDnAsyncRequestType.SET_CONFIGURATION, req, 
dataNodeLocationMap);
+              CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
       CnToDnInternalServiceAsyncRequestManager.getInstance()
           .sendAsyncRequestWithRetry(clientHandler);
       responseList.addAll(clientHandler.getResponseList());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eb37cb1d5d2..c65105ae54c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3705,7 +3705,11 @@ public class DataRegion implements IDataRegionForQuery {
       if (!regionObjectDir.isDirectory()) {
         continue;
       }
-      CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, 
databaseName);
+      try {
+        
CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, 
databaseName);
+      } catch (Exception e) {
+        logger.error("Failed to execute object ttl check", e);
+      }
     }
     CompactionMetrics.getInstance()
         .updateTTLCheckForObjectFileCost(System.currentTimeMillis() - 
startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
index 8348b878137..516b1489a20 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements 
IService {
       ConcurrentHashMap.newKeySet();
   private ReentrantLock lock = new ReentrantLock();
   private volatile boolean init = false;
+  private volatile boolean isStoppingAllScheduleTask = false;
 
   @Override
   public void start() throws StartupException {
@@ -76,8 +77,13 @@ public class CompactionScheduleTaskManager implements 
IService {
     logger.info("Compaction schedule task manager started.");
   }
 
+  public boolean isStoppingAllScheduleTask() {
+    return isStoppingAllScheduleTask;
+  }
+
   public void stopCompactionScheduleTasks() throws InterruptedException {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       for (Future<Void> task : submitCompactionScheduleTaskFutures) {
         task.cancel(true);
@@ -121,6 +127,7 @@ public class CompactionScheduleTaskManager implements 
IService {
 
   public void startScheduleTasks() {
     lock.lock();
+    isStoppingAllScheduleTask = false;
     try {
       // compaction selector
       for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public class CompactionScheduleTaskManager implements 
IService {
   @Override
   public void stop() {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       if (!init) {
         return;
@@ -160,6 +168,7 @@ public class CompactionScheduleTaskManager implements 
IService {
   @Override
   public void waitAndStop(long milliseconds) {
     lock.lock();
+    isStoppingAllScheduleTask = true;
     try {
       if (!init) {
         return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
index 17ad0dd4334..f5646fbed4a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java
@@ -72,9 +72,20 @@ public class CompactionScheduleTaskWorker implements 
Callable<Void> {
           dataRegion.executeCompaction();
         }
       } catch (InterruptedException ignored) {
+        boolean isStoppedByUser =
+            
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
         logger.info(
-            "[CompactionScheduleTaskWorker-{}] compaction schedule is 
interrupted", workerId);
-        return null;
+            "[CompactionScheduleTaskWorker-{}] compaction schedule is 
interrupted, isStopByUser: {}",
+            workerId,
+            isStoppedByUser);
+        if (isStoppedByUser) {
+          return null;
+        }
+      } catch (Exception e) {
+        logger.error(
+            "[CompactionScheduleTaskWorker-{}] Failed to execute compaction 
schedule task",
+            workerId,
+            e);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
index e15a908ac24..c7b5271f352 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java
@@ -78,8 +78,17 @@ public class TTLScheduleTask implements Callable<Void> {
           }
         }
       } catch (InterruptedException ignored) {
-        logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
-        return null;
+        boolean isStoppedByUser =
+            
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
+        logger.info(
+            "[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: 
{}",
+            workerId,
+            isStoppedByUser);
+        if (isStoppedByUser) {
+          return null;
+        }
+      } catch (Exception e) {
+        logger.error("[TTLCheckTask-{}] Failed to execute ttl check", 
workerId, e);
       }
     }
   }

Reply via email to