This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_snapshot in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 19f3675355424336f6c58c25cd5089e6bb22b4d9 Author: numminex <[email protected]> AuthorDate: Wed Sep 24 11:22:38 2025 +0200 feat(io_uring): fix get_snapshot command --- Cargo.toml | 1 - core/server/src/shard/namespace.rs | 2 -- core/server/src/shard/system/snapshot/mod.rs | 34 ++++++----------------- core/server/src/shard/system/snapshot/procdump.rs | 18 ++++++------ 4 files changed, 17 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b860c7a0..c37e8557 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,6 @@ async-broadcast = "0.7.2" async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } async-trait = "0.1.89" async_zip = { version = "0.0.18", features = [ - "tokio", "lzma", "bzip2", "xz", diff --git a/core/server/src/shard/namespace.rs b/core/server/src/shard/namespace.rs index 5c66e8db..235003ff 100644 --- a/core/server/src/shard/namespace.rs +++ b/core/server/src/shard/namespace.rs @@ -16,9 +16,7 @@ * under the License. */ -use hash32::{Hasher, Murmur3Hasher}; use iggy_common::Identifier; -use std::hash::Hasher as _; use crate::slab::partitions; diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index f6bf7857..658c8a4b 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -21,18 +21,17 @@ mod procdump; use crate::configs::system::SystemConfig; use crate::shard::IggyShard; use crate::streaming::session::Session; -use async_zip::tokio::write::ZipFileWriter; +use async_zip::base::write::ZipFileWriter; use async_zip::{Compression, ZipEntryBuilder}; -use compio::fs::{File, OpenOptions}; +use compio::fs::{OpenOptions}; use compio::io::{AsyncReadAtExt, AsyncWriteAtExt}; use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, SystemSnapshotType}; -use std::io::Cursor; use std::path::PathBuf; +// TODO: compio has an `process` module, consider using that instead, but read the docs carefully // https://compio.rs/docs/compio/process +use std::process::Command; use std::sync::Arc; use std::time::Instant; use tempfile::NamedTempFile; -use tokio::process::Command; -use tokio_util::compat::TokioAsyncWriteCompatExt; use tracing::{error, info}; impl IggyShard { @@ -54,16 +53,7 @@ impl IggyShard { snapshot_types }; - // TODO: Replace this with - // https://github.com/bearcove/rc-zip - // and impl the monoio async writer, based on this example: - // https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140 - // and rc-zip-tokio crate. - - /* - let cursor = Cursor::new(Vec::new()); - let mut zip_writer = ZipFileWriter::new(cursor.compat_write()); - + let mut zip_writer = ZipFileWriter::new(Vec::new()); let compression = match compression { SnapshotCompression::Stored => Compression::Stored, SnapshotCompression::Deflated => Compression::Deflate, @@ -118,33 +108,27 @@ impl IggyShard { } } } - */ - /* info!( "Snapshot commands {:?} finished in {}", snapshot_types, IggyDuration::new(now.elapsed()) ); - let writer = zip_writer + let zip_data = zip_writer .close() .await .map_err(|_| IggyError::SnapshotFileCompletionFailed)?; - let cursor = writer.into_inner(); - let zip_data = cursor.into_inner(); - info!("Final zip size: {} bytes", zip_data.len()); - */ - Ok(Snapshot::new(vec![])) + Ok(Snapshot::new(zip_data)) } } async fn write_command_output_to_temp_file( command: &mut Command, ) -> Result<NamedTempFile, std::io::Error> { - let output = command.output().await?; + let output = command.output()?; let temp_file = NamedTempFile::new()?; let mut file = OpenOptions::new() .write(true) @@ -169,7 +153,7 @@ async fn get_process_info() -> Result<NamedTempFile, std::io::Error> { .await?; let mut position = 0; - let ps_output = Command::new("ps").arg("aux").output().await?; + let ps_output = Command::new("ps").arg("aux").output()?; let (result, written) = file .write_all_at(b"=== Process List (ps aux) ===\n", 0) .await diff --git a/core/server/src/shard/system/snapshot/procdump.rs b/core/server/src/shard/system/snapshot/procdump.rs index 2cb2add1..71c1bb24 100644 --- a/core/server/src/shard/system/snapshot/procdump.rs +++ b/core/server/src/shard/system/snapshot/procdump.rs @@ -16,8 +16,6 @@ * under the License. */ -use tokio::fs::{self}; - /// Parse the contents of a /proc/[pid]/task/[tid]/stat file into a human-readable format fn parse_stat(contents: &str) -> String { let fields: Vec<&str> = contents.split_whitespace().collect(); @@ -137,7 +135,7 @@ pub async fn get_proc_info() -> Result<String, std::io::Error> { let mut result = String::new(); async fn dump_file(result: &mut String, path: &str) -> Result<(), std::io::Error> { - match fs::read_to_string(path).await { + match std::fs::read_to_string(path) { Ok(contents) => { result.push_str(&format!("=== {path} ===\n")); @@ -150,13 +148,13 @@ pub async fn get_proc_info() -> Result<String, std::io::Error> { result.push_str("\n\n"); } Err(e) => { - if let Ok(metadata) = fs::metadata(path).await { + if let Ok(metadata) = std::fs::metadata(path) { if metadata.is_dir() && path.ends_with("/fd") { result.push_str(&format!("=== {path} (directory) ===\n")); - if let Ok(mut rd) = fs::read_dir(path).await { - while let Ok(Some(entry)) = rd.next_entry().await { + if let Ok(mut rd) = std::fs::read_dir(path) { + while let Some(Ok(entry)) = rd.next() { let fd_path = entry.path(); - match fs::read_link(&fd_path).await { + match std::fs::read_link(&fd_path) { Ok(link) => { result.push_str(&format!( "{} -> {}\n", @@ -189,9 +187,9 @@ pub async fn get_proc_info() -> Result<String, std::io::Error> { dump_file(&mut result, path).await?; } - let mut proc_dir = fs::read_dir("/proc").await?; - while let Ok(Some(entry)) = proc_dir.next_entry().await { - let file_type = entry.file_type().await?; + let mut proc_dir = std::fs::read_dir("/proc")?; + while let Some(Ok(entry)) = proc_dir.next() { + let file_type = entry.file_type()?; if file_type.is_dir() && let Ok(pid) = entry.file_name().to_string_lossy().parse::<u32>() {
