This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 86a10deec feat(server): add SegmentedSlab collection (#2549)
86a10deec is described below

commit 86a10deecb1187e53d8c8cea4065b6fb0bcd776c
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 9 10:35:27 2026 +0100

    feat(server): add SegmentedSlab collection (#2549)
    
    Add a concurrent segmented slab with structural sharing,
    designed for Read-Copy-Update patterns with lock-free
    reads and copy-on-write modifications.
---
 Cargo.lock                                    |   1 +
 core/common/Cargo.toml                        |   1 +
 core/common/src/collections/mod.rs            |  20 +
 core/common/src/collections/segmented_slab.rs | 504 ++++++++++++++++++++++++++
 core/common/src/lib.rs                        |   1 +
 5 files changed, 527 insertions(+)

diff --git a/Cargo.lock b/Cargo.lock
index 4c330ed8d..b5761f4f3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4684,6 +4684,7 @@ dependencies = [
  "serde_json",
  "serde_with",
  "serial_test",
+ "slab",
  "strum 0.27.2",
  "thiserror 2.0.17",
  "tokio",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index a529c1ab3..65124b631 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -63,6 +63,7 @@ rustls = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 serde_with = { workspace = true, features = ["base64"] }
+slab = "0.4.11"
 strum = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
diff --git a/core/common/src/collections/mod.rs 
b/core/common/src/collections/mod.rs
new file mode 100644
index 000000000..ce21f8584
--- /dev/null
+++ b/core/common/src/collections/mod.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod segmented_slab;
+
+pub use segmented_slab::SegmentedSlab;
diff --git a/core/common/src/collections/segmented_slab.rs 
b/core/common/src/collections/segmented_slab.rs
new file mode 100644
index 000000000..fc75239a4
--- /dev/null
+++ b/core/common/src/collections/segmented_slab.rs
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Concurrent segmented slab with structural sharing for RCU patterns.
+//!
+//! # Design Goals
+//!
+//! - O(1) access time
+//! - Structural sharing via Arc-wrapped segments (cheap clones)
+//! - Lock-free reads with copy-on-write modifications
+//! - Slab-assigned keys with slot reuse via free list
+//!
+//! # Architecture
+//!
+//! Data is divided into fixed-size segments, each backed by a `Slab<T>`.
+//! Keys are encoded as: `global_key = (segment_idx << SEGMENT_BITS) | 
local_key`
+//!
+//! Slab handles ID assignment internally:
+//! - `insert(value)` returns the assigned key
+//! - Removed slots are reused via Slab's free list
+//! - No external ID generation needed
+//!
+//! # Structural Sharing
+//!
+//! Each segment is wrapped in `Arc`. On modification:
+//! - `Arc::make_mut` clones only if segment is shared
+//! - Unmodified segments remain shared across snapshots
+//! - Clone cost is O(num_segments), not O(num_entries)
+
+use slab::Slab;
+use std::sync::Arc;
+
+/// Concurrent segmented slab with structural sharing.
+///
+/// # Performance Characteristics
+///
+/// | Operation | Time Complexity | Notes |
+/// |-----------|-----------------|-------|
+/// | get       | O(1)            | Direct segment + slab indexing |
+/// | insert    | O(1) amortized  | May clone segment if shared |
+/// | remove    | O(1) amortized  | May clone segment if shared |
+/// | clone     | O(num_segments) | Just Arc clones, not data copies |
+///
+/// # Key Encoding
+///
+/// Keys are `usize` where:
+/// - High bits `(key >> SEGMENT_BITS)` = segment index
+/// - Low bits `(key & SEGMENT_MASK)` = local slab key
+///
+/// # Type Parameter
+///
+/// `SEGMENT_CAPACITY` must be a power of 2. This is enforced at compile time.
+#[derive(Clone)]
+pub struct SegmentedSlab<T, const SEGMENT_CAPACITY: usize> {
+    segments: Vec<Arc<Slab<T>>>,
+    len: usize,
+}
+
+impl<T: std::fmt::Debug, const SEGMENT_CAPACITY: usize> std::fmt::Debug
+    for SegmentedSlab<T, SEGMENT_CAPACITY>
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SegmentedSlab")
+            .field("len", &self.len)
+            .field("segments", &self.segments.len())
+            .field("segment_capacity", &SEGMENT_CAPACITY)
+            .finish()
+    }
+}
+
+impl<T, const SEGMENT_CAPACITY: usize> Default for SegmentedSlab<T, 
SEGMENT_CAPACITY> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T, const SEGMENT_CAPACITY: usize> SegmentedSlab<T, SEGMENT_CAPACITY> {
+    const SEGMENT_BITS: usize = {
+        assert!(SEGMENT_CAPACITY > 0, "SEGMENT_CAPACITY must be positive");
+        assert!(
+            SEGMENT_CAPACITY.is_power_of_two(),
+            "SEGMENT_CAPACITY must be a power of 2"
+        );
+        SEGMENT_CAPACITY.trailing_zeros() as usize
+    };
+    const SEGMENT_MASK: usize = SEGMENT_CAPACITY - 1;
+
+    pub fn new() -> Self {
+        // Force evaluation to trigger compile-time assertions
+        let _ = Self::SEGMENT_BITS;
+        Self {
+            segments: Vec::new(),
+            len: 0,
+        }
+    }
+
+    /// Create from key-value pairs (for bootstrap/recovery).
+    ///
+    /// Keys determine segment placement. Uses slab's `FromIterator<(usize, 
T)>`
+    /// to place entries at specific keys within each segment.
+    ///
+    /// # Panics
+    ///
+    /// Panics if duplicate keys are provided.
+    pub fn from_entries(entries: impl IntoIterator<Item = (usize, T)>) -> Self 
{
+        use std::collections::HashSet;
+
+        let mut segment_entries: Vec<Vec<(usize, T)>> = Vec::new();
+        let mut seen_keys: HashSet<usize> = HashSet::new();
+        let mut len = 0;
+
+        for (key, value) in entries {
+            assert!(seen_keys.insert(key), "duplicate key {key} in 
from_entries");
+
+            let (seg_idx, local_key) = Self::decode_key(key);
+
+            while segment_entries.len() <= seg_idx {
+                segment_entries.push(Vec::new());
+            }
+
+            segment_entries[seg_idx].push((local_key, value));
+            len += 1;
+        }
+
+        let segments = segment_entries
+            .into_iter()
+            .map(|entries| Arc::new(entries.into_iter().collect::<Slab<T>>()))
+            .collect();
+
+        Self { segments, len }
+    }
+
+    /// Decodes global key into (segment_index, local_key).
+    #[inline]
+    const fn decode_key(key: usize) -> (usize, usize) {
+        (key >> Self::SEGMENT_BITS, key & Self::SEGMENT_MASK)
+    }
+
+    /// Encodes segment and local indices into global key.
+    #[inline]
+    const fn encode_key(segment_idx: usize, local_key: usize) -> usize {
+        (segment_idx << Self::SEGMENT_BITS) | local_key
+    }
+
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.len
+    }
+
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.len == 0
+    }
+
+    /// Returns reference to value at key, or None if not present.
+    #[inline]
+    pub fn get(&self, key: usize) -> Option<&T> {
+        let (seg_idx, local_key) = Self::decode_key(key);
+        self.segments.get(seg_idx)?.get(local_key)
+    }
+
+    /// Returns true if key is present.
+    #[inline]
+    pub fn contains_key(&self, key: usize) -> bool {
+        let (seg_idx, local_key) = Self::decode_key(key);
+        self.segments
+            .get(seg_idx)
+            .is_some_and(|slab| slab.contains(local_key))
+    }
+
+    /// Returns iterator over (key, &value) pairs.
+    pub fn iter(&self) -> impl Iterator<Item = (usize, &T)> + '_ {
+        self.segments
+            .iter()
+            .enumerate()
+            .flat_map(|(seg_idx, slab)| {
+                slab.iter()
+                    .map(move |(local_key, value)| (Self::encode_key(seg_idx, 
local_key), value))
+            })
+    }
+
+    /// Returns iterator over all keys.
+    pub fn keys(&self) -> impl Iterator<Item = usize> + '_ {
+        self.iter().map(|(k, _)| k)
+    }
+
+    /// Returns iterator over all values.
+    pub fn values(&self) -> impl Iterator<Item = &T> + '_ {
+        self.iter().map(|(_, v)| v)
+    }
+}
+
+impl<T: Clone, const SEGMENT_CAPACITY: usize> SegmentedSlab<T, 
SEGMENT_CAPACITY> {
+    /// Insert value, returning (new_slab, assigned_key).
+    ///
+    /// Slab assigns the key, reusing freed slots when available.
+    /// Only the affected segment is cloned if shared.
+    pub fn insert(mut self, value: T) -> (Self, usize) {
+        let seg_idx = self
+            .segments
+            .iter()
+            .position(|slab| slab.vacant_key() < SEGMENT_CAPACITY)
+            .unwrap_or(self.segments.len());
+
+        if seg_idx >= self.segments.len() {
+            self.segments.push(Arc::new(Slab::new()));
+        }
+
+        let slab = Arc::make_mut(&mut self.segments[seg_idx]);
+        let local_key = slab.insert(value);
+        let global_key = Self::encode_key(seg_idx, local_key);
+
+        self.len += 1;
+        (self, global_key)
+    }
+
+    /// Update value at existing key, returning (new_slab, success).
+    ///
+    /// Returns (self, false) if key doesn't exist (no-op).
+    pub fn update(mut self, key: usize, value: T) -> (Self, bool) {
+        let (seg_idx, local_key) = Self::decode_key(key);
+
+        let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
+            return (self, false);
+        };
+
+        if !slab_arc.contains(local_key) {
+            return (self, false);
+        }
+
+        let slab = Arc::make_mut(slab_arc);
+        slab[local_key] = value;
+        (self, true)
+    }
+
+    /// Remove entry at key, returning (new_slab, removed_value).
+    ///
+    /// Freed slot will be reused by future inserts.
+    pub fn remove(mut self, key: usize) -> (Self, Option<T>) {
+        let (seg_idx, local_key) = Self::decode_key(key);
+
+        let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
+            return (self, None);
+        };
+
+        if !slab_arc.contains(local_key) {
+            return (self, None);
+        }
+
+        let slab = Arc::make_mut(slab_arc);
+        let value = slab.remove(local_key);
+        self.len -= 1;
+
+        (self, Some(value))
+    }
+
+    /// Remove entry at key, returning new slab. Ignores removed value.
+    #[inline]
+    pub fn without(self, key: usize) -> Self {
+        self.remove(key).0
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    const TEST_CAPACITY: usize = 1024;
+    type TestSlab<T> = SegmentedSlab<T, TEST_CAPACITY>;
+
+    #[test]
+    fn test_key_encoding() {
+        assert_eq!(TestSlab::<()>::decode_key(0), (0, 0));
+        assert_eq!(TestSlab::<()>::decode_key(42), (0, 42));
+        assert_eq!(TestSlab::<()>::decode_key(1024), (1, 0));
+        assert_eq!(TestSlab::<()>::decode_key(1025), (1, 1));
+        assert_eq!(TestSlab::<()>::decode_key(2048), (2, 0));
+
+        assert_eq!(TestSlab::<()>::encode_key(0, 0), 0);
+        assert_eq!(TestSlab::<()>::encode_key(0, 42), 42);
+        assert_eq!(TestSlab::<()>::encode_key(1, 0), 1024);
+        assert_eq!(TestSlab::<()>::encode_key(1, 1), 1025);
+    }
+
+    #[test]
+    fn test_insert_assigns_sequential_keys() {
+        let slab = TestSlab::new();
+
+        let (slab, key0) = slab.insert("a");
+        let (slab, key1) = slab.insert("b");
+        let (slab, key2) = slab.insert("c");
+
+        assert_eq!(key0, 0);
+        assert_eq!(key1, 1);
+        assert_eq!(key2, 2);
+        assert_eq!(slab.len(), 3);
+    }
+
+    #[test]
+    fn test_get() {
+        let slab = TestSlab::new();
+        let (slab, key) = slab.insert("hello");
+
+        assert_eq!(slab.get(key), Some(&"hello"));
+        assert_eq!(slab.get(999), None);
+    }
+
+    #[test]
+    fn test_update() {
+        let slab = TestSlab::new();
+        let (slab, key) = slab.insert("original");
+
+        let (slab, success) = slab.update(key, "updated");
+        assert!(success);
+        assert_eq!(slab.get(key), Some(&"updated"));
+        assert_eq!(slab.len(), 1);
+
+        // Update non-existent key returns false
+        let (_, success) = slab.update(999, "nope");
+        assert!(!success);
+    }
+
+    #[test]
+    fn test_remove_and_reuse() {
+        let slab = TestSlab::new();
+        let (slab, key0) = slab.insert("a");
+        let (slab, key1) = slab.insert("b");
+        let (slab, key2) = slab.insert("c");
+
+        assert_eq!(key0, 0);
+        assert_eq!(key1, 1);
+        assert_eq!(key2, 2);
+
+        // Remove middle entry
+        let (slab, removed) = slab.remove(key1);
+        assert_eq!(removed, Some("b"));
+        assert_eq!(slab.len(), 2);
+        assert_eq!(slab.get(key1), None);
+
+        // New insert reuses freed slot
+        let (slab, key3) = slab.insert("d");
+        assert_eq!(key3, 1); // Reused!
+        assert_eq!(slab.get(key3), Some(&"d"));
+        assert_eq!(slab.len(), 3);
+    }
+
+    #[test]
+    fn test_structural_sharing() {
+        let slab = TestSlab::new();
+        let (slab, _) = slab.insert("value");
+
+        // Clone shares segment via Arc
+        let snapshot = slab.clone();
+
+        // Modify original
+        let (slab, key1) = slab.insert("new");
+
+        // Snapshot still sees original state
+        assert_eq!(snapshot.len(), 1);
+        assert_eq!(snapshot.get(key1), None);
+
+        // Modified slab has both
+        assert_eq!(slab.len(), 2);
+    }
+
+    #[test]
+    fn test_arc_make_mut_no_clone_when_unique() {
+        let slab = TestSlab::new();
+        let (slab, _) = slab.insert("a");
+
+        let ptr_before = Arc::as_ptr(&slab.segments[0]);
+        let (slab, _) = slab.insert("b");
+        let ptr_after = Arc::as_ptr(&slab.segments[0]);
+
+        // Same pointer - no clone occurred
+        assert_eq!(ptr_before, ptr_after);
+    }
+
+    #[test]
+    fn test_arc_make_mut_clones_when_shared() {
+        let slab = TestSlab::new();
+        let (slab, _) = slab.insert("a");
+
+        let _snapshot = slab.clone(); // Creates shared reference
+        let ptr_before = Arc::as_ptr(&slab.segments[0]);
+
+        let (slab, _) = slab.insert("b");
+        let ptr_after = Arc::as_ptr(&slab.segments[0]);
+
+        // Different pointer - clone occurred
+        assert_ne!(ptr_before, ptr_after);
+    }
+
+    #[test]
+    fn test_cross_segment() {
+        let mut slab = TestSlab::new();
+
+        // Fill first segment
+        for i in 0..TEST_CAPACITY {
+            let (new_slab, key) = slab.insert(i);
+            slab = new_slab;
+            assert_eq!(key, i);
+        }
+
+        // Next insert should go to second segment
+        let (slab, key) = slab.insert(TEST_CAPACITY);
+        assert_eq!(key, TEST_CAPACITY);
+        assert_eq!(slab.segments.len(), 2);
+    }
+
+    #[test]
+    fn test_from_entries() {
+        let entries = vec![(0, "zero"), (5, "five"), (1024, "segment2")];
+
+        let slab = TestSlab::from_entries(entries);
+
+        assert_eq!(slab.len(), 3);
+        assert_eq!(slab.get(0), Some(&"zero"));
+        assert_eq!(slab.get(5), Some(&"five"));
+        assert_eq!(slab.get(1024), Some(&"segment2"));
+        assert_eq!(slab.segments.len(), 2);
+    }
+
+    #[test]
+    fn test_iter() {
+        let slab = TestSlab::new();
+        let (slab, _) = slab.insert("a");
+        let (slab, _) = slab.insert("b");
+        let (slab, _) = slab.insert("c");
+
+        let items: Vec<_> = slab.iter().collect();
+        assert_eq!(items, vec![(0, &"a"), (1, &"b"), (2, &"c")]);
+    }
+
+    #[test]
+    fn test_keys_and_values() {
+        let slab = TestSlab::new();
+        let (slab, _) = slab.insert(10);
+        let (slab, _) = slab.insert(20);
+
+        let keys: Vec<_> = slab.keys().collect();
+        assert_eq!(keys, vec![0, 1]);
+
+        let values: Vec<_> = slab.values().collect();
+        assert_eq!(values, vec![&10, &20]);
+    }
+
+    #[test]
+    fn test_contains_key() {
+        let slab = TestSlab::new();
+        let (slab, key) = slab.insert("x");
+
+        assert!(slab.contains_key(key));
+        assert!(!slab.contains_key(999));
+        assert!(!slab.contains_key(2048)); // Non-existent segment
+    }
+
+    #[test]
+    fn test_without() {
+        let slab = TestSlab::new();
+        let (slab, key) = slab.insert("value");
+
+        // without() removes and ignores result
+        let slab = slab.without(key);
+        assert_eq!(slab.get(key), None);
+        assert_eq!(slab.len(), 0);
+
+        // without() on non-existent key is no-op
+        let slab = slab.without(999);
+        assert_eq!(slab.len(), 0);
+    }
+
+    #[test]
+    fn test_empty_slab() {
+        let slab: TestSlab<i32> = TestSlab::new();
+
+        assert!(slab.is_empty());
+        assert_eq!(slab.len(), 0);
+        assert_eq!(slab.get(0), None);
+        assert!(!slab.contains_key(0));
+        assert_eq!(slab.iter().count(), 0);
+    }
+
+    #[test]
+    #[should_panic(expected = "duplicate key 0 in from_entries")]
+    fn test_from_entries_panics_on_duplicate_keys() {
+        let entries = vec![(0, "first"), (0, "second")];
+        let _ = TestSlab::from_entries(entries);
+    }
+}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 9171c4097..9eaf65531 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -17,6 +17,7 @@
 
 mod alloc;
 mod certificates;
+pub mod collections;
 mod commands;
 mod configs;
 mod error;

Reply via email to