hubcio commented on code in PR #2549:
URL: https://github.com/apache/iggy/pull/2549#discussion_r2675380578


##########
core/common/src/collections/segmented_slab.rs:
##########
@@ -0,0 +1,601 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Concurrent segmented slab with structural sharing for RCU patterns.
+//!
+//! # Design Goals
+//!
+//! - O(1) access time
+//! - Structural sharing via Arc-wrapped segments (cheap clones)
+//! - Lock-free reads with copy-on-write modifications
+//! - Slab-assigned keys with slot reuse via free list
+//!
+//! # Architecture
+//!
+//! Data is divided into fixed-size segments, each backed by a `Slab<T>`.
+//! Keys are encoded as: `global_key = (segment_idx << SEGMENT_BITS) | 
local_key`
+//!
+//! Slab handles ID assignment internally:
+//! - `insert(value)` returns the assigned key
+//! - Removed slots are reused via Slab's free list
+//! - No external ID generation needed
+//!
+//! # Structural Sharing
+//!
+//! Each segment is wrapped in `Arc`. On modification:
+//! - `Arc::make_mut` clones only if segment is shared
+//! - Unmodified segments remain shared across snapshots
+//! - Clone cost is O(num_segments), not O(num_entries)
+
+use slab::Slab;
+use std::sync::Arc;
+
+/// Bits for local index within segment (2^10 = 1024 entries per segment)
+const SEGMENT_BITS: usize = 10;
+
+/// Mask to extract local index: key & SEGMENT_MASK = local_key
+const SEGMENT_MASK: usize = (1 << SEGMENT_BITS) - 1;
+
+/// Maximum entries per segment (must not exceed this to avoid key overlap)
+const SEGMENT_CAPACITY: usize = 1 << SEGMENT_BITS;
+
+/// Concurrent segmented slab with structural sharing.
+///
+/// # Performance Characteristics
+///
+/// | Operation | Time Complexity | Notes |
+/// |-----------|-----------------|-------|
+/// | get       | O(1)            | Direct segment + slab indexing |
+/// | insert    | O(1) amortized  | May clone segment if shared |
+/// | remove    | O(1) amortized  | May clone segment if shared |
+/// | clone     | O(num_segments) | Just Arc clones, not data copies |
+///
+/// # Key Encoding
+///
+/// Keys are `usize` where:
+/// - High bits `(key >> SEGMENT_BITS)` = segment index
+/// - Low bits `(key & SEGMENT_MASK)` = local slab key
+#[derive(Clone)]
+pub struct SegmentedSlab<T> {
+    segments: Vec<Arc<Slab<T>>>,
+    len: usize,
+}
+
+impl<T: std::fmt::Debug> std::fmt::Debug for SegmentedSlab<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SegmentedSlab")
+            .field("len", &self.len)
+            .field("segments", &self.segments.len())
+            .finish()
+    }
+}
+
+impl<T> Default for SegmentedSlab<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> SegmentedSlab<T> {
+    pub fn new() -> Self {
+        Self {
+            segments: Vec::new(),
+            len: 0,
+        }
+    }
+
+    /// Create from key-value pairs (for bootstrap/recovery).
+    ///
+    /// Keys determine segment placement. Uses slab's `FromIterator<(usize, 
T)>`
+    /// to place entries at specific keys within each segment.
+    ///
+    /// # Panics
+    ///
+    /// Panics if duplicate keys are provided.
+    pub fn from_entries(entries: impl IntoIterator<Item = (usize, T)>) -> Self 
{
+        use std::collections::HashSet;
+
+        let mut segment_entries: Vec<Vec<(usize, T)>> = Vec::new();
+        let mut seen_keys: HashSet<usize> = HashSet::new();
+        let mut len = 0;
+
+        for (key, value) in entries {
+            assert!(seen_keys.insert(key), "duplicate key {key} in 
from_entries");
+
+            let (seg_idx, local_key) = Self::decode_key(key);
+
+            while segment_entries.len() <= seg_idx {
+                segment_entries.push(Vec::new());
+            }
+
+            segment_entries[seg_idx].push((local_key, value));
+            len += 1;
+        }
+
+        let segments = segment_entries
+            .into_iter()
+            .map(|entries| Arc::new(entries.into_iter().collect::<Slab<T>>()))
+            .collect();
+
+        Self { segments, len }
+    }
+
+    /// Decodes global key into (segment_index, local_key).
+    #[inline]
+    const fn decode_key(key: usize) -> (usize, usize) {
+        (key >> SEGMENT_BITS, key & SEGMENT_MASK)
+    }
+
+    /// Encodes segment and local indices into global key.
+    #[inline]
+    const fn encode_key(segment_idx: usize, local_key: usize) -> usize {
+        (segment_idx << SEGMENT_BITS) | local_key
+    }
+
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    /// Returns reference to value at key, or None if not present.
+    #[inline]
+    pub fn get(&self, key: usize) -> Option<&T> {
+        let (seg_idx, local_key) = Self::decode_key(key);
+        self.segments.get(seg_idx)?.get(local_key)
+    }
+
+    /// Returns true if key is present.
+    #[inline]
+    pub fn contains_key(&self, key: usize) -> bool {
+        let (seg_idx, local_key) = Self::decode_key(key);
+        self.segments
+            .get(seg_idx)
+            .is_some_and(|slab| slab.contains(local_key))
+    }
+
+    /// Returns iterator over (key, &value) pairs.
+    pub fn iter(&self) -> impl Iterator<Item = (usize, &T)> + '_ {
+        self.segments
+            .iter()
+            .enumerate()
+            .flat_map(|(seg_idx, slab)| {
+                slab.iter()
+                    .map(move |(local_key, value)| (Self::encode_key(seg_idx, 
local_key), value))
+            })
+    }
+
+    /// Returns iterator over all keys.
+    pub fn keys(&self) -> impl Iterator<Item = usize> + '_ {
+        self.iter().map(|(k, _)| k)
+    }
+
+    /// Returns iterator over all values.
+    pub fn values(&self) -> impl Iterator<Item = &T> + '_ {
+        self.iter().map(|(_, v)| v)
+    }
+}
+
+impl<T: Clone> SegmentedSlab<T> {
+    /// Insert value at a specific key (for bootstrap/recovery).
+    ///
+    /// Creates segments as needed. Returns (new_slab, success).
+    /// Returns false if key already exists.
+    pub fn insert_at(mut self, key: usize, value: T) -> (Self, bool) {
+        let (seg_idx, local_key) = Self::decode_key(key);
+
+        while self.segments.len() <= seg_idx {
+            self.segments.push(Arc::new(Slab::new()));
+        }
+
+        let slab = Arc::make_mut(&mut self.segments[seg_idx]);
+
+        if slab.contains(local_key) {
+            return (self, false);
+        }
+
+        // Slab uses LIFO free list - consume entries until we reach target 
slot
+        let mut temps = Vec::new();
+        loop {
+            let vacant_key = slab.vacant_key();
+            if vacant_key == local_key {
+                slab.insert(value);
+                break;
+            }
+            temps.push(slab.insert(value.clone()));
+        }
+
+        for temp_key in temps {
+            slab.remove(temp_key);
+        }
+
+        self.len += 1;
+        (self, true)
+    }
+
+    /// Insert value, returning (new_slab, assigned_key).
+    ///
+    /// Slab assigns the key, reusing freed slots when available.
+    /// Only the affected segment is cloned if shared.
+    pub fn insert(mut self, value: T) -> (Self, usize) {
+        let seg_idx = self
+            .segments
+            .iter()
+            .position(|slab| slab.vacant_key() < SEGMENT_CAPACITY)
+            .unwrap_or(self.segments.len());
+
+        if seg_idx >= self.segments.len() {
+            self.segments.push(Arc::new(Slab::new()));
+        }
+
+        let slab = Arc::make_mut(&mut self.segments[seg_idx]);
+        let local_key = slab.insert(value);
+        let global_key = Self::encode_key(seg_idx, local_key);
+
+        self.len += 1;
+        (self, global_key)
+    }
+
+    /// Update value at existing key, returning (new_slab, success).
+    ///
+    /// Returns (self, false) if key doesn't exist (no-op).
+    pub fn update(mut self, key: usize, value: T) -> (Self, bool) {
+        let (seg_idx, local_key) = Self::decode_key(key);
+
+        let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
+            return (self, false);
+        };
+
+        if !slab_arc.contains(local_key) {
+            return (self, false);
+        }
+
+        let slab = Arc::make_mut(slab_arc);
+        slab[local_key] = value;
+        (self, true)
+    }
+
+    /// Set value at key, returning new slab. Ignores success/failure.
+    #[inline]
+    pub fn set(self, key: usize, value: T) -> Self {
+        self.update(key, value).0
+    }
+
+    /// Remove entry at key, returning (new_slab, removed_value).
+    ///
+    /// Freed slot will be reused by future inserts.
+    pub fn remove(mut self, key: usize) -> (Self, Option<T>) {

Review Comment:
   it's needed for rcu pattern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to