numinnex commented on code in PR #2675:
URL: https://github.com/apache/iggy/pull/2675#discussion_r2773183788
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -23,6 +27,114 @@ use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};
+/// Trait for metadata snapshot implementations.
+///
+/// This is the interface that `MetadataHandle::Snapshot` must satisfy.
+/// It provides methods for creating, encoding, decoding, and restoring
snapshots.
+#[allow(unused)]
+pub trait MetadataSnapshot: Sized {
+ /// The error type for snapshot operations.
+ type Error: std::error::Error;
+
+ /// Create a snapshot from the current state of the mux state machine.
+ ///
+ /// # Arguments
+ /// * `mux` - The multiplexing state machine containing all sub-state
machines
+ /// * `commit_number` - The VSR commit number this snapshot corresponds to
+ fn create<T>(mux: &MuxStateMachine<T>, commit_number: u64) -> Result<Self,
Self::Error>
+ where
+ T: StateMachine + SnapshotContributor;
+
+ /// Encode the snapshot to msgpack bytes.
+ fn encode(&self) -> Result<Vec<u8>, Self::Error>;
+
+ /// Decode a snapshot from msgpack bytes.
+ fn decode(bytes: &[u8]) -> Result<Self, Self::Error>;
+
+ /// Restore a mux state machine from this snapshot.
+ fn restore<T>(&self) -> Result<MuxStateMachine<T>, Self::Error>
+ where
+ T: StateMachine + SnapshotContributor;
+
+ /// Get the VSR commit number this snapshot corresponds to.
Review Comment:
Maybe we could use an `sequence_number` instead of commit number, the idea
is that the sequence number would be monotonically growing on each snapshot,
independent of the `commit` number from consensus.
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -23,6 +27,114 @@ use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};
+/// Trait for metadata snapshot implementations.
+///
+/// This is the interface that `MetadataHandle::Snapshot` must satisfy.
+/// It provides methods for creating, encoding, decoding, and restoring
snapshots.
+#[allow(unused)]
+pub trait MetadataSnapshot: Sized {
Review Comment:
Lets rename this to `Snapshot` trait, if there already exists one (that I
created as an stub), remove it.
##########
core/metadata/src/stm/mux.rs:
##########
@@ -88,7 +89,57 @@ where
}
}
+/// Recursive case for variadic tuple pattern: (Head, Tail)
Review Comment:
We could alternatively look into extending the `StateMachine` trait, by
adding extra bounds on it
```rs
trait StateMachine + Snapshotable + ...
{
}
```
and then have single impl block.
##########
core/metadata/src/stm/mux.rs:
##########
@@ -88,7 +89,57 @@ where
}
}
+/// Recursive case for variadic tuple pattern: (Head, Tail)
Review Comment:
Hmm, I am not sure if this is the simplest way to go around it.
I imagine the flow of abstraction to be identical to one of the
`StateMachine` trait,
e.g we would have an `Snapshotable` trait, that would be implemented for
everything that impls `StateMachine` trait,
since `StateMachine` trait is implemented both for `MuxStateMachine` aswell
as the `variadic!()` tuple of state machines, the API would be called this way:
```rs
fn snapshot_metadata(&self, ....) {
let mut snapshot = MetadataSnapshot::default(); <-- This would impl the
`Snapshot` trait.
// Here is the magic, `fill_snapshot` impl for `MuxStateMachine` would
proxy to the `variadic!()` tuple impl,
// impl Snapshotable for variadic!(St, ... Rest)
// where St: StateMachine + FillSnapshot,
// Rest: Snapshotable,
// {
// fn fill_snapshot<S: Snapshot>(&self, snapshot: &mut S) {
// self.0.do_fill_snapshot(snapshot);
// self.1.fill_snapshot(snapshot);
// }
// fn restore_snapshot(...) {
// // ... Here similar code for restoring.
// }
// }
// You can choose whatever trait name for the `FillSnapshot` trait, smth
that covers filling/recovering
// You can have associated types on the `Snapshot` trait that would be
used to represent the currently filled snapshot
// (for example a binary blob), and use those as arguments into the
`fill_snapshot` and `restore_snapshot` methods.
// and the snapshot from which we would restore State.
self.mux.fill_snapshot(&mut snapshot);
}
The idea is to avoid using those `const SECTION:NAME: &'str`, rather rely on
the type checking to perform the walk for us.
```
##########
core/metadata/src/stm/snapshot.rs:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize, de::DeserializeOwned};
+use std::fmt;
+
+#[derive(Debug)]
+pub enum SnapshotError {
+ /// A required section is missing from the snapshot.
+ MissingSection(&'static str),
+ /// Serialization failed.
+ Serialize(rmp_serde::encode::Error),
+ /// Deserialization failed.
+ Deserialize(rmp_serde::decode::Error),
+ /// Slab ID mismatch during snapshot restore.
+ SlabIdMismatch {
+ section: &'static str,
+ expected: usize,
+ actual: usize,
+ },
+ /// Duplicate section name detected in snapshot.
+ DuplicateSection(String),
+}
+
+impl fmt::Display for SnapshotError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SnapshotError::MissingSection(name) => {
+ write!(f, "missing snapshot section: {}", name)
+ }
+ SnapshotError::Serialize(e) => write!(f, "snapshot serialization
failed: {}", e),
+ SnapshotError::Deserialize(e) => write!(f, "snapshot
deserialization failed: {}", e),
+ SnapshotError::SlabIdMismatch {
+ section,
+ expected,
+ actual,
+ } => {
+ write!(
+ f,
+ "slab ID mismatch in section '{}': expected {}, got {}",
+ section, expected, actual
+ )
+ }
+ SnapshotError::DuplicateSection(name) => {
+ write!(f, "duplicate snapshot section name: {}", name)
+ }
+ }
+ }
+}
+
+impl std::error::Error for SnapshotError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ SnapshotError::Serialize(e) => Some(e),
+ SnapshotError::Deserialize(e) => Some(e),
+ _ => None,
+ }
+ }
+}
+
+/// The outer envelope for a complete metadata snapshot.
+///
+/// Contains metadata about the snapshot and a collection of sections,
+/// where each section corresponds to one state machine's serialized state.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct SnapshotEnvelope {
Review Comment:
This is a valid approach, to store an type-erased bytes inside of the
snapshot, but alternative approach (the one that redpanda did choose), is for
the snapshot to store an `intermediate` representation of all the
`Snapshotable` states and serialize those. So rather than serializing
individual states, they serialize the snapshot instead.
##########
core/metadata/src/stm/consumer_group.rs:
##########
@@ -225,3 +227,150 @@ impl Handler for ConsumerGroupsInner {
}
}
}
+
+/// Consumer group member snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
Review Comment:
I think we don't need those intermediate structs, we can have the `State`
structs impl `Serialize` and `Deserialize` directly.
##########
core/metadata/src/stm/consumer_group.rs:
##########
@@ -225,3 +227,150 @@ impl Handler for ConsumerGroupsInner {
}
}
}
+
+/// Consumer group member snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupMemberSnapshot {
+ pub id: usize,
+ pub client_id: u32,
+ pub partitions: Vec<usize>,
+ pub partition_index: usize,
+}
+
+/// Consumer group snapshot representation for serialization.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupSnapshot {
+ pub id: usize,
+ pub name: String,
+ pub partitions: Vec<usize>,
+ pub members: Vec<(usize, ConsumerGroupMemberSnapshot)>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ConsumerGroupsSnapshot {
+ pub items: Vec<(usize, ConsumerGroupSnapshot)>,
+ pub topic_index: Vec<((usize, usize), Vec<usize>)>,
+ pub topic_name_index: Vec<((String, String), Vec<usize>)>,
+}
+
+impl Snapshotable for ConsumerGroupsInner {
Review Comment:
Impl those for `ConsumerGroup` and `Streams` and `Users`, rather for their
inners. You can create an metehod on the `LeftRight`, that exposes the read
handle.
##########
core/metadata/src/stm/snapshot.rs:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize, de::DeserializeOwned};
+use std::fmt;
+
+#[derive(Debug)]
+pub enum SnapshotError {
+ /// A required section is missing from the snapshot.
+ MissingSection(&'static str),
+ /// Serialization failed.
+ Serialize(rmp_serde::encode::Error),
+ /// Deserialization failed.
+ Deserialize(rmp_serde::decode::Error),
+ /// Slab ID mismatch during snapshot restore.
+ SlabIdMismatch {
+ section: &'static str,
+ expected: usize,
+ actual: usize,
+ },
+ /// Duplicate section name detected in snapshot.
+ DuplicateSection(String),
+}
+
+impl fmt::Display for SnapshotError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ SnapshotError::MissingSection(name) => {
+ write!(f, "missing snapshot section: {}", name)
+ }
+ SnapshotError::Serialize(e) => write!(f, "snapshot serialization
failed: {}", e),
+ SnapshotError::Deserialize(e) => write!(f, "snapshot
deserialization failed: {}", e),
+ SnapshotError::SlabIdMismatch {
+ section,
+ expected,
+ actual,
+ } => {
+ write!(
+ f,
+ "slab ID mismatch in section '{}': expected {}, got {}",
+ section, expected, actual
+ )
+ }
+ SnapshotError::DuplicateSection(name) => {
+ write!(f, "duplicate snapshot section name: {}", name)
+ }
+ }
+ }
+}
+
+impl std::error::Error for SnapshotError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ SnapshotError::Serialize(e) => Some(e),
+ SnapshotError::Deserialize(e) => Some(e),
+ _ => None,
+ }
+ }
+}
+
+/// The outer envelope for a complete metadata snapshot.
+///
+/// Contains metadata about the snapshot and a collection of sections,
+/// where each section corresponds to one state machine's serialized state.
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct SnapshotEnvelope {
Review Comment:
I didn't think about it a lot and not sure what is the trade off space
between those two approaches (I guess you need more allocations with their
approach, as you have to copy the data first and then serialize), where with
our approach, we just serialize the state and append it to the snapshot
envelope.
##########
core/metadata/src/impls/metadata.rs:
##########
@@ -23,6 +27,114 @@ use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};
+/// Trait for metadata snapshot implementations.
+///
+/// This is the interface that `MetadataHandle::Snapshot` must satisfy.
+/// It provides methods for creating, encoding, decoding, and restoring
snapshots.
+#[allow(unused)]
+pub trait MetadataSnapshot: Sized {
+ /// The error type for snapshot operations.
+ type Error: std::error::Error;
+
+ /// Create a snapshot from the current state of the mux state machine.
+ ///
+ /// # Arguments
+ /// * `mux` - The multiplexing state machine containing all sub-state
machines
+ /// * `commit_number` - The VSR commit number this snapshot corresponds to
+ fn create<T>(mux: &MuxStateMachine<T>, commit_number: u64) -> Result<Self,
Self::Error>
Review Comment:
We impl `StateMachine` for `MuxStateMachine`, therefor I think we could
accept as first argument `stm: &mut T`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]