This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c4574b6567a Fix active load cleanup lifecycle (#17947)
c4574b6567a is described below

commit c4574b6567a56bee8b245eeea96602c4cea2aad0
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 16:28:16 2026 +0800

    Fix active load cleanup lifecycle (#17947)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 +++++
 .../iotdb/db/storageengine/StorageEngine.java      |  3 +++
 .../db/storageengine/load/LoadTsFileManager.java   | 13 +++++++++++
 .../storageengine/load/active/ActiveLoadAgent.java |  6 +++++
 .../load/active/ActiveLoadDirScanner.java          | 17 ++++++++++++--
 .../active/ActiveLoadScheduledExecutorService.java | 21 ++++++++++++++---
 .../load/active/ActiveLoadTsFileLoader.java        | 27 +++++++++++++++++++++-
 .../src/test/resources/iotdb-system.properties     |  1 +
 .../conf/iotdb-system.properties.template          | 12 ++++++++++
 10 files changed, 108 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e45085bd4be..7aadfebc917 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -4163,11 +4163,17 @@ public class IoTDBConfig {
   }
 
   public String getLoadActiveListeningPipeDir() {
-    return loadActiveListeningPipeDir;
+    return loadActiveListeningPipeDir == null || 
Objects.equals(loadActiveListeningPipeDir, "")
+        ? extDir
+            + File.separator
+            + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME
+            + File.separator
+            + IoTDBConstant.PIPE_FOLDER_NAME
+        : loadActiveListeningPipeDir;
   }
 
   public void setLoadActiveListeningPipeDir(String loadActiveListeningPipeDir) 
{
-    this.loadActiveListeningPipeDir = loadActiveListeningPipeDir;
+    this.loadActiveListeningPipeDir = 
addDataHomeDir(loadActiveListeningPipeDir);
   }
 
   public String[] getLoadActiveListeningDirs() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 068d44d0540..13352020a15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2503,6 +2503,9 @@ public class IoTDBDescriptor {
     conf.setLoadActiveListeningFailDir(
         properties.getProperty(
             "load_active_listening_fail_dir", 
conf.getLoadActiveListeningFailDir()));
+    conf.setLoadActiveListeningPipeDir(
+        properties.getProperty(
+            "load_active_listening_pipe_dir", 
conf.getLoadActiveListeningPipeDir()));
 
     final long loadActiveListeningCheckIntervalSeconds =
         Long.parseLong(
@@ -2637,6 +2640,9 @@ public class IoTDBDescriptor {
         properties.getProperty(
             "load_active_listening_fail_dir",
             
ConfigurationFileUtils.getConfigurationDefaultValue("load_active_listening_fail_dir")));
+    conf.setLoadActiveListeningPipeDir(
+        properties.getProperty(
+            "load_active_listening_pipe_dir", 
conf.getLoadActiveListeningPipeDir()));
 
     conf.setLoadTsFileSpiltPartitionMaxSize(
         Integer.parseInt(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index db51775281d..532b6878577 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -327,6 +327,7 @@ public class StorageEngine implements IService {
     }
 
     asyncRecoverTsFileResource();
+    loadTsFileManager.start();
   }
 
   private void startTimedService() {
@@ -411,6 +412,7 @@ public class StorageEngine implements IService {
 
   @Override
   public void stop() {
+    loadTsFileManager.stop();
     for (DataRegion dataRegion : dataRegionMap.values()) {
       if (dataRegion != null) {
         
CompactionScheduleTaskManager.getInstance().unregisterDataRegion(dataRegion);
@@ -429,6 +431,7 @@ public class StorageEngine implements IService {
 
   @Override
   public void shutdown(long milliseconds) throws ShutdownException {
+    loadTsFileManager.stop();
     try {
       for (DataRegion dataRegion : dataRegionMap.values()) {
         if (dataRegion != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index e3c6948563a..05e53a90763 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -130,9 +130,22 @@ public class LoadTsFileManager {
   public LoadTsFileManager() {
     registerCleanupTaskExecutor();
     recover();
+  }
+
+  public void start() {
     activeLoadAgent.start();
   }
 
+  public void stop() {
+    activeLoadAgent.stop();
+    synchronized (uuid2CleanupTask) {
+      uuid2CleanupTask.values().forEach(CleanupTask::cancel);
+      uuid2CleanupTask.clear();
+      cleanupTaskQueue.clear();
+    }
+    new 
HashSet<>(uuid2WriterManager.keySet()).forEach(this::forceCloseWriterManager);
+  }
+
   private long getCleanupTaskDelayInMs() {
     return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
index bec3bbe072b..f3532536111 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -63,6 +63,12 @@ public class ActiveLoadAgent {
     activeLoadMetricsCollector.start();
   }
 
+  public synchronized void stop() {
+    activeLoadDirScanner.stop();
+    activeLoadMetricsCollector.stop();
+    activeLoadTsFileLoader.stop();
+  }
+
   /**
    * Clean up all listening directories for active load on DataNode first 
startup. This method will
    * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 2a3ddee11f7..81de3f054cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -54,6 +54,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
 
   private final AtomicReference<String[]> listeningDirsConfig = new 
AtomicReference<>();
+  private final AtomicReference<String> pipeListeningDirConfig = new 
AtomicReference<>();
   private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
 
   private final Set<String> noPermissionDirs = new CopyOnWriteArraySet<>();
@@ -204,8 +205,20 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
       } else {
         listeningDirs.clear();
       }
-      // Hot reload active load listening dir for pipe data sync
-      // Active load is always enabled for pipe data sync
+      if (!Objects.equals(
+          IOTDB_CONFIG.getLoadActiveListeningPipeDir(), 
pipeListeningDirConfig.get())) {
+        synchronized (this) {
+          if (!Objects.equals(
+              IOTDB_CONFIG.getLoadActiveListeningPipeDir(), 
pipeListeningDirConfig.get())) {
+            if (pipeListeningDirConfig.get() != null) {
+              listeningDirs.remove(pipeListeningDirConfig.get());
+            }
+            
pipeListeningDirConfig.set(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
+          }
+        }
+      }
+
+      // Active load is always enabled for pipe data sync.
       listeningDirs.add(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
 
       // Create directories if not exists
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
index 6bd26645840..fb989e7923e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadScheduledExecutorService.java
@@ -46,14 +46,19 @@ public abstract class ActiveLoadScheduledExecutorService {
 
   private static final long MIN_EXECUTION_INTERVAL_SECONDS =
       IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds();
-  private final ScheduledExecutorService scheduledExecutorService;
+  private final ThreadName threadName;
+  private ScheduledExecutorService scheduledExecutorService;
   private Future<?> future;
 
   private final List<Pair<WrappedRunnable, Long>> jobs = new 
CopyOnWriteArrayList<>();
 
   protected ActiveLoadScheduledExecutorService(final ThreadName threadName) {
-    scheduledExecutorService =
-        
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
+    this.threadName = threadName;
+    scheduledExecutorService = newScheduledExecutorService();
+  }
+
+  private ScheduledExecutorService newScheduledExecutorService() {
+    return 
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(threadName.name());
   }
 
   public void register(Runnable runnable) {
@@ -74,6 +79,9 @@ public abstract class ActiveLoadScheduledExecutorService {
 
   public synchronized void start() {
     if (future == null) {
+      if (scheduledExecutorService.isShutdown()) {
+        scheduledExecutorService = newScheduledExecutorService();
+      }
       future =
           ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
               scheduledExecutorService,
@@ -97,5 +105,12 @@ public abstract class ActiveLoadScheduledExecutorService {
       future = null;
       LOGGER.info(StorageEngineMessages.ACTIVE_LOAD_EXECUTOR_STOPPED);
     }
+    scheduledExecutorService.shutdownNow();
+    try {
+      scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.warn(StorageEngineMessages.STILL_NOT_EXIT_AFTER_30S, 
threadName.getName());
+      Thread.currentThread().interrupt();
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 13ec94186b7..61e297a9e86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -156,6 +156,27 @@ public class ActiveLoadTsFileLoader {
     }
   }
 
+  public void stop() {
+    final WrappedThreadPoolExecutor executor = 
activeLoadExecutor.getAndSet(null);
+    if (executor == null) {
+      return;
+    }
+
+    executor.shutdownNow();
+    try {
+      if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn(
+            StorageEngineMessages.STILL_NOT_EXIT_AFTER_30S,
+            ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+      }
+    } catch (final InterruptedException e) {
+      LOGGER.warn(
+          StorageEngineMessages.STILL_NOT_EXIT_AFTER_30S,
+          ThreadName.ACTIVE_LOAD_TSFILE_LOADER.getName());
+      Thread.currentThread().interrupt();
+    }
+  }
+
   private void tryLoadPendingTsFiles() {
     final IClientSession session =
         new InternalClientSession(
@@ -202,18 +223,22 @@ public class ActiveLoadTsFileLoader {
         Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() 
<< 1);
     long currentRetryTimes = 0;
 
-    while (true) {
+    while (!Thread.currentThread().isInterrupted()) {
       final ActiveLoadPendingQueue.ActiveLoadEntry entry = 
pendingQueue.dequeueFromPending();
       if (Objects.nonNull(entry)) {
         return Optional.of(entry);
       }
 
       LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
+      if (Thread.currentThread().isInterrupted()) {
+        return Optional.empty();
+      }
 
       if (currentRetryTimes++ >= maxRetryTimes) {
         return Optional.empty();
       }
     }
+    return Optional.empty();
   }
 
   private TSStatus loadTsFile(
diff --git a/iotdb-core/datanode/src/test/resources/iotdb-system.properties 
b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
index af1d5c3914a..a7a2a08bd1f 100644
--- a/iotdb-core/datanode/src/test/resources/iotdb-system.properties
+++ b/iotdb-core/datanode/src/test/resources/iotdb-system.properties
@@ -32,6 +32,7 @@ udf_lib_dir=target/ext/udf
 trigger_lib_dir=target/ext/trigger
 pipe_lib_dir=target/ext/pipe
 load_active_listening_dirs=target/ext/load/pending
+load_active_listening_pipe_dir=target/ext/load/pipe
 load_active_listening_fail_dir=target/ext/load/failed
 
 ####################
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 1e855a9704c..8e762c0840c 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -2257,6 +2257,18 @@ load_active_listening_enable=true
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 load_active_listening_dirs=ext/load/pending
 
+# The directory to be actively listened for tsfile loading from Pipe.
+# Only one directory can be configured.
+# effectiveMode: hot_reload
+# Datatype: String
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is 
"\\\\", then the path is absolute.
+# Otherwise, it is relative.
+# load_active_listening_pipe_dir=ext\\load\\pipe
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+load_active_listening_pipe_dir=ext/load/pipe
+
 # The directory where tsfile are moved if the active listening mode fails to 
load them.
 # Only one directory can be configured.
 # effectiveMode: hot_reload

Reply via email to