This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc_compio in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 61c0d636ff444ea07682999fe215c1f811b65421 Author: numinex <[email protected]> AuthorDate: Fri Jul 4 07:51:06 2025 +0200 cargo fmt --- .../src/compat/index_rebuilding/index_rebuilder.rs | 28 ++++++++++++------- core/server/src/main.rs | 32 +++++++++++----------- core/server/src/quic/quic_sender.rs | 2 +- core/server/src/shard/system/snapshot/mod.rs | 8 ++++-- core/server/src/shard/system/storage.rs | 6 ++-- core/server/src/shard/system/topics.rs | 12 ++++---- core/server/src/state/file.rs | 2 +- core/server/src/streaming/partitions/storage.rs | 2 +- .../src/streaming/segments/indexes/index_reader.rs | 2 +- .../streaming/segments/messages/messages_writer.rs | 4 +-- core/server/src/streaming/segments/messages/mod.rs | 1 - core/server/src/tcp/sender.rs | 6 ++-- 12 files changed, 58 insertions(+), 47 deletions(-) diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs b/core/server/src/compat/index_rebuilding/index_rebuilder.rs index 06204bf8..dbc93ebe 100644 --- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs +++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs @@ -16,11 +16,14 @@ * under the License. */ -use crate::streaming::utils::file; use crate::server_error::CompatError; +use crate::streaming::utils::file; use async_zip::tokio::write; +use compio::{ + fs::File, + io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}, +}; use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader}; -use compio::{fs::File, io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}}; use std::io::{Seek, SeekFrom}; pub struct IndexRebuilder { @@ -57,27 +60,32 @@ impl IndexRebuilder { // Write offset (4 bytes) - base_offset + last_offset_delta - start_offset let offset = start_offset - header.offset; debug_assert!(offset <= u32::MAX as u64); - let (result, _) = writer.write_all(Box::new(offset.to_le_bytes())).await.into(); + let (result, _) = writer + .write_all(Box::new(offset.to_le_bytes())) + .await + .into(); result?; // Write position (4 bytes) - let (result, _) = writer.write_all(Box::new(position.to_le_bytes())).await.into(); + let (result, _) = writer + .write_all(Box::new(position.to_le_bytes())) + .await + .into(); result?; // Write timestamp (8 bytes) let (result, _) = writer .write_all(Box::new(header.timestamp.to_le_bytes())) - .await.into(); + .await + .into(); result?; Ok(()) } pub async fn rebuild(&self) -> Result<(), CompatError> { - let read_cursor = std::io::Cursor::new(file::open(&self.messages_file_path).await?); - let write_cursor = std::io::Cursor::new( - file::overwrite(&self.index_path).await? - ); + let read_cursor = std::io::Cursor::new(file::open(&self.messages_file_path).await?); + let write_cursor = std::io::Cursor::new(file::overwrite(&self.index_path).await?); let mut reader = BufReader::new(read_cursor); let mut writer = BufWriter::new(write_cursor); let mut position = 0; @@ -96,7 +104,7 @@ impl IndexRebuilder { // Skip message payload and headers reader.consume( - header.payload_length as usize + header.user_headers_length as usize + header.payload_length as usize + header.user_headers_length as usize, ); // Update position for next iteration diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 53151abf..e81cd902 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -30,8 +30,8 @@ use iggy_common::defaults::DEFAULT_ROOT_USER_ID; use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError}; use server::args::Args; use server::bootstrap::{ - create_directories, create_root_user, create_shard_connections, - create_shard_executor, load_config, resolve_persister, + create_directories, create_root_user, create_shard_connections, create_shard_executor, + load_config, resolve_persister, }; use server::configs::config_provider::{self}; use server::configs::server::ServerConfig; @@ -266,21 +266,21 @@ fn main() -> Result<(), ServerError> { let shutdown_handles_for_signal = shutdown_handles.clone(); /* - ::set_handler(move || { - info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful shutdown..."); - - for (shard_id, stop_sender) in &shutdown_handles_for_signal { - info!("Sending shutdown signal to shard {}", shard_id); - if let Err(e) = stop_sender.send_blocking(()) { - error!( - "Failed to send shutdown signal to shard {}: {}", - shard_id, e - ); + ::set_handler(move || { + info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful shutdown..."); + + for (shard_id, stop_sender) in &shutdown_handles_for_signal { + info!("Sending shutdown signal to shard {}", shard_id); + if let Err(e) = stop_sender.send_blocking(()) { + error!( + "Failed to send shutdown signal to shard {}: {}", + shard_id, e + ); + } } - } - }) - .expect("Error setting Ctrl-C handler"); -*/ + }) + .expect("Error setting Ctrl-C handler"); + */ info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown."); for (idx, handle) in handles.into_iter().enumerate() { diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index cc10c658..54bda604 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -20,9 +20,9 @@ use crate::quic::COMPONENT; use crate::streaming::utils::PooledBuffer; use crate::{binary::sender::Sender, server_error::ServerError}; use bytes::BytesMut; +use compio::buf::{IoBuf, IoBufMut}; use error_set::ErrContext; use iggy_common::IggyError; -use compio::buf::{IoBuf, IoBufMut}; use nix::libc; use quinn::{RecvStream, SendStream}; use std::io::IoSlice; diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index be6ac7dc..f6bf7857 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -23,9 +23,9 @@ use crate::shard::IggyShard; use crate::streaming::session::Session; use async_zip::tokio::write::ZipFileWriter; use async_zip::{Compression, ZipEntryBuilder}; +use compio::fs::{File, OpenOptions}; use compio::io::{AsyncReadAtExt, AsyncWriteAtExt}; use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, SystemSnapshotType}; -use compio::fs::{File, OpenOptions}; use std::io::Cursor; use std::path::PathBuf; use std::sync::Arc; @@ -172,7 +172,8 @@ async fn get_process_info() -> Result<NamedTempFile, std::io::Error> { let ps_output = Command::new("ps").arg("aux").output().await?; let (result, written) = file .write_all_at(b"=== Process List (ps aux) ===\n", 0) - .await.into(); + .await + .into(); result?; position += written.len() as u64; @@ -186,7 +187,8 @@ async fn get_process_info() -> Result<NamedTempFile, std::io::Error> { let (result, written) = file .write_all_at(b"=== Detailed Process Information ===\n", position) - .await.into(); + .await + .into(); result?; position += written.len() as u64; diff --git a/core/server/src/shard/system/storage.rs b/core/server/src/shard/system/storage.rs index 75104cc2..e1e7cf84 100644 --- a/core/server/src/shard/system/storage.rs +++ b/core/server/src/shard/system/storage.rs @@ -24,10 +24,10 @@ use crate::streaming::utils::PooledBuffer; use crate::streaming::utils::file; use anyhow::Context; use compio::io::AsyncReadAtExt; +use compio::io::AsyncReadExt; use error_set::ErrContext; use iggy_common::IggyError; use std::sync::Arc; -use compio::io::AsyncReadExt; use tracing::info; #[derive(Debug)] @@ -62,7 +62,9 @@ impl SystemInfoStorage for FileSystemInfoStorage { .map_err(|_| IggyError::CannotReadFileMetadata)? .len() as usize; - let file = file::open(&self.path).await.map_err(|_| IggyError::CannotReadFile)?; + let file = file::open(&self.path) + .await + .map_err(|_| IggyError::CannotReadFile)?; let mut buffer = PooledBuffer::with_capacity(file_size); buffer.put_bytes(0, file_size); let (result, buffer) = file.read_exact_at(buffer, 0).await.into(); diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 462f47a4..0f8155fb 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -132,9 +132,9 @@ impl IggyShard { .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") })?; - topic.persist().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") - })?; + topic.persist().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") + })?; // TODO: Figure out a way how to distribute the shards table among different shards, // without the need to do code from below, everytime we handle a `ShardEvent`. @@ -204,9 +204,9 @@ impl IggyShard { .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") })?; - topic.persist().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") - })?; + topic.persist().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") + })?; let records = partition_ids.into_iter().map(|partition_id| { let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index 85a280cb..5716d7bb 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -23,13 +23,13 @@ use crate::streaming::utils::file; use crate::versioning::SemanticVersion; use bytes::{Buf, BufMut, Bytes, BytesMut}; use compio::fs::File; +use compio::io::{AsyncReadExt, AsyncWriteExt}; use error_set::ErrContext; use iggy_common::BytesSerializable; use iggy_common::EncryptorKind; use iggy_common::IggyByteSize; use iggy_common::IggyError; use iggy_common::IggyTimestamp; -use compio::io::{AsyncReadExt, AsyncWriteExt}; use std::fmt::Debug; use std::path::Path; use std::sync::Arc; diff --git a/core/server/src/streaming/partitions/storage.rs b/core/server/src/streaming/partitions/storage.rs index 69d2cd32..67e3d3f4 100644 --- a/core/server/src/streaming/partitions/storage.rs +++ b/core/server/src/streaming/partitions/storage.rs @@ -462,7 +462,7 @@ impl PartitionStorage for FilePartitionStorage { }) .map_err(|_| IggyError::CannotReadFile)?; // TODO: This is like awfull. - let mut cursor = std::io::Cursor::new(file); + let mut cursor = std::io::Cursor::new(file); let offset = cursor .read_u64_le() .await diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs b/core/server/src/streaming/segments/indexes/index_reader.rs index bdb64279..ce235ef4 100644 --- a/core/server/src/streaming/segments/indexes/index_reader.rs +++ b/core/server/src/streaming/segments/indexes/index_reader.rs @@ -17,7 +17,7 @@ */ use super::IggyIndexesMut; -use crate::{streaming::utils::PooledBuffer}; +use crate::streaming::utils::PooledBuffer; use bytes::BytesMut; use compio::{ BufResult, diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index 97b8a253..f3d43d1e 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -16,9 +16,7 @@ * under the License. */ -use crate::{ - streaming::segments::{IggyMessagesBatchSet, messages::write_batch}, -}; +use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch}; use compio::fs::{File, OpenOptions}; use error_set::ErrContext; use iggy_common::{IggyByteSize, IggyError}; diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index cdf25abd..a3989a2e 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -19,7 +19,6 @@ mod messages_reader; mod messages_writer; - use super::IggyMessagesBatchSet; use compio::{fs::File, io::AsyncWriteAtExt}; use iggy_common::IggyError; diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 61a6ce2b..2c0ecfa9 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -18,7 +18,9 @@ use bytes::{Bytes, BytesMut}; use compio::{ - buf::{IoBuf, IoBufMut}, io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt}, BufResult + BufResult, + buf::{IoBuf, IoBufMut}, + io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt}, }; use iggy_common::IggyError; use nix::libc; @@ -126,7 +128,7 @@ where ); let status = PooledBuffer::from(status); let length = PooledBuffer::from(length); - slices.splice(0..0, [status, length]); + slices.splice(0..0, [status, length]); stream .write_vectored_all(slices) .await
