This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc_compio
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 61c0d636ff444ea07682999fe215c1f811b65421
Author: numinex <[email protected]>
AuthorDate: Fri Jul 4 07:51:06 2025 +0200

    cargo fmt
---
 .../src/compat/index_rebuilding/index_rebuilder.rs | 28 ++++++++++++-------
 core/server/src/main.rs                            | 32 +++++++++++-----------
 core/server/src/quic/quic_sender.rs                |  2 +-
 core/server/src/shard/system/snapshot/mod.rs       |  8 ++++--
 core/server/src/shard/system/storage.rs            |  6 ++--
 core/server/src/shard/system/topics.rs             | 12 ++++----
 core/server/src/state/file.rs                      |  2 +-
 core/server/src/streaming/partitions/storage.rs    |  2 +-
 .../src/streaming/segments/indexes/index_reader.rs |  2 +-
 .../streaming/segments/messages/messages_writer.rs |  4 +--
 core/server/src/streaming/segments/messages/mod.rs |  1 -
 core/server/src/tcp/sender.rs                      |  6 ++--
 12 files changed, 58 insertions(+), 47 deletions(-)

diff --git a/core/server/src/compat/index_rebuilding/index_rebuilder.rs 
b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
index 06204bf8..dbc93ebe 100644
--- a/core/server/src/compat/index_rebuilding/index_rebuilder.rs
+++ b/core/server/src/compat/index_rebuilding/index_rebuilder.rs
@@ -16,11 +16,14 @@
  * under the License.
  */
 
-use crate::streaming::utils::file;
 use crate::server_error::CompatError;
+use crate::streaming::utils::file;
 use async_zip::tokio::write;
