This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 153cb378c feat(metadata): impl Snapshot interface for Mux state 
machine (#2675)
153cb378c is described below

commit 153cb378c49eaca0af03a1040fe2b921dfc54427
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Feb 13 16:52:20 2026 +0530

    feat(metadata): impl Snapshot interface for Mux state machine (#2675)
    
    The snapshot system has three layers:
    `Snapshotable` per-state-machine trait. Each `StreamsInner`, `
    UsersInner`, `ConsumerGroupsInner` implements `to_snapshot()` /
    from_snapshot() to convert between in-memory state.
    `SnapshotContributor` visitor trait using the same recursive variadic
    tuple pattern as `StateMachine::update`. Walks (`Users`, (`Streams`,
    (`ConsumerGroups`, ()))) at compile time, collecting each state
    machine's serialized section into a `Vec<SnapshotSection>`.
    `MetadataSnapshot` a top-level interface for the generic snapshot type
    on `IggyMetadata`. Defines the full lifecycle: create (snapshot from
    mux) → encode (to bytes) → decode (from bytes) → restore (back to mux).
    
    ---------
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
---
 Cargo.lock                              |   2 +
 core/metadata/Cargo.toml                |   2 +
 core/metadata/src/impls/metadata.rs     |  55 +++++
 core/metadata/src/stats/mod.rs          |  28 +++
 core/metadata/src/stm/consumer_group.rs | 146 +++++++++++-
 core/metadata/src/stm/mod.rs            |  14 ++
 core/metadata/src/stm/mux.rs            | 126 ++++++++++
 core/metadata/src/stm/snapshot.rs       | 402 ++++++++++++++++++++++++++++++++
 core/metadata/src/stm/stream.rs         | 190 ++++++++++++++-
 core/metadata/src/stm/user.rs           | 225 +++++++++++++++++-
 10 files changed, 1184 insertions(+), 6 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f16507888..d7ffd97b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5712,6 +5712,8 @@ dependencies = [
  "left-right",
  "message_bus",
  "paste",
+ "rmp-serde",
+ "serde",
  "slab",
  "tracing",
 ]
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 193bf5788..f5d81d5f8 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -35,5 +35,7 @@ journal = { workspace = true }
 left-right = { workspace = true }
 message_bus = { workspace = true }
 paste = { workspace = true }
+rmp-serde = { workspace = true }
+serde = { workspace = true, features = ["derive"] }
 slab = { workspace = true }
 tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index c4e4b3da3..b3d9769b5 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 use crate::stm::StateMachine;
+use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
     header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
@@ -24,6 +25,60 @@ use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use tracing::{debug, warn};
 
+#[derive(Debug, Clone)]
+#[allow(unused)]
+pub struct IggySnapshot {
+    snapshot: MetadataSnapshot,
+}
+
+#[allow(unused)]
+impl IggySnapshot {
+    pub fn new(sequence_number: u64) -> Self {
+        Self {
+            snapshot: MetadataSnapshot::new(sequence_number),
+        }
+    }
+
+    pub fn snapshot(&self) -> &MetadataSnapshot {
+        &self.snapshot
+    }
+}
+
+impl Snapshot for IggySnapshot {
+    type Error = SnapshotError;
+    type SequenceNumber = u64;
+    type Timestamp = u64;
+    type Inner = MetadataSnapshot;
+
+    fn create<T>(stm: &T, sequence_number: u64) -> Result<Self, SnapshotError>
+    where
+        T: FillSnapshot<MetadataSnapshot>,
+    {
+        let mut snapshot = MetadataSnapshot::new(sequence_number);
+
+        stm.fill_snapshot(&mut snapshot)?;
+
+        Ok(Self { snapshot })
+    }
+
+    fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
+        self.snapshot.encode()
+    }
+
+    fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
+        let snapshot = MetadataSnapshot::decode(bytes)?;
+        Ok(Self { snapshot })
+    }
+
+    fn sequence_number(&self) -> u64 {
+        self.snapshot.sequence_number
+    }
+
+    fn created_at(&self) -> u64 {
+        self.snapshot.created_at
+    }
+}
+
 pub trait Metadata<C>
 where
     C: Consensus,
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
index 35b926176..f4cc3aedc 100644
--- a/core/metadata/src/stats/mod.rs
+++ b/core/metadata/src/stats/mod.rs
@@ -89,6 +89,20 @@ impl StreamStats {
         self.zero_out_messages_count();
         self.zero_out_segments_count();
     }
+
+    pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
+        (
+            self.size_bytes.load(Ordering::Relaxed),
+            self.messages_count.load(Ordering::Relaxed),
+            self.segments_count.load(Ordering::Relaxed),
+        )
+    }
+
+    pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, 
segments_count: u32) {
+        self.size_bytes.store(size_bytes, Ordering::Relaxed);
+        self.messages_count.store(messages_count, Ordering::Relaxed);
+        self.segments_count.store(segments_count, Ordering::Relaxed);
+    }
 }
 
 #[derive(Default, Debug)]
@@ -219,6 +233,20 @@ impl TopicStats {
         self.zero_out_messages_count();
         self.zero_out_segments_count();
     }
+
+    pub fn load_for_snapshot(&self) -> (u64, u64, u32) {
+        (
+            self.size_bytes.load(Ordering::Relaxed),
+            self.messages_count.load(Ordering::Relaxed),
+            self.segments_count.load(Ordering::Relaxed),
+        )
+    }
+
+    pub fn store_from_snapshot(&self, size_bytes: u64, messages_count: u64, 
segments_count: u32) {
+        self.size_bytes.store(size_bytes, Ordering::Relaxed);
+        self.messages_count.store(messages_count, Ordering::Relaxed);
+        self.segments_count.store(segments_count, Ordering::Relaxed);
+    }
 }
 
 #[derive(Default, Debug)]
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 71babb412..5c039e0dc 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -16,14 +16,17 @@
 // under the License.
 
 use crate::stm::StateHandler;
