This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c7b623508a08000c38ca943a079cb06589a4924f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jun 30 19:22:40 2025 +0200

    feat(tpc): implement async walk_dir and remove_dir_all (#1942)
    
    This closes #1938.
---
 core/server/src/bootstrap.rs                    | 12 ++--
 core/server/src/io/fs_utils.rs                  | 93 +++++++++++++++++++++++++
 core/server/src/io/mod.rs                       |  1 +
 core/server/src/main.rs                         |  6 +-
 core/server/src/shard/mod.rs                    |  3 +-
 core/server/src/streaming/partitions/storage.rs |  9 ++-
 core/server/src/streaming/streams/storage.rs    |  6 +-
 core/server/src/streaming/topics/storage.rs     |  6 +-
 8 files changed, 115 insertions(+), 21 deletions(-)

diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 6f046fa8..ddd0599c 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -11,6 +11,7 @@ use tracing::info;
 use crate::{
     IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
     configs::{config_provider::ConfigProviderKind, server::ServerConfig, 
system::SystemConfig},
+    io::fs_utils,
     server_error::ServerError,
     shard::transmission::{
         connector::{ShardConnector, StopSender},
@@ -22,7 +23,7 @@ use crate::{
         utils::file::overwrite,
     },
 };
-use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
+use std::{env, ops::Range, path::Path, sync::Arc};
 
 pub fn create_shard_connections(
     shards_set: Range<usize>,
@@ -59,10 +60,8 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
         return Err(IggyError::CannotCreateStateDirectory(state_path));
     }
     let state_log = config.get_state_messages_file_path();
-    if !Path::new(&state_log).exists() {
-        if let Err(_) = overwrite(&state_log).await {
-            return Err(IggyError::CannotCreateStateDirectory(state_log));
-        }
+    if !Path::new(&state_log).exists() && 
(overwrite(&state_log).await).is_err() {
+        return Err(IggyError::CannotCreateStateDirectory(state_log));
     }
 
     let streams_path = config.get_streams_path();
@@ -71,8 +70,7 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
     }
 
     let runtime_path = config.get_runtime_path();
-    // TODO: Change remove_dir_all to async version, once we implement the dir 
walk using monoio `remove_dir` method.
-    if Path::new(&runtime_path).exists() && 
remove_dir_all(&runtime_path).is_err() {
+    if Path::new(&runtime_path).exists() && 
fs_utils::remove_dir_all(&runtime_path).await.is_err() {
         return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
     }
 
diff --git a/core/server/src/io/fs_utils.rs b/core/server/src/io/fs_utils.rs
new file mode 100644
index 00000000..d9be21e7
--- /dev/null
+++ b/core/server/src/io/fs_utils.rs
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use monoio::fs;
+use std::io;
+use std::path::{Path, PathBuf};
+
+#[derive(Debug, Clone)]
+pub struct DirEntry {
+    pub path: PathBuf,
+    pub is_dir: bool,
+}
+
+impl DirEntry {
+    fn file(path: PathBuf) -> Self {
+        Self {
+            path,
+            is_dir: false,
+        }
+    }
+
+    fn dir(path: PathBuf) -> Self {
+        Self { path, is_dir: true }
+    }
+}
+
+/// Asynchronously walks a directory tree iteratively (without recursion).
+/// Returns all entries with directories listed after their contents to enable
+/// safe deletion (contents before containers).
+/// Symlinks are treated as files and not followed.
+pub async fn walk_dir(root: impl AsRef<Path>) -> io::Result<Vec<DirEntry>> {
+    let root = root.as_ref();
+
+    let metadata = fs::metadata(root).await?;
+    if !metadata.is_dir() {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "path is not a directory",
+        ));
+    }
+
+    let mut files = Vec::new();
+    let mut directories = Vec::new();
+    let mut stack = vec![root.to_path_buf()];
+
+    while let Some(current_dir) = stack.pop() {
+        directories.push(DirEntry::dir(current_dir.clone()));
+
+        for entry in std::fs::read_dir(&current_dir)? {
+            let entry = entry?;
+            let entry_path = entry.path();
+            let metadata = fs::symlink_metadata(&entry_path).await?;
+
+            if metadata.is_dir() {
+                stack.push(entry_path);
+            } else {
+                files.push(DirEntry::file(entry_path));
+            }
+        }
+    }
+
+    directories.reverse();
+    files.extend(directories);
+    Ok(files)
+}
+
+/// Removes a directory and all its contents.
+/// This is the equivalent of `tokio::fs::remove_dir_all` for monoio.
+/// Uses walk_dir to traverse the directory tree without recursion.
+pub async fn remove_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
+    for entry in walk_dir(path).await? {
+        match entry.is_dir {
+            true => fs::remove_dir(&entry.path).await?,
+            false => fs::remove_file(&entry.path).await?,
+        }
+    }
+    Ok(())
+}
diff --git a/core/server/src/io/mod.rs b/core/server/src/io/mod.rs
index ea2bfab5..686717cd 100644
--- a/core/server/src/io/mod.rs
+++ b/core/server/src/io/mod.rs
@@ -1,3 +1,4 @@
 pub mod file;
+pub mod fs_utils;
 pub mod reader;
 pub mod writer;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 2cd70c4d..53fa2d7b 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -35,6 +35,7 @@ use server::bootstrap::{
 };
 use server::configs::config_provider::{self};
 use server::configs::server::ServerConfig;
