This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9397c134b3dc2f3aae930c9037379d195e908f47
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Dec 10 11:39:49 2025 +0800

    Active Load: Add cleanup for active load listening directories on DataNode 
first startup (#16854)
    
    * Add cleanup for active load listening directories on DataNode first 
startup
    
    - Add cleanupListeningDirectories() method in ActiveLoadAgent to clean up 
all listening directories
    - Call cleanup method when DataNode starts for the first time
    - Clean up pending, pipe, and failed directories
    - Silent execution with minimal logging
    
    * update
    
    * fix
    
    (cherry picked from commit bfa71e00e763c62c3d4ef3f5b459d1a814d91ddb)
---
 .../java/org/apache/iotdb/db/service/DataNode.java |  3 +
 .../storageengine/load/active/ActiveLoadAgent.java | 90 ++++++++++++++++++++++
 2 files changed, 93 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index c4b08f8f05a..1fac05012fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -111,6 +111,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
@@ -245,6 +246,8 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
         sendRegisterRequestToConfigNode(false);
         saveSecretKey();
         saveHardwareCode();
+        // Clean up active load listening directories on first startup
+        ActiveLoadAgent.cleanupListeningDirectories();
       } else {
         /* Check encrypt magic string */
         try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
index f060bd9a96f..6065c349c8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -19,8 +19,21 @@
 
 package org.apache.iotdb.db.storageengine.load.active;
 
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 public class ActiveLoadAgent {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadAgent.class);
+
   private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
   private final ActiveLoadDirScanner activeLoadDirScanner;
   private final ActiveLoadMetricsCollector activeLoadMetricsCollector;
@@ -48,4 +61,81 @@ public class ActiveLoadAgent {
     activeLoadDirScanner.start();
     activeLoadMetricsCollector.start();
   }
+
+  /**
+   * Clean up all listening directories for active load on DataNode first 
startup. This method will
+   * clean up all files and subdirectories in the listening directories, 
including: 1. Pending
+   * directories (configured by load_active_listening_dirs) 2. Pipe directory 
(for pipe data sync)
+   * 3. Failed directory (for failed files)
+   *
+   * <p>This method is called during DataNode startup and must not throw any 
exceptions to ensure
+   * startup can proceed normally. All exceptions are caught and logged 
internally.
+   */
+  public static void cleanupListeningDirectories() {
+    try {
+      final List<String> dirsToClean = new ArrayList<>();
+
+      dirsToClean.addAll(
+          
Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+
+      // Add pipe dir
+      
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+      // Add failed dir
+      
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+
+      // Clean up each directory
+      for (final String dirPath : dirsToClean) {
+        try {
+          if (dirPath == null || dirPath.isEmpty()) {
+            continue;
+          }
+
+          final File dir = new File(dirPath);
+
+          // Check if directory exists and is a directory
+          // These methods may throw SecurityException if access is denied
+          try {
+            if (!dir.exists() || !dir.isDirectory()) {
+              continue;
+            }
+          } catch (Exception e) {
+            LOGGER.debug("Failed to check directory: {}", dirPath, e);
+            continue;
+          }
+
+          // Only delete contents inside the directory, not the directory 
itself
+          // listFiles() may throw SecurityException if access is denied
+          File[] files = null;
+          try {
+            files = dir.listFiles();
+          } catch (Exception e) {
+            LOGGER.warn("Failed to list files in directory: {}", dirPath, e);
+            continue;
+          }
+
+          if (files != null) {
+            for (final File file : files) {
+              // FileUtils.deleteFileOrDirectory internally calls 
file.isDirectory() and
+              // file.listFiles() without try-catch, so exceptions may 
propagate here.
+              // We need to catch it to prevent one file failure from stopping 
the cleanup.
+              try {
+                FileUtils.deleteFileOrDirectory(file, true);
+              } catch (Exception e) {
+                LOGGER.debug("Failed to delete file or directory: {}", 
file.getAbsolutePath(), e);
+              }
+            }
+          }
+        } catch (Exception e) {
+          LOGGER.warn("Failed to cleanup directory: {}", dirPath, e);
+        }
+      }
+
+      LOGGER.info("Cleaned up active load listening directories");
+    } catch (Throwable t) {
+      // Catch all exceptions and errors (including OutOfMemoryError, etc.)
+      // to ensure startup process is not affected
+      LOGGER.warn("Unexpected error during cleanup of active load listening 
directories", t);
+    }
+  }
 }

Reply via email to