-use crate::{collect_handlers, define_state};
+use crate::stm::snapshot::Snapshotable;
+use crate::{collect_handlers, define_state, impl_fill_restore};
+
 use ahash::AHashMap;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
 use iggy_common::{IdKind, Identifier};
+use serde::{Deserialize, Serialize};
 use slab::Slab;
 use std::sync::Arc;
-use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::{AtomicUsize, Ordering};
 
 #[derive(Debug, Clone)]
 pub struct ConsumerGroupMember {
@@ -231,3 +234,142 @@ impl StateHandler for DeleteConsumerGroup {
         }
     }
 }
+
+/// Consumer group member snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupMemberSnapshot {
+    pub id: usize,
+    pub client_id: u32,
+    pub partitions: Vec<usize>,
+    pub partition_index: usize,
+}
+
+/// Consumer group snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupSnapshot {
+    pub id: usize,
+    pub name: String,
+    pub partitions: Vec<usize>,
+    pub members: Vec<(usize, ConsumerGroupMemberSnapshot)>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupsSnapshot {
+    pub items: Vec<(usize, ConsumerGroupSnapshot)>,
+    pub topic_index: Vec<((usize, usize), Vec<usize>)>,
+    pub topic_name_index: Vec<((String, String), Vec<usize>)>,
+}
+
+impl Snapshotable for ConsumerGroups {
+    type Snapshot = ConsumerGroupsSnapshot;
+
+    fn to_snapshot(&self) -> Self::Snapshot {
+        self.inner.read(|inner| {
+            let items: Vec<(usize, ConsumerGroupSnapshot)> = inner
+                .items
+                .iter()
+                .map(|(group_id, group)| {
+                    let members: Vec<(usize, ConsumerGroupMemberSnapshot)> = 
group
+                        .members
+                        .iter()
+                        .map(|(member_id, member)| {
+                            (
+                                member_id,
+                                ConsumerGroupMemberSnapshot {
+                                    id: member.id,
+                                    client_id: member.client_id,
+                                    partitions: member.partitions.clone(),
+                                    partition_index: 
member.partition_index.load(Ordering::Relaxed),
+                                },
+                            )
+                        })
+                        .collect();
+
+                    (
+                        group_id,
+                        ConsumerGroupSnapshot {
+                            id: group.id,
+                            name: group.name.to_string(),
+                            partitions: group.partitions.clone(),
+                            members,
+                        },
+                    )
+                })
+                .collect();
+
+            let topic_index: Vec<((usize, usize), Vec<usize>)> = inner
+                .topic_index
+                .iter()
+                .map(|(&k, v)| (k, v.clone()))
+                .collect();
+
+            let topic_name_index: Vec<((String, String), Vec<usize>)> = inner
+                .topic_name_index
+                .iter()
+                .map(|((s, t), v)| ((s.to_string(), t.to_string()), v.clone()))
+                .collect();
+
+            ConsumerGroupsSnapshot {
+                items,
+                topic_index,
+                topic_name_index,
+            }
+        })
+    }
+
+    fn from_snapshot(
+        snapshot: Self::Snapshot,
+    ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
+        let mut name_index: AHashMap<Arc<str>, usize> = AHashMap::new();
+        let mut group_entries: Vec<(usize, ConsumerGroup)> = Vec::new();
+
+        for (slab_key, group_snap) in snapshot.items {
+            let member_entries: Vec<(usize, ConsumerGroupMember)> = group_snap
+                .members
+                .into_iter()
+                .map(|(member_key, member_snap)| {
+                    let member = ConsumerGroupMember {
+                        id: member_snap.id,
+                        client_id: member_snap.client_id,
+                        partitions: member_snap.partitions,
+                        partition_index: 
Arc::new(AtomicUsize::new(member_snap.partition_index)),
+                    };
+                    (member_key, member)
+                })
+                .collect();
+            let members: Slab<ConsumerGroupMember> = 
member_entries.into_iter().collect();
+
+            let group_name: Arc<str> = Arc::from(group_snap.name.as_str());
+            let group = ConsumerGroup {
+                id: group_snap.id,
+                name: group_name.clone(),
+                partitions: group_snap.partitions,
+                members,
+            };
+
+            name_index.insert(group_name, slab_key);
+            group_entries.push((slab_key, group));
+        }
+
+        let items = group_entries.into_iter().collect();
+
+        let topic_index: AHashMap<(usize, usize), Vec<usize>> =
+            snapshot.topic_index.into_iter().collect();
+
+        let topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>> = 
snapshot
+            .topic_name_index
+            .into_iter()
+            .map(|((s, t), v)| ((Arc::from(s.as_str()), 
Arc::from(t.as_str())), v))
+            .collect();
+
+        let inner = ConsumerGroupsInner {
+            name_index,
+            topic_index,
+            topic_name_index,
+            items,
+        };
+        Ok(inner.into())
+    }
+}
+
+impl_fill_restore!(ConsumerGroups, consumer_groups);
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 08bef35dd..d5f3d573e 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -17,6 +17,7 @@
 
 pub mod consumer_group;
 pub mod mux;
+pub mod snapshot;
 pub mod stream;
 pub mod user;
 
@@ -87,6 +88,19 @@ where
     read: Arc<ReadHandle<T>>,
 }
 
+impl<T, C> LeftRight<T, C>
+where
+    T: Absorb<C>,
+{
+    pub fn read<F, R>(&self, f: F) -> R
+    where
+        F: FnOnce(&T) -> R,
+    {
+        let guard = self.read.enter().expect("read handle should be 
accessible");
+        f(&*guard)
+    }
+}
+
 impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
 where
     T: Absorb<<T as Command>::Cmd> + Clone + Command,
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index f68d99bf7..ae514e5c4 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
 use iggy_common::{header::PrepareHeader, message::Message};
 
 use crate::stm::{State, StateMachine};
@@ -91,7 +92,58 @@ where
     }
 }
 
