This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new ed4415458 feat(metadata): impl base mux_state_machine (#2474)
ed4415458 is described below

commit ed4415458649ccdbfdbc3304e97f210a40a77d6b
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Dec 12 10:30:26 2025 +0100

    feat(metadata): impl base mux_state_machine (#2474)
    
    Base for MuxStateMachine
---
 .../metadata/src/stm/{mod.rs => consumer_group.rs} | 14 +++-
 core/metadata/src/stm/mod.rs                       | 21 +++++-
 core/metadata/src/stm/mux.rs                       | 84 +++++++++++++++++++++-
 core/metadata/src/stm/stream.rs                    | 12 ++++
 core/metadata/src/stm/{mod.rs => user.rs}          | 14 +++-
 5 files changed, 135 insertions(+), 10 deletions(-)

diff --git a/core/metadata/src/stm/mod.rs 
b/core/metadata/src/stm/consumer_group.rs
similarity index 81%
copy from core/metadata/src/stm/mod.rs
copy to core/metadata/src/stm/consumer_group.rs
index 853a1c43e..4aed76aed 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,6 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod mux;
-mod stream;
-// TODO: Add more state machines.
+use crate::stm::State;
+
+pub struct ConsumerGroups {}
+
+impl State for ConsumerGroups {
+    type Output = ();
+
+    fn apply(&self) -> Option<Self::Output> {
+        Some(())
+    }
+}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 853a1c43e..80646a559 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -15,6 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod mux;
-mod stream;
+pub mod consumer_group;
+pub mod stream;
+pub mod user;
+
+pub mod mux;
 // TODO: Add more state machines.
+
+pub trait State {
+    type Output;
+    // Apply the state machine logic and return an optional output.
+    // The output is optional, as we model the `StateMachine`, as an variadic 
list,
+    // where not all state machines will produce an output for every input 
event.
+    fn apply(&self) -> Option<Self::Output>;
+}
+
+pub trait StateMachine {
+    type Input;
+    type Output;
+    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
+}
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 7907c8db8..22601bdc0 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,5 +15,85 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[expect(unused)]
-struct MuxStateMachine {}
+use crate::stm::{State, StateMachine};
+use iggy_common::{header::PrepareHeader, message::Message};
+
+// MuxStateMachine that proxies to an tuple of variadic state machines
+pub struct MuxStateMachine<T>
+where
+    T: StateMachine,
+{
+    inner: T,
+}
+
+impl<T> MuxStateMachine<T>
+where
+    T: StateMachine,
+{
+    pub fn new(inner: T) -> Self {
+        Self { inner }
+    }
+}
+
+impl<T> StateMachine for MuxStateMachine<T>
+where
+    T: StateMachine,
+{
+    type Input = T::Input;
+    type Output = T::Output;
+
+    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
+        self.inner.update(input, output);
+    }
+}
+
+//TODO: Move to common
+#[macro_export]
+macro_rules! variadic {
+    () => ( () );
+    (...$a:ident  $(,)? ) => ( $a );
+    (...$a:expr  $(,)? ) => ( $a );
+    ($a:ident  $(,)? ) => ( ($a, ()) );
+    ($a:expr  $(,)? ) => ( ($a, ()) );
+    ($a:ident,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
+    ($a:expr,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
+}
+
+// Base case of the recursive resolution.
+impl StateMachine for () {
+    type Input = Message<PrepareHeader>;
+    // TODO: Make sure that the `Output` matches to the output type of the 
rest of list.
+    type Output = ();
+
+    fn update(&self, _input: &Self::Input, _output: &mut Vec<Self::Output>) {}
+}
+
+// Recursive case: process head and recurse on tail
+impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
+where
+    S: State<Output = O>,
+    Rest: StateMachine<Input = Message<PrepareHeader>, Output = O>,
+{
+    type Input = Message<PrepareHeader>;
+    type Output = O;
+
+    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
+        if let Some(result) = self.0.apply() {
+            output.push(result);
+        }
+        self.1.update(input, output)
+    }
+}
+
+mod tests {
+
+    #[test]
+    fn construct_mux_state_machine_from_states_with_same_output() {
+        use crate::stm::*;
+
+        let users = user::Users {};
+        let streams = stream::Streams {};
+        let cgs = consumer_group::ConsumerGroups {};
+        let _mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+    }
+}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index b248758bc..737e21e94 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -14,3 +14,15 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
+use crate::stm::State;
+
+pub struct Streams {}
+
+impl State for Streams {
+    type Output = ();
+
+    fn apply(&self) -> Option<Self::Output> {
+        Some(())
+    }
+}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/user.rs
similarity index 83%
copy from core/metadata/src/stm/mod.rs
copy to core/metadata/src/stm/user.rs
index 853a1c43e..d5f0fac96 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,6 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod mux;
-mod stream;
-// TODO: Add more state machines.
+use crate::stm::State;
+
+pub struct Users {}
+
+impl State for Users {
+    type Output = ();
+
+    fn apply(&self) -> Option<Self::Output> {
+        Some(())
+    }
+}

Reply via email to