This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 678fec43 feat(io_uring): fix get_snapshot command (#2196)
678fec43 is described below
commit 678fec437af1c8ee06d3e738a236576dde5a83dc
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Sep 24 18:54:35 2025 +0200
feat(io_uring): fix get_snapshot command (#2196)
---
Cargo.toml | 1 -
core/server/src/shard/namespace.rs | 2 --
core/server/src/shard/system/snapshot/mod.rs | 34 ++++++-----------------
core/server/src/shard/system/snapshot/procdump.rs | 18 ++++++------
4 files changed, 17 insertions(+), 38 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index b860c7a0..c37e8557 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -53,7 +53,6 @@ async-broadcast = "0.7.2"
async-dropper = { version = "0.3.1", features = ["tokio", "simple"] }
async-trait = "0.1.89"
async_zip = { version = "0.0.18", features = [
- "tokio",
"lzma",
"bzip2",
"xz",
diff --git a/core/server/src/shard/namespace.rs
b/core/server/src/shard/namespace.rs
index 5c66e8db..235003ff 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -16,9 +16,7 @@
* under the License.
*/
-use hash32::{Hasher, Murmur3Hasher};
use iggy_common::Identifier;
-use std::hash::Hasher as _;
use crate::slab::partitions;
diff --git a/core/server/src/shard/system/snapshot/mod.rs
b/core/server/src/shard/system/snapshot/mod.rs
index f6bf7857..658c8a4b 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -21,18 +21,17 @@ mod procdump;
use crate::configs::system::SystemConfig;
use crate::shard::IggyShard;
use crate::streaming::session::Session;
-use async_zip::tokio::write::ZipFileWriter;
+use async_zip::base::write::ZipFileWriter;
use async_zip::{Compression, ZipEntryBuilder};
-use compio::fs::{File, OpenOptions};
+use compio::fs::{OpenOptions};
use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression,
SystemSnapshotType};
-use std::io::Cursor;
use std::path::PathBuf;
+// TODO: compio has an `process` module, consider using that instead, but read
the docs carefully // https://compio.rs/docs/compio/process
+use std::process::Command;
use std::sync::Arc;
use std::time::Instant;
use tempfile::NamedTempFile;
-use tokio::process::Command;
-use tokio_util::compat::TokioAsyncWriteCompatExt;
use tracing::{error, info};
impl IggyShard {
@@ -54,16 +53,7 @@ impl IggyShard {
snapshot_types
};
- // TODO: Replace this with
- // https://github.com/bearcove/rc-zip
- // and impl the monoio async writer, based on this example:
- // https://youtu.be/RYHYiXMJdZI?si=d2roKeHn5lJrw2ri&t=1140
- // and rc-zip-tokio crate.
-
- /*
- let cursor = Cursor::new(Vec::new());
- let mut zip_writer = ZipFileWriter::new(cursor.compat_write());
-
+ let mut zip_writer = ZipFileWriter::new(Vec::new());
let compression = match compression {
SnapshotCompression::Stored => Compression::Stored,
SnapshotCompression::Deflated => Compression::Deflate,
@@ -118,33 +108,27 @@ impl IggyShard {
}
}
}
- */
- /*
info!(
"Snapshot commands {:?} finished in {}",
snapshot_types,
IggyDuration::new(now.elapsed())
);
- let writer = zip_writer
+ let zip_data = zip_writer
.close()
.await
.map_err(|_| IggyError::SnapshotFileCompletionFailed)?;
- let cursor = writer.into_inner();
- let zip_data = cursor.into_inner();
-
info!("Final zip size: {} bytes", zip_data.len());
- */
- Ok(Snapshot::new(vec![]))
+ Ok(Snapshot::new(zip_data))
}
}
async fn write_command_output_to_temp_file(
command: &mut Command,
) -> Result<NamedTempFile, std::io::Error> {
- let output = command.output().await?;
+ let output = command.output()?;
let temp_file = NamedTempFile::new()?;
let mut file = OpenOptions::new()
.write(true)
@@ -169,7 +153,7 @@ async fn get_process_info() -> Result<NamedTempFile,
std::io::Error> {
.await?;
let mut position = 0;
- let ps_output = Command::new("ps").arg("aux").output().await?;
+ let ps_output = Command::new("ps").arg("aux").output()?;
let (result, written) = file
.write_all_at(b"=== Process List (ps aux) ===\n", 0)
.await
diff --git a/core/server/src/shard/system/snapshot/procdump.rs
b/core/server/src/shard/system/snapshot/procdump.rs
index 2cb2add1..71c1bb24 100644
--- a/core/server/src/shard/system/snapshot/procdump.rs
+++ b/core/server/src/shard/system/snapshot/procdump.rs
@@ -16,8 +16,6 @@
* under the License.
*/
-use tokio::fs::{self};
-
/// Parse the contents of a /proc/[pid]/task/[tid]/stat file into a
human-readable format
fn parse_stat(contents: &str) -> String {
let fields: Vec<&str> = contents.split_whitespace().collect();
@@ -137,7 +135,7 @@ pub async fn get_proc_info() -> Result<String,
std::io::Error> {
let mut result = String::new();
async fn dump_file(result: &mut String, path: &str) -> Result<(),
std::io::Error> {
- match fs::read_to_string(path).await {
+ match std::fs::read_to_string(path) {
Ok(contents) => {
result.push_str(&format!("=== {path} ===\n"));
@@ -150,13 +148,13 @@ pub async fn get_proc_info() -> Result<String,
std::io::Error> {
result.push_str("\n\n");
}
Err(e) => {
- if let Ok(metadata) = fs::metadata(path).await {
+ if let Ok(metadata) = std::fs::metadata(path) {
if metadata.is_dir() && path.ends_with("/fd") {
result.push_str(&format!("=== {path} (directory)
===\n"));
- if let Ok(mut rd) = fs::read_dir(path).await {
- while let Ok(Some(entry)) = rd.next_entry().await {
+ if let Ok(mut rd) = std::fs::read_dir(path) {
+ while let Some(Ok(entry)) = rd.next() {
let fd_path = entry.path();
- match fs::read_link(&fd_path).await {
+ match std::fs::read_link(&fd_path) {
Ok(link) => {
result.push_str(&format!(
"{} -> {}\n",
@@ -189,9 +187,9 @@ pub async fn get_proc_info() -> Result<String,
std::io::Error> {
dump_file(&mut result, path).await?;
}
- let mut proc_dir = fs::read_dir("/proc").await?;
- while let Ok(Some(entry)) = proc_dir.next_entry().await {
- let file_type = entry.file_type().await?;
+ let mut proc_dir = std::fs::read_dir("/proc")?;
+ while let Some(Ok(entry)) = proc_dir.next() {
+ let file_type = entry.file_type()?;
if file_type.is_dir()
&& let Ok(pid) = entry.file_name().to_string_lossy().parse::<u32>()
{