+/// Recursive case for variadic tuple pattern: (Head, Tail)
+/// Fills snapshot from head and tail, and restores both on restore.
+impl<SnapshotData, Head, Tail> FillSnapshot<SnapshotData> for variadic!(Head, 
...Tail)
+where
+    Head: FillSnapshot<SnapshotData>,
+    Tail: FillSnapshot<SnapshotData>,
+{
+    fn fill_snapshot(&self, snapshot: &mut SnapshotData) -> Result<(), 
SnapshotError> {
+        self.0.fill_snapshot(snapshot)?;
+        self.1.fill_snapshot(snapshot)?;
+        Ok(())
+    }
+}
+
+impl<SnapshotData, Head, Tail> RestoreSnapshot<SnapshotData> for 
variadic!(Head, ...Tail)
+where
+    Head: RestoreSnapshot<SnapshotData>,
+    Tail: RestoreSnapshot<SnapshotData>,
+{
+    fn restore_snapshot(snapshot: &SnapshotData) -> Result<Self, 
SnapshotError> {
+        let head = Head::restore_snapshot(snapshot)?;
+        let tail = Tail::restore_snapshot(snapshot)?;
+        Ok((head, tail))
+    }
+}
+
+impl<SnapshotData, T> FillSnapshot<SnapshotData> for MuxStateMachine<T>
+where
+    T: StateMachine + FillSnapshot<SnapshotData>,
+{
+    fn fill_snapshot(&self, snapshot: &mut SnapshotData) -> Result<(), 
SnapshotError> {
+        self.inner.fill_snapshot(snapshot)
+    }
+}
+
+impl<SnapshotData, T> RestoreSnapshot<SnapshotData> for MuxStateMachine<T>
+where
+    T: StateMachine + RestoreSnapshot<SnapshotData>,
+{
+    fn restore_snapshot(snapshot: &SnapshotData) -> Result<Self, 
SnapshotError> {
+        let inner = T::restore_snapshot(snapshot)?;
+        Ok(MuxStateMachine::new(inner))
+    }
+}
+
+#[allow(unused_imports)]
 mod tests {
+    use super::*;
+    use crate::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
+    use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, 
RestoreSnapshot, Snapshotable};
+    use crate::stm::stream::{Streams, StreamsInner};
+    use crate::stm::user::{Users, UsersInner};
 
     #[test]
     fn construct_mux_state_machine_from_states_with_same_output() {
@@ -110,4 +162,78 @@ mod tests {
 
         mux.update(input);
     }
+
+    #[test]
+    fn mux_state_machine_snapshot_roundtrip() {
+        let users: Users = UsersInner::new().into();
+        let streams: Streams = StreamsInner::new().into();
+        let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
+
+        let mux = MuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
+
+        // Fill the typed snapshot
+        let mut snapshot = MetadataSnapshot::new(12345);
+        mux.fill_snapshot(&mut snapshot).unwrap();
+
+        // Verify all fields are filled
+        assert!(snapshot.users.is_some());
+        assert!(snapshot.streams.is_some());
+        assert!(snapshot.consumer_groups.is_some());
+
+        // Restore and verify
+        type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+        let restored: MuxStateMachine<MuxTuple> =
+            MuxStateMachine::restore_snapshot(&snapshot).unwrap();
+
+        // Verify the restored mux produces the same snapshot
+        let mut verify_snapshot = MetadataSnapshot::new(0);
+        restored.fill_snapshot(&mut verify_snapshot).unwrap();
+        assert!(verify_snapshot.users.is_some());
+        assert!(verify_snapshot.streams.is_some());
+        assert!(verify_snapshot.consumer_groups.is_some());
+    }
+
+    #[test]
+    fn mux_state_machine_full_envelope_roundtrip() {
+        use crate::impls::metadata::IggySnapshot;
+        use crate::stm::snapshot::Snapshot;
+
+        let users: Users = UsersInner::new().into();
+        let streams: Streams = StreamsInner::new().into();
+        let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
+
+        type MuxTuple = (Users, (Streams, (ConsumerGroups, ())));
+        let mux: MuxStateMachine<MuxTuple> =
+            MuxStateMachine::new(variadic!(users, streams, consumer_groups));
+
+        let sequence_number = 12345u64;
+        let snapshot = IggySnapshot::create(&mux, sequence_number).unwrap();
+
+        assert_eq!(snapshot.sequence_number(), sequence_number);
+        assert!(snapshot.created_at() > 0);
+
+        // Encode to bytes
+        let encoded = snapshot.encode().unwrap();
+        assert!(!encoded.is_empty());
+
+        // Decode from bytes
+        let decoded = IggySnapshot::decode(&encoded).unwrap();
+        assert_eq!(decoded.sequence_number(), sequence_number);
+
+        // Verify snapshot fields are present
+        assert!(decoded.snapshot().users.is_some());
+        assert!(decoded.snapshot().streams.is_some());
+        assert!(decoded.snapshot().consumer_groups.is_some());
+
+        // Restore MuxStateMachine from the state side (symmetric with 
fill_snapshot)
+        let restored: MuxStateMachine<MuxTuple> =
+            MuxStateMachine::restore_snapshot(decoded.snapshot()).unwrap();
+
+        // Verify restored state
+        let mut verify_snapshot = MetadataSnapshot::new(0);
+        restored.fill_snapshot(&mut verify_snapshot).unwrap();
+        assert!(verify_snapshot.users.is_some());
+        assert!(verify_snapshot.streams.is_some());
+        assert!(verify_snapshot.consumer_groups.is_some());
+    }
 }
