This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c7b623508a08000c38ca943a079cb06589a4924f Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Jun 30 19:22:40 2025 +0200 feat(tpc): implement async walk_dir and remove_dir_all (#1942) This closes #1938. --- core/server/src/bootstrap.rs | 12 ++-- core/server/src/io/fs_utils.rs | 93 +++++++++++++++++++++++++ core/server/src/io/mod.rs | 1 + core/server/src/main.rs | 6 +- core/server/src/shard/mod.rs | 3 +- core/server/src/streaming/partitions/storage.rs | 9 ++- core/server/src/streaming/streams/storage.rs | 6 +- core/server/src/streaming/topics/storage.rs | 6 +- 8 files changed, 115 insertions(+), 21 deletions(-) diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 6f046fa8..ddd0599c 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -11,6 +11,7 @@ use tracing::info; use crate::{ IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, configs::{config_provider::ConfigProviderKind, server::ServerConfig, system::SystemConfig}, + io::fs_utils, server_error::ServerError, shard::transmission::{ connector::{ShardConnector, StopSender}, @@ -22,7 +23,7 @@ use crate::{ utils::file::overwrite, }, }; -use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc}; +use std::{env, ops::Range, path::Path, sync::Arc}; pub fn create_shard_connections( shards_set: Range<usize>, @@ -59,10 +60,8 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> return Err(IggyError::CannotCreateStateDirectory(state_path)); } let state_log = config.get_state_messages_file_path(); - if !Path::new(&state_log).exists() { - if let Err(_) = overwrite(&state_log).await { - return Err(IggyError::CannotCreateStateDirectory(state_log)); - } + if !Path::new(&state_log).exists() && (overwrite(&state_log).await).is_err() { + return Err(IggyError::CannotCreateStateDirectory(state_log)); } let streams_path = config.get_streams_path(); @@ -71,8 +70,7 @@ pub async fn create_directories(config: &SystemConfig) -> Result<(), IggyError> } let runtime_path = config.get_runtime_path(); - // TODO: Change remove_dir_all to async version, once we implement the dir walk using monoio `remove_dir` method. - if Path::new(&runtime_path).exists() && remove_dir_all(&runtime_path).is_err() { + if Path::new(&runtime_path).exists() && fs_utils::remove_dir_all(&runtime_path).await.is_err() { return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path)); } diff --git a/core/server/src/io/fs_utils.rs b/core/server/src/io/fs_utils.rs new file mode 100644 index 00000000..d9be21e7 --- /dev/null +++ b/core/server/src/io/fs_utils.rs @@ -0,0 +1,93 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use monoio::fs; +use std::io; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone)] +pub struct DirEntry { + pub path: PathBuf, + pub is_dir: bool, +} + +impl DirEntry { + fn file(path: PathBuf) -> Self { + Self { + path, + is_dir: false, + } + } + + fn dir(path: PathBuf) -> Self { + Self { path, is_dir: true } + } +} + +/// Asynchronously walks a directory tree iteratively (without recursion). +/// Returns all entries with directories listed after their contents to enable +/// safe deletion (contents before containers). +/// Symlinks are treated as files and not followed. +pub async fn walk_dir(root: impl AsRef<Path>) -> io::Result<Vec<DirEntry>> { + let root = root.as_ref(); + + let metadata = fs::metadata(root).await?; + if !metadata.is_dir() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "path is not a directory", + )); + } + + let mut files = Vec::new(); + let mut directories = Vec::new(); + let mut stack = vec![root.to_path_buf()]; + + while let Some(current_dir) = stack.pop() { + directories.push(DirEntry::dir(current_dir.clone())); + + for entry in std::fs::read_dir(¤t_dir)? { + let entry = entry?; + let entry_path = entry.path(); + let metadata = fs::symlink_metadata(&entry_path).await?; + + if metadata.is_dir() { + stack.push(entry_path); + } else { + files.push(DirEntry::file(entry_path)); + } + } + } + + directories.reverse(); + files.extend(directories); + Ok(files) +} + +/// Removes a directory and all its contents. +/// This is the equivalent of `tokio::fs::remove_dir_all` for monoio. +/// Uses walk_dir to traverse the directory tree without recursion. +pub async fn remove_dir_all(path: impl AsRef<Path>) -> io::Result<()> { + for entry in walk_dir(path).await? { + match entry.is_dir { + true => fs::remove_dir(&entry.path).await?, + false => fs::remove_file(&entry.path).await?, + } + } + Ok(()) +} diff --git a/core/server/src/io/mod.rs b/core/server/src/io/mod.rs index ea2bfab5..686717cd 100644 --- a/core/server/src/io/mod.rs +++ b/core/server/src/io/mod.rs @@ -1,3 +1,4 @@ pub mod file; +pub mod fs_utils; pub mod reader; pub mod writer; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 2cd70c4d..53fa2d7b 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -35,6 +35,7 @@ use server::bootstrap::{ }; use server::configs::config_provider::{self}; use server::configs::server::ServerConfig; +use server::io::fs_utils; #[cfg(not(feature = "tokio-console"))] use server::log::logger::Logging; #[cfg(feature = "tokio-console")] @@ -98,12 +99,9 @@ fn main() -> Result<(), ServerError> { "Removing system path at: {} because `--fresh` flag was set", system_path ); - //TODO: Impl dir walk and remove the files - /* - if let Err(e) = tokio::fs::remove_dir_all(&system_path).await { + if let Err(e) = fs_utils::remove_dir_all(&system_path).await { eprintln!("Failed to remove system path at {}: {}", system_path, e); } - */ } } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index cc5023e2..48bf183a 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -46,6 +46,7 @@ use transmission::connector::{Receiver, ShardConnector, StopReceiver, StopSender use crate::{ configs::server::ServerConfig, + io::fs_utils, shard::{ system::info::SystemInfo, task_registry::TaskRegistry, @@ -341,7 +342,7 @@ impl IggyShard { error!( "Stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed." ); - if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) { + if let Err(error) = fs_utils::remove_dir_all(&dir_entry.path()).await { error!("Cannot remove stream directory: {error}"); } else { warn!("Stream with ID: '{stream_id}' was removed."); diff --git a/core/server/src/streaming/partitions/storage.rs b/core/server/src/streaming/partitions/storage.rs index 954025d1..6621ea19 100644 --- a/core/server/src/streaming/partitions/storage.rs +++ b/core/server/src/streaming/partitions/storage.rs @@ -19,6 +19,7 @@ use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder; use crate::configs::cache_indexes::CacheIndexesConfig; use crate::io::file::IggyFile; +use crate::io::fs_utils; use crate::state::system::PartitionState; use crate::streaming::partitions::COMPONENT; use crate::streaming::partitions::partition::{ConsumerOffset, Partition}; @@ -366,7 +367,10 @@ impl PartitionStorage for FilePartitionStorage { )); } - if std::fs::remove_dir_all(&partition.partition_path).is_err() { + if fs_utils::remove_dir_all(&partition.partition_path) + .await + .is_err() + { error!( "Cannot delete partition directory: {} for partition with ID: {} for topic with ID: {} for stream with ID: {}.", partition.partition_path, @@ -474,8 +478,7 @@ impl PartitionStorage for FilePartitionStorage { return Ok(()); } - //TODO: replace with async once its there - if std::fs::remove_dir_all(path).is_err() { + if fs_utils::remove_dir_all(path).await.is_err() { error!("Cannot delete consumer offsets directory: {}.", path); return Err(IggyError::CannotDeleteConsumerOffsetsDirectory( path.to_owned(), diff --git a/core/server/src/streaming/streams/storage.rs b/core/server/src/streaming/streams/storage.rs index 50212014..d9848f03 100644 --- a/core/server/src/streaming/streams/storage.rs +++ b/core/server/src/streaming/streams/storage.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::io::fs_utils; use crate::state::system::StreamState; use crate::streaming::storage::StreamStorage; use crate::streaming::streams::COMPONENT; @@ -72,8 +73,7 @@ impl StreamStorage for FileStreamStorage { error!( "Topic with ID: '{topic_id}' for stream with ID: '{stream_id}' was not found in state, but exists on disk and will be removed." ); - // TODO: Replace this with the dir walk impl that is mentioed in main function. - if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) { + if let Err(error) = fs_utils::remove_dir_all(&dir_entry.path()).await { error!("Cannot remove topic directory: {error}"); } else { warn!( @@ -223,7 +223,7 @@ impl StreamStorage for FileStreamStorage { async fn delete(&self, stream: &Stream) -> Result<(), IggyError> { info!("Deleting stream with ID: {}...", stream.stream_id); - if std::fs::remove_dir_all(&stream.path).is_err() { + if fs_utils::remove_dir_all(&stream.path).await.is_err() { return Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id)); } info!("Deleted stream with ID: {}.", stream.stream_id); diff --git a/core/server/src/streaming/topics/storage.rs b/core/server/src/streaming/topics/storage.rs index 46894d20..8d92b9d1 100644 --- a/core/server/src/streaming/topics/storage.rs +++ b/core/server/src/streaming/topics/storage.rs @@ -16,6 +16,7 @@ * under the License. */ +use crate::io::fs_utils; use crate::state::system::TopicState; use crate::streaming::partitions::partition::Partition; use crate::streaming::storage::TopicStorage; @@ -88,8 +89,7 @@ impl TopicStorage for FileTopicStorage { error!( "Partition with ID: '{partition_id}' for stream with ID: '{stream_id}' and topic with ID: '{topic_id}' was not found in state, but exists on disk and will be removed." ); - // TODO: Replace this with the dir walk impl that is mentioed in main function. - if let Err(error) = std::fs::remove_dir_all(&dir_entry.path()) { + if let Err(error) = fs_utils::remove_dir_all(&dir_entry.path()).await { error!("Cannot remove partition directory: {error}"); } else { warn!( @@ -270,7 +270,7 @@ impl TopicStorage for FileTopicStorage { async fn delete(&self, topic: &Topic) -> Result<(), IggyError> { info!("Deleting topic {topic}..."); - if std::fs::remove_dir_all(&topic.path).is_err() { + if fs_utils::remove_dir_all(&topic.path).await.is_err() { return Err(IggyError::CannotDeleteTopicDirectory( topic.topic_id, topic.stream_id,
