hubcio commented on code in PR #2944:
URL: https://github.com/apache/iggy/pull/2944#discussion_r2964691890


##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.

Review Comment:
   doc says "Panics if `split_at` is not a multiple of `ALIGN` bytes" but the 
only assertion at line 56 is `split_at <= self.inner.len()`. no alignment check 
exists. either add `assert!(split_at % ALIGN == 0)` or fix the doc.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {

Review Comment:
   the safety comment says "head is not shared between clones, instead it gets 
copied" but that's only true for the clone's head. in the non-cloned case, head 
points into the same allocation as tail. mutating head is safe because regions 
don't overlap, but `&mut` and `&` to the same allocation is a stacked borrows 
grey area. if `Send` is ever added, two threads could hold original + clone, 
creating aliasing across threads. miri would detect that.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]
+pub struct Frozen<const ALIGN: usize> {
+    inner: Extent,
+}
+
+impl<const ALIGN: usize> Frozen<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
+#[repr(C, align(64))]
+struct ControlBlock {
+    ref_count: AtomicUsize,
+    base: NonNull<u8>,
+    len: usize,
+    capacity: usize,
+    _pad: [u8; 32],
+}
+
+impl ControlBlock {
+    fn new(base: NonNull<u8>, len: usize, capacity: usize) -> NonNull<Self> {
+        let ctrl = Box::new(ControlBlock {
+            ref_count: AtomicUsize::new(1),
+            base,
+            len,
+            capacity,
+            _pad: [0; 32],
+        });
+        // SAFETY: Box::into_raw returns a valid pointer
+        unsafe { NonNull::new_unchecked(Box::into_raw(ctrl)) }
+    }
+}
+
+struct Extent {
+    ptr: NonNull<u8>,
+    len: usize,
+    ctrlb: NonNull<ControlBlock>,
+    _pad: usize,
+}
+
+impl Extent {
+    fn as_slice(&self) -> &[u8] {
+        // SAFETY: ptr and len describe a valid allocation
+        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
+    }
+
+    unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
+        // SAFETY: caller guarantees exclusive access
+        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
+    }
+
+    fn copy_from_slice<const ALIGN: usize>(src: &[u8]) -> Self {
+        let mut v: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN);
+        v.extend_from_slice(src);
+
+        let (ptr, _, len, capacity) = v.into_raw_parts();
+        let data = unsafe { NonNull::new_unchecked(ptr) };
+
+        let ctrlb = ControlBlock::new(data, len, capacity);
+
+        Extent {
+            ptr: data,
+            len,
+            ctrlb,
+            _pad: 0,
+        }
+    }
+}
+
+impl Clone for Extent {
+    fn clone(&self) -> Self {
+        // SAFETY: `self.ctrlb` points to a live control block while `self` is 
alive.
+        unsafe {
+            self.ctrlb
+                .as_ref()
+                .ref_count
+                .fetch_add(1, Ordering::Relaxed);
+        }
+        Self {
+            ptr: self.ptr,
+            len: self.len,
+            ctrlb: self.ctrlb,
+            _pad: 0,
+        }
+    }
+}
+
+unsafe fn release_control_block_w_allocation<const ALIGN: usize>(ctrlb: 
NonNull<ControlBlock>) {
+    // SAFETY: ctrlb is valid per function preconditions
+    let old = unsafe { ctrlb.as_ref() }
+        .ref_count
+        .fetch_sub(1, Ordering::Release);
+    debug_assert!(old > 0, "control block refcount underflow");
+
+    if old != 1 {
+        return;
+    }
+
+    // This fence is needed to prevent reordering of use of the data and
+    // deletion of the data. Because it is marked `Release`, the decreasing
+    // of the reference count synchronizes with this `Acquire` fence. This
+    // means that use of the data happens before decreasing the reference
+    // count, which happens before this fence, which happens before the
+    // deletion of the data.
+    //
+    // As explained in the [Boost documentation][1],
+    //
+    // > It is important to enforce any possible access to the object in one
+    // > thread (through an existing reference) to *happen before* deleting
+    // > the object in a different thread. This is achieved by a "release"
+    // > operation after dropping a reference (any access to the object
+    // > through this reference must obviously happened before), and an
+    // > "acquire" operation before deleting the object.
+    //
+    // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
+    //
+    fence(Ordering::Acquire);
+
+    // SAFETY: refcount is zero, we have exclusive ownership
+    let ctrlb = unsafe { Box::from_raw(ctrlb.as_ptr()) };
+
+    // SAFETY: `ctrlb.base`, `ctrlb.len` and `ctrlb.capacity` were captured 
from an `AVec`
+    // allocation. We reconstruct the AVec and let it deallocate properly.
+    let _ = unsafe {
+        AVec::<u8, ConstAlign<ALIGN>>::from_raw_parts(
+            ctrlb.base.as_ptr(),
+            ALIGN,
+            ctrlb.len,
+            ctrlb.capacity,
+        )
+    };
+}
+
+unsafe fn reclaim_unique_control_block(ctrlb: NonNull<ControlBlock>) -> 
ControlBlock {
+    debug_assert_eq!(

Review Comment:
   `reclaim_unique_control_block` uses `debug_assert_eq` for a safety 
precondition. in release builds, if refcount != 1 due to a bug, `Box::from_raw` 
reclaims the control block while other references exist - use-after-free. 
promote to `assert_eq` for defense-in-depth.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);

Review Comment:
   every `Owned::split_at()` does `Box::new(ControlBlock{...})` - a heap 
allocation. at 1M ops/sec this is 1M mallocs/sec just for control blocks. 
consider embedding the control block in the aligned allocation or using a slab 
allocator.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {

Review Comment:
   `TwoHalves`, `Owned`-after-split, and `Frozen` are all `!Send + !Sync` 
because `NonNull` is `!Send + !Sync`. the `AtomicUsize` refcounting strongly 
implies cross-thread use was intended. either add `unsafe impl Send/Sync` with 
a safety argument, or document the `!Send` constraint as intentional for the 
shard-local model.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]

Review Comment:
   `Frozen` is dead code with guaranteed memory leak. derives `Clone` 
(increments refcount via `Extent::clone`) but has no `Drop` and no constructor. 
every drop leaks the `ControlBlock` + backing allocation. either add `Drop` or 
delete it entirely per project rules.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]
+pub struct Frozen<const ALIGN: usize> {
+    inner: Extent,
+}
+
+impl<const ALIGN: usize> Frozen<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
+#[repr(C, align(64))]
+struct ControlBlock {
+    ref_count: AtomicUsize,
+    base: NonNull<u8>,
+    len: usize,
+    capacity: usize,
+    _pad: [u8; 32],
+}
+
+impl ControlBlock {
+    fn new(base: NonNull<u8>, len: usize, capacity: usize) -> NonNull<Self> {
+        let ctrl = Box::new(ControlBlock {
+            ref_count: AtomicUsize::new(1),
+            base,
+            len,
+            capacity,
+            _pad: [0; 32],
+        });
+        // SAFETY: Box::into_raw returns a valid pointer
+        unsafe { NonNull::new_unchecked(Box::into_raw(ctrl)) }
+    }
+}
+
+struct Extent {
+    ptr: NonNull<u8>,
+    len: usize,
+    ctrlb: NonNull<ControlBlock>,
+    _pad: usize,
+}
+
+impl Extent {
+    fn as_slice(&self) -> &[u8] {
+        // SAFETY: ptr and len describe a valid allocation
+        unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
+    }
+
+    unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
+        // SAFETY: caller guarantees exclusive access
+        unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
+    }
+
+    fn copy_from_slice<const ALIGN: usize>(src: &[u8]) -> Self {
+        let mut v: AVec<u8, ConstAlign<ALIGN>> = AVec::new(ALIGN);
+        v.extend_from_slice(src);
+
+        let (ptr, _, len, capacity) = v.into_raw_parts();
+        let data = unsafe { NonNull::new_unchecked(ptr) };
+
+        let ctrlb = ControlBlock::new(data, len, capacity);
+
+        Extent {
+            ptr: data,
+            len,
+            ctrlb,
+            _pad: 0,
+        }
+    }
+}
+
+impl Clone for Extent {

Review Comment:
   `Extent::clone()` increments the refcount but `Extent` has no `Drop` - 
refcounting is a one-way ratchet. the design relies entirely on container types 
(`TwoHalves::drop`) to manage the lifecycle. this is the root cause of the 
`Frozen` leak. consider implementing `Drop` for `Extent` to make refcounting 
self-contained.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {

Review Comment:
   `TwoHalves::split_at()` returns the split position, `Owned::split_at()` 
performs the split. same name, very different semantics. rename this accessor 
to `split_point()` or `head_len()`.



##########
core/buf/Cargo.toml:
##########
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "buf"

Review Comment:
   `buf` is very generic for a workspace of 37+ crates. consider `iggy_buf` to 
namespace consistently with `iggy_common`, `iggy_binary_protocol`, etc.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]
+pub struct Frozen<const ALIGN: usize> {
+    inner: Extent,
+}
+
+impl<const ALIGN: usize> Frozen<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
+#[repr(C, align(64))]
+struct ControlBlock {
+    ref_count: AtomicUsize,
+    base: NonNull<u8>,
+    len: usize,
+    capacity: usize,
+    _pad: [u8; 32],
+}
+
+impl ControlBlock {
+    fn new(base: NonNull<u8>, len: usize, capacity: usize) -> NonNull<Self> {
+        let ctrl = Box::new(ControlBlock {
+            ref_count: AtomicUsize::new(1),
+            base,
+            len,
+            capacity,
+            _pad: [0; 32],
+        });
+        // SAFETY: Box::into_raw returns a valid pointer
+        unsafe { NonNull::new_unchecked(Box::into_raw(ctrl)) }
+    }
+}
+
+struct Extent {
+    ptr: NonNull<u8>,
+    len: usize,
+    ctrlb: NonNull<ControlBlock>,
+    _pad: usize,

Review Comment:
   `_pad: usize` brings `Extent` to 32 bytes but the purpose is undocumented. 
if it's for cache-line alignment, say so. if not needed, remove it.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]
+pub struct Frozen<const ALIGN: usize> {
+    inner: Extent,
+}
+
+impl<const ALIGN: usize> Frozen<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
+#[repr(C, align(64))]
+struct ControlBlock {
+    ref_count: AtomicUsize,
+    base: NonNull<u8>,
+    len: usize,
+    capacity: usize,
+    _pad: [u8; 32],

Review Comment:
   `_pad: [u8; 32]` is manually computed assuming 8-byte pointers. breaks on 
32-bit targets. let `#[repr(C, align(64))]` handle padding implicitly without 
the manual field.



##########
core/buf/Cargo.toml:
##########
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "buf"
+version = "0.1.0"
+edition = "2024"

Review Comment:
   missing `license` field. DEPENDENCIES.md shows "N/A". add `license = 
"Apache-2.0"` to match all other internal crates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to