diff --git a/core/metadata/src/stm/snapshot.rs 
b/core/metadata/src/stm/snapshot.rs
new file mode 100644
index 000000000..76210ceb6
--- /dev/null
+++ b/core/metadata/src/stm/snapshot.rs
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize, de::DeserializeOwned};
+use std::fmt;
+
+use crate::stm::consumer_group::ConsumerGroupsSnapshot;
+use crate::stm::stream::StreamsSnapshot;
+use crate::stm::user::UsersSnapshot;
+
+#[derive(Debug)]
+pub enum SnapshotError {
+    /// Serialization failed.
+    Serialize(rmp_serde::encode::Error),
+    /// Deserialization failed.
+    Deserialize(rmp_serde::decode::Error),
+}
+
+impl fmt::Display for SnapshotError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            SnapshotError::Serialize(e) => write!(f, "snapshot serialization 
failed: {}", e),
+            SnapshotError::Deserialize(e) => write!(f, "snapshot 
deserialization failed: {}", e),
+        }
+    }
+}
+
+impl std::error::Error for SnapshotError {
+    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+        match self {
+            SnapshotError::Serialize(e) => Some(e),
+            SnapshotError::Deserialize(e) => Some(e),
+        }
+    }
+}
+
+/// The snapshot container for all metadata state machines.
+/// Each field corresponds to one state machine's serialized state.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct MetadataSnapshot {
+    /// Snapshot format version for forward/backward compatibility.
+    /// TODO(krishvishal): Properly handle versioning for snapshot. This is a 
placeholder for now.
+    pub version: u32,
+    /// Timestamp when the snapshot was created (microseconds since epoch).
+    pub created_at: u64,
+    /// Monotonically increasing snapshot sequence number.
+    pub sequence_number: u64,
+    /// Users state machine snapshot data.
+    pub users: Option<UsersSnapshot>,
+    /// Streams state machine snapshot data.
+    pub streams: Option<StreamsSnapshot>,
+    /// Consumer groups state machine snapshot data.
+    pub consumer_groups: Option<ConsumerGroupsSnapshot>,
+}
+
+impl Default for MetadataSnapshot {
+    fn default() -> Self {
+        Self::new(0)
+    }
+}
+
+impl MetadataSnapshot {
+    /// Create a new snapshot with the given sequence number.
+    pub fn new(sequence_number: u64) -> Self {
+        Self {
+            version: 1,
+            created_at: iggy_common::IggyTimestamp::now().as_micros(),
+            sequence_number,
+            users: None,
+            streams: None,
+            consumer_groups: None,
+        }
+    }
+
+    /// Encode the snapshot to msgpack bytes.
+    pub fn encode(&self) -> Result<Vec<u8>, SnapshotError> {
+        rmp_serde::to_vec(self).map_err(SnapshotError::Serialize)
+    }
+
+    /// Decode a snapshot from msgpack bytes.
+    pub fn decode(bytes: &[u8]) -> Result<Self, SnapshotError> {
+        rmp_serde::from_slice(bytes).map_err(SnapshotError::Deserialize)
+    }
+}
+
+/// Trait for metadata snapshot implementations.
+///
+/// This is the high-level interface that concrete snapshot types (e.g. 
`IggySnapshot`)
+/// must satisfy. It provides methods for creating, encoding, and decoding 
snapshots.
+pub trait Snapshot: Sized {
+    /// The error type for snapshot operations.
+    type Error: std::error::Error;
+
+    /// The type used for snapshot sequence numbers.
+    type SequenceNumber;
+
+    /// The type used for snapshot timestamps.
+    type Timestamp;
+
+    /// The inner snapshot data structure that state machines fill and restore 
from.
+    type Inner;
+
+    /// Create a snapshot from the current state of a state machine.
+    ///
+    /// # Arguments
+    /// * `stm` - The state machine to snapshot
+    /// * `sequence_number` - Monotonically increasing snapshot sequence number
+    fn create<T>(stm: &T, sequence_number: Self::SequenceNumber) -> 
Result<Self, Self::Error>
+    where
+        T: FillSnapshot<Self::Inner>;
+
+    /// Encode the snapshot to msgpack bytes.
+    fn encode(&self) -> Result<Vec<u8>, Self::Error>;
+
+    /// Decode a snapshot from msgpack bytes.
+    fn decode(bytes: &[u8]) -> Result<Self, Self::Error>;
+
+    /// Get the snapshot sequence number.
+    fn sequence_number(&self) -> Self::SequenceNumber;
+
+    /// Get the timestamp when this snapshot was created.
+    fn created_at(&self) -> Self::Timestamp;
+}
+
+/// Trait implemented by each `{Name}Inner` state machine to support 
snapshotting.
+/// Each state machine defines its own snapshot
+/// type for serialization and provides conversion methods.
+pub trait Snapshotable {
+    /// The serde-serializable snapshot representation of this state.
+    /// This should be a plain struct with only serializable types and no 
wrappers
+    /// like `Arc`, `AtomicUsize`, or other non-serializable wrappers.
+    type Snapshot: Serialize + DeserializeOwned;
+
+    /// Convert the current in-memory state into a serializable snapshot.
+    fn to_snapshot(&self) -> Self::Snapshot;
+
+    /// Restore in-memory state from a snapshot representation.
+    fn from_snapshot(snapshot: Self::Snapshot) -> Result<Self, SnapshotError>
+    where
+        Self: Sized;
+}
+
+/// Trait for filling a typed snapshot with state machine data.
+///
+/// Each state machine implements this to write its serialized state
+pub trait FillSnapshot<S> {
+    /// Fill the snapshot with this state machine's data.
+    fn fill_snapshot(&self, snapshot: &mut S) -> Result<(), SnapshotError>;
+}
+
+/// Trait for restoring state machine data from a typed snapshot.
+///
+/// Each state machine implements this to read its state.
+pub trait RestoreSnapshot<S>: Sized {
+    /// Restore this state machine from the snapshot.
+    fn restore_snapshot(snapshot: &S) -> Result<Self, SnapshotError>;
+}
+
+/// Base case for the recursive tuple pattern - unit type terminates the 
recursion.
+impl<S> FillSnapshot<S> for () {
+    fn fill_snapshot(&self, _snapshot: &mut S) -> Result<(), SnapshotError> {
+        Ok(())
+    }
+}
+
+impl<S> RestoreSnapshot<S> for () {
+    fn restore_snapshot(_snapshot: &S) -> Result<Self, SnapshotError> {
+        Ok(())
+    }
+}
+
+/// Generates `FillSnapshot` and `RestoreSnapshot` implementations for a 
wrapper type.
+///
+/// The wrapper type (e.g. `Streams`) must implement `Snapshotable`.
+///
+/// # Example
+///
+/// ```ignore
+/// impl_fill_restore!(Users, users);
+/// ```
+#[macro_export]
+macro_rules! impl_fill_restore {
+    ($wrapper:ident, $field:ident) => {
+        impl 
$crate::stm::snapshot::FillSnapshot<$crate::stm::snapshot::MetadataSnapshot>
+            for $wrapper
+        {
+            fn fill_snapshot(
+                &self,
+                snapshot: &mut $crate::stm::snapshot::MetadataSnapshot,
+            ) -> Result<(), $crate::stm::snapshot::SnapshotError> {
+                use $crate::stm::snapshot::Snapshotable;
+                snapshot.$field = Some(self.to_snapshot());
+                Ok(())
+            }
+        }
+
+        impl 
$crate::stm::snapshot::RestoreSnapshot<$crate::stm::snapshot::MetadataSnapshot>
+            for $wrapper
+        {
+            fn restore_snapshot(
+                snapshot: &$crate::stm::snapshot::MetadataSnapshot,
+            ) -> Result<Self, $crate::stm::snapshot::SnapshotError> {
+                use serde::de::Error as _;
+                use $crate::stm::snapshot::{SnapshotError, Snapshotable};
+                let snap = snapshot.$field.clone().ok_or_else(|| {
+                    
SnapshotError::Deserialize(rmp_serde::decode::Error::custom(format_args!(
+                        "Snapshot Restore Error: {}",
+                        stringify!($field)
+                    )))
+                })?;
+                Self::from_snapshot(snap)
+            }
+        }
+    };
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::stm::stream::{StatsSnapshot, StreamSnapshot};
+    use iggy_common::IggyTimestamp;
+
+    #[test]
+    fn test_metadata_snapshot_roundtrip() {
+        let snapshot = MetadataSnapshot::new(42);
+
+        let encoded = snapshot.encode().unwrap();
+        let decoded = MetadataSnapshot::decode(&encoded).unwrap();
+
+        assert_eq!(decoded.sequence_number, 42);
+        assert!(decoded.users.is_none());
+        assert!(decoded.streams.is_none());
+        assert!(decoded.consumer_groups.is_none());
+    }
+
+    #[test]
+    fn roundtrip_with_data() {
+        let ts = IggyTimestamp::from(1694968446131680u64);
+
+        let mut snapshot = MetadataSnapshot::new(100);
+        snapshot.streams = Some(StreamsSnapshot {
+            items: vec![(
+                0,
+                StreamSnapshot {
+                    id: 0,
+                    name: "events".to_string(),
+                    created_at: ts,
+                    stats: StatsSnapshot {
+                        size_bytes: 1024,
+                        messages_count: 50,
+                        segments_count: 2,
+                    },
+                    topics: vec![],
+                },
+            )],
+        });
+
+        let encoded = snapshot.encode().unwrap();
+        let decoded = MetadataSnapshot::decode(&encoded).unwrap();
+
+        assert_eq!(decoded.sequence_number, 100);
+        assert!(decoded.users.is_none());
+        assert!(decoded.consumer_groups.is_none());
+
+        let streams = decoded.streams.as_ref().unwrap();
+        assert_eq!(streams.items.len(), 1);
+
+        let (slab_id, stream) = &streams.items[0];
+        assert_eq!(*slab_id, 0);
+        assert_eq!(stream.name, "events");
+        assert_eq!(stream.created_at.as_micros(), ts.as_micros());
+        assert_eq!(stream.stats.size_bytes, 1024);
+        assert_eq!(stream.stats.messages_count, 50);
+        assert_eq!(stream.stats.segments_count, 2);
+        assert_eq!(stream.topics.len(), 0);
+    }
+
+    #[test]
+    fn roundtrip_with_slab_gaps() {
+        use crate::stm::stream::StreamsSnapshot;
+        use crate::stm::user::{PermissionerSnapshot, UserSnapshot, 
UsersSnapshot};
+        use iggy_common::UserStatus;
+
+        let ts = IggyTimestamp::from(1694968446131680u64);
+
+        let users_snap = UsersSnapshot {
+            items: vec![
+                (
+                    0,
+                    UserSnapshot {
+                        id: 0,
+                        username: "alice".to_string(),
+                        password_hash: "hash_a".to_string(),
+                        status: UserStatus::Active,
+                        created_at: ts,
+                        permissions: None,
+                    },
+                ),
+                (
+                    2,
+                    UserSnapshot {
+                        id: 2,
+                        username: "charlie".to_string(),
+                        password_hash: "hash_c".to_string(),
+                        status: UserStatus::Active,
+                        created_at: ts,
+                        permissions: None,
+                    },
+                ),
+            ],
+            personal_access_tokens: vec![],
+            permissioner: PermissionerSnapshot {
+                users_permissions: vec![],
+                users_streams_permissions: vec![],
+                users_that_can_poll_messages_from_all_streams: vec![],
+                users_that_can_send_messages_to_all_streams: vec![],
+                users_that_can_poll_messages_from_specific_streams: vec![],
+                users_that_can_send_messages_to_specific_streams: vec![],
+            },
+        };
+
+        let streams_snap = StreamsSnapshot {
+            items: vec![
+                (
+                    0,
+                    StreamSnapshot {
+                        id: 0,
+                        name: "stream-0".to_string(),
+                        created_at: ts,
+                        stats: StatsSnapshot {
+                            size_bytes: 100,
+                            messages_count: 10,
+                            segments_count: 1,
+                        },
+                        topics: vec![],
+                    },
+                ),
+                (
+                    3,
+                    StreamSnapshot {
+                        id: 3,
+                        name: "stream-3".to_string(),
+                        created_at: ts,
+                        stats: StatsSnapshot {
+                            size_bytes: 200,
+                            messages_count: 20,
+                            segments_count: 2,
+                        },
+                        topics: vec![],
+                    },
+                ),
+            ],
+        };
+
+        let mut snapshot = MetadataSnapshot::new(99);
+        snapshot.users = Some(users_snap);
+        snapshot.streams = Some(streams_snap);
+
+        let encoded = snapshot.encode().unwrap();
+        let decoded = MetadataSnapshot::decode(&encoded).unwrap();
+
+        use crate::stm::user::Users;
+        let restored_users: Users = 
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+
+        let mut verify = MetadataSnapshot::new(0);
+        restored_users.fill_snapshot(&mut verify).unwrap();
+        let users_snap = verify.users.unwrap();
+        assert_eq!(users_snap.items.len(), 2);
+        assert_eq!(users_snap.items[0].0, 0);
+        assert_eq!(users_snap.items[0].1.username, "alice");
+        assert_eq!(users_snap.items[0].1.id, 0);
+        assert_eq!(users_snap.items[1].0, 2);
+        assert_eq!(users_snap.items[1].1.username, "charlie");
+        assert_eq!(users_snap.items[1].1.id, 2);
+
+        use crate::stm::stream::Streams;
+        let restored_streams: Streams = 
RestoreSnapshot::restore_snapshot(&decoded).unwrap();
+
+        let mut verify = MetadataSnapshot::new(0);
+        restored_streams.fill_snapshot(&mut verify).unwrap();
+        let streams_snap = verify.streams.unwrap();
+        assert_eq!(streams_snap.items.len(), 2);
+        assert_eq!(streams_snap.items[0].0, 0);
+        assert_eq!(streams_snap.items[0].1.name, "stream-0");
+        assert_eq!(streams_snap.items[1].0, 3);
+        assert_eq!(streams_snap.items[1].1.name, "stream-3");
+    }
+}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 4f212cd20..7e4c95104 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -17,7 +17,8 @@
 
 use crate::stats::{StreamStats, TopicStats};
 use crate::stm::StateHandler;