+use compio::{
+    fs::File,
+    io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, 
BufReader, BufWriter},
+};
 use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use compio::{fs::File, io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, 
AsyncWriteExt, BufReader, BufWriter}};
 use std::io::{Seek, SeekFrom};
 
 pub struct IndexRebuilder {
@@ -57,27 +60,32 @@ impl IndexRebuilder {
         // Write offset (4 bytes) - base_offset + last_offset_delta - 
start_offset
         let offset = start_offset - header.offset;
         debug_assert!(offset <= u32::MAX as u64);
-        let (result, _) = 
writer.write_all(Box::new(offset.to_le_bytes())).await.into();
+        let (result, _) = writer
+            .write_all(Box::new(offset.to_le_bytes()))
+            .await
+            .into();
         result?;
 
         // Write position (4 bytes)
-        let (result, _) = 
writer.write_all(Box::new(position.to_le_bytes())).await.into();
+        let (result, _) = writer
+            .write_all(Box::new(position.to_le_bytes()))
+            .await
+            .into();
         result?;
 
         // Write timestamp (8 bytes)
         let (result, _) = writer
             .write_all(Box::new(header.timestamp.to_le_bytes()))
-            .await.into();
+            .await
+            .into();
         result?;
 
         Ok(())
     }
 
     pub async fn rebuild(&self) -> Result<(), CompatError> {
-        let read_cursor  =  
std::io::Cursor::new(file::open(&self.messages_file_path).await?);
-        let write_cursor = std::io::Cursor::new(
-            file::overwrite(&self.index_path).await?
-        );
+        let read_cursor = 
std::io::Cursor::new(file::open(&self.messages_file_path).await?);
+        let write_cursor = 
std::io::Cursor::new(file::overwrite(&self.index_path).await?);
         let mut reader = BufReader::new(read_cursor);
         let mut writer = BufWriter::new(write_cursor);
         let mut position = 0;
@@ -96,7 +104,7 @@ impl IndexRebuilder {
 
                     // Skip message payload and headers
                     reader.consume(
-                        header.payload_length as usize + 
header.user_headers_length as usize
+                        header.payload_length as usize + 
header.user_headers_length as usize,
                     );
 
                     // Update position for next iteration
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 53151abf..e81cd902 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -30,8 +30,8 @@ use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{Aes256GcmEncryptor, EncryptorKind, IggyError};
 use server::args::Args;
 use server::bootstrap::{
-    create_directories, create_root_user, create_shard_connections,
-    create_shard_executor, load_config, resolve_persister,
+    create_directories, create_root_user, create_shard_connections, 
create_shard_executor,
+    load_config, resolve_persister,
 };
 use server::configs::config_provider::{self};
 use server::configs::server::ServerConfig;
@@ -266,21 +266,21 @@ fn main() -> Result<(), ServerError> {
 
     let shutdown_handles_for_signal = shutdown_handles.clone();
     /*
-    ::set_handler(move || {
-        info!("Received shutdown signal (SIGTERM/SIGINT), initiating graceful 
shutdown...");
-
-        for (shard_id, stop_sender) in &shutdown_handles_for_signal {
-            info!("Sending shutdown signal to shard {}", shard_id);
-            if let Err(e) = stop_sender.send_blocking(()) {
-                error!(
-                    "Failed to send shutdown signal to shard {}: {}",
-                    shard_id, e
-                );
+        ::set_handler(move || {
+            info!("Received shutdown signal (SIGTERM/SIGINT), initiating 
graceful shutdown...");
+
+            for (shard_id, stop_sender) in &shutdown_handles_for_signal {
+                info!("Sending shutdown signal to shard {}", shard_id);
+                if let Err(e) = stop_sender.send_blocking(()) {
+                    error!(
+                        "Failed to send shutdown signal to shard {}: {}",
+                        shard_id, e
+                    );
+                }
             }
-        }
-    })
-    .expect("Error setting Ctrl-C handler");
-*/
+        })
+        .expect("Error setting Ctrl-C handler");
+    */
 
     info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown.");
     for (idx, handle) in handles.into_iter().enumerate() {
diff --git a/core/server/src/quic/quic_sender.rs 
b/core/server/src/quic/quic_sender.rs
index cc10c658..54bda604 100644
--- a/core/server/src/quic/quic_sender.rs
+++ b/core/server/src/quic/quic_sender.rs
@@ -20,9 +20,9 @@ use crate::quic::COMPONENT;
 use crate::streaming::utils::PooledBuffer;
 use crate::{binary::sender::Sender, server_error::ServerError};
 use bytes::BytesMut;
+use compio::buf::{IoBuf, IoBufMut};
 use error_set::ErrContext;
 use iggy_common::IggyError;
-use compio::buf::{IoBuf, IoBufMut};
 use nix::libc;
 use quinn::{RecvStream, SendStream};
 use std::io::IoSlice;
diff --git a/core/server/src/shard/system/snapshot/mod.rs 
b/core/server/src/shard/system/snapshot/mod.rs
index be6ac7dc..f6bf7857 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -23,9 +23,9 @@ use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use async_zip::tokio::write::ZipFileWriter;
 use async_zip::{Compression, ZipEntryBuilder};
+use compio::fs::{File, OpenOptions};
 use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
 use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, 
SystemSnapshotType};
-use compio::fs::{File, OpenOptions};
 use std::io::Cursor;
 use std::path::PathBuf;
 use std::sync::Arc;
@@ -172,7 +172,8 @@ async fn get_process_info() -> Result<NamedTempFile, 
std::io::Error> {
     let ps_output = Command::new("ps").arg("aux").output().await?;
     let (result, written) = file
         .write_all_at(b"=== Process List (ps aux) ===\n", 0)
-        .await.into();
+        .await
+        .into();
     result?;
     position += written.len() as u64;
 
@@ -186,7 +187,8 @@ async fn get_process_info() -> Result<NamedTempFile, 
std::io::Error> {
 
     let (result, written) = file
         .write_all_at(b"=== Detailed Process Information ===\n", position)
-        .await.into();
+        .await
+        .into();
     result?;
     position += written.len() as u64;
 
diff --git a/core/server/src/shard/system/storage.rs 
b/core/server/src/shard/system/storage.rs
index 75104cc2..e1e7cf84 100644
--- a/core/server/src/shard/system/storage.rs
+++ b/core/server/src/shard/system/storage.rs
@@ -24,10 +24,10 @@ use crate::streaming::utils::PooledBuffer;
 use crate::streaming::utils::file;
 use anyhow::Context;
 use compio::io::AsyncReadAtExt;
+use compio::io::AsyncReadExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use std::sync::Arc;
-use compio::io::AsyncReadExt;
 use tracing::info;
 
 #[derive(Debug)]
@@ -62,7 +62,9 @@ impl SystemInfoStorage for FileSystemInfoStorage {
             .map_err(|_| IggyError::CannotReadFileMetadata)?
             .len() as usize;
 
-        let file = file::open(&self.path).await.map_err(|_| 
IggyError::CannotReadFile)?;
+        let file = file::open(&self.path)
+            .await
+            .map_err(|_| IggyError::CannotReadFile)?;
         let mut buffer = PooledBuffer::with_capacity(file_size);
         buffer.put_bytes(0, file_size);
         let (result, buffer) = file.read_exact_at(buffer, 0).await.into();
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 462f47a4..0f8155fb 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -132,9 +132,9 @@ impl IggyShard {
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id} in stream with ID: {stream_id}")
             })?;
-            topic.persist().await.with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to persist 
topic: {topic}")
-            })?;
+        topic.persist().await.with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
+        })?;
 
         // TODO: Figure out a way how to distribute the shards table among 
