This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch two_halves in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c4e71b57a7a80ce62252164b17b19808dba3601a Author: numinex <[email protected]> AuthorDate: Mon Mar 16 14:31:15 2026 +0100 feat(server): implement TwoHalves buffer --- Cargo.lock | 4 + Cargo.toml | 3 +- core/buf/Cargo.toml | 6 + core/buf/src/lib.rs | 381 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 393 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 6f4d7aa02..3d81c8fd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1538,6 +1538,10 @@ dependencies = [ "serde", ] +[[package]] +name = "buf" +version = "0.1.0" + [[package]] name = "built" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index d91476771..785594a86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,8 @@ members = [ "core/bench/dashboard/shared", "core/bench/report", "core/bench/runner", - "core/binary_protocol", + "core/binary_protocol", + "core/buf", "core/cli", "core/clock", "core/common", diff --git a/core/buf/Cargo.toml b/core/buf/Cargo.toml new file mode 100644 index 000000000..35387e669 --- /dev/null +++ b/core/buf/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "buf" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/core/buf/src/lib.rs b/core/buf/src/lib.rs new file mode 100644 index 000000000..e6b4065d8 --- /dev/null +++ b/core/buf/src/lib.rs @@ -0,0 +1,381 @@ +use std::mem::ManuallyDrop; +use std::ptr::NonNull; +use std::slice; +use std::sync::atomic::{AtomicUsize, Ordering, fence}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Owned { + inner: Vec<u8>, +} + +#[derive(Clone, Copy)] +struct Half { + ptr: NonNull<u8>, + len: usize, + ctrlb: NonNull<ControlBlock>, +} + +struct ControlBlock { + ref_count: AtomicUsize, + base: NonNull<u8>, + len: usize, + cap: usize, +} + +fn create_control_block(base: NonNull<u8>, len: usize, cap: usize) -> NonNull<ControlBlock> { + let ctrlb = Box::new(ControlBlock { + ref_count: AtomicUsize::new(1), + base, + len, + cap, + }); + // SAFETY: `ctrlb` is a valid control block for the lifetime of the returned halves. + unsafe { NonNull::new_unchecked(Box::into_raw(ctrlb)) } +} + +pub struct TwoHalves { + buf: (Half, Half), + split_at: usize, +} + +impl From<Vec<u8>> for Owned { + fn from(vec: Vec<u8>) -> Self { + Self { inner: vec } + } +} + +impl From<Owned> for Vec<u8> { + fn from(value: Owned) -> Self { + value.inner + } +} + +impl Owned { + pub fn from_vec(vec: Vec<u8>) -> Self { + Self { inner: vec } + } + + pub fn as_slice(&self) -> &[u8] { + &self.inner + } + + pub fn as_mut_slice(&mut self) -> &mut [u8] { + &mut self.inner + } + + pub fn split_at(self, split_at: usize) -> TwoHalves { + assert!(split_at <= self.inner.len()); + // Transfering ownership of the `Inner` to `TwoHalves`, which is from now on responsible for dropping it. + let mut inner = ManuallyDrop::new(self.inner); + let len = inner.len(); + let cap = inner.capacity(); + + // SAFETY: both pointers are constructed from the same `Inner` allocation, the split_at bounds are validated. + // The control block captures original `Inner` metadata to allow reconstructing the original frame for merging/dropping. + // The ptr provenence rules are maintained by the use of `NonNull` apis. + let ptr = inner.as_mut_ptr(); + let base = unsafe { NonNull::new_unchecked(ptr) }; + let other = unsafe { NonNull::new_unchecked(ptr.add(split_at)) }; + + let ctrlb = create_control_block(base, len, cap); + TwoHalves { + buf: ( + Half { + ptr: base, + len: split_at, + ctrlb, + }, + Half { + ptr: other, + len: len - split_at, + ctrlb, + }, + ), + split_at, + } + } +} + +impl Half { + fn as_slice(&self) -> &[u8] { + // SAFETY: `ptr,len` always describe a live allocation owned by `ctrlb`. + unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } + } + + unsafe fn as_mut_slice(&mut self) -> &mut [u8] { + // SAFETY: caller must provide the safety guarantees for mutable access. + unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } + + fn share(&self) -> Self { + // SAFETY: `ctrlb` points to a valid control block for the lifetime of this half. + unsafe { + self.ctrlb + .as_ref() + .ref_count + .fetch_add(1, Ordering::Relaxed); + } + *self + } + + fn copy_from_slice(src: &[u8]) -> Self { + let mut vec = Vec::with_capacity(src.len()); + vec.extend_from_slice(src); + + // Transfering ownership of the `Inner` to `Half`, which is from now on responsible for dropping it. + let mut inner = ManuallyDrop::new(vec); + let ptr = inner.as_mut_ptr(); + let base = unsafe { NonNull::new_unchecked(ptr) }; + let len = inner.len(); + let cap = inner.capacity(); + + let ctrlb = create_control_block(base, len, cap); + Self { + ptr: base, + len, + ctrlb, + } + } +} + +/// Drops the control block, together with associated allocation if this is the last reference. +/// This is used for both halves, so it must be careful to only drop the shared allocation once. +unsafe fn release_control_block_w_allocation(ctrlb: NonNull<ControlBlock>) { + // SAFETY: caller guarantees `ctrlb` points to a live control block. + let old = unsafe { ctrlb.as_ref() } + .ref_count + .fetch_sub(1, Ordering::Release); + debug_assert!(old > 0, "control block refcount underflow"); + if old != 1 { + return; + } + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the decreasing + // of the reference count synchronizes with this `Acquire` fence. This + // means that use of the data happens before decreasing the reference + // count, which happens before this fence, which happens before the + // deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // > It is important to enforce any possible access to the object in one + // > thread (through an existing reference) to *happen before* deleting + // > the object in a different thread. This is achieved by a "release" + // > operation after dropping a reference (any access to the object + // > through this reference must obviously happened before), and an + // > "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + // + fence(Ordering::Acquire); + + // SAFETY: refcount reached zero, so this control block is uniquely owned here. + let ctrlb = unsafe { Box::from_raw(ctrlb.as_ptr()) }; + // SAFETY: `base,len,cap` were captured from a `Vec<u8>` allocation and are still valid. + let _vec = unsafe { Vec::from_raw_parts(ctrlb.base.as_ptr(), ctrlb.len, ctrlb.cap) }; +} + +unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) -> ControlBlock { + debug_assert_eq!( + // SAFETY: caller guarantees `ctrlb` points to a live control block. + unsafe { ctrlb.as_ref() }.ref_count.load(Ordering::Acquire), + 1 + ); + + // SAFETY: caller guarantees uniqueness, so ownership of the control block can be reclaimed directly. + unsafe { *Box::from_raw(ctrlb.as_ptr()) } +} + +impl TwoHalves { + pub fn head(&self) -> &[u8] { + self.buf.0.as_slice() + } + + pub fn head_mut(&mut self) -> &mut [u8] { + // SAFETY: We are accessing the head half mutably, this is the only correct operation, as the head is not shared between clones, + // instead it gets copied. + unsafe { self.buf.0.as_mut_slice() } + } + + pub fn tail(&self) -> &[u8] { + self.buf.1.as_slice() + } + + pub fn split_at(&self) -> usize { + self.split_at + } + + pub fn total_len(&self) -> usize { + self.buf.0.len + self.buf.1.len + } + + pub fn is_unique(&self) -> bool { + // `buf.1` is the authoritative owner of the original frame allocation. + // SAFETY: `buf.1.ctrlb` points to a live control block while `self` is alive. + unsafe { self.buf.1.ctrlb.as_ref().ref_count.load(Ordering::Acquire) == 1 } + } + + pub fn try_merge(self) -> Result<Owned, Self> { + if !self.is_unique() { + return Err(self); + } + + // We transfer the ownership to `Owned`, in order to prevent double-free, we must not drop `Self`. + let this = ManuallyDrop::new(self); + let head = this.buf.0; + let tail = this.buf.1; + let split_at = this.split_at; + + // SAFETY: `tail.ctrlb` is unique at this point, + // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation + // that must be released after copying. + unsafe { + let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), tail.ctrlb.as_ptr()); + if !ctrlb_eq { + let tail_ctrlb = tail.ctrlb.as_ref(); + + // We are patching up the original allocation, with the current head data, so that the resulting `Owned` has correct content. + let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), split_at); + dst.copy_from_slice(head.as_slice()); + release_control_block_w_allocation(head.ctrlb); + } + + let ctrlb = reclaim_unique_control_block(tail.ctrlb); + // SAFETY: `ctrlb.base,len,cap` were captured from a `Vec<u8>` allocation and + // are now exclusively owned by this path. + let inner = Vec::from_raw_parts(ctrlb.base.as_ptr(), ctrlb.len, ctrlb.cap); + Ok(Owned { inner }) + } + } +} + +impl Clone for TwoHalves { + fn clone(&self) -> Self { + Self { + buf: (Half::copy_from_slice(self.head()), self.buf.1.share()), + split_at: self.split_at, + } + } +} + +impl Drop for TwoHalves { + fn drop(&mut self) { + // SAFETY: `buf.0.ctrlb` / `buf.1.ctrlb` point to live control blocks while `self` is alive. + unsafe { + if std::ptr::addr_eq(self.buf.0.ctrlb.as_ptr(), self.buf.1.ctrlb.as_ptr()) { + release_control_block_w_allocation(self.buf.1.ctrlb); + } else { + // Two separate control blocks, so we must release both allocations. + release_control_block_w_allocation(self.buf.0.ctrlb); + release_control_block_w_allocation(self.buf.1.ctrlb); + } + } + } +} + +impl std::fmt::Debug for TwoHalves { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TwoHalves") + .field("split_at", &self.split_at) + .field("head_len", &self.buf.0.len) + .field("tail_len", &self.buf.1.len) + .field("halves_alias", &(self.buf.0.ctrlb == self.buf.1.ctrlb)) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::Owned; + + #[test] + fn split_exposes_head_and_tail() { + let mut buffer = Owned::from_vec(vec![1, 2, 3, 4, 5]).split_at(2); + + assert_eq!(buffer.head(), &[1, 2]); + assert_eq!(buffer.tail(), &[3, 4, 5]); + assert_eq!(buffer.split_at(), 2); + assert_eq!(buffer.total_len(), 5); + + buffer.head_mut().copy_from_slice(&[9, 8]); + assert_eq!(buffer.head(), &[9, 8]); + assert_eq!(buffer.tail(), &[3, 4, 5]); + } + + #[test] + fn clone_copies_head_and_shares_tail() { + let mut original = Owned::from_vec(vec![1, 2, 3, 4, 5]).split_at(2); + let mut cloned = original.clone(); + + assert!(!original.is_unique()); + assert!(!cloned.is_unique()); + + original.head_mut().copy_from_slice(&[9, 9]); + cloned.head_mut().copy_from_slice(&[7, 7]); + + assert_eq!(original.head(), &[9, 9]); + assert_eq!(cloned.head(), &[7, 7]); + assert_eq!(original.tail(), &[3, 4, 5]); + assert_eq!(cloned.tail(), &[3, 4, 5]); + } + + #[test] + fn try_merge_reuses_original_frame_when_unique() { + let mut buffer = Owned::from_vec(vec![1, 2, 3, 4, 5]).split_at(2); + buffer.head_mut().copy_from_slice(&[8, 9]); + + let merged: Vec<u8> = buffer.try_merge().unwrap().into(); + assert_eq!(merged, vec![8, 9, 3, 4, 5]); + } + + #[test] + fn try_merge_fails_while_tail_is_shared() { + let buffer = Owned::from_vec(vec![1, 2, 3, 4, 5]).split_at(2); + let clone = buffer.clone(); + + let buffer = buffer.try_merge().unwrap_err(); + assert!(!buffer.is_unique()); + + drop(clone); + + let merged: Vec<u8> = buffer.try_merge().unwrap().into(); + assert_eq!(merged, vec![1, 2, 3, 4, 5]); + } + + #[test] + fn merge_after_cloned_head_mutation_writes_back_to_original_frame() { + let buffer = Owned::from_vec(vec![1, 2, 3, 4, 5]).split_at(2); + let mut clone = buffer.clone(); + + drop(buffer); + + clone.head_mut().copy_from_slice(&[4, 2]); + assert!(clone.is_unique()); + + let merged: Vec<u8> = clone.try_merge().unwrap().into(); + assert_eq!(merged, vec![4, 2, 3, 4, 5]); + } + + #[test] + fn zero_length_splits_work() { + let left_empty = Owned::from_vec(vec![1, 2, 3]).split_at(0); + assert_eq!(left_empty.head(), &[]); + assert_eq!(left_empty.tail(), &[1, 2, 3]); + + let right_empty = Owned::from_vec(vec![1, 2, 3]).split_at(3); + assert_eq!(right_empty.head(), &[1, 2, 3]); + assert_eq!(right_empty.tail(), &[]); + } + + #[test] + fn clone_of_clone_keeps_tail_sharing_semantics() { + let original = Owned::from_vec(vec![1, 2, 3, 4, 5]).split_at(2); + let clone1 = original.clone(); + let clone2 = clone1.clone(); + + assert!(!original.is_unique()); + assert!(!clone1.is_unique()); + assert!(!clone2.is_unique()); + } +}