-use crate::{collect_handlers, define_state};
+use crate::stm::snapshot::Snapshotable;
+use crate::{collect_handlers, define_state, impl_fill_restore};
 use ahash::AHashMap;
 use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::create_stream::CreateStream;
@@ -30,9 +31,17 @@ use iggy_common::purge_topic::PurgeTopic;
 use iggy_common::update_stream::UpdateStream;
 use iggy_common::update_topic::UpdateTopic;
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
+use serde::{Deserialize, Serialize};
 use slab::Slab;
 use std::sync::Arc;
-use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+/// Partition snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PartitionSnapshot {
+    pub id: usize,
+    pub created_at: IggyTimestamp,
+}
 
 #[derive(Debug, Clone)]
 pub struct Partition {
@@ -46,6 +55,29 @@ impl Partition {
     }
 }
 
+/// Stats snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StatsSnapshot {
+    pub size_bytes: u64,
+    pub messages_count: u64,
+    pub segments_count: u32,
+}
+
+/// Topic snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct TopicSnapshot {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+    pub replication_factor: u8,
+    pub message_expiry: IggyExpiry,
+    pub compression_algorithm: CompressionAlgorithm,
+    pub max_topic_size: MaxTopicSize,
+    pub stats: StatsSnapshot,
+    pub partitions: Vec<PartitionSnapshot>,
+    pub round_robin_counter: usize,
+}
+
 #[derive(Debug, Clone)]
 pub struct Topic {
     pub id: usize,
@@ -103,6 +135,16 @@ impl Topic {
     }
 }
 
