This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new f3f00ea6 feat(tpc): implement async walk_dir and remove_dir_all (#1942)
f3f00ea6 is described below
commit f3f00ea65863ba016b43ac3bb81200a5a1625b60
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jun 30 19:22:40 2025 +0200
feat(tpc): implement async walk_dir and remove_dir_all (#1942)
This closes #1938.
---
core/server/src/bootstrap.rs | 12 ++--
core/server/src/io/fs_utils.rs | 93 +++++++++++++++++++++++++
core/server/src/io/mod.rs | 1 +
core/server/src/main.rs | 6 +-
core/server/src/shard/mod.rs | 3 +-
core/server/src/streaming/partitions/storage.rs | 9 ++-
core/server/src/streaming/streams/storage.rs | 6 +-
core/server/src/streaming/topics/storage.rs | 6 +-
8 files changed, 115 insertions(+), 21 deletions(-)
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 6f046fa8..ddd0599c 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -11,6 +11,7 @@ use tracing::info;
use crate::{
IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV,
configs::{config_provider::ConfigProviderKind, server::ServerConfig,
system::SystemConfig},
+ io::fs_utils,
server_error::ServerError,
shard::transmission::{
connector::{ShardConnector, StopSender},
@@ -22,7 +23,7 @@ use crate::{
utils::file::overwrite,
},
};
-use std::{env, fs::remove_dir_all, ops::Range, path::Path, sync::Arc};
+use std::{env, ops::Range, path::Path, sync::Arc};
pub fn create_shard_connections(
shards_set: Range<usize>,
@@ -59,10 +60,8 @@ pub async fn create_directories(config: &SystemConfig) ->
Result<(), IggyError>
return Err(IggyError::CannotCreateStateDirectory(state_path));
}
let state_log = config.get_state_messages_file_path();
- if !Path::new(&state_log).exists() {
- if let Err(_) = overwrite(&state_log).await {
- return Err(IggyError::CannotCreateStateDirectory(state_log));
- }
+ if !Path::new(&state_log).exists() &&
(overwrite(&state_log).await).is_err() {
+ return Err(IggyError::CannotCreateStateDirectory(state_log));
}
let streams_path = config.get_streams_path();
@@ -71,8 +70,7 @@ pub async fn create_directories(config: &SystemConfig) ->
Result<(), IggyError>
}
let runtime_path = config.get_runtime_path();
- // TODO: Change remove_dir_all to async version, once we implement the dir
walk using monoio `remove_dir` method.
- if Path::new(&runtime_path).exists() &&
remove_dir_all(&runtime_path).is_err() {
+ if Path::new(&runtime_path).exists() &&
fs_utils::remove_dir_all(&runtime_path).await.is_err() {
return Err(IggyError::CannotRemoveRuntimeDirectory(runtime_path));
}
diff --git a/core/server/src/io/fs_utils.rs b/core/server/src/io/fs_utils.rs
new file mode 100644
index 00000000..d9be21e7
--- /dev/null
+++ b/core/server/src/io/fs_utils.rs
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use monoio::fs;
+use std::io;
+use std::path::{Path, PathBuf};
+
+#[derive(Debug, Clone)]
+pub struct DirEntry {
+ pub path: PathBuf,
+ pub is_dir: bool,
+}
+
+impl DirEntry {
+ fn file(path: PathBuf) -> Self {
+ Self {
+ path,
+ is_dir: false,
+ }
+ }
+
+ fn dir(path: PathBuf) -> Self {
+ Self { path, is_dir: true }
+ }
+}
+
+/// Asynchronously walks a directory tree iteratively (without recursion).
+/// Returns all entries with directories listed after their contents to enable
+/// safe deletion (contents before containers).
+/// Symlinks are treated as files and not followed.
+pub async fn walk_dir(root: impl AsRef<Path>) -> io::Result<Vec<DirEntry>> {
+ let root = root.as_ref();
+
+ let metadata = fs::metadata(root).await?;
+ if !metadata.is_dir() {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "path is not a directory",
+ ));
+ }
+
+ let mut files = Vec::new();
+ let mut directories = Vec::new();
+ let mut stack = vec![root.to_path_buf()];
+
+ while let Some(current_dir) = stack.pop() {
+ directories.push(DirEntry::dir(current_dir.clone()));
+
+ for entry in std::fs::read_dir(¤t_dir)? {
+ let entry = entry?;
+ let entry_path = entry.path();
+ let metadata = fs::symlink_metadata(&entry_path).await?;
+
+ if metadata.is_dir() {
+ stack.push(entry_path);
+ } else {
+ files.push(DirEntry::file(entry_path));
+ }
+ }
+ }
+
+ directories.reverse();
+ files.extend(directories);
+ Ok(files)
+}
+
+/// Removes a directory and all its contents.
+/// This is the equivalent of `tokio::fs::remove_dir_all` for monoio.
+/// Uses walk_dir to traverse the directory tree without recursion.
+pub async fn remove_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
+ for entry in walk_dir(path).await? {
+ match entry.is_dir {
+ true => fs::remove_dir(&entry.path).await?,
+ false => fs::remove_file(&entry.path).await?,
+ }
+ }
+ Ok(())
+}
diff --git a/core/server/src/io/mod.rs b/core/server/src/io/mod.rs
index ea2bfab5..686717cd 100644
--- a/core/server/src/io/mod.rs
+++ b/core/server/src/io/mod.rs
@@ -1,3 +1,4 @@
pub mod file;
+pub mod fs_utils;
pub mod reader;
pub mod writer;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 2cd70c4d..53fa2d7b 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -35,6 +35,7 @@ use server::bootstrap::{
};
use server::configs::config_provider::{self};
use server::configs::server::ServerConfig;
+use server::io::fs_utils;
#[cfg(not(feature = "tokio-console"))]
use server::log::logger::Logging;
#[cfg(feature = "tokio-console")]
@@ -98,12 +99,9 @@ fn main() -> Result<(), ServerError> {
"Removing system path at: {} because `--fresh` flag
was set",
system_path
);
- //TODO: Impl dir walk and remove the files
- /*
- if let Err(e) =
tokio::fs::remove_dir_all(&system_path).await {
+ if let Err(e) =
fs_utils::remove_dir_all(&system_path).await {
eprintln!("Failed to remove system path at {}: {}",
system_path, e);
}
- */
}
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index cc5023e2..48bf183a 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -46,6 +46,7 @@ use transmission::connector::{Receiver, ShardConnector,
StopReceiver, StopSender
use crate::{
configs::server::ServerConfig,
+ io::fs_utils,
shard::{
system::info::SystemInfo,
task_registry::TaskRegistry,
@@ -341,7 +342,7 @@ impl IggyShard {
error!(
"Stream with ID: '{stream_id}' was not found in state, but
exists on disk and will be removed."
);
- if let Err(error) = std::fs::remove_dir_all(&dir_entry.path())
{
+ if let Err(error) =
fs_utils::remove_dir_all(&dir_entry.path()).await {
error!("Cannot remove stream directory: {error}");
} else {
warn!("Stream with ID: '{stream_id}' was removed.");
diff --git a/core/server/src/streaming/partitions/storage.rs
b/core/server/src/streaming/partitions/storage.rs
index 954025d1..6621ea19 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -19,6 +19,7 @@
use crate::compat::index_rebuilding::index_rebuilder::IndexRebuilder;
use crate::configs::cache_indexes::CacheIndexesConfig;
use crate::io::file::IggyFile;
+use crate::io::fs_utils;
use crate::state::system::PartitionState;
use crate::streaming::partitions::COMPONENT;
use crate::streaming::partitions::partition::{ConsumerOffset, Partition};
@@ -366,7 +367,10 @@ impl PartitionStorage for FilePartitionStorage {
));
}
- if std::fs::remove_dir_all(&partition.partition_path).is_err() {
+ if fs_utils::remove_dir_all(&partition.partition_path)
+ .await
+ .is_err()
+ {
error!(
"Cannot delete partition directory: {} for partition with ID:
{} for topic with ID: {} for stream with ID: {}.",
partition.partition_path,
@@ -474,8 +478,7 @@ impl PartitionStorage for FilePartitionStorage {
return Ok(());
}
- //TODO: replace with async once its there
- if std::fs::remove_dir_all(path).is_err() {
+ if fs_utils::remove_dir_all(path).await.is_err() {
error!("Cannot delete consumer offsets directory: {}.", path);
return Err(IggyError::CannotDeleteConsumerOffsetsDirectory(
path.to_owned(),
diff --git a/core/server/src/streaming/streams/storage.rs
b/core/server/src/streaming/streams/storage.rs
index 50212014..d9848f03 100644
--- a/core/server/src/streaming/streams/storage.rs
+++ b/core/server/src/streaming/streams/storage.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use crate::io::fs_utils;
use crate::state::system::StreamState;
use crate::streaming::storage::StreamStorage;
use crate::streaming::streams::COMPONENT;
@@ -72,8 +73,7 @@ impl StreamStorage for FileStreamStorage {
error!(
"Topic with ID: '{topic_id}' for stream with ID:
'{stream_id}' was not found in state, but exists on disk and will be removed."
);
- // TODO: Replace this with the dir walk impl that is mentioed
in main function.
- if let Err(error) = std::fs::remove_dir_all(&dir_entry.path())
{
+ if let Err(error) =
fs_utils::remove_dir_all(&dir_entry.path()).await {
error!("Cannot remove topic directory: {error}");
} else {
warn!(
@@ -223,7 +223,7 @@ impl StreamStorage for FileStreamStorage {
async fn delete(&self, stream: &Stream) -> Result<(), IggyError> {
info!("Deleting stream with ID: {}...", stream.stream_id);
- if std::fs::remove_dir_all(&stream.path).is_err() {
+ if fs_utils::remove_dir_all(&stream.path).await.is_err() {
return
Err(IggyError::CannotDeleteStreamDirectory(stream.stream_id));
}
info!("Deleted stream with ID: {}.", stream.stream_id);
diff --git a/core/server/src/streaming/topics/storage.rs
b/core/server/src/streaming/topics/storage.rs
index 46894d20..8d92b9d1 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use crate::io::fs_utils;
use crate::state::system::TopicState;
use crate::streaming::partitions::partition::Partition;
use crate::streaming::storage::TopicStorage;
@@ -88,8 +89,7 @@ impl TopicStorage for FileTopicStorage {
error!(
"Partition with ID: '{partition_id}' for stream with ID:
'{stream_id}' and topic with ID: '{topic_id}' was not found in state, but
exists on disk and will be removed."
);
- // TODO: Replace this with the dir walk impl that is mentioed
in main function.
- if let Err(error) = std::fs::remove_dir_all(&dir_entry.path())
{
+ if let Err(error) =
fs_utils::remove_dir_all(&dir_entry.path()).await {
error!("Cannot remove partition directory: {error}");
} else {
warn!(
@@ -270,7 +270,7 @@ impl TopicStorage for FileTopicStorage {
async fn delete(&self, topic: &Topic) -> Result<(), IggyError> {
info!("Deleting topic {topic}...");
- if std::fs::remove_dir_all(&topic.path).is_err() {
+ if fs_utils::remove_dir_all(&topic.path).await.is_err() {
return Err(IggyError::CannotDeleteTopicDirectory(
topic.topic_id,
topic.stream_id,