This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch io_uring_tpc_direct_io
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc_direct_io by this
push:
new b888e603 4096 alignment
b888e603 is described below
commit b888e603bd99ff3132420738975f01a23371384a
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jul 9 19:56:20 2025 +0200
4096 alignment
---
.../handlers/messages/send_messages_handler.rs | 6 +++---
core/server/src/streaming/segments/direct_file.rs | 22 +++++-----------------
core/server/src/streaming/utils/memory_pool.rs | 2 +-
3 files changed, 9 insertions(+), 21 deletions(-)
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 6d155bca..6e9438fb 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -25,7 +25,7 @@ use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
use crate::streaming::session::Session;
-use crate::streaming::utils::PooledBuffer;
+use crate::streaming::utils::{ALIGNMENT, PooledBuffer};
use anyhow::Result;
use compio::buf::{IntoInner as _, IoBuf};
use iggy_common::Identifier;
@@ -92,7 +92,7 @@ impl ServerCommandHandler for SendMessages {
);
let indexes_size = messages_count as usize * INDEX_SIZE;
- let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size +
512); // extra space for possible padding to not cause reallocations
+ let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size +
ALIGNMENT); // extra space for possible padding to not cause reallocations
let indexes_buffer = sender
.read(indexes_buffer.slice(0..indexes_size))
.await?
@@ -101,7 +101,7 @@ impl ServerCommandHandler for SendMessages {
let messages_size =
total_payload_size - metadata_size as usize - indexes_size -
metadata_len_field_size;
- let mut messages_buffer = PooledBuffer::with_capacity(messages_size +
512); // extra space for possible padding to not cause reallocations
+ let mut messages_buffer = PooledBuffer::with_capacity(messages_size +
ALIGNMENT); // extra space for possible padding to not cause reallocations
let messages_buffer = sender
.read(messages_buffer.slice(0..messages_size))
.await?
diff --git a/core/server/src/streaming/segments/direct_file.rs
b/core/server/src/streaming/segments/direct_file.rs
index 7de7c4b3..76981c0b 100644
--- a/core/server/src/streaming/segments/direct_file.rs
+++ b/core/server/src/streaming/segments/direct_file.rs
@@ -17,7 +17,7 @@
*/
use crate::streaming::utils::{ALIGNMENT, PooledBuffer};
-use compio::buf::{IntoInner, IoBuf};
+use compio::buf::IoBuf;
use compio::fs::{File, OpenOptions};
use compio::io::AsyncWriteAtExt;
use error_set::ErrContext;
@@ -25,7 +25,7 @@ use iggy_common::IggyError;
const O_DIRECT: i32 = 0x4000;
const O_DSYNC: i32 = 0x1000;
-
+const SCRATCH_SIZE: usize = ALIGNMENT * 8;
/// Cache line padding to prevent false sharing
/// Most modern CPUs have 64-byte cache lines
#[repr(align(64))]
@@ -34,9 +34,9 @@ struct Padded<T>(T);
/// Stack-allocated scratch buffer for small writes
/// Aligned to 4KiB for optimal performance
-#[repr(align(512))]
+#[repr(align(4096))]
#[derive(Debug)]
-struct ScratchBuffer([u8; 4096]);
+struct ScratchBuffer([u8; SCRATCH_SIZE]);
/// A wrapper that allows us to pass a slice of ScratchBuffer as IoBuf
/// This is safe because DirectFile owns the ScratchBuffer for its entire
lifetime
@@ -124,7 +124,7 @@ impl DirectFile {
tail: PooledBuffer::with_capacity(ALIGNMENT),
tail_len: Padded(0),
spare: PooledBuffer::with_capacity(ALIGNMENT),
- scratch: ScratchBuffer([0u8; 4096]),
+ scratch: ScratchBuffer([0u8; SCRATCH_SIZE]),
})
}
@@ -142,18 +142,6 @@ impl DirectFile {
.map(|metadata| metadata.len())
}
- fn new(file: File, file_path: String, initial_position: u64) -> Self {
- Self {
- file_path,
- file,
- file_position: initial_position,
- tail: PooledBuffer::with_capacity(ALIGNMENT),
- tail_len: Padded(0),
- spare: PooledBuffer::with_capacity(ALIGNMENT),
- scratch: ScratchBuffer([0u8; 4096]),
- }
- }
-
/// Write data from an owned PooledBuffer with zero-copy optimization
///
/// This method takes ownership of the buffer to satisfy the 'static
lifetime
diff --git a/core/server/src/streaming/utils/memory_pool.rs
b/core/server/src/streaming/utils/memory_pool.rs
index df0783b1..5d24a040 100644
--- a/core/server/src/streaming/utils/memory_pool.rs
+++ b/core/server/src/streaming/utils/memory_pool.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tracing::{error, info, trace, warn};
-pub const ALIGNMENT: usize = 512;
+pub const ALIGNMENT: usize = 4096;
pub type Align512 = ConstAlign<ALIGNMENT>;
pub type AlignedBuffer = AVec<u8, Align512>;