+/// Stream snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StreamSnapshot {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+    pub stats: StatsSnapshot,
+    pub topics: Vec<(usize, TopicSnapshot)>,
+}
+
 #[derive(Debug)]
 pub struct Stream {
     pub id: usize,
@@ -466,3 +508,147 @@ impl StateHandler for DeletePartitions {
         }
     }
 }
+
+/// Snapshot representation for the Streams state machine.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct StreamsSnapshot {
+    pub items: Vec<(usize, StreamSnapshot)>,
+}
+
+impl Snapshotable for Streams {
+    type Snapshot = StreamsSnapshot;
+
+    fn to_snapshot(&self) -> Self::Snapshot {
+        self.inner.read(|inner| {
+            let items: Vec<(usize, StreamSnapshot)> = inner
+                .items
+                .iter()
+                .map(|(stream_id, stream)| {
+                    let (size_bytes, messages_count, segments_count) =
+                        stream.stats.load_for_snapshot();
+                    let topics: Vec<(usize, TopicSnapshot)> = stream
+                        .topics
+                        .iter()
+                        .map(|(topic_id, topic)| {
+                            let (t_size, t_msgs, t_segs) = 
topic.stats.load_for_snapshot();
+                            (
+                                topic_id,
+                                TopicSnapshot {
+                                    id: topic.id,
+                                    name: topic.name.to_string(),
+                                    created_at: topic.created_at,
+                                    replication_factor: 
topic.replication_factor,
+                                    message_expiry: topic.message_expiry,
+                                    compression_algorithm: 
topic.compression_algorithm,
+                                    max_topic_size: topic.max_topic_size,
+                                    stats: StatsSnapshot {
+                                        size_bytes: t_size,
+                                        messages_count: t_msgs,
+                                        segments_count: t_segs,
+                                    },
+                                    partitions: topic
+                                        .partitions
+                                        .iter()
+                                        .map(|p| PartitionSnapshot {
+                                            id: p.id,
+                                            created_at: p.created_at,
+                                        })
+                                        .collect(),
+                                    round_robin_counter: topic
+                                        .round_robin_counter
+                                        .load(Ordering::Relaxed),
+                                },
+                            )
+                        })
+                        .collect();
+                    (
+                        stream_id,
+                        StreamSnapshot {
+                            id: stream.id,
+                            name: stream.name.to_string(),
+                            created_at: stream.created_at,
+                            stats: StatsSnapshot {
+                                size_bytes,
+                                messages_count,
+                                segments_count,
+                            },
+                            topics,
+                        },
+                    )
+                })
+                .collect();
+            StreamsSnapshot { items }
+        })
+    }
+
+    fn from_snapshot(
+        snapshot: Self::Snapshot,
+    ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
+        let mut index: AHashMap<Arc<str>, usize> = AHashMap::new();
+        let mut stream_entries: Vec<(usize, Stream)> = Vec::new();
+
+        for (slab_key, stream_snap) in snapshot.items {
+            let stream_stats = Arc::new(StreamStats::default());
+            stream_stats.store_from_snapshot(
+                stream_snap.stats.size_bytes,
+                stream_snap.stats.messages_count,
+                stream_snap.stats.segments_count,
+            );
+
+            let mut topic_index: AHashMap<Arc<str>, usize> = AHashMap::new();
+            let mut topic_entries: Vec<(usize, Topic)> = Vec::new();
+
+            for (topic_slab_key, topic_snap) in stream_snap.topics {
+                let topic_stats = 
Arc::new(TopicStats::new(stream_stats.clone()));
+                topic_stats.store_from_snapshot(
+                    topic_snap.stats.size_bytes,
+                    topic_snap.stats.messages_count,
+                    topic_snap.stats.segments_count,
+                );
+                let topic_name: Arc<str> = Arc::from(topic_snap.name.as_str());
+                let topic = Topic {
+                    id: topic_snap.id,
+                    name: topic_name.clone(),
+                    created_at: topic_snap.created_at,
+                    replication_factor: topic_snap.replication_factor,
+                    message_expiry: topic_snap.message_expiry,
+                    compression_algorithm: topic_snap.compression_algorithm,
+                    max_topic_size: topic_snap.max_topic_size,
+                    stats: topic_stats,
+                    partitions: topic_snap
+                        .partitions
+                        .into_iter()
+                        .map(|p| Partition {
+                            id: p.id,
+                            created_at: p.created_at,
+                        })
+                        .collect(),
+                    round_robin_counter: 
Arc::new(AtomicUsize::new(topic_snap.round_robin_counter)),
+                };
+                topic_index.insert(topic_name, topic_slab_key);
+                topic_entries.push((topic_slab_key, topic));
+            }
+
+            let topics: Slab<Topic> = topic_entries.into_iter().collect();
+
+            let stream_name: Arc<str> = Arc::from(stream_snap.name.as_str());
+            let stream = Stream {
+                id: stream_snap.id,
+                name: stream_name.clone(),
+                created_at: stream_snap.created_at,
+                stats: stream_stats,
+                topics,
+                topic_index,
+            };
+
+            index.insert(stream_name, slab_key);
+            stream_entries.push((slab_key, stream));
+        }
+
+        let items: Slab<Stream> = stream_entries.into_iter().collect();
+        let inner = StreamsInner { index, items };
+        Ok(inner.into())
+    }
+}
+
+impl_fill_restore!(Streams, streams);
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 88bc178ea..14f71f3af 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -17,7 +17,8 @@
 
 use crate::permissioner::Permissioner;
 use crate::stm::StateHandler;
