numinnex commented on code in PR #2944:
URL: https://github.com/apache/iggy/pull/2944#discussion_r2965435784


##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.

Review Comment:
   Yeah, it's artifact from previous iteration, Ill remove it



##########
core/buf/Cargo.toml:
##########
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "buf"

Review Comment:
   I was thinking about `iobuf` as name, maybe that's better.



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]
+pub struct Frozen<const ALIGN: usize> {
+    inner: Extent,
+}
+
+impl<const ALIGN: usize> Frozen<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
+#[repr(C, align(64))]
+struct ControlBlock {
+    ref_count: AtomicUsize,
+    base: NonNull<u8>,
+    len: usize,
+    capacity: usize,
+    _pad: [u8; 32],

Review Comment:
   Indeed, I will remove that field 



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]

Review Comment:
   Yeah I need to add explicit `Drop` impl 



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {

Review Comment:
   and in the non-clone case, we cannot have both mutable and non-mutable 
reference to the `TwoHalves`, only way is to clone and then the `head` becomes 
an distinct allocation. 



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {

Review Comment:
   Both of them are `Send`, so I will add impl for those 



##########
core/buf/src/lib.rs:
##########
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::ManuallyDrop;
+use std::ptr::NonNull;
+use std::slice;
+use std::sync::atomic::{AtomicUsize, Ordering, fence};
+
+use aligned_vec::{AVec, ConstAlign};
+
+#[derive(Debug)]
+pub struct Owned<const ALIGN: usize = 4096> {
+    inner: AVec<u8, ConstAlign<ALIGN>>,
+}
+
+impl<const ALIGN: usize> From<AVec<u8, ConstAlign<ALIGN>>> for Owned<ALIGN> {
+    fn from(vec: AVec<u8, ConstAlign<ALIGN>>) -> Self {
+        Self { inner: vec }
+    }
+}
+
+impl<const ALIGN: usize> From<Owned<ALIGN>> for AVec<u8, ConstAlign<ALIGN>> {
+    fn from(value: Owned<ALIGN>) -> Self {
+        value.inner
+    }
+}
+
+impl<const ALIGN: usize> Owned<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.inner
+    }
+
+    pub fn as_mut_slice(&mut self) -> &mut [u8] {
+        &mut self.inner
+    }
+
+    /// Split `Owned` buffer into two halves
+    ///
+    /// # Panics
+    /// Panics if `split_at > self.len()` or if `split_at` is not a multiple 
of `ALIGN` bytes.
+    pub fn split_at(self, split_at: usize) -> TwoHalves<ALIGN> {
+        assert!(split_at <= self.inner.len());
+
+        // Take ownership of the AVec's allocation. After this, we are 
responsible
+        // for deallocating via `AVec::from_raw_parts` or equivalent.
+        let (ptr, _, len, capacity) = self.inner.into_raw_parts();
+
+        // SAFETY: both pointers are constructed from the same `Inner` 
allocation, the split_at bounds are validated.
+        // The control block captures original `Inner` metadata to allow 
reconstructing the original frame for merging/dropping.
+        // The ptr provenance rules are maintained by the use of `NonNull` 
apis.
+        let base: NonNull<u8> = unsafe { NonNull::new_unchecked(ptr) };
+        let tail = unsafe { NonNull::new_unchecked(ptr.add(split_at)) };
+        let ctrlb = ControlBlock::new(base, len, capacity);
+
+        TwoHalves {
+            inner: (
+                Extent {
+                    ptr: base,
+                    len: split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+                Extent {
+                    ptr: tail,
+                    len: len - split_at,
+                    ctrlb,
+                    _pad: 0,
+                },
+            ),
+        }
+    }
+}
+
+pub struct TwoHalves<const ALIGN: usize> {
+    inner: (Extent, Extent),
+}
+
+impl<const ALIGN: usize> TwoHalves<ALIGN> {
+    pub fn head(&self) -> &[u8] {
+        self.inner.0.as_slice()
+    }
+
+    pub fn head_mut(&mut self) -> &mut [u8] {
+        // SAFETY: We are accessing the head half mutably, this is the only 
correct operation, as the head is not shared between clones,
+        // instead it gets copied.
+        unsafe { self.inner.0.as_mut_slice() }
+    }
+
+    pub fn tail(&self) -> &[u8] {
+        self.inner.1.as_slice()
+    }
+
+    pub fn split_at(&self) -> usize {
+        self.inner.0.len
+    }
+
+    pub fn total_len(&self) -> usize {
+        self.inner.0.len + self.inner.1.len
+    }
+
+    pub fn is_unique(&self) -> bool {
+        // `inner.1` is the authoritative owner of the original frame 
allocation.
+        // SAFETY: `inner.1.ctrlb` points to a live control block while `self` 
is alive.
+        unsafe {
+            self.inner
+                .1
+                .ctrlb
+                .as_ref()
+                .ref_count
+                .load(Ordering::Acquire)
+                == 1
+        }
+    }
+
+    pub fn try_merge(self) -> Result<Owned<ALIGN>, Self> {
+        if !self.is_unique() {
+            return Err(self);
+        }
+
+        // Transfer ownership to prevent double-free.
+        // SAFETY: We read the inner tuple out of ManuallyDrop, which won't 
run TwoHalves::drop.
+        let this = ManuallyDrop::new(self);
+        let (head, tail) = unsafe { std::ptr::read(&this.inner) };
+        let split_at = head.len;
+
+        // SAFETY: `tail.ctrlb` is unique at this point,
+        // If `head.ctrlb != tail.ctrlb`, the head owns a standalone allocation
+        // that must be released after copying.
+        unsafe {
+            let ctrlb_eq = std::ptr::addr_eq(head.ctrlb.as_ptr(), 
tail.ctrlb.as_ptr());
+
+            if !ctrlb_eq {
+                let tail_ctrlb = tail.ctrlb.as_ref();
+
+                // We are patching up the original allocation, with the 
current head data, so that the resulting `Owned` has correct content.
+                let dst = slice::from_raw_parts_mut(tail_ctrlb.base.as_ptr(), 
split_at);
+                dst.copy_from_slice(head.as_slice());
+                release_control_block_w_allocation::<ALIGN>(head.ctrlb);
+            }
+
+            let ctrlb = reclaim_unique_control_block(tail.ctrlb);
+            // SAFETY: `ctrlb.base,capacity` were captured from an `AVec<u8>` 
allocation and
+            // are now exclusively owned by this path.
+            let inner = AVec::from_raw_parts(ctrlb.base.as_ptr(), ALIGN, 
ctrlb.len, ctrlb.capacity);
+            Ok(Owned { inner })
+        }
+    }
+}
+
+impl<const ALIGN: usize> Clone for TwoHalves<ALIGN> {
+    fn clone(&self) -> Self {
+        Self {
+            inner: (
+                Extent::copy_from_slice::<ALIGN>(self.head()),
+                self.inner.1.clone(),
+            ),
+        }
+    }
+}
+
+impl<const ALIGN: usize> Drop for TwoHalves<ALIGN> {
+    fn drop(&mut self) {
+        // SAFETY: `inner.0.ctrlb` / `inner.1.ctrlb` point to live control 
blocks while `self` is alive.
+        let ctrlb_eq = std::ptr::addr_eq(self.inner.0.ctrlb.as_ptr(), 
self.inner.1.ctrlb.as_ptr());
+        unsafe {
+            if ctrlb_eq {
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            } else {
+                // Different control blocks, release both
+                
release_control_block_w_allocation::<ALIGN>(self.inner.0.ctrlb);
+                
release_control_block_w_allocation::<ALIGN>(self.inner.1.ctrlb);
+            }
+        }
+    }
+}
+
+impl<const ALIGN: usize> std::fmt::Debug for TwoHalves<ALIGN> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TwoHalves")
+            .field("split_at", &self.split_at())
+            .field("head_len", &self.inner.0.len)
+            .field("tail_len", &self.inner.1.len)
+            .field("halves_alias", &(self.inner.0.ctrlb == self.inner.1.ctrlb))
+            .finish()
+    }
+}
+
+#[derive(Clone)]
+pub struct Frozen<const ALIGN: usize> {
+    inner: Extent,
+}
+
+impl<const ALIGN: usize> Frozen<ALIGN> {
+    pub fn as_slice(&self) -> &[u8] {
+        self.inner.as_slice()
+    }
+}
+
+#[repr(C, align(64))]
+struct ControlBlock {
+    ref_count: AtomicUsize,
+    base: NonNull<u8>,
+    len: usize,
+    capacity: usize,
+    _pad: [u8; 32],
+}
+
+impl ControlBlock {
+    fn new(base: NonNull<u8>, len: usize, capacity: usize) -> NonNull<Self> {
+        let ctrl = Box::new(ControlBlock {
+            ref_count: AtomicUsize::new(1),
+            base,
+            len,
+            capacity,
+            _pad: [0; 32],
+        });
+        // SAFETY: Box::into_raw returns a valid pointer
+        unsafe { NonNull::new_unchecked(Box::into_raw(ctrl)) }
+    }
+}
+
+struct Extent {
+    ptr: NonNull<u8>,
+    len: usize,
+    ctrlb: NonNull<ControlBlock>,
+    _pad: usize,

Review Comment:
   I will document it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to