different shards,
         // without the need to do code from below, everytime we handle a 
`ShardEvent`.
@@ -204,9 +204,9 @@ impl IggyShard {
                 .with_error_context(|error| {
                     format!("{COMPONENT} (error: {error}) - failed to get 
topic with ID: {topic_id} in stream with ID: {stream_id}")
                 })?;
-            topic.persist().await.with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to persist 
topic: {topic}")
-            })?;
+        topic.persist().await.with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
+        })?;
 
         let records = partition_ids.into_iter().map(|partition_id| {
             let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 85a280cb..5716d7bb 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -23,13 +23,13 @@ use crate::streaming::utils::file;
 use crate::versioning::SemanticVersion;
 use bytes::{Buf, BufMut, Bytes, BytesMut};
 use compio::fs::File;
+use compio::io::{AsyncReadExt, AsyncWriteExt};
 use error_set::ErrContext;
 use iggy_common::BytesSerializable;
 use iggy_common::EncryptorKind;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
-use compio::io::{AsyncReadExt, AsyncWriteExt};
 use std::fmt::Debug;
 use std::path::Path;
 use std::sync::Arc;
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
index 69d2cd32..67e3d3f4 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -462,7 +462,7 @@ impl PartitionStorage for FilePartitionStorage {
                 })
                 .map_err(|_| IggyError::CannotReadFile)?;
             // TODO: This is like awfull.
-            let mut cursor = std::io::Cursor::new(file);  
+            let mut cursor = std::io::Cursor::new(file);
             let offset = cursor
                 .read_u64_le()
                 .await
diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs 
b/core/server/src/streaming/segments/indexes/index_reader.rs
index bdb64279..ce235ef4 100644
--- a/core/server/src/streaming/segments/indexes/index_reader.rs
+++ b/core/server/src/streaming/segments/indexes/index_reader.rs
@@ -17,7 +17,7 @@
  */
 
 use super::IggyIndexesMut;
-use crate::{streaming::utils::PooledBuffer};
+use crate::streaming::utils::PooledBuffer;
 use bytes::BytesMut;
 use compio::{
     BufResult,
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 97b8a253..f3d43d1e 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,9 +16,7 @@
  * under the License.
  */
 
-use crate::{
-    streaming::segments::{IggyMessagesBatchSet, messages::write_batch},
-};
+use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch};
 use compio::fs::{File, OpenOptions};
 use error_set::ErrContext;
 use iggy_common::{IggyByteSize, IggyError};
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index cdf25abd..a3989a2e 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,7 +19,6 @@
 mod messages_reader;
 mod messages_writer;
 
-
 use super::IggyMessagesBatchSet;
 use compio::{fs::File, io::AsyncWriteAtExt};
 use iggy_common::IggyError;
diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs
index 61a6ce2b..2c0ecfa9 100644
--- a/core/server/src/tcp/sender.rs
+++ b/core/server/src/tcp/sender.rs
@@ -18,7 +18,9 @@
 
 use bytes::{Bytes, BytesMut};
 use compio::{
-    buf::{IoBuf, IoBufMut}, io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, 
AsyncWriteExt}, BufResult
+    BufResult,
+    buf::{IoBuf, IoBufMut},
+    io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt},
 };
 use iggy_common::IggyError;
 use nix::libc;
@@ -126,7 +128,7 @@ where
     );
     let status = PooledBuffer::from(status);
     let length = PooledBuffer::from(length);
-    slices.splice(0..0,  [status, length]);
+    slices.splice(0..0, [status, length]);
     stream
         .write_vectored_all(slices)
         .await

Reply via email to