-use crate::{collect_handlers, define_state};
+use crate::stm::snapshot::Snapshotable;
+use crate::{collect_handlers, define_state, impl_fill_restore};
 use ahash::AHashMap;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
@@ -26,7 +27,11 @@ use 
iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
 use iggy_common::delete_user::DeleteUser;
 use iggy_common::update_permissions::UpdatePermissions;
 use iggy_common::update_user::UpdateUser;
-use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, 
UserStatus};
+use iggy_common::{
+    GlobalPermissions, IggyTimestamp, Permissions, PersonalAccessToken, 
StreamPermissions, UserId,
+    UserStatus,
+};
+use serde::{Deserialize, Serialize};
 use slab::Slab;
 use std::sync::Arc;
 
@@ -258,3 +263,219 @@ impl StateHandler for DeletePersonalAccessToken {
         }
     }
 }
+
+/// User snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UserSnapshot {
+    pub id: UserId,
+    pub username: String,
+    pub password_hash: String,
+    pub status: UserStatus,
+    pub created_at: IggyTimestamp,
+    pub permissions: Option<Permissions>,
+}
+
+/// Personal access token snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PersonalAccessTokenSnapshot {
+    pub user_id: UserId,
+    pub name: String,
+    pub token: String,
+    pub expiry_at: Option<IggyTimestamp>,
+}
+
+/// Permissioner snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct PermissionerSnapshot {
+    pub users_permissions: Vec<(UserId, GlobalPermissions)>,
+    pub users_streams_permissions: Vec<((UserId, usize), StreamPermissions)>,
+    pub users_that_can_poll_messages_from_all_streams: Vec<UserId>,
+    pub users_that_can_send_messages_to_all_streams: Vec<UserId>,
+    pub users_that_can_poll_messages_from_specific_streams: Vec<(UserId, 
usize)>,
+    pub users_that_can_send_messages_to_specific_streams: Vec<(UserId, usize)>,
+}
+
+/// Snapshot representation for the Users state machine.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct UsersSnapshot {
+    pub items: Vec<(usize, UserSnapshot)>,
+    pub personal_access_tokens: Vec<(UserId, Vec<(String, 
PersonalAccessTokenSnapshot)>)>,
+    pub permissioner: PermissionerSnapshot,
+}
+
+impl Snapshotable for Users {
+    type Snapshot = UsersSnapshot;
+
+    fn to_snapshot(&self) -> Self::Snapshot {
+        self.inner.read(|inner| {
+            let items: Vec<(usize, UserSnapshot)> = inner
+                .items
+                .iter()
+                .map(|(user_id, user)| {
+                    (
+                        user_id,
+                        UserSnapshot {
+                            id: user.id,
+                            username: user.username.to_string(),
+                            password_hash: user.password_hash.to_string(),
+                            status: user.status,
+                            created_at: user.created_at,
+                            permissions: user.permissions.as_ref().map(|p| 
(**p).clone()),
+                        },
+                    )
+                })
+                .collect();
+
+            let personal_access_tokens: Vec<(UserId, Vec<(String, 
PersonalAccessTokenSnapshot)>)> =
+                inner
+                    .personal_access_tokens
+                    .iter()
+                    .map(|(&user_id, tokens)| {
+                        let token_list: Vec<(String, 
PersonalAccessTokenSnapshot)> = tokens
+                            .iter()
+                            .map(|(name, pat)| {
+                                (
+                                    name.to_string(),
+                                    PersonalAccessTokenSnapshot {
+                                        user_id: pat.user_id,
+                                        name: pat.name.to_string(),
+                                        token: pat.token.to_string(),
+                                        expiry_at: pat.expiry_at,
+                                    },
+                                )
+                            })
+                            .collect();
+                        (user_id, token_list)
+                    })
+                    .collect();
+
+            let permissioner = PermissionerSnapshot {
+                users_permissions: inner
+                    .permissioner
+                    .users_permissions
+                    .iter()
+                    .map(|(&k, v)| (k, v.clone()))
+                    .collect(),
+                users_streams_permissions: inner
+                    .permissioner
+                    .users_streams_permissions
+                    .iter()
+                    .map(|(&k, v)| (k, v.clone()))
+                    .collect(),
+                users_that_can_poll_messages_from_all_streams: inner
+                    .permissioner
+                    .users_that_can_poll_messages_from_all_streams
+                    .iter()
+                    .copied()
+                    .collect(),
+                users_that_can_send_messages_to_all_streams: inner
+                    .permissioner
+                    .users_that_can_send_messages_to_all_streams
+                    .iter()
+                    .copied()
+                    .collect(),
+                users_that_can_poll_messages_from_specific_streams: inner
+                    .permissioner
+                    .users_that_can_poll_messages_from_specific_streams
+                    .iter()
+                    .copied()
+                    .collect(),
+                users_that_can_send_messages_to_specific_streams: inner
+                    .permissioner
+                    .users_that_can_send_messages_to_specific_streams
+                    .iter()
+                    .copied()
+                    .collect(),
+            };
+
+            UsersSnapshot {
+                items,
+                personal_access_tokens,
+                permissioner,
+            }
+        })
+    }
+
+    fn from_snapshot(
+        snapshot: Self::Snapshot,
+    ) -> Result<Self, crate::stm::snapshot::SnapshotError> {
+        let mut index: AHashMap<Arc<str>, UserId> = AHashMap::new();
+        let mut user_entries: Vec<(usize, User)> = Vec::new();
+
+        for (slab_key, user_snap) in snapshot.items {
+            let username: Arc<str> = Arc::from(user_snap.username.as_str());
+            let user = User {
+                id: user_snap.id,
+                username: username.clone(),
+                password_hash: Arc::from(user_snap.password_hash.as_str()),
+                status: user_snap.status,
+                created_at: user_snap.created_at,
+                permissions: user_snap.permissions.map(Arc::new),
+            };
+
+            index.insert(username, slab_key as UserId);
+            user_entries.push((slab_key, user));
+        }
+
+        let items: Slab<User> = user_entries.into_iter().collect();
+
+        let mut personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, 
PersonalAccessToken>> =
+            AHashMap::new();
+        for (user_id, tokens) in snapshot.personal_access_tokens {
+            let mut token_map: AHashMap<Arc<str>, PersonalAccessToken> = 
AHashMap::new();
+            for (name, pat_snap) in tokens {
+                let pat = PersonalAccessToken::raw(
+                    pat_snap.user_id,
+                    &pat_snap.name,
+                    &pat_snap.token,
+                    pat_snap.expiry_at,
+                );
+                token_map.insert(Arc::from(name.as_str()), pat);
+            }
+            personal_access_tokens.insert(user_id, token_map);
+        }
+
+        let permissioner = Permissioner {
+            users_permissions: snapshot
+                .permissioner
+                .users_permissions
+                .into_iter()
+                .collect(),
+            users_streams_permissions: snapshot
+                .permissioner
+                .users_streams_permissions
+                .into_iter()
+                .collect(),
+            users_that_can_poll_messages_from_all_streams: snapshot
+                .permissioner
+                .users_that_can_poll_messages_from_all_streams
+                .into_iter()
+                .collect(),
+            users_that_can_send_messages_to_all_streams: snapshot
+                .permissioner
+                .users_that_can_send_messages_to_all_streams
+                .into_iter()
+                .collect(),
+            users_that_can_poll_messages_from_specific_streams: snapshot
+                .permissioner
+                .users_that_can_poll_messages_from_specific_streams
+                .into_iter()
+                .collect(),
+            users_that_can_send_messages_to_specific_streams: snapshot
+                .permissioner
+                .users_that_can_send_messages_to_specific_streams
+                .into_iter()
+                .collect(),
+        };
+
+        let inner = UsersInner {
+            index,
+            items,
+            personal_access_tokens,
+            permissioner,
+        };
+        Ok(inner.into())
+    }
+}
+
+impl_fill_restore!(Users, users);

Reply via email to