+use server::io::fs_utils;
 #[cfg(not(feature = "tokio-console"))]
 use server::log::logger::Logging;
 #[cfg(feature = "tokio-console")]
@@ -98,12 +99,9 @@ fn main() -> Result<(), ServerError> {
                         "Removing system path at: {} because `--fresh` flag 
was set",
                         system_path
                     );
-                    //TODO: Impl dir walk and remove the files
-                    /*
-                    if let Err(e) = 
tokio::fs::remove_dir_all(&system_path).await {
+                    if let Err(e) = 
fs_utils::remove_dir_all(&system_path).await {
                         eprintln!("Failed to remove system path at {}: {}", 
system_path, e);
                     }
-                    */
                 }
             }
 
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index cc5023e2..48bf183a 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -46,6 +46,7 @@ use transmission::connector::{Receiver, ShardConnector, 
StopReceiver, StopSender
 
 use crate::{
     configs::server::ServerConfig,
+    io::fs_utils,
     shard::{
         system::info::SystemInfo,
         task_registry::TaskRegistry,
@@ -341,7 +342,7 @@ impl IggyShard {
                 error!(
                     "Stream with ID: '{stream_id}' was not found in state, but 
exists on disk and will be removed."
                 );
-                if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) 
{
+                if let Err(error) = 
fs_utils::remove_dir_all(&dir_entry.path()).await {
                     error!("Cannot remove stream directory: {error}");
                 } else {
                     warn!("Stream with ID: '{stream_id}' was removed.");
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
index 954025d1..6621ea19 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -19,6 +19,7 @@
 use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
 use crate::configs::cache_indexes::CacheIndexesConfig;
 use crate::io::file::IggyFile;
+use crate::io::fs_utils;
 use crate::state::system::PartitionState;
 use crate::streaming::partitions::COMPONENT;
 use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
@@ -366,7 +367,10 @@ impl PartitionStorage for FilePartitionStorage {
             ));
         }
 
-        if std::fs::remove_dir_all(&partition.partition_path).is_err() {
+        if fs_utils::remove_dir_all(&partition.partition_path)
+            .await
+            .is_err()
+        {
             error!(
                 "Cannot delete partition directory: {} for partition with ID: 
{} for topic with ID: {} for stream with ID: {}.",
                 partition.partition_path,
@@ -474,8 +478,7 @@ impl PartitionStorage for FilePartitionStorage {
             return Ok(());
         }
 
-        //TODO: replace with async once its there
-        if std::fs::remove_dir_all(path).is_err() {
+        if fs_utils::remove_dir_all(path).await.is_err() {
             error!("Cannot delete consumer offsets directory: {}.", path);
             return Err(IggyError::CannotDeleteConsumerOffsetsDirectory(
                 path.to_owned(),
diff --git a/core/server/src/streaming/streams/storage.rs 
b/core/server/src/streaming/streams/storage.rs
index 50212014..d9848f03 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::io::fs_utils;
 use crate::state::system::StreamState;
 use crate::streaming::storage::StreamStorage;
 use crate::streaming::streams::COMPONENT;
@@ -72,8 +73,7 @@ impl StreamStorage for FileStreamStorage {
                 error!(
                     "Topic with ID: '{topic_id}' for stream with ID: 
'{stream_id}' was not found in state, but exists on disk and will be removed."
                 );
-                // TODO: Replace this with the dir walk impl that is mentioed 
in main function.
-                if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) 
{
+                if let Err(error) = 
fs_utils::remove_dir_all(&dir_entry.path()).await {
                     error!("Cannot remove topic directory: {error}");
                 } else {
                     warn!(
@@ -223,7 +223,7 @@ impl StreamStorage for FileStreamStorage {
 
     async fn delete(&self, stream: &Stream) -> Result<(), IggyError> {
         info!("Deleting stream with ID: {}...", stream.stream_id);
-        if std::fs::remove_dir_all(&stream.path).is_err() {
+        if fs_utils::remove_dir_all(&stream.path).await.is_err() {
             return 
Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id));
         }
         info!("Deleted stream with ID: {}.", stream.stream_id);
diff --git a/core/server/src/streaming/topics/storage.rs 
b/core/server/src/streaming/topics/storage.rs
index 46894d20..8d92b9d1 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::io::fs_utils;
 use crate::state::system::TopicState;
 use crate::streaming::partitions::partition::Partition;
 use crate::streaming::storage::TopicStorage;
@@ -88,8 +89,7 @@ impl TopicStorage for FileTopicStorage {
                 error!(
                     "Partition with ID: '{partition_id}' for stream with ID: 
'{stream_id}' and topic with ID: '{topic_id}' was not found in state, but 
exists on disk and will be removed."
                 );
-                // TODO: Replace this with the dir walk impl that is mentioed 
in main function.
-                if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) 
{
+                if let Err(error) = 
fs_utils::remove_dir_all(&dir_entry.path()).await {
                     error!("Cannot remove partition directory: {error}");
                 } else {
                     warn!(
@@ -270,7 +270,7 @@ impl TopicStorage for FileTopicStorage {
 
     async fn delete(&self, topic: &Topic) -> Result<(), IggyError> {
         info!("Deleting topic {topic}...");
-        if std::fs::remove_dir_all(&topic.path).is_err() {
+        if fs_utils::remove_dir_all(&topic.path).await.is_err() {
             return Err(IggyError::CannotDeleteTopicDirectory(
                 topic.topic_id,
                 topic.stream_id,

Reply via email to