This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 678fec43 feat(io_uring): fix get_snapshot command (#2196)
678fec43 is described below

commit 678fec437af1c8ee06d3e738a236576dde5a83dc
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Sep 24 18:54:35 2025 +0200

    feat(io_uring): fix get_snapshot command (#2196)
---
 Cargo.toml                                        |  1 -
 core/server/src/shard/namespace.rs                |  2 --
 core/server/src/shard/system/snapshot/mod.rs      | 34 ++++++-----------------
 core/server/src/shard/system/snapshot/procdump.rs | 18 ++++++------
 4 files changed, 17 insertions(+), 38 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index b860c7a0..c37e8557 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,7 +53,6 @@ async-broadcast = "0.7.2"
 async-dropper = { version = "0.3.1", features = ["tokio", "simple"] }
 async-trait = "0.1.89"
 async_zip = { version = "0.0.18", features = [
-    "tokio",
     "lzma",
     "bzip2",
     "xz",
diff --git a/core/server/src/shard/namespace.rs 
b/core/server/src/shard/namespace.rs
index 5c66e8db..235003ff 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -16,9 +16,7 @@
  * under the License.
  */
 
-use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::Identifier;
-use std::hash::Hasher as _;
 
 use crate::slab::partitions;
 
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index f6bf7857..658c8a4b 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -21,18 +21,17 @@ mod procdump;
 use crate::configs::system::SystemConfig;
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
-use async_zip::tokio::write::ZipFileWriter;
+use async_zip::base::write::ZipFileWriter;
 use async_zip::{Compression, ZipEntryBuilder};
-use compio::fs::{File, OpenOptions};
+use compio::fs::{OpenOptions};
 use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
 use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, 
SystemSnapshotType};
-use std::io::Cursor;
 use std::path::PathBuf;
+// TODO: compio has an `process` module, consider using that instead, but read 
the docs carefully // https://compio.rs/docs/compio/process
+use std::process::Command;
 use std::sync::Arc;
 use std::time::Instant;
 use tempfile::NamedTempFile;
-use tokio::process::Command;
-use tokio_util::compat::TokioAsyncWriteCompatExt;
 use tracing::{error, info};
 
 impl IggyShard {
@@ -54,16 +53,7 @@ impl IggyShard {
             snapshot_types
         };
 
-        // TODO: Replace this with
-        // https://github.com/bearcove/rc-zip
-        // and impl the monoio async writer, based on this example:
-        // https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140
-        // and rc-zip-tokio crate.
-
-        /*
-        let cursor = Cursor::new(Vec::new());
-        let mut zip_writer = ZipFileWriter::new(cursor.compat_write());
-
+        let mut zip_writer = ZipFileWriter::new(Vec::new());
         let compression = match compression {
             SnapshotCompression::Stored => Compression::Stored,
             SnapshotCompression::Deflated => Compression::Deflate,
@@ -118,33 +108,27 @@ impl IggyShard {
                 }
             }
         }
-        */
 
-        /*
         info!(
             "Snapshot commands {:?} finished in {}",
             snapshot_types,
             IggyDuration::new(now.elapsed())
         );
 
-        let writer = zip_writer
+        let zip_data = zip_writer
             .close()
             .await
             .map_err(|_| IggyError::SnapshotFileCompletionFailed)?;
 
-        let cursor = writer.into_inner();
-        let zip_data = cursor.into_inner();
-
         info!("Final zip size: {} bytes", zip_data.len());
-        */
-        Ok(Snapshot::new(vec![]))
+        Ok(Snapshot::new(zip_data))
     }
 }
 
 async fn write_command_output_to_temp_file(
     command: &mut Command,
 ) -> Result<NamedTempFile, std::io::Error> {
-    let output = command.output().await?;
+    let output = command.output()?;
     let temp_file = NamedTempFile::new()?;
     let mut file = OpenOptions::new()
         .write(true)
@@ -169,7 +153,7 @@ async fn get_process_info() -> Result<NamedTempFile, 
std::io::Error> {
         .await?;
 
     let mut position = 0;
-    let ps_output = Command::new("ps").arg("aux").output().await?;
+    let ps_output = Command::new("ps").arg("aux").output()?;
     let (result, written) = file
         .write_all_at(b"=== Process List (ps aux) ===\n", 0)
         .await
diff --git a/core/server/src/shard/system/snapshot/procdump.rs 
b/core/server/src/shard/system/snapshot/procdump.rs
index 2cb2add1..71c1bb24 100644
--- a/core/server/src/shard/system/snapshot/procdump.rs
+++ b/core/server/src/shard/system/snapshot/procdump.rs
@@ -16,8 +16,6 @@
  * under the License.
  */
 
-use tokio::fs::{self};
-
 /// Parse the contents of a /proc/[pid]/task/[tid]/stat file into a 
human-readable format
 fn parse_stat(contents: &str) -> String {
     let fields: Vec<&str> = contents.split_whitespace().collect();
@@ -137,7 +135,7 @@ pub async fn get_proc_info() -> Result<String, 
std::io::Error> {
     let mut result = String::new();
 
     async fn dump_file(result: &mut String, path: &str) -> Result<(), 
std::io::Error> {
-        match fs::read_to_string(path).await {
+        match std::fs::read_to_string(path) {
             Ok(contents) => {
                 result.push_str(&format!("=== {path} ===\n"));
 
@@ -150,13 +148,13 @@ pub async fn get_proc_info() -> Result<String, 
std::io::Error> {
                 result.push_str("\n\n");
             }
             Err(e) => {
-                if let Ok(metadata) = fs::metadata(path).await {
+                if let Ok(metadata) = std::fs::metadata(path) {
                     if metadata.is_dir() && path.ends_with("/fd") {
                         result.push_str(&format!("=== {path} (directory) 
===\n"));
-                        if let Ok(mut rd) = fs::read_dir(path).await {
-                            while let Ok(Some(entry)) = rd.next_entry().await {
+                        if let Ok(mut rd) = std::fs::read_dir(path) {
+                            while let Some(Ok(entry)) = rd.next() {
                                 let fd_path = entry.path();
-                                match fs::read_link(&fd_path).await {
+                                match std::fs::read_link(&fd_path) {
                                     Ok(link) => {
                                         result.push_str(&format!(
                                             "{} -> {}\n",
@@ -189,9 +187,9 @@ pub async fn get_proc_info() -> Result<String, 
std::io::Error> {
         dump_file(&mut result, path).await?;
     }
 
-    let mut proc_dir = fs::read_dir("/proc").await?;
-    while let Ok(Some(entry)) = proc_dir.next_entry().await {
-        let file_type = entry.file_type().await?;
+    let mut proc_dir = std::fs::read_dir("/proc")?;
+    while let Some(Ok(entry)) = proc_dir.next() {
+        let file_type = entry.file_type()?;
         if file_type.is_dir()
             && let Ok(pid) = entry.file_name().to_string_lossy().parse::<u32>()
         {

Reply via email to