This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fea527da259 [To dev/1.3] Active Load: Add cleanup for active load
listening directories on DataNode first startup (#16854) (#16866)
fea527da259 is described below
commit fea527da2590ded327504186122a3dd5410b45b3
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Dec 11 09:18:53 2025 +0800
[To dev/1.3] Active Load: Add cleanup for active load listening directories
on DataNode first startup (#16854) (#16866)
* Add cleanup for active load listening directories on DataNode first
startup
- Add cleanupListeningDirectories() method in ActiveLoadAgent to clean up
all listening directories
- Call cleanup method when DataNode starts for the first time
- Clean up pending, pipe, and failed directories
- Silent execution with minimal logging
# Conflicts:
#
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
* update
* fix
---
.../java/org/apache/iotdb/db/service/DataNode.java | 3 +
.../storageengine/load/active/ActiveLoadAgent.java | 90 ++++++++++++++++++++++
2 files changed, 93 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 147b8783694..c557ac61b61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -100,6 +100,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
@@ -223,6 +224,8 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
ConfigNodeInfo.getInstance().storeConfigNodeList();
// Register this DataNode to the cluster when first start
sendRegisterRequestToConfigNode(false);
+ // Clean up active load listening directories on first startup
+ ActiveLoadAgent.cleanupListeningDirectories();
} else {
// Send restart request of this DataNode
sendRestartRequestToConfigNode();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
index f060bd9a96f..6065c349c8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java
@@ -19,8 +19,21 @@
package org.apache.iotdb.db.storageengine.load.active;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
public class ActiveLoadAgent {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadAgent.class);
+
private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
private final ActiveLoadDirScanner activeLoadDirScanner;
private final ActiveLoadMetricsCollector activeLoadMetricsCollector;
@@ -48,4 +61,81 @@ public class ActiveLoadAgent {
activeLoadDirScanner.start();
activeLoadMetricsCollector.start();
}
+
+ /**
+ * Clean up all listening directories for active load on DataNode first
startup. This method will
+ * clean up all files and subdirectories in the listening directories,
including: 1. Pending
+ * directories (configured by load_active_listening_dirs) 2. Pipe directory
(for pipe data sync)
+ * 3. Failed directory (for failed files)
+ *
+ * <p>This method is called during DataNode startup and must not throw any
exceptions to ensure
+ * startup can proceed normally. All exceptions are caught and logged
internally.
+ */
+ public static void cleanupListeningDirectories() {
+ try {
+ final List<String> dirsToClean = new ArrayList<>();
+
+ dirsToClean.addAll(
+
Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
+
+ // Add pipe dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
+
+ // Add failed dir
+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
+
+ // Clean up each directory
+ for (final String dirPath : dirsToClean) {
+ try {
+ if (dirPath == null || dirPath.isEmpty()) {
+ continue;
+ }
+
+ final File dir = new File(dirPath);
+
+ // Check if directory exists and is a directory
+ // These methods may throw SecurityException if access is denied
+ try {
+ if (!dir.exists() || !dir.isDirectory()) {
+ continue;
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Failed to check directory: {}", dirPath, e);
+ continue;
+ }
+
+ // Only delete contents inside the directory, not the directory
itself
+ // listFiles() may throw SecurityException if access is denied
+ File[] files = null;
+ try {
+ files = dir.listFiles();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to list files in directory: {}", dirPath, e);
+ continue;
+ }
+
+ if (files != null) {
+ for (final File file : files) {
+ // FileUtils.deleteFileOrDirectory internally calls
file.isDirectory() and
+ // file.listFiles() without try-catch, so exceptions may
propagate here.
+ // We need to catch it to prevent one file failure from stopping
the cleanup.
+ try {
+ FileUtils.deleteFileOrDirectory(file, true);
+ } catch (Exception e) {
+ LOGGER.debug("Failed to delete file or directory: {}",
file.getAbsolutePath(), e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to cleanup directory: {}", dirPath, e);
+ }
+ }
+
+ LOGGER.info("Cleaned up active load listening directories");
+ } catch (Throwable t) {
+ // Catch all exceptions and errors (including OutOfMemoryError, etc.)
+ // to ensure startup process is not affected
+ LOGGER.warn("Unexpected error during cleanup of active load listening
directories", t);
+ }
+ }
}