This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 3e27ebc8d refactor(server): add metadata module foundation (#2551)
3e27ebc8d is described below
commit 3e27ebc8dd5dbf257b816993908dc0747c4f8849
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 14 10:06:30 2026 +0100
refactor(server): add metadata module foundation (#2551)
Add new metadata module with LeftRight wrapping slab collections.
Refactor PersonalAccessToken to use Arc<str> instead of Arc<String>.
Module compiles but is not yet wired into the system.
---
Cargo.lock | 43 +-
DEPENDENCIES.md | 3 +
core/common/Cargo.toml | 1 -
core/common/src/collections/segmented_slab.rs | 504 -----------------
core/common/src/lib.rs | 1 -
.../common/src/types/personal_access_tokens/mod.rs | 18 +-
core/server/Cargo.toml | 2 +-
core/server/src/bootstrap.rs | 2 +-
core/server/src/http/mapper.rs | 2 +-
core/server/src/lib.rs | 1 +
core/server/src/metadata/absorb.rs | 348 ++++++++++++
core/server/src/metadata/consumer_group.rs | 59 ++
.../src/metadata/consumer_group_member.rs} | 23 +-
.../mod.rs => server/src/metadata/inner.rs} | 28 +-
core/server/src/metadata/mod.rs | 68 +++
core/server/src/metadata/ops.rs | 129 +++++
core/server/src/metadata/partition.rs | 36 ++
core/server/src/metadata/reader.rs | 485 +++++++++++++++++
core/server/src/metadata/stream.rs | 64 +++
core/server/src/metadata/topic.rs | 72 +++
.../mod.rs => server/src/metadata/user.rs} | 14 +-
core/server/src/metadata/writer.rs | 597 +++++++++++++++++++++
.../src/shard/system/personal_access_tokens.rs | 8 +-
.../periodic/personal_access_token_cleaner.rs | 2 +-
core/server/src/streaming/users/user.rs | 2 +-
25 files changed, 1981 insertions(+), 531 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index b5761f4f3..88953e444 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3505,6 +3505,21 @@ dependencies = [
"slab",
]
+[[package]]
+name = "generator"
+version = "0.8.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "libc",
+ "log",
+ "rustversion",
+ "windows-link 0.2.1",
+ "windows-result 0.4.1",
+]
+
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -4684,7 +4699,6 @@ dependencies = [
"serde_json",
"serde_with",
"serial_test",
- "slab",
"strum 0.27.2",
"thiserror 2.0.17",
"tokio",
@@ -5267,6 +5281,17 @@ dependencies = [
"spin",
]
+[[package]]
+name = "left-right"
+version = "0.11.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f0c21e4c8ff95f487fb34e6f9182875f42c84cef966d29216bf115d9bba835a"
+dependencies = [
+ "crossbeam-utils",
+ "loom",
+ "slab",
+]
+
[[package]]
name = "lending-iterator"
version = "0.1.7"
@@ -5575,6 +5600,19 @@ dependencies = [
"logos-codegen",
]
+[[package]]
+name = "loom"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca"
+dependencies = [
+ "cfg-if",
+ "generator",
+ "scoped-tls",
+ "tracing",
+ "tracing-subscriber",
+]
+
[[package]]
name = "lru-slab"
version = "0.1.2"
@@ -8288,6 +8326,7 @@ dependencies = [
"hwlocality",
"iggy_common",
"jsonwebtoken",
+ "left-right",
"lending-iterator",
"mimalloc",
"mime_guess",
@@ -9633,9 +9672,11 @@ dependencies = [
"once_cell",
"regex-automata",
"sharded-slab",
+ "smallvec",
"thread_local",
"tracing",
"tracing-core",
+ "tracing-log",
]
[[package]]
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 8423e83d3..31143eead 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -317,6 +317,7 @@ futures-sink: 0.3.31, "Apache-2.0 OR MIT",
futures-task: 0.3.31, "Apache-2.0 OR MIT",
futures-timer: 3.0.3, "Apache-2.0 OR MIT",
futures-util: 0.3.31, "Apache-2.0 OR MIT",
+generator: 0.8.8, "Apache-2.0 OR MIT",
generic-array: 0.14.7, "MIT",
getrandom: 0.2.16, "Apache-2.0 OR MIT",
getrandom: 0.3.4, "Apache-2.0 OR MIT",
@@ -446,6 +447,7 @@ kqueue: 1.1.1, "MIT",
kqueue-sys: 1.0.4, "MIT",
language-tags: 0.3.2, "Apache-2.0 OR MIT",
lazy_static: 1.5.0, "Apache-2.0 OR MIT",
+left-right: 0.11.7, "Apache-2.0 OR MIT",
lending-iterator: 0.1.7, "Apache-2.0 OR MIT OR Zlib",
lending-iterator-proc_macros: 0.1.7, "Apache-2.0 OR MIT OR Zlib",
lexical-core: 1.0.6, "Apache-2.0 OR MIT",
@@ -479,6 +481,7 @@ log: 0.4.29, "Apache-2.0 OR MIT",
logos: 0.15.1, "Apache-2.0 OR MIT",
logos-codegen: 0.15.1, "Apache-2.0 OR MIT",
logos-derive: 0.15.1, "Apache-2.0 OR MIT",
+loom: 0.7.2, "MIT",
lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib",
lz4_flex: 0.11.5, "MIT",
lzma-rust2: 0.15.4, "Apache-2.0",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 65124b631..a529c1ab3 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -63,7 +63,6 @@ rustls = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["base64"] }
-slab = "0.4.11"
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
diff --git a/core/common/src/collections/segmented_slab.rs
b/core/common/src/collections/segmented_slab.rs
deleted file mode 100644
index fc75239a4..000000000
--- a/core/common/src/collections/segmented_slab.rs
+++ /dev/null
@@ -1,504 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Concurrent segmented slab with structural sharing for RCU patterns.
-//!
-//! # Design Goals
-//!
-//! - O(1) access time
-//! - Structural sharing via Arc-wrapped segments (cheap clones)
-//! - Lock-free reads with copy-on-write modifications
-//! - Slab-assigned keys with slot reuse via free list
-//!
-//! # Architecture
-//!
-//! Data is divided into fixed-size segments, each backed by a `Slab<T>`.
-//! Keys are encoded as: `global_key = (segment_idx << SEGMENT_BITS) |
local_key`
-//!
-//! Slab handles ID assignment internally:
-//! - `insert(value)` returns the assigned key
-//! - Removed slots are reused via Slab's free list
-//! - No external ID generation needed
-//!
-//! # Structural Sharing
-//!
-//! Each segment is wrapped in `Arc`. On modification:
-//! - `Arc::make_mut` clones only if segment is shared
-//! - Unmodified segments remain shared across snapshots
-//! - Clone cost is O(num_segments), not O(num_entries)
-
-use slab::Slab;
-use std::sync::Arc;
-
-/// Concurrent segmented slab with structural sharing.
-///
-/// # Performance Characteristics
-///
-/// | Operation | Time Complexity | Notes |
-/// |-----------|-----------------|-------|
-/// | get | O(1) | Direct segment + slab indexing |
-/// | insert | O(1) amortized | May clone segment if shared |
-/// | remove | O(1) amortized | May clone segment if shared |
-/// | clone | O(num_segments) | Just Arc clones, not data copies |
-///
-/// # Key Encoding
-///
-/// Keys are `usize` where:
-/// - High bits `(key >> SEGMENT_BITS)` = segment index
-/// - Low bits `(key & SEGMENT_MASK)` = local slab key
-///
-/// # Type Parameter
-///
-/// `SEGMENT_CAPACITY` must be a power of 2. This is enforced at compile time.
-#[derive(Clone)]
-pub struct SegmentedSlab<T, const SEGMENT_CAPACITY: usize> {
- segments: Vec<Arc<Slab<T>>>,
- len: usize,
-}
-
-impl<T: std::fmt::Debug, const SEGMENT_CAPACITY: usize> std::fmt::Debug
- for SegmentedSlab<T, SEGMENT_CAPACITY>
-{
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("SegmentedSlab")
- .field("len", &self.len)
- .field("segments", &self.segments.len())
- .field("segment_capacity", &SEGMENT_CAPACITY)
- .finish()
- }
-}
-
-impl<T, const SEGMENT_CAPACITY: usize> Default for SegmentedSlab<T,
SEGMENT_CAPACITY> {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl<T, const SEGMENT_CAPACITY: usize> SegmentedSlab<T, SEGMENT_CAPACITY> {
- const SEGMENT_BITS: usize = {
- assert!(SEGMENT_CAPACITY > 0, "SEGMENT_CAPACITY must be positive");
- assert!(
- SEGMENT_CAPACITY.is_power_of_two(),
- "SEGMENT_CAPACITY must be a power of 2"
- );
- SEGMENT_CAPACITY.trailing_zeros() as usize
- };
- const SEGMENT_MASK: usize = SEGMENT_CAPACITY - 1;
-
- pub fn new() -> Self {
- // Force evaluation to trigger compile-time assertions
- let _ = Self::SEGMENT_BITS;
- Self {
- segments: Vec::new(),
- len: 0,
- }
- }
-
- /// Create from key-value pairs (for bootstrap/recovery).
- ///
- /// Keys determine segment placement. Uses slab's `FromIterator<(usize,
T)>`
- /// to place entries at specific keys within each segment.
- ///
- /// # Panics
- ///
- /// Panics if duplicate keys are provided.
- pub fn from_entries(entries: impl IntoIterator<Item = (usize, T)>) -> Self
{
- use std::collections::HashSet;
-
- let mut segment_entries: Vec<Vec<(usize, T)>> = Vec::new();
- let mut seen_keys: HashSet<usize> = HashSet::new();
- let mut len = 0;
-
- for (key, value) in entries {
- assert!(seen_keys.insert(key), "duplicate key {key} in
from_entries");
-
- let (seg_idx, local_key) = Self::decode_key(key);
-
- while segment_entries.len() <= seg_idx {
- segment_entries.push(Vec::new());
- }
-
- segment_entries[seg_idx].push((local_key, value));
- len += 1;
- }
-
- let segments = segment_entries
- .into_iter()
- .map(|entries| Arc::new(entries.into_iter().collect::<Slab<T>>()))
- .collect();
-
- Self { segments, len }
- }
-
- /// Decodes global key into (segment_index, local_key).
- #[inline]
- const fn decode_key(key: usize) -> (usize, usize) {
- (key >> Self::SEGMENT_BITS, key & Self::SEGMENT_MASK)
- }
-
- /// Encodes segment and local indices into global key.
- #[inline]
- const fn encode_key(segment_idx: usize, local_key: usize) -> usize {
- (segment_idx << Self::SEGMENT_BITS) | local_key
- }
-
- #[inline]
- pub fn len(&self) -> usize {
- self.len
- }
-
- #[inline]
- pub fn is_empty(&self) -> bool {
- self.len == 0
- }
-
- /// Returns reference to value at key, or None if not present.
- #[inline]
- pub fn get(&self, key: usize) -> Option<&T> {
- let (seg_idx, local_key) = Self::decode_key(key);
- self.segments.get(seg_idx)?.get(local_key)
- }
-
- /// Returns true if key is present.
- #[inline]
- pub fn contains_key(&self, key: usize) -> bool {
- let (seg_idx, local_key) = Self::decode_key(key);
- self.segments
- .get(seg_idx)
- .is_some_and(|slab| slab.contains(local_key))
- }
-
- /// Returns iterator over (key, &value) pairs.
- pub fn iter(&self) -> impl Iterator<Item = (usize, &T)> + '_ {
- self.segments
- .iter()
- .enumerate()
- .flat_map(|(seg_idx, slab)| {
- slab.iter()
- .map(move |(local_key, value)| (Self::encode_key(seg_idx,
local_key), value))
- })
- }
-
- /// Returns iterator over all keys.
- pub fn keys(&self) -> impl Iterator<Item = usize> + '_ {
- self.iter().map(|(k, _)| k)
- }
-
- /// Returns iterator over all values.
- pub fn values(&self) -> impl Iterator<Item = &T> + '_ {
- self.iter().map(|(_, v)| v)
- }
-}
-
-impl<T: Clone, const SEGMENT_CAPACITY: usize> SegmentedSlab<T,
SEGMENT_CAPACITY> {
- /// Insert value, returning (new_slab, assigned_key).
- ///
- /// Slab assigns the key, reusing freed slots when available.
- /// Only the affected segment is cloned if shared.
- pub fn insert(mut self, value: T) -> (Self, usize) {
- let seg_idx = self
- .segments
- .iter()
- .position(|slab| slab.vacant_key() < SEGMENT_CAPACITY)
- .unwrap_or(self.segments.len());
-
- if seg_idx >= self.segments.len() {
- self.segments.push(Arc::new(Slab::new()));
- }
-
- let slab = Arc::make_mut(&mut self.segments[seg_idx]);
- let local_key = slab.insert(value);
- let global_key = Self::encode_key(seg_idx, local_key);
-
- self.len += 1;
- (self, global_key)
- }
-
- /// Update value at existing key, returning (new_slab, success).
- ///
- /// Returns (self, false) if key doesn't exist (no-op).
- pub fn update(mut self, key: usize, value: T) -> (Self, bool) {
- let (seg_idx, local_key) = Self::decode_key(key);
-
- let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
- return (self, false);
- };
-
- if !slab_arc.contains(local_key) {
- return (self, false);
- }
-
- let slab = Arc::make_mut(slab_arc);
- slab[local_key] = value;
- (self, true)
- }
-
- /// Remove entry at key, returning (new_slab, removed_value).
- ///
- /// Freed slot will be reused by future inserts.
- pub fn remove(mut self, key: usize) -> (Self, Option<T>) {
- let (seg_idx, local_key) = Self::decode_key(key);
-
- let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
- return (self, None);
- };
-
- if !slab_arc.contains(local_key) {
- return (self, None);
- }
-
- let slab = Arc::make_mut(slab_arc);
- let value = slab.remove(local_key);
- self.len -= 1;
-
- (self, Some(value))
- }
-
- /// Remove entry at key, returning new slab. Ignores removed value.
- #[inline]
- pub fn without(self, key: usize) -> Self {
- self.remove(key).0
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- const TEST_CAPACITY: usize = 1024;
- type TestSlab<T> = SegmentedSlab<T, TEST_CAPACITY>;
-
- #[test]
- fn test_key_encoding() {
- assert_eq!(TestSlab::<()>::decode_key(0), (0, 0));
- assert_eq!(TestSlab::<()>::decode_key(42), (0, 42));
- assert_eq!(TestSlab::<()>::decode_key(1024), (1, 0));
- assert_eq!(TestSlab::<()>::decode_key(1025), (1, 1));
- assert_eq!(TestSlab::<()>::decode_key(2048), (2, 0));
-
- assert_eq!(TestSlab::<()>::encode_key(0, 0), 0);
- assert_eq!(TestSlab::<()>::encode_key(0, 42), 42);
- assert_eq!(TestSlab::<()>::encode_key(1, 0), 1024);
- assert_eq!(TestSlab::<()>::encode_key(1, 1), 1025);
- }
-
- #[test]
- fn test_insert_assigns_sequential_keys() {
- let slab = TestSlab::new();
-
- let (slab, key0) = slab.insert("a");
- let (slab, key1) = slab.insert("b");
- let (slab, key2) = slab.insert("c");
-
- assert_eq!(key0, 0);
- assert_eq!(key1, 1);
- assert_eq!(key2, 2);
- assert_eq!(slab.len(), 3);
- }
-
- #[test]
- fn test_get() {
- let slab = TestSlab::new();
- let (slab, key) = slab.insert("hello");
-
- assert_eq!(slab.get(key), Some(&"hello"));
- assert_eq!(slab.get(999), None);
- }
-
- #[test]
- fn test_update() {
- let slab = TestSlab::new();
- let (slab, key) = slab.insert("original");
-
- let (slab, success) = slab.update(key, "updated");
- assert!(success);
- assert_eq!(slab.get(key), Some(&"updated"));
- assert_eq!(slab.len(), 1);
-
- // Update non-existent key returns false
- let (_, success) = slab.update(999, "nope");
- assert!(!success);
- }
-
- #[test]
- fn test_remove_and_reuse() {
- let slab = TestSlab::new();
- let (slab, key0) = slab.insert("a");
- let (slab, key1) = slab.insert("b");
- let (slab, key2) = slab.insert("c");
-
- assert_eq!(key0, 0);
- assert_eq!(key1, 1);
- assert_eq!(key2, 2);
-
- // Remove middle entry
- let (slab, removed) = slab.remove(key1);
- assert_eq!(removed, Some("b"));
- assert_eq!(slab.len(), 2);
- assert_eq!(slab.get(key1), None);
-
- // New insert reuses freed slot
- let (slab, key3) = slab.insert("d");
- assert_eq!(key3, 1); // Reused!
- assert_eq!(slab.get(key3), Some(&"d"));
- assert_eq!(slab.len(), 3);
- }
-
- #[test]
- fn test_structural_sharing() {
- let slab = TestSlab::new();
- let (slab, _) = slab.insert("value");
-
- // Clone shares segment via Arc
- let snapshot = slab.clone();
-
- // Modify original
- let (slab, key1) = slab.insert("new");
-
- // Snapshot still sees original state
- assert_eq!(snapshot.len(), 1);
- assert_eq!(snapshot.get(key1), None);
-
- // Modified slab has both
- assert_eq!(slab.len(), 2);
- }
-
- #[test]
- fn test_arc_make_mut_no_clone_when_unique() {
- let slab = TestSlab::new();
- let (slab, _) = slab.insert("a");
-
- let ptr_before = Arc::as_ptr(&slab.segments[0]);
- let (slab, _) = slab.insert("b");
- let ptr_after = Arc::as_ptr(&slab.segments[0]);
-
- // Same pointer - no clone occurred
- assert_eq!(ptr_before, ptr_after);
- }
-
- #[test]
- fn test_arc_make_mut_clones_when_shared() {
- let slab = TestSlab::new();
- let (slab, _) = slab.insert("a");
-
- let _snapshot = slab.clone(); // Creates shared reference
- let ptr_before = Arc::as_ptr(&slab.segments[0]);
-
- let (slab, _) = slab.insert("b");
- let ptr_after = Arc::as_ptr(&slab.segments[0]);
-
- // Different pointer - clone occurred
- assert_ne!(ptr_before, ptr_after);
- }
-
- #[test]
- fn test_cross_segment() {
- let mut slab = TestSlab::new();
-
- // Fill first segment
- for i in 0..TEST_CAPACITY {
- let (new_slab, key) = slab.insert(i);
- slab = new_slab;
- assert_eq!(key, i);
- }
-
- // Next insert should go to second segment
- let (slab, key) = slab.insert(TEST_CAPACITY);
- assert_eq!(key, TEST_CAPACITY);
- assert_eq!(slab.segments.len(), 2);
- }
-
- #[test]
- fn test_from_entries() {
- let entries = vec![(0, "zero"), (5, "five"), (1024, "segment2")];
-
- let slab = TestSlab::from_entries(entries);
-
- assert_eq!(slab.len(), 3);
- assert_eq!(slab.get(0), Some(&"zero"));
- assert_eq!(slab.get(5), Some(&"five"));
- assert_eq!(slab.get(1024), Some(&"segment2"));
- assert_eq!(slab.segments.len(), 2);
- }
-
- #[test]
- fn test_iter() {
- let slab = TestSlab::new();
- let (slab, _) = slab.insert("a");
- let (slab, _) = slab.insert("b");
- let (slab, _) = slab.insert("c");
-
- let items: Vec<_> = slab.iter().collect();
- assert_eq!(items, vec![(0, &"a"), (1, &"b"), (2, &"c")]);
- }
-
- #[test]
- fn test_keys_and_values() {
- let slab = TestSlab::new();
- let (slab, _) = slab.insert(10);
- let (slab, _) = slab.insert(20);
-
- let keys: Vec<_> = slab.keys().collect();
- assert_eq!(keys, vec![0, 1]);
-
- let values: Vec<_> = slab.values().collect();
- assert_eq!(values, vec![&10, &20]);
- }
-
- #[test]
- fn test_contains_key() {
- let slab = TestSlab::new();
- let (slab, key) = slab.insert("x");
-
- assert!(slab.contains_key(key));
- assert!(!slab.contains_key(999));
- assert!(!slab.contains_key(2048)); // Non-existent segment
- }
-
- #[test]
- fn test_without() {
- let slab = TestSlab::new();
- let (slab, key) = slab.insert("value");
-
- // without() removes and ignores result
- let slab = slab.without(key);
- assert_eq!(slab.get(key), None);
- assert_eq!(slab.len(), 0);
-
- // without() on non-existent key is no-op
- let slab = slab.without(999);
- assert_eq!(slab.len(), 0);
- }
-
- #[test]
- fn test_empty_slab() {
- let slab: TestSlab<i32> = TestSlab::new();
-
- assert!(slab.is_empty());
- assert_eq!(slab.len(), 0);
- assert_eq!(slab.get(0), None);
- assert!(!slab.contains_key(0));
- assert_eq!(slab.iter().count(), 0);
- }
-
- #[test]
- #[should_panic(expected = "duplicate key 0 in from_entries")]
- fn test_from_entries_panics_on_duplicate_keys() {
- let entries = vec![(0, "first"), (0, "second")];
- let _ = TestSlab::from_entries(entries);
- }
-}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 9eaf65531..9171c4097 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -17,7 +17,6 @@
mod alloc;
mod certificates;
-pub mod collections;
mod commands;
mod configs;
mod error;
diff --git a/core/common/src/types/personal_access_tokens/mod.rs
b/core/common/src/types/personal_access_tokens/mod.rs
index e28a585de..0e1e33391 100644
--- a/core/common/src/types/personal_access_tokens/mod.rs
+++ b/core/common/src/types/personal_access_tokens/mod.rs
@@ -28,8 +28,8 @@ const SIZE: usize = 50;
#[derive(Clone, Debug)]
pub struct PersonalAccessToken {
pub user_id: UserId,
- pub name: Arc<String>,
- pub token: Arc<String>,
+ pub name: Arc<str>,
+ pub token: Arc<str>,
pub expiry_at: Option<IggyTimestamp>,
}
@@ -49,8 +49,8 @@ impl PersonalAccessToken {
(
Self {
user_id,
- name: Arc::new(name.to_string()),
- token: Arc::new(token_hash),
+ name: Arc::from(name),
+ token: Arc::from(token_hash),
expiry_at: Self::calculate_expiry_at(now, expiry),
},
token,
@@ -65,8 +65,8 @@ impl PersonalAccessToken {
) -> Self {
Self {
user_id,
- name: Arc::new(name.into()),
- token: Arc::new(token_hash.into()),
+ name: Arc::from(name),
+ token: Arc::from(token_hash),
expiry_at,
}
}
@@ -106,12 +106,12 @@ mod tests {
let name = "test_token";
let (personal_access_token, raw_token) =
PersonalAccessToken::new(user_id, name, now,
IggyExpiry::NeverExpire);
- assert_eq!(personal_access_token.name.as_str(), name);
+ assert_eq!(&*personal_access_token.name, name);
assert!(!personal_access_token.token.is_empty());
assert!(!raw_token.is_empty());
- assert_ne!(personal_access_token.token.as_str(), raw_token);
+ assert_ne!(&*personal_access_token.token, raw_token);
assert_eq!(
- personal_access_token.token.as_str(),
+ &*personal_access_token.token,
PersonalAccessToken::hash_token(&raw_token)
);
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index ff38a114b..3e3bb5dc4 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -69,9 +69,9 @@ flume = { workspace = true }
futures = { workspace = true }
hash32 = "1.0.0"
human-repr = { workspace = true }
-
iggy_common = { workspace = true }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
+left-right = "0.11"
lending-iterator = "0.1.7"
mimalloc = { workspace = true, optional = true }
mime_guess = { version = "2.0", optional = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 216e7c93a..13690a813 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -212,7 +212,7 @@ pub fn load_users(state: impl IntoIterator<Item =
UserState>) -> Users {
.into_values()
.map(|token| {
(
- Arc::new(token.token_hash.clone()),
+ Arc::from(token.token_hash.as_str()),
PersonalAccessToken::raw(id, &token.name,
&token.token_hash, token.expiry_at),
)
})
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index afe243529..d7ec542f2 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -138,7 +138,7 @@ pub fn map_personal_access_tokens(
let mut personal_access_tokens_data =
Vec::with_capacity(personal_access_tokens.len());
for personal_access_token in personal_access_tokens {
let personal_access_token = PersonalAccessTokenInfo {
- name: personal_access_token.name.as_str().to_owned(),
+ name: (*personal_access_token.name).to_owned(),
expiry_at: personal_access_token.expiry_at,
};
personal_access_tokens_data.push(personal_access_token);
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 2919214b6..41669486f 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -37,6 +37,7 @@ pub mod diagnostics;
pub mod http;
pub mod io;
pub mod log;
+pub mod metadata;
pub mod quic;
pub mod server_error;
pub mod shard;
diff --git a/core/server/src/metadata/absorb.rs
b/core/server/src/metadata/absorb.rs
new file mode 100644
index 000000000..d36186a64
--- /dev/null
+++ b/core/server/src/metadata/absorb.rs
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::ConsumerGroupMemberMeta;
+use crate::metadata::inner::InnerMetadata;
+use crate::metadata::ops::MetadataOp;
+use left_right::Absorb;
+use std::sync::atomic::Ordering;
+
+impl Absorb<MetadataOp> for InnerMetadata {
+ fn absorb_first(&mut self, op: &mut MetadataOp, _other: &Self) {
+ apply_op(self, op, true);
+ }
+
+ fn absorb_second(&mut self, op: MetadataOp, _other: &Self) {
+ apply_op(self, &op, false);
+ }
+
+ fn sync_with(&mut self, first: &Self) {
+ *self = first.clone();
+ }
+}
+
+fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, populate_ids: bool)
{
+ match op {
+ MetadataOp::Initialize(initial) => {
+ *metadata = (**initial).clone();
+ }
+
+ MetadataOp::AddStream { meta, assigned_id } => {
+ let entry = metadata.streams.vacant_entry();
+ let id = entry.key();
+ if populate_ids {
+ assigned_id.store(id, Ordering::Release);
+ }
+ let mut meta = meta.clone();
+ meta.id = id;
+ let name = meta.name.clone();
+ entry.insert(meta);
+ metadata.stream_index.insert(name, id);
+ }
+
+ MetadataOp::UpdateStream { id, new_name } => {
+ if let Some(stream) = metadata.streams.get_mut(*id) {
+ let old_name = stream.name.clone();
+ stream.name = new_name.clone();
+ metadata.stream_index.remove(&old_name);
+ metadata.stream_index.insert(new_name.clone(), *id);
+ }
+ }
+
+ MetadataOp::DeleteStream { id } => {
+ if metadata.streams.contains(*id) {
+ let stream = metadata.streams.remove(*id);
+ metadata.stream_index.remove(&stream.name);
+ }
+ }
+
+ MetadataOp::AddTopic {
+ stream_id,
+ meta,
+ assigned_id,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id) {
+ let entry = stream.topics.vacant_entry();
+ let id = entry.key();
+ if populate_ids {
+ assigned_id.store(id, Ordering::Release);
+ }
+ let mut meta = meta.clone();
+ meta.id = id;
+ let name = meta.name.clone();
+ entry.insert(meta);
+ stream.topic_index.insert(name, id);
+ }
+ }
+
+ MetadataOp::UpdateTopic {
+ stream_id,
+ topic_id,
+ new_name,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ {
+ let old_name = topic.name.clone();
+
+ topic.name = new_name.clone();
+ topic.message_expiry = *message_expiry;
+ topic.compression_algorithm = *compression_algorithm;
+ topic.max_topic_size = *max_topic_size;
+ topic.replication_factor = *replication_factor;
+
+ if old_name != *new_name {
+ stream.topic_index.remove(&old_name);
+ stream.topic_index.insert(new_name.clone(), *topic_id);
+ }
+ }
+ }
+
+ MetadataOp::DeleteTopic {
+ stream_id,
+ topic_id,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && stream.topics.contains(*topic_id)
+ {
+ let topic = stream.topics.remove(*topic_id);
+ stream.topic_index.remove(&topic.name);
+ }
+ }
+
+ MetadataOp::AddPartitions {
+ stream_id,
+ topic_id,
+ partitions,
+ revision_id,
+ } => {
+ if partitions.is_empty() {
+ return;
+ }
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ {
+ for meta in partitions {
+ let mut meta = meta.clone();
+ meta.id = topic.partitions.len();
+ meta.revision_id = *revision_id;
+ topic.partitions.push(meta);
+ }
+ }
+ }
+
+ MetadataOp::DeletePartitions {
+ stream_id,
+ topic_id,
+ count,
+ } => {
+ if *count == 0 {
+ return;
+ }
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ {
+ let new_len = topic.partitions.len().saturating_sub(*count as
usize);
+ topic.partitions.truncate(new_len);
+ }
+ }
+
+ MetadataOp::SetPartitionOffsets {
+ stream_id,
+ topic_id,
+ partition_id,
+ consumer_offsets,
+ consumer_group_offsets,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ && let Some(partition) =
topic.partitions.get_mut(*partition_id)
+ {
+ partition.consumer_offsets = Some(consumer_offsets.clone());
+ partition.consumer_group_offsets =
Some(consumer_group_offsets.clone());
+ }
+ }
+
+ MetadataOp::AddUser { meta, assigned_id } => {
+ let entry = metadata.users.vacant_entry();
+ let id = entry.key();
+ if populate_ids {
+ assigned_id.store(id, Ordering::Release);
+ }
+ let mut meta = meta.clone();
+ meta.id = id as u32;
+ let username = meta.username.clone();
+ entry.insert(meta);
+ metadata.user_index.insert(username, id as u32);
+ }
+
+ MetadataOp::UpdateUserMeta { id, meta } => {
+ let user_id = *id as usize;
+ if let Some(old_user) = metadata.users.get(user_id)
+ && old_user.username != meta.username
+ {
+ metadata.user_index.remove(&old_user.username);
+ metadata.user_index.insert(meta.username.clone(), *id);
+ }
+ if metadata.users.contains(user_id) {
+ metadata.users[user_id] = meta.clone();
+ }
+ }
+
+ MetadataOp::DeleteUser { id } => {
+ let user_id = *id as usize;
+ if metadata.users.contains(user_id) {
+ let user = metadata.users.remove(user_id);
+ metadata.user_index.remove(&user.username);
+ }
+ metadata.personal_access_tokens.remove(id);
+ }
+
+ MetadataOp::AddPersonalAccessToken { user_id, pat } => {
+ metadata
+ .personal_access_tokens
+ .entry(*user_id)
+ .or_default()
+ .insert(pat.token.clone(), pat.clone());
+ }
+
+ MetadataOp::DeletePersonalAccessToken {
+ user_id,
+ token_hash,
+ } => {
+ if let Some(user_pats) =
metadata.personal_access_tokens.get_mut(user_id) {
+ user_pats.remove(token_hash);
+ }
+ }
+
+ MetadataOp::AddConsumerGroup {
+ stream_id,
+ topic_id,
+ meta,
+ assigned_id,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ {
+ let entry = topic.consumer_groups.vacant_entry();
+ let id = entry.key();
+ if populate_ids {
+ assigned_id.store(id, Ordering::Release);
+ }
+ let mut meta = meta.clone();
+ meta.id = id;
+ let name = meta.name.clone();
+ entry.insert(meta);
+ topic.consumer_group_index.insert(name, id);
+ }
+ }
+
+ MetadataOp::DeleteConsumerGroup {
+ stream_id,
+ topic_id,
+ group_id,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ && topic.consumer_groups.contains(*group_id)
+ {
+ let group = topic.consumer_groups.remove(*group_id);
+ topic.consumer_group_index.remove(&group.name);
+ }
+ }
+
+ MetadataOp::JoinConsumerGroup {
+ stream_id,
+ topic_id,
+ group_id,
+ client_id,
+ member_id,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ && let Some(group) = topic.consumer_groups.get_mut(*group_id)
+ {
+ let next_id = group
+ .members
+ .iter()
+ .map(|(_, m)| m.id)
+ .max()
+ .map(|m| m + 1)
+ .unwrap_or(0);
+
+ if populate_ids {
+ member_id.store(next_id, Ordering::Release);
+ }
+
+ let new_member = ConsumerGroupMemberMeta::new(next_id,
*client_id);
+ group.members.insert(new_member);
+ group.rebalance_members();
+ }
+ }
+
+ MetadataOp::LeaveConsumerGroup {
+ stream_id,
+ topic_id,
+ group_id,
+ client_id,
+ removed_member_id,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ && let Some(group) = topic.consumer_groups.get_mut(*group_id)
+ {
+ let member_to_remove: Option<usize> = group
+ .members
+ .iter()
+ .find(|(_, m)| m.client_id == *client_id)
+ .map(|(id, _)| id);
+
+ if let Some(mid) = member_to_remove {
+ if populate_ids {
+ removed_member_id.store(mid, Ordering::Release);
+ }
+ group.members.remove(mid);
+ group.rebalance_members();
+ }
+ }
+ }
+
+ MetadataOp::RebalanceConsumerGroupsForTopic {
+ stream_id,
+ topic_id,
+ partitions_count,
+ } => {
+ if let Some(stream) = metadata.streams.get_mut(*stream_id)
+ && let Some(topic) = stream.topics.get_mut(*topic_id)
+ {
+ let partition_ids: Vec<usize> = (0..*partitions_count as
usize).collect();
+ let group_ids: Vec<_> = topic.consumer_groups.iter().map(|(id,
_)| id).collect();
+
+ for gid in group_ids {
+ if let Some(group) = topic.consumer_groups.get_mut(gid) {
+ group.partitions = partition_ids.clone();
+ group.rebalance_members();
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/core/server/src/metadata/consumer_group.rs
b/core/server/src/metadata/consumer_group.rs
new file mode 100644
index 000000000..d44dd5ecb
--- /dev/null
+++ b/core/server/src/metadata/consumer_group.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::consumer_group_member::ConsumerGroupMemberMeta;
+use crate::metadata::{ConsumerGroupId, PartitionId};
+use slab::Slab;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct ConsumerGroupMeta {
+ pub id: ConsumerGroupId,
+ pub name: Arc<str>,
+ pub partitions: Vec<PartitionId>,
+ pub members: Slab<ConsumerGroupMemberMeta>,
+}
+
+impl ConsumerGroupMeta {
+ /// Rebalance partition assignments among members (round-robin).
+ pub fn rebalance_members(&mut self) {
+ let partition_count = self.partitions.len();
+ let member_count = self.members.len();
+
+ if member_count == 0 || partition_count == 0 {
+ return;
+ }
+
+ // Clear all member partitions
+ let member_ids: Vec<usize> = self.members.iter().map(|(id, _)|
id).collect();
+ for &member_id in &member_ids {
+ if let Some(member) = self.members.get_mut(member_id) {
+ member.partitions.clear();
+ }
+ }
+
+ // Rebuild assignments (round-robin)
+ for (i, &partition_id) in self.partitions.iter().enumerate() {
+ let member_idx = i % member_count;
+ if let Some(&member_id) = member_ids.get(member_idx)
+ && let Some(member) = self.members.get_mut(member_id)
+ {
+ member.partitions.push(partition_id);
+ }
+ }
+ }
+}
diff --git a/core/common/src/collections/mod.rs
b/core/server/src/metadata/consumer_group_member.rs
similarity index 56%
copy from core/common/src/collections/mod.rs
copy to core/server/src/metadata/consumer_group_member.rs
index ce21f8584..8543351cb 100644
--- a/core/common/src/collections/mod.rs
+++ b/core/server/src/metadata/consumer_group_member.rs
@@ -15,6 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-mod segmented_slab;
+use crate::metadata::{ClientId, ConsumerGroupMemberId, PartitionId};
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
-pub use segmented_slab::SegmentedSlab;
+#[derive(Clone, Debug)]
+pub struct ConsumerGroupMemberMeta {
+ pub id: ConsumerGroupMemberId,
+ pub client_id: ClientId,
+ pub partitions: Vec<PartitionId>,
+ pub partition_index: Arc<AtomicUsize>,
+}
+
+impl ConsumerGroupMemberMeta {
+ pub fn new(id: ConsumerGroupMemberId, client_id: ClientId) -> Self {
+ Self {
+ id,
+ client_id,
+ partitions: Vec::new(),
+ partition_index: Arc::new(AtomicUsize::new(0)),
+ }
+ }
+}
diff --git a/core/common/src/collections/mod.rs
b/core/server/src/metadata/inner.rs
similarity index 52%
copy from core/common/src/collections/mod.rs
copy to core/server/src/metadata/inner.rs
index ce21f8584..f826d3d8e 100644
--- a/core/common/src/collections/mod.rs
+++ b/core/server/src/metadata/inner.rs
@@ -15,6 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-mod segmented_slab;
+use crate::metadata::{StreamId, StreamMeta, UserId, UserMeta};
+use ahash::AHashMap;
+use iggy_common::PersonalAccessToken;
+use slab::Slab;
+use std::sync::Arc;
-pub use segmented_slab::SegmentedSlab;
+#[derive(Clone, Default)]
+pub struct InnerMetadata {
+ /// Streams indexed by StreamId (slab-assigned)
+ pub streams: Slab<StreamMeta>,
+
+ /// Users indexed by UserId (slab-assigned)
+ pub users: Slab<UserMeta>,
+
+ /// Forward indexes (name → ID)
+ pub stream_index: AHashMap<Arc<str>, StreamId>,
+ pub user_index: AHashMap<Arc<str>, UserId>,
+
+ /// user_id -> (token_hash -> PAT)
+ pub personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>,
PersonalAccessToken>>,
+}
+
+impl InnerMetadata {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs
new file mode 100644
index 000000000..12804095b
--- /dev/null
+++ b/core/server/src/metadata/mod.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared metadata module providing a single source of truth for all shards.
+//!
+//! This module provides an `ArcSwap`-based approach where all shards read from
+//! a shared snapshot, and only shard 0 can write (swap in new snapshots).
+//!
+//! # Architecture
+//!
+//! - `InnerMetadata` (inner.rs): Immutable snapshot with all metadata
+//! - `Metadata` (reader.rs): Thread-safe read handle for querying metadata
+//! - Entity types: `StreamMeta`, `TopicMeta`, `PartitionMeta`, `UserMeta`,
`ConsumerGroupMeta`
+//! - Consumer offsets are stored in `PartitionMeta` for cross-shard visibility
+
+mod absorb;
+mod consumer_group;
+mod consumer_group_member;
+mod inner;
+pub mod ops;
+mod partition;
+mod reader;
+mod stream;
+mod topic;
+mod user;
+mod writer;
+
+pub use consumer_group::ConsumerGroupMeta;
+pub use consumer_group_member::ConsumerGroupMemberMeta;
+pub use inner::InnerMetadata;
+pub use ops::MetadataOp;
+pub use partition::PartitionMeta;
+pub use reader::Metadata;
+pub use stream::StreamMeta;
+pub use topic::TopicMeta;
+pub use user::UserMeta;
+pub use writer::MetadataWriter;
+
+pub type MetadataReadHandle = left_right::ReadHandle<InnerMetadata>;
+pub type StreamId = usize;
+pub type TopicId = usize;
+pub type PartitionId = usize;
+pub type UserId = u32;
+pub type ClientId = u32;
+pub type ConsumerGroupId = usize;
+pub type ConsumerGroupMemberId = usize;
+pub type ConsumerGroupKey = (StreamId, TopicId, ConsumerGroupId);
+
+pub fn create_metadata_handles() -> (MetadataWriter, MetadataReadHandle) {
+ let (write_handle, read_handle) = left_right::new::<InnerMetadata,
MetadataOp>();
+ let mut writer = MetadataWriter::new(write_handle);
+ writer.publish();
+ (writer, read_handle)
+}
diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs
new file mode 100644
index 000000000..4d21c0d90
--- /dev/null
+++ b/core/server/src/metadata/ops.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::inner::InnerMetadata;
+use crate::metadata::{
+ ConsumerGroupId, ConsumerGroupMeta, PartitionId, PartitionMeta, StreamId,
StreamMeta, TopicId,
+ TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets,
ConsumerOffsets};
+use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize,
PersonalAccessToken};
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+
+#[derive(Clone)]
+pub enum MetadataOp {
+ Initialize(Box<InnerMetadata>),
+
+ AddStream {
+ meta: StreamMeta,
+ assigned_id: Arc<AtomicUsize>,
+ },
+ UpdateStream {
+ id: StreamId,
+ new_name: Arc<str>,
+ },
+ DeleteStream {
+ id: StreamId,
+ },
+ AddTopic {
+ stream_id: StreamId,
+ meta: TopicMeta,
+ assigned_id: Arc<AtomicUsize>,
+ },
+ UpdateTopic {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ new_name: Arc<str>,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: u8,
+ },
+ DeleteTopic {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ },
+ AddPartitions {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partitions: Vec<PartitionMeta>,
+ revision_id: u64,
+ },
+ DeletePartitions {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ count: u32,
+ },
+ SetPartitionOffsets {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ consumer_offsets: Arc<ConsumerOffsets>,
+ consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+ },
+ AddUser {
+ meta: UserMeta,
+ assigned_id: Arc<AtomicUsize>,
+ },
+ UpdateUserMeta {
+ id: UserId,
+ meta: UserMeta,
+ },
+ DeleteUser {
+ id: UserId,
+ },
+
+ AddPersonalAccessToken {
+ user_id: UserId,
+ pat: PersonalAccessToken,
+ },
+ DeletePersonalAccessToken {
+ user_id: UserId,
+ token_hash: Arc<str>,
+ },
+ AddConsumerGroup {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ meta: ConsumerGroupMeta,
+ assigned_id: Arc<AtomicUsize>,
+ },
+ DeleteConsumerGroup {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ },
+ JoinConsumerGroup {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ client_id: u32,
+ member_id: Arc<AtomicUsize>,
+ },
+ LeaveConsumerGroup {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ client_id: u32,
+ removed_member_id: Arc<AtomicUsize>,
+ },
+ RebalanceConsumerGroupsForTopic {
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partitions_count: u32,
+ },
+}
diff --git a/core/server/src/metadata/partition.rs
b/core/server/src/metadata/partition.rs
new file mode 100644
index 000000000..b2490b96a
--- /dev/null
+++ b/core/server/src/metadata/partition.rs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::PartitionId;
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets,
ConsumerOffsets};
+use crate::streaming::stats::PartitionStats;
+use iggy_common::IggyTimestamp;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct PartitionMeta {
+ pub id: PartitionId,
+ pub created_at: IggyTimestamp,
+ /// Monotonically increasing version to detect stale partition_store
entries.
+ /// Set to the Metadata version when the partition was created.
+ pub revision_id: u64,
+ pub stats: Arc<PartitionStats>,
+ /// Consumer offsets for this partition // TODO: move to partition store
+ pub consumer_offsets: Option<Arc<ConsumerOffsets>>,
+ /// Consumer group offsets for this partition // TODO: move to partition
store
+ pub consumer_group_offsets: Option<Arc<ConsumerGroupOffsets>>,
+}
diff --git a/core/server/src/metadata/reader.rs
b/core/server/src/metadata/reader.rs
new file mode 100644
index 000000000..ea85511a6
--- /dev/null
+++ b/core/server/src/metadata/reader.rs
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{
+ ConsumerGroupId, ConsumerGroupMeta, InnerMetadata, MetadataReadHandle,
PartitionId,
+ PartitionMeta, StreamId, StreamMeta, TopicId, TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets,
ConsumerOffsets};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IdKind, Identifier, PersonalAccessToken};
+use left_right::ReadGuard;
+use std::sync::Arc;
+use std::sync::atomic::Ordering;
+
+/// Thread-safe wrapper for GlobalMetadata using left-right for lock-free
reads.
+/// Uses hierarchical structure: streams contain topics, topics contain
partitions and consumer groups.
+/// All mutations go through MetadataWriter (shard 0 only).
+///
+/// Each shard should own its own `Metadata` instance (cloned from a common
source).
+/// The underlying data is shared via left-right's internal mechanism.
+#[derive(Clone)]
+pub struct Metadata {
+ inner: MetadataReadHandle,
+}
+
+impl Metadata {
+ pub fn new(reader: MetadataReadHandle) -> Self {
+ Self { inner: reader }
+ }
+
+ #[inline]
+ pub fn load(&self) -> ReadGuard<'_, InnerMetadata> {
+ self.inner
+ .enter()
+ .expect("metadata not initialized - writer must publish before
reads")
+ }
+
+ pub fn get_stream_id(&self, identifier: &Identifier) -> Option<StreamId> {
+ let metadata = self.load();
+ match identifier.kind {
+ IdKind::Numeric => {
+ let stream_id = identifier.get_u32_value().ok()? as StreamId;
+ if metadata.streams.get(stream_id).is_some() {
+ Some(stream_id)
+ } else {
+ None
+ }
+ }
+ IdKind::String => {
+ let name = identifier.get_cow_str_value().ok()?;
+ metadata.stream_index.get(name.as_ref()).copied()
+ }
+ }
+ }
+
+ pub fn stream_name_exists(&self, name: &str) -> bool {
+ self.load().stream_index.contains_key(name)
+ }
+
+ pub fn get_topic_id(&self, stream_id: StreamId, identifier: &Identifier)
-> Option<TopicId> {
+ let metadata = self.load();
+ let stream = metadata.streams.get(stream_id)?;
+
+ match identifier.kind {
+ IdKind::Numeric => {
+ let topic_id = identifier.get_u32_value().ok()? as TopicId;
+ if stream.topics.get(topic_id).is_some() {
+ Some(topic_id)
+ } else {
+ None
+ }
+ }
+ IdKind::String => {
+ let name = identifier.get_cow_str_value().ok()?;
+ stream.topic_index.get(&Arc::from(name.as_ref())).copied()
+ }
+ }
+ }
+
+ pub fn get_user_id(&self, identifier: &Identifier) -> Option<UserId> {
+ let metadata = self.load();
+ match identifier.kind {
+ IdKind::Numeric => Some(identifier.get_u32_value().ok()? as
UserId),
+ IdKind::String => {
+ let name = identifier.get_cow_str_value().ok()?;
+ metadata.user_index.get(name.as_ref()).copied()
+ }
+ }
+ }
+
+ pub fn get_consumer_group_id(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ identifier: &Identifier,
+ ) -> Option<ConsumerGroupId> {
+ let metadata = self.load();
+ let stream = metadata.streams.get(stream_id)?;
+ let topic = stream.topics.get(topic_id)?;
+
+ match identifier.kind {
+ IdKind::Numeric => {
+ let group_id = identifier.get_u32_value().ok()? as
ConsumerGroupId;
+ if topic.consumer_groups.get(group_id).is_some() {
+ Some(group_id)
+ } else {
+ None
+ }
+ }
+ IdKind::String => {
+ let name = identifier.get_cow_str_value().ok()?;
+ topic
+ .consumer_group_index
+ .get(&Arc::from(name.as_ref()))
+ .copied()
+ }
+ }
+ }
+
+ pub fn stream_exists(&self, id: StreamId) -> bool {
+ self.load().streams.get(id).is_some()
+ }
+
+ pub fn topic_exists(&self, stream_id: StreamId, topic_id: TopicId) -> bool
{
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .is_some()
+ }
+
+ pub fn partition_exists(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ ) -> bool {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.partitions.get(partition_id))
+ .is_some()
+ }
+
+ pub fn user_exists(&self, id: UserId) -> bool {
+ self.load().users.get(id as usize).is_some()
+ }
+
+ pub fn consumer_group_exists(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ ) -> bool {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.consumer_groups.get(group_id))
+ .is_some()
+ }
+
+ pub fn consumer_group_exists_by_name(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ name: &str,
+ ) -> bool {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .map(|t| t.consumer_group_index.contains_key(name))
+ .unwrap_or(false)
+ }
+
+ pub fn streams_count(&self) -> usize {
+ self.load().streams.len()
+ }
+
+ pub fn next_stream_id(&self) -> usize {
+ self.load().streams.vacant_key()
+ }
+
+ pub fn topics_count(&self, stream_id: StreamId) -> usize {
+ self.load()
+ .streams
+ .get(stream_id)
+ .map(|s| s.topics.len())
+ .unwrap_or(0)
+ }
+
+ pub fn next_topic_id(&self, stream_id: StreamId) -> Option<usize> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .map(|s| s.topics.vacant_key())
+ }
+
+ pub fn partitions_count(&self, stream_id: StreamId, topic_id: TopicId) ->
usize {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .map(|t| t.partitions.len())
+ .unwrap_or(0)
+ }
+
+ pub fn get_partitions_count(&self, stream_id: StreamId, topic_id: TopicId)
-> Option<usize> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .map(|t| t.partitions.len())
+ }
+
+ pub fn get_next_partition_id(&self, stream_id: StreamId, topic_id:
TopicId) -> Option<usize> {
+ let metadata = self.load();
+ let topic = metadata.streams.get(stream_id)?.topics.get(topic_id)?;
+ let partitions_count = topic.partitions.len();
+
+ if partitions_count == 0 {
+ return None;
+ }
+
+ let counter = &topic.round_robin_counter;
+ let mut partition_id = counter.fetch_add(1, Ordering::AcqRel);
+ if partition_id >= partitions_count {
+ partition_id %= partitions_count;
+ counter.store(partition_id + 1, Ordering::Relaxed);
+ }
+ Some(partition_id)
+ }
+
+ pub fn get_next_member_partition_id(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ member_id: usize,
+ calculate: bool,
+ ) -> Option<PartitionId> {
+ let metadata = self.load();
+ let member = metadata
+ .streams
+ .get(stream_id)?
+ .topics
+ .get(topic_id)?
+ .consumer_groups
+ .get(group_id)?
+ .members
+ .get(member_id)?;
+
+ let assigned_partitions = &member.partitions;
+ if assigned_partitions.is_empty() {
+ return None;
+ }
+
+ let partitions_count = assigned_partitions.len();
+ let counter = &member.partition_index;
+
+ if calculate {
+ let current = counter
+ .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
+ Some((current + 1) % partitions_count)
+ })
+ .unwrap();
+ Some(assigned_partitions[current % partitions_count])
+ } else {
+ let current = counter.load(Ordering::Relaxed);
+ Some(assigned_partitions[current % partitions_count])
+ }
+ }
+
+ pub fn users_count(&self) -> usize {
+ self.load().users.len()
+ }
+
+ pub fn username_exists(&self, username: &str) -> bool {
+ self.load().user_index.contains_key(username)
+ }
+
+ pub fn consumer_groups_count(&self, stream_id: StreamId, topic_id:
TopicId) -> usize {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .map(|t| t.consumer_groups.len())
+ .unwrap_or(0)
+ }
+
+ pub fn get_stream_stats(&self, id: StreamId) -> Option<Arc<StreamStats>> {
+ self.load().streams.get(id).map(|s| s.stats.clone())
+ }
+
+ pub fn get_topic_stats(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ ) -> Option<Arc<TopicStats>> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .map(|t| t.stats.clone())
+ }
+
+ pub fn get_partition_stats(&self, ns: &IggyNamespace) ->
Option<Arc<PartitionStats>> {
+ self.load()
+ .streams
+ .get(ns.stream_id())
+ .and_then(|s| s.topics.get(ns.topic_id()))
+ .and_then(|t| t.partitions.get(ns.partition_id()))
+ .map(|p| p.stats.clone())
+ }
+
+ pub fn get_partition_stats_by_ids(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ ) -> Option<Arc<PartitionStats>> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.partitions.get(partition_id))
+ .map(|p| p.stats.clone())
+ }
+
+ pub fn get_partition_consumer_offsets(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ ) -> Option<Arc<ConsumerOffsets>> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.partitions.get(partition_id))
+ .and_then(|p| p.consumer_offsets.clone())
+ }
+
+ pub fn get_partition_consumer_group_offsets(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ ) -> Option<Arc<ConsumerGroupOffsets>> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.partitions.get(partition_id))
+ .and_then(|p| p.consumer_group_offsets.clone())
+ }
+
+ pub fn get_user(&self, id: UserId) -> Option<UserMeta> {
+ self.load().users.get(id as usize).cloned()
+ }
+
+ pub fn get_all_users(&self) -> Vec<UserMeta> {
+ self.load().users.iter().map(|(_, u)| u.clone()).collect()
+ }
+
+ pub fn get_stream(&self, id: StreamId) -> Option<StreamMeta> {
+ self.load().streams.get(id).cloned()
+ }
+
+ pub fn get_topic(&self, stream_id: StreamId, topic_id: TopicId) ->
Option<TopicMeta> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id).cloned())
+ }
+
+ pub fn get_partition(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ ) -> Option<PartitionMeta> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.partitions.get(partition_id).cloned())
+ }
+
+ pub fn get_consumer_group(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ ) -> Option<ConsumerGroupMeta> {
+ self.load()
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.consumer_groups.get(group_id).cloned())
+ }
+
+ pub fn get_user_personal_access_tokens(&self, user_id: UserId) ->
Vec<PersonalAccessToken> {
+ self.load()
+ .personal_access_tokens
+ .get(&user_id)
+ .map(|pats| pats.values().cloned().collect())
+ .unwrap_or_default()
+ }
+
+ pub fn get_personal_access_token_by_hash(
+ &self,
+ token_hash: &str,
+ ) -> Option<PersonalAccessToken> {
+ let token_hash_arc: Arc<str> = Arc::from(token_hash);
+ let metadata = self.load();
+ for (_, user_pats) in metadata.personal_access_tokens.iter() {
+ if let Some(pat) = user_pats.get(&token_hash_arc) {
+ return Some(pat.clone());
+ }
+ }
+ None
+ }
+
+ pub fn user_pat_count(&self, user_id: UserId) -> usize {
+ self.load()
+ .personal_access_tokens
+ .get(&user_id)
+ .map(|pats| pats.len())
+ .unwrap_or(0)
+ }
+
+ pub fn user_has_pat_with_name(&self, user_id: UserId, name: &str) -> bool {
+ self.load()
+ .personal_access_tokens
+ .get(&user_id)
+ .map(|pats| pats.values().any(|pat| &*pat.name == name))
+ .unwrap_or(false)
+ }
+
+ pub fn find_pat_token_hash_by_name(&self, user_id: UserId, name: &str) ->
Option<Arc<str>> {
+ self.load()
+ .personal_access_tokens
+ .get(&user_id)
+ .and_then(|pats| {
+ pats.iter()
+ .find(|(_, pat)| &*pat.name == name)
+ .map(|(hash, _)| hash.clone())
+ })
+ }
+
+ pub fn is_consumer_group_member(
+ &self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ client_id: u32,
+ ) -> bool {
+ let metadata = self.load();
+ metadata
+ .streams
+ .get(stream_id)
+ .and_then(|s| s.topics.get(topic_id))
+ .and_then(|t| t.consumer_groups.get(group_id))
+ .map(|g| g.members.iter().any(|(_, m)| m.client_id == client_id))
+ .unwrap_or(false)
+ }
+}
diff --git a/core/server/src/metadata/stream.rs
b/core/server/src/metadata/stream.rs
new file mode 100644
index 000000000..02aee459d
--- /dev/null
+++ b/core/server/src/metadata/stream.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::topic::TopicMeta;
+use crate::metadata::{StreamId, TopicId};
+use crate::streaming::stats::StreamStats;
+use ahash::AHashMap;
+use iggy_common::IggyTimestamp;
+use slab::Slab;
+use std::sync::Arc;
+
+/// Stream metadata stored in the shared snapshot.
+#[derive(Clone, Debug)]
+pub struct StreamMeta {
+ pub id: StreamId,
+ pub name: Arc<str>,
+ pub created_at: IggyTimestamp,
+ pub stats: Arc<StreamStats>,
+ pub topics: Slab<TopicMeta>,
+ pub topic_index: AHashMap<Arc<str>, TopicId>,
+}
+
+impl StreamMeta {
+ pub fn new(id: StreamId, name: Arc<str>, created_at: IggyTimestamp) ->
Self {
+ Self {
+ id,
+ name,
+ created_at,
+ stats: Arc::new(StreamStats::default()),
+ topics: Slab::new(),
+ topic_index: AHashMap::default(),
+ }
+ }
+
+ pub fn with_stats(
+ id: StreamId,
+ name: Arc<str>,
+ created_at: IggyTimestamp,
+ stats: Arc<StreamStats>,
+ ) -> Self {
+ Self {
+ id,
+ name,
+ created_at,
+ stats,
+ topics: Slab::new(),
+ topic_index: AHashMap::default(),
+ }
+ }
+}
diff --git a/core/server/src/metadata/topic.rs
b/core/server/src/metadata/topic.rs
new file mode 100644
index 000000000..ba0a88b3d
--- /dev/null
+++ b/core/server/src/metadata/topic.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::consumer_group::ConsumerGroupMeta;
+use crate::metadata::partition::PartitionMeta;
+use crate::metadata::{ConsumerGroupId, TopicId};
+use crate::streaming::stats::TopicStats;
+use ahash::AHashMap;
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp,
MaxTopicSize};
+use slab::Slab;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+
+/// Topic metadata stored in the shared snapshot.
+#[derive(Clone, Debug)]
+pub struct TopicMeta {
+ pub id: TopicId,
+ pub name: Arc<str>,
+ pub created_at: IggyTimestamp,
+ pub message_expiry: IggyExpiry,
+ pub compression_algorithm: CompressionAlgorithm,
+ pub max_topic_size: MaxTopicSize,
+ pub replication_factor: u8,
+ pub stats: Arc<TopicStats>,
+ pub partitions: Vec<PartitionMeta>,
+ pub consumer_groups: Slab<ConsumerGroupMeta>,
+ pub consumer_group_index: AHashMap<Arc<str>, ConsumerGroupId>,
+ pub round_robin_counter: Arc<AtomicUsize>,
+}
+
+impl TopicMeta {
+ #[allow(clippy::too_many_arguments)]
+ pub fn with_stats(
+ id: TopicId,
+ name: Arc<str>,
+ created_at: IggyTimestamp,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: u8,
+ stats: Arc<TopicStats>,
+ ) -> Self {
+ Self {
+ id,
+ name,
+ created_at,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ stats,
+ partitions: Vec::new(),
+ consumer_groups: Slab::new(),
+ consumer_group_index: AHashMap::default(),
+ round_robin_counter: Arc::new(AtomicUsize::new(0)),
+ }
+ }
+}
diff --git a/core/common/src/collections/mod.rs
b/core/server/src/metadata/user.rs
similarity index 69%
rename from core/common/src/collections/mod.rs
rename to core/server/src/metadata/user.rs
index ce21f8584..c82d49a44 100644
--- a/core/common/src/collections/mod.rs
+++ b/core/server/src/metadata/user.rs
@@ -15,6 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-mod segmented_slab;
+use crate::metadata::UserId;
+use iggy_common::{IggyTimestamp, Permissions, UserStatus};
+use std::sync::Arc;
-pub use segmented_slab::SegmentedSlab;
+#[derive(Clone, Debug)]
+pub struct UserMeta {
+ pub id: UserId,
+ pub username: Arc<str>,
+ pub password_hash: Arc<str>,
+ pub status: UserStatus,
+ pub permissions: Option<Arc<Permissions>>,
+ pub created_at: IggyTimestamp,
+}
diff --git a/core/server/src/metadata/writer.rs
b/core/server/src/metadata/writer.rs
new file mode 100644
index 000000000..daa1142bf
--- /dev/null
+++ b/core/server/src/metadata/writer.rs
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::inner::InnerMetadata;
+use crate::metadata::ops::MetadataOp;
+use crate::metadata::reader::Metadata;
+use crate::metadata::{
+ ConsumerGroupId, ConsumerGroupMeta, PartitionId, PartitionMeta, StreamId,
StreamMeta, TopicId,
+ TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets,
ConsumerOffsets};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+use iggy_common::{
+ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp,
MaxTopicSize,
+ Permissions, PersonalAccessToken, UserStatus,
+};
+use left_right::WriteHandle;
+use slab::Slab;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+pub struct MetadataWriter {
+ inner: WriteHandle<InnerMetadata, MetadataOp>,
+ revision: u64,
+}
+
+impl MetadataWriter {
+ pub fn new(handle: WriteHandle<InnerMetadata, MetadataOp>) -> Self {
+ Self {
+ inner: handle,
+ revision: 0,
+ }
+ }
+
+ fn next_revision(&mut self) -> u64 {
+ self.revision += 1;
+ self.revision
+ }
+
+ pub fn append(&mut self, op: MetadataOp) {
+ self.inner.append(op);
+ }
+
+ pub fn publish(&mut self) {
+ self.inner.publish();
+ }
+
+ pub fn initialize(&mut self, initial: InnerMetadata) {
+ self.append(MetadataOp::Initialize(Box::new(initial)));
+ self.publish();
+ }
+
+ pub fn add_stream(&mut self, meta: StreamMeta) -> StreamId {
+ let assigned_id = Arc::new(AtomicUsize::new(0));
+ self.append(MetadataOp::AddStream {
+ meta,
+ assigned_id: assigned_id.clone(),
+ });
+ self.publish();
+ assigned_id.load(Ordering::Acquire)
+ }
+
+ pub fn update_stream(&mut self, id: StreamId, new_name: Arc<str>) {
+ self.append(MetadataOp::UpdateStream { id, new_name });
+ self.publish();
+ }
+
+ pub fn delete_stream(&mut self, id: StreamId) {
+ self.append(MetadataOp::DeleteStream { id });
+ self.publish();
+ }
+
+ pub fn add_topic(&mut self, stream_id: StreamId, meta: TopicMeta) ->
Option<TopicId> {
+ let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
+ self.append(MetadataOp::AddTopic {
+ stream_id,
+ meta,
+ assigned_id: assigned_id.clone(),
+ });
+ self.publish();
+ let id = assigned_id.load(Ordering::Acquire);
+ if id == usize::MAX { None } else { Some(id) }
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn update_topic(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ new_name: Arc<str>,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: u8,
+ ) {
+ self.append(MetadataOp::UpdateTopic {
+ stream_id,
+ topic_id,
+ new_name,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ });
+ self.publish();
+ }
+
+ pub fn delete_topic(&mut self, stream_id: StreamId, topic_id: TopicId) {
+ self.append(MetadataOp::DeleteTopic {
+ stream_id,
+ topic_id,
+ });
+ self.publish();
+ }
+
+ /// Add partitions to a topic. Returns the assigned partition IDs
(sequential from current count).
+ pub fn add_partitions(
+ &mut self,
+ reader: &Metadata,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partitions: Vec<PartitionMeta>,
+ ) -> Vec<PartitionId> {
+ if partitions.is_empty() {
+ return Vec::new();
+ }
+
+ let count_before = reader
+ .get_partitions_count(stream_id, topic_id)
+ .expect("stream and topic must exist when adding partitions");
+ let count = partitions.len();
+
+ let revision_id = self.next_revision();
+ self.append(MetadataOp::AddPartitions {
+ stream_id,
+ topic_id,
+ partitions,
+ revision_id,
+ });
+ self.publish();
+
+ (count_before..count_before + count).collect()
+ }
+
+ /// Delete partitions from the end of a topic.
+ pub fn delete_partitions(&mut self, stream_id: StreamId, topic_id:
TopicId, count: u32) {
+ if count == 0 {
+ return;
+ }
+ self.append(MetadataOp::DeletePartitions {
+ stream_id,
+ topic_id,
+ count,
+ });
+ self.publish();
+ }
+
+ pub fn set_partition_offsets(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partition_id: PartitionId,
+ consumer_offsets: Arc<ConsumerOffsets>,
+ consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+ ) {
+ self.append(MetadataOp::SetPartitionOffsets {
+ stream_id,
+ topic_id,
+ partition_id,
+ consumer_offsets,
+ consumer_group_offsets,
+ });
+ self.publish();
+ }
+
+ pub fn add_user(&mut self, meta: UserMeta) -> UserId {
+ let assigned_id = Arc::new(AtomicUsize::new(0));
+ self.append(MetadataOp::AddUser {
+ meta,
+ assigned_id: assigned_id.clone(),
+ });
+ self.publish();
+ assigned_id.load(Ordering::Acquire) as UserId
+ }
+
+ pub fn update_user_meta(&mut self, id: UserId, meta: UserMeta) {
+ self.append(MetadataOp::UpdateUserMeta { id, meta });
+ self.publish();
+ }
+
+ pub fn delete_user(&mut self, id: UserId) {
+ self.append(MetadataOp::DeleteUser { id });
+ self.publish();
+ }
+
+ pub fn add_personal_access_token(&mut self, user_id: UserId, pat:
PersonalAccessToken) {
+ self.append(MetadataOp::AddPersonalAccessToken { user_id, pat });
+ self.publish();
+ }
+
+ pub fn delete_personal_access_token(&mut self, user_id: UserId,
token_hash: Arc<str>) {
+ self.append(MetadataOp::DeletePersonalAccessToken {
+ user_id,
+ token_hash,
+ });
+ self.publish();
+ }
+
+ pub fn add_consumer_group(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ meta: ConsumerGroupMeta,
+ ) -> Option<ConsumerGroupId> {
+ let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
+ self.append(MetadataOp::AddConsumerGroup {
+ stream_id,
+ topic_id,
+ meta,
+ assigned_id: assigned_id.clone(),
+ });
+ self.publish();
+ let id = assigned_id.load(Ordering::Acquire);
+ if id == usize::MAX { None } else { Some(id) }
+ }
+
+ pub fn delete_consumer_group(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ ) {
+ self.append(MetadataOp::DeleteConsumerGroup {
+ stream_id,
+ topic_id,
+ group_id,
+ });
+ self.publish();
+ }
+
+ pub fn join_consumer_group(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ client_id: u32,
+ ) -> Option<usize> {
+ let member_id = Arc::new(AtomicUsize::new(usize::MAX));
+ self.append(MetadataOp::JoinConsumerGroup {
+ stream_id,
+ topic_id,
+ group_id,
+ client_id,
+ member_id: member_id.clone(),
+ });
+ self.publish();
+ let id = member_id.load(Ordering::Acquire);
+ if id == usize::MAX { None } else { Some(id) }
+ }
+
+ pub fn leave_consumer_group(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ group_id: ConsumerGroupId,
+ client_id: u32,
+ ) -> Option<usize> {
+ let removed_member_id = Arc::new(AtomicUsize::new(usize::MAX));
+ self.append(MetadataOp::LeaveConsumerGroup {
+ stream_id,
+ topic_id,
+ group_id,
+ client_id,
+ removed_member_id: removed_member_id.clone(),
+ });
+ self.publish();
+ let id = removed_member_id.load(Ordering::Acquire);
+ if id == usize::MAX { None } else { Some(id) }
+ }
+
+ pub fn rebalance_consumer_groups_for_topic(
+ &mut self,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ partitions_count: u32,
+ ) {
+ self.append(MetadataOp::RebalanceConsumerGroupsForTopic {
+ stream_id,
+ topic_id,
+ partitions_count,
+ });
+ self.publish();
+ }
+
+ // High-level registration methods with validation
+
+ pub fn create_stream(
+ &mut self,
+ reader: &Metadata,
+ name: Arc<str>,
+ created_at: IggyTimestamp,
+ ) -> Result<(StreamId, Arc<StreamStats>), IggyError> {
+ if reader.stream_name_exists(&name) {
+ return Err(IggyError::StreamNameAlreadyExists(name.to_string()));
+ }
+
+ let stats = Arc::new(StreamStats::default());
+ let meta = StreamMeta::with_stats(0, name, created_at, stats.clone());
+ let id = self.add_stream(meta);
+ Ok((id, stats))
+ }
+
+ pub fn try_update_stream(
+ &mut self,
+ reader: &Metadata,
+ id: StreamId,
+ new_name: Arc<str>,
+ ) -> Result<(), IggyError> {
+ let guard = reader.load();
+ let Some(stream) = guard.streams.get(id) else {
+ return Err(IggyError::StreamIdNotFound(
+ Identifier::numeric(id as u32).unwrap(),
+ ));
+ };
+
+ if stream.name == new_name {
+ return Ok(());
+ }
+
+ if let Some(&existing_id) = guard.stream_index.get(&new_name)
+ && existing_id != id
+ {
+ return
Err(IggyError::StreamNameAlreadyExists(new_name.to_string()));
+ }
+ drop(guard);
+
+ self.update_stream(id, new_name);
+ Ok(())
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn create_topic(
+ &mut self,
+ reader: &Metadata,
+ stream_id: StreamId,
+ name: Arc<str>,
+ created_at: IggyTimestamp,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: u8,
+ ) -> Result<(TopicId, Arc<TopicStats>), IggyError> {
+ let parent_stats = reader.get_stream_stats(stream_id).ok_or_else(|| {
+ IggyError::StreamIdNotFound(Identifier::numeric(stream_id as
u32).unwrap())
+ })?;
+
+ let guard = reader.load();
+ let Some(stream) = guard.streams.get(stream_id) else {
+ return Err(IggyError::StreamIdNotFound(
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ };
+
+ if stream.topic_index.contains_key(&name) {
+ return Err(IggyError::TopicNameAlreadyExists(
+ name.to_string(),
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ }
+ drop(guard);
+
+ let stats = Arc::new(TopicStats::new(parent_stats));
+ let meta = TopicMeta {
+ id: 0,
+ name,
+ created_at,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ stats: stats.clone(),
+ partitions: Vec::new(),
+ consumer_groups: Slab::new(),
+ consumer_group_index: ahash::AHashMap::default(),
+ round_robin_counter: Arc::new(AtomicUsize::new(0)),
+ };
+
+ // change to create_topic
+ let id = self.add_topic(stream_id, meta).ok_or_else(|| {
+ IggyError::StreamIdNotFound(Identifier::numeric(stream_id as
u32).unwrap())
+ })?;
+ Ok((id, stats))
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn try_update_topic(
+ &mut self,
+ reader: &Metadata,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ new_name: Arc<str>,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ replication_factor: u8,
+ ) -> Result<(), IggyError> {
+ let guard = reader.load();
+ let Some(stream) = guard.streams.get(stream_id) else {
+ return Err(IggyError::StreamIdNotFound(
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ };
+
+ let Some(topic) = stream.topics.get(topic_id) else {
+ return Err(IggyError::TopicIdNotFound(
+ Identifier::numeric(topic_id as u32).unwrap(),
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ };
+
+ if topic.name != new_name
+ && let Some(&existing_id) = stream.topic_index.get(&new_name)
+ && existing_id != topic_id
+ {
+ return Err(IggyError::TopicNameAlreadyExists(
+ new_name.to_string(),
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ }
+ drop(guard);
+
+ self.update_topic(
+ stream_id,
+ topic_id,
+ new_name,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ );
+ Ok(())
+ }
+
+ pub fn register_partitions(
+ &mut self,
+ reader: &Metadata,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ count: usize,
+ created_at: IggyTimestamp,
+ ) -> Vec<(PartitionId, Arc<PartitionStats>)> {
+ if count == 0 {
+ return Vec::new();
+ }
+
+ let parent_stats = reader
+ .get_topic_stats(stream_id, topic_id)
+ .expect("Parent topic stats must exist before registering
partitions");
+
+ let mut metas = Vec::with_capacity(count);
+ let mut stats_list = Vec::with_capacity(count);
+
+ for _ in 0..count {
+ let stats = Arc::new(PartitionStats::new(parent_stats.clone()));
+ metas.push(PartitionMeta {
+ id: 0,
+ created_at,
+ revision_id: 0,
+ stats: stats.clone(),
+ consumer_offsets: None,
+ consumer_group_offsets: None,
+ });
+ stats_list.push(stats);
+ }
+
+ let ids = self.add_partitions(reader, stream_id, topic_id, metas);
+ ids.into_iter().zip(stats_list).collect()
+ }
+
+ pub fn create_user(
+ &mut self,
+ reader: &Metadata,
+ username: Arc<str>,
+ password_hash: Arc<str>,
+ status: UserStatus,
+ permissions: Option<Arc<Permissions>>,
+ max_users: usize,
+ ) -> Result<UserId, IggyError> {
+ if reader.username_exists(&username) {
+ return Err(IggyError::UserAlreadyExists);
+ }
+
+ if reader.users_count() >= max_users {
+ return Err(IggyError::UsersLimitReached);
+ }
+
+ let meta = UserMeta {
+ id: 0,
+ username,
+ password_hash,
+ status,
+ permissions,
+ created_at: IggyTimestamp::now(),
+ };
+ let id = self.add_user(meta);
+ Ok(id)
+ }
+
+ pub fn update_user(
+ &mut self,
+ reader: &Metadata,
+ id: UserId,
+ username: Option<Arc<str>>,
+ status: Option<UserStatus>,
+ ) -> Result<UserMeta, IggyError> {
+ let Some(mut meta) = reader.get_user(id) else {
+ return Err(IggyError::ResourceNotFound(format!("user:{id}")));
+ };
+
+ if let Some(new_username) = username {
+ if meta.username != new_username &&
reader.username_exists(&new_username) {
+ return Err(IggyError::UserAlreadyExists);
+ }
+ meta.username = new_username;
+ }
+
+ if let Some(new_status) = status {
+ meta.status = new_status;
+ }
+
+ let updated = meta.clone();
+ self.update_user_meta(id, meta);
+ Ok(updated)
+ }
+
+ pub fn create_consumer_group(
+ &mut self,
+ reader: &Metadata,
+ stream_id: StreamId,
+ topic_id: TopicId,
+ name: Arc<str>,
+ partitions_count: u32,
+ ) -> Result<ConsumerGroupId, IggyError> {
+ let guard = reader.load();
+ let Some(stream) = guard.streams.get(stream_id) else {
+ return Err(IggyError::StreamIdNotFound(
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ };
+
+ let Some(topic) = stream.topics.get(topic_id) else {
+ return Err(IggyError::TopicIdNotFound(
+ Identifier::numeric(topic_id as u32).unwrap(),
+ Identifier::numeric(stream_id as u32).unwrap(),
+ ));
+ };
+
+ if topic.consumer_group_index.contains_key(&name) {
+ return Err(IggyError::ConsumerGroupNameAlreadyExists(
+ name.to_string(),
+ Identifier::numeric(topic_id as u32).unwrap(),
+ ));
+ }
+ drop(guard);
+
+ let meta = ConsumerGroupMeta {
+ id: 0,
+ name,
+ partitions: (0..partitions_count as usize).collect(),
+ members: Slab::new(),
+ };
+
+ let id = self
+ .add_consumer_group(stream_id, topic_id, meta)
+ .ok_or_else(|| {
+ IggyError::TopicIdNotFound(
+ Identifier::numeric(topic_id as u32).unwrap(),
+ Identifier::numeric(stream_id as u32).unwrap(),
+ )
+ })?;
+ Ok(id)
+ }
+}
diff --git a/core/server/src/shard/system/personal_access_tokens.rs
b/core/server/src/shard/system/personal_access_tokens.rs
index 7b26aff1e..7fe8dbbb3 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -102,7 +102,7 @@ impl IggyShard {
if user
.personal_access_tokens
.iter()
- .any(|pat| pat.name.as_str() == name.as_str())
+ .any(|pat| pat.name == name)
{
error!(
"Personal access token: {name} for user with ID:
{user_id} already exists."
@@ -150,7 +150,7 @@ impl IggyShard {
let token = if let Some(pat) = user
.personal_access_tokens
.iter()
- .find(|pat| pat.name.as_str() == name)
+ .find(|pat| &*pat.name == name)
{
pat.token.clone()
} else {
@@ -182,7 +182,7 @@ impl IggyShard {
let users = self.users.values();
let mut personal_access_token = None;
for user in &users {
- if let Some(pat) = user.personal_access_tokens.get(&token_hash) {
+ if let Some(pat) =
user.personal_access_tokens.get(token_hash.as_str()) {
personal_access_token = Some(pat);
break;
}
@@ -205,7 +205,7 @@ impl IggyShard {
personal_access_token.name, personal_access_token.user_id
);
return Err(IggyError::PersonalAccessTokenExpired(
- personal_access_token.name.as_str().to_owned(),
+ (*personal_access_token.name).to_owned(),
personal_access_token.user_id,
));
}
diff --git
a/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
b/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
index 3c1203a9a..83b2df434 100644
--- a/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
+++ b/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
@@ -50,7 +50,7 @@ async fn clear_personal_access_tokens(shard: Rc<IggyShard>)
-> Result<(), IggyEr
let users = shard.users.values();
for user in &users {
- let expired_tokens: Vec<Arc<String>> = user
+ let expired_tokens: Vec<Arc<str>> = user
.personal_access_tokens
.iter()
.filter(|entry| entry.value().is_expired(now))
diff --git a/core/server/src/streaming/users/user.rs
b/core/server/src/streaming/users/user.rs
index 672662d3e..4882d4542 100644
--- a/core/server/src/streaming/users/user.rs
+++ b/core/server/src/streaming/users/user.rs
@@ -32,7 +32,7 @@ pub struct User {
pub password: String,
pub created_at: IggyTimestamp,
pub permissions: Option<Permissions>,
- pub personal_access_tokens: DashMap<Arc<String>, PersonalAccessToken>,
+ pub personal_access_tokens: DashMap<Arc<str>, PersonalAccessToken>,
}
impl Default for User {