This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new aff29a544 feat(cluster): create a unified abstraction for subsystems 
(#2780)
aff29a544 is described below

commit aff29a544372fe5487598339830dafecb39aa14e
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Feb 20 17:53:25 2026 +0100

    feat(cluster): create a unified abstraction for subsystems (#2780)
    
    Adds an `Plane` trait that is used to dispatch requests to different
    subsystems based on the plane demux logic (similarly to how
    `StateMachine` works).
---
 core/common/src/lib.rs                     |   1 +
 core/common/src/macros.rs                  |  27 ++++
 core/common/src/types/consensus/header.rs  | 199 ++++++++++++++++++++++++++++-
 core/common/src/types/consensus/message.rs |  37 ++++--
 core/consensus/src/lib.rs                  |  30 +++--
 core/consensus/src/plane_mux.rs            | 120 +++++++++++++++++
 core/metadata/src/impls/metadata.rs        |  57 ++++++++-
 core/metadata/src/stm/mux.rs               |  13 +-
 core/partitions/src/iggy_partitions.rs     |  27 +++-
 core/simulator/src/deps.rs                 |   8 +-
 core/simulator/src/lib.rs                  |  58 +++------
 core/simulator/src/replica.rs              |  29 ++---
 12 files changed, 508 insertions(+), 98 deletions(-)

diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index a87c07b5c..6e8b4dcf7 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -22,6 +22,7 @@ mod certificates;
 mod commands;
 mod deduplication;
 mod error;
+mod macros;
 mod sender;
 pub mod sharding;
 mod traits;
diff --git a/core/common/src/macros.rs b/core/common/src/macros.rs
new file mode 100644
index 000000000..c7c1ce3f6
--- /dev/null
+++ b/core/common/src/macros.rs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! variadic {
+    () => (());
+    (...$a:ident $(,)?) => ($a);
+    (...$a:expr $(,)?) => ($a);
+    ($a:ident $(,)?) => (($a, ()));
+    ($a:expr $(,)?) => (($a, ()));
+    ($a:ident, $($b:tt)+) => (($a, $crate::variadic!($($b)+)));
+    ($a:expr, $($b:tt)+) => (($a, $crate::variadic!($($b)+)));
+}
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index 2991f89c3..2f1a83d62 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -21,11 +21,10 @@ use thiserror::Error;
 const HEADER_SIZE: usize = 256;
 pub trait ConsensusHeader: Sized + Pod + Zeroable {
     const COMMAND: Command2;
-    // TODO: Trait consts are never evaluated unless explicitly accessed (e.g. 
`<T as ConsensusHeader>::_SIZE_CHECK`).
-    // The size invariant is enforced by repr(C) layout + bytemuck Pod derive; 
consider adding a static_assert in each impl.
-    const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() == 
HEADER_SIZE);
 
     fn validate(&self) -> Result<(), ConsensusError>;
+    fn operation(&self) -> Operation;
+    fn command(&self) -> Command2;
     fn size(&self) -> u32;
 }
 
@@ -139,6 +138,20 @@ pub struct GenericHeader {
 
     pub reserved_command: [u8; 128],
 }
+const _: () = {
+    assert!(core::mem::size_of::<GenericHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(GenericHeader, reserved_command)
+            == core::mem::offset_of!(GenericHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(GenericHeader, reserved_command) + 
core::mem::size_of::<[u8; 128]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for GenericHeader {}
 unsafe impl Zeroable for GenericHeader {}
@@ -146,6 +159,14 @@ unsafe impl Zeroable for GenericHeader {}
 impl ConsensusHeader for GenericHeader {
     const COMMAND: Command2 = Command2::Reserved;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         Ok(())
     }
@@ -177,6 +198,20 @@ pub struct RequestHeader {
     pub namespace: u64,
     pub reserved: [u8; 64],
 }
+const _: () = {
+    assert!(core::mem::size_of::<RequestHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(RequestHeader, client)
+            == core::mem::offset_of!(RequestHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(RequestHeader, reserved) + 
core::mem::size_of::<[u8; 64]>()
+            == HEADER_SIZE
+    );
+};
 
 impl Default for RequestHeader {
     fn default() -> Self {
@@ -208,6 +243,10 @@ unsafe impl Zeroable for RequestHeader {}
 impl ConsensusHeader for RequestHeader {
     const COMMAND: Command2 = Command2::Request;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Request {
             return Err(ConsensusError::InvalidCommand {
@@ -217,6 +256,9 @@ impl ConsensusHeader for RequestHeader {
         }
         Ok(())
     }
+    fn command(&self) -> Command2 {
+        self.command
+    }
 
     fn size(&self) -> u32 {
         self.size
@@ -249,6 +291,20 @@ pub struct PrepareHeader {
     pub namespace: u64,
     pub reserved: [u8; 32],
 }
+const _: () = {
+    assert!(core::mem::size_of::<PrepareHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(PrepareHeader, client)
+            == core::mem::offset_of!(PrepareHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(PrepareHeader, reserved) + 
core::mem::size_of::<[u8; 32]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for PrepareHeader {}
 unsafe impl Zeroable for PrepareHeader {}
@@ -256,6 +312,10 @@ unsafe impl Zeroable for PrepareHeader {}
 impl ConsensusHeader for PrepareHeader {
     const COMMAND: Command2 = Command2::Prepare;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Prepare {
             return Err(ConsensusError::InvalidCommand {
@@ -265,6 +325,9 @@ impl ConsensusHeader for PrepareHeader {
         }
         Ok(())
     }
+    fn command(&self) -> Command2 {
+        self.command
+    }
 
     fn size(&self) -> u32 {
         self.size
@@ -323,6 +386,20 @@ pub struct PrepareOkHeader {
     pub namespace: u64,
     pub reserved: [u8; 48],
 }
+const _: () = {
+    assert!(core::mem::size_of::<PrepareOkHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(PrepareOkHeader, parent)
+            == core::mem::offset_of!(PrepareOkHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(PrepareOkHeader, reserved) + 
core::mem::size_of::<[u8; 48]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for PrepareOkHeader {}
 unsafe impl Zeroable for PrepareOkHeader {}
@@ -330,6 +407,13 @@ unsafe impl Zeroable for PrepareOkHeader {}
 impl ConsensusHeader for PrepareOkHeader {
     const COMMAND: Command2 = Command2::PrepareOk;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::PrepareOk {
             return Err(ConsensusError::InvalidCommand {
@@ -391,6 +475,20 @@ pub struct CommitHeader {
     pub namespace: u64,
     pub reserved: [u8; 80],
 }
+const _: () = {
+    assert!(core::mem::size_of::<CommitHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(CommitHeader, commit_checksum)
+            == core::mem::offset_of!(CommitHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(CommitHeader, reserved) + 
core::mem::size_of::<[u8; 80]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for CommitHeader {}
 unsafe impl Zeroable for CommitHeader {}
@@ -398,6 +496,13 @@ unsafe impl Zeroable for CommitHeader {}
 impl ConsensusHeader for CommitHeader {
     const COMMAND: Command2 = Command2::Commit;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Commit {
             return Err(ConsensusError::CommitInvalidCommand2);
@@ -435,8 +540,22 @@ pub struct ReplyHeader {
     pub operation: Operation,
     pub operation_padding: [u8; 7],
     pub namespace: u64,
-    pub reserved: [u8; 41],
+    pub reserved: [u8; 48],
 }
+const _: () = {
+    assert!(core::mem::size_of::<ReplyHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(ReplyHeader, request_checksum)
+            == core::mem::offset_of!(ReplyHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(ReplyHeader, reserved) + 
core::mem::size_of::<[u8; 48]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for ReplyHeader {}
 unsafe impl Zeroable for ReplyHeader {}
@@ -444,6 +563,13 @@ unsafe impl Zeroable for ReplyHeader {}
 impl ConsensusHeader for ReplyHeader {
     const COMMAND: Command2 = Command2::Reply;
 
+    fn operation(&self) -> Operation {
+        self.operation
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::Reply {
             return Err(ConsensusError::ReplyInvalidCommand2);
@@ -477,7 +603,7 @@ impl Default for ReplyHeader {
             operation: Default::default(),
             operation_padding: [0; 7],
             namespace: 0,
-            reserved: [0; 41],
+            reserved: [0; 48],
         }
     }
 }
@@ -502,6 +628,20 @@ pub struct StartViewChangeHeader {
     pub namespace: u64,
     pub reserved: [u8; 120],
 }
+const _: () = {
+    assert!(core::mem::size_of::<StartViewChangeHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(StartViewChangeHeader, namespace)
+            == core::mem::offset_of!(StartViewChangeHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(StartViewChangeHeader, reserved) + 
core::mem::size_of::<[u8; 120]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for StartViewChangeHeader {}
 unsafe impl Zeroable for StartViewChangeHeader {}
@@ -509,6 +649,13 @@ unsafe impl Zeroable for StartViewChangeHeader {}
 impl ConsensusHeader for StartViewChangeHeader {
     const COMMAND: Command2 = Command2::StartViewChange;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::StartViewChange {
             return Err(ConsensusError::InvalidCommand {
@@ -558,6 +705,20 @@ pub struct DoViewChangeHeader {
     pub log_view: u32,
     pub reserved: [u8; 100],
 }
+const _: () = {
+    assert!(core::mem::size_of::<DoViewChangeHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(DoViewChangeHeader, op)
+            == core::mem::offset_of!(DoViewChangeHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(DoViewChangeHeader, reserved) + 
core::mem::size_of::<[u8; 100]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for DoViewChangeHeader {}
 unsafe impl Zeroable for DoViewChangeHeader {}
@@ -565,6 +726,13 @@ unsafe impl Zeroable for DoViewChangeHeader {}
 impl ConsensusHeader for DoViewChangeHeader {
     const COMMAND: Command2 = Command2::DoViewChange;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::DoViewChange {
             return Err(ConsensusError::InvalidCommand {
@@ -627,6 +795,20 @@ pub struct StartViewHeader {
     pub namespace: u64,
     pub reserved: [u8; 104],
 }
+const _: () = {
+    assert!(core::mem::size_of::<StartViewHeader>() == HEADER_SIZE);
+    // Ensure no implicit padding is inserted between reserved_frame and the 
body fields.
+    assert!(
+        core::mem::offset_of!(StartViewHeader, op)
+            == core::mem::offset_of!(StartViewHeader, reserved_frame)
+                + core::mem::size_of::<[u8; 66]>()
+    );
+    // Ensure no implicit tail padding is inserted after the explicit trailing 
bytes.
+    assert!(
+        core::mem::offset_of!(StartViewHeader, reserved) + 
core::mem::size_of::<[u8; 104]>()
+            == HEADER_SIZE
+    );
+};
 
 unsafe impl Pod for StartViewHeader {}
 unsafe impl Zeroable for StartViewHeader {}
@@ -634,6 +816,13 @@ unsafe impl Zeroable for StartViewHeader {}
 impl ConsensusHeader for StartViewHeader {
     const COMMAND: Command2 = Command2::StartView;
 
+    fn operation(&self) -> Operation {
+        Operation::Default
+    }
+    fn command(&self) -> Command2 {
+        self.command
+    }
+
     fn validate(&self) -> Result<(), ConsensusError> {
         if self.command != Command2::StartView {
             return Err(ConsensusError::InvalidCommand {
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index 5fe7e0843..7ef39cdca 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -22,6 +22,25 @@ use crate::{
 use bytes::Bytes;
 use std::marker::PhantomData;
 
+// TODO: Rename this to Message and ConsensusHeader to Header.
+pub trait ConsensusMessage<H>
+where
+    H: ConsensusHeader,
+{
+    // TODO: fn body(&self) -> Something;
+    fn header(&self) -> &H;
+}
+
+impl<H> ConsensusMessage<H> for Message<H>
+where
+    H: ConsensusHeader,
+{
+    fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+}
+
 #[derive(Debug, Clone)]
 pub struct Message<H: ConsensusHeader> {
     buffer: Bytes,
@@ -32,6 +51,13 @@ impl<H> Message<H>
 where
     H: ConsensusHeader,
 {
+    #[inline]
+    #[allow(unused)]
+    pub fn header(&self) -> &H {
+        let header_bytes = &self.buffer[..size_of::<H>()];
+        bytemuck::from_bytes(header_bytes)
+    }
+
     /// Create a new message from a buffer.
     ///
     /// # Safety
@@ -115,17 +141,6 @@ where
         }
     }
 
-    /// Get a reference to the header using zero-copy access.
-    ///
-    /// This uses `bytemuck::from_bytes` to cast the buffer to the header type
-    /// without any copying or allocation.
-    #[inline]
-    #[allow(unused)]
-    pub fn header(&self) -> &H {
-        let header_bytes = &self.buffer[..size_of::<H>()];
-        bytemuck::from_bytes(header_bytes)
-    }
-
     /// Get a reference to the message body (everything after the header).
     ///
     /// Returns an empty slice if there is no body.
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 0ee87eab6..b1f7460e2 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use iggy_common::header::ConsensusHeader;
+use iggy_common::message::ConsensusMessage;
 use message_bus::MessageBus;
 
 pub trait Project<T, C: Consensus> {
@@ -48,11 +49,14 @@ pub trait Pipeline {
     fn verify(&self);
 }
 
-// TODO: Create type aliases for the Message types, both here and on the 
`Plane` trait.
+pub type RequestMessage<C> = <C as Consensus>::Message<<C as 
Consensus>::RequestHeader>;
+pub type ReplicateMessage<C> = <C as Consensus>::Message<<C as 
Consensus>::ReplicateHeader>;
+pub type AckMessage<C> = <C as Consensus>::Message<<C as 
Consensus>::AckHeader>;
+
 pub trait Consensus: Sized {
     type MessageBus: MessageBus;
     #[rustfmt::skip] // Scuffed formatter.
-    type Message<H> where H: ConsensusHeader;
+    type Message<H>: ConsensusMessage<H> where H: ConsensusHeader;
 
     type RequestHeader: ConsensusHeader;
     type ReplicateHeader: ConsensusHeader;
@@ -79,20 +83,30 @@ pub trait Plane<C>
 where
     C: Consensus,
 {
-    fn on_request(&self, message: C::Message<C::RequestHeader>) -> impl 
Future<Output = ()>
+    fn on_request(&self, message: RequestMessage<C>) -> impl Future<Output = 
()>
     where
-        C::Message<C::RequestHeader>:
-            Project<C::Message<C::ReplicateHeader>, C, Consensus = C> + Clone;
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone;
 
-    fn on_replicate(&self, message: C::Message<C::ReplicateHeader>) -> impl 
Future<Output = ()>
+    fn on_replicate(&self, message: ReplicateMessage<C>) -> impl Future<Output 
= ()>
     where
-        C::Message<C::ReplicateHeader>: Project<C::Message<C::AckHeader>, C, 
Consensus = C> + Clone;
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone;
+
+    fn on_ack(&self, message: AckMessage<C>) -> impl Future<Output = ()>;
+}
 
-    fn on_ack(&self, message: C::Message<C::AckHeader>) -> impl Future<Output 
= ()>;
+pub trait PlaneIdentity<C>
+where
+    C: Consensus,
+{
+    fn is_applicable<H>(&self, message: &C::Message<H>) -> bool
+    where
+        H: ConsensusHeader;
 }
 
 mod impls;
 pub use impls::*;
+mod plane_mux;
+pub use plane_mux::*;
 mod namespaced_pipeline;
 pub use namespaced_pipeline::*;
 mod plane_helpers;
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
new file mode 100644
index 000000000..1f32cb810
--- /dev/null
+++ b/core/consensus/src/plane_mux.rs
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+    AckMessage, Consensus, Plane, PlaneIdentity, Project, ReplicateMessage, 
RequestMessage,
+};
+use iggy_common::variadic;
+
+#[derive(Debug)]
+pub struct MuxPlane<T> {
+    inner: T,
+}
+
+impl<T> MuxPlane<T> {
+    pub fn new(inner: T) -> Self {
+        Self { inner }
+    }
+
+    pub fn inner(&self) -> &T {
+        &self.inner
+    }
+
+    pub fn inner_mut(&mut self) -> &mut T {
+        &mut self.inner
+    }
+}
+
+impl<C, T> Plane<C> for MuxPlane<T>
+where
+    C: Consensus,
+    T: Plane<C>,
+{
+    async fn on_request(&self, message: RequestMessage<C>)
+    where
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone,
+    {
+        self.inner.on_request(message).await;
+    }
+
+    async fn on_replicate(&self, message: ReplicateMessage<C>)
+    where
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
+    {
+        self.inner.on_replicate(message).await;
+    }
+
+    async fn on_ack(&self, message: AckMessage<C>) {
+        self.inner.on_ack(message).await;
+    }
+}
+
+impl<C> Plane<C> for ()
+where
+    C: Consensus,
+{
+    async fn on_request(&self, _message: RequestMessage<C>)
+    where
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone,
+    {
+    }
+
+    async fn on_replicate(&self, _message: ReplicateMessage<C>)
+    where
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
+    {
+    }
+
+    async fn on_ack(&self, _message: AckMessage<C>) {}
+}
+
+impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
+where
+    C: Consensus,
+    Head: Plane<C> + PlaneIdentity<C>,
+    Tail: Plane<C>,
+{
+    async fn on_request(&self, message: RequestMessage<C>)
+    where
+        RequestMessage<C>: Project<ReplicateMessage<C>, C, Consensus = C> + 
Clone,
+    {
+        if self.0.is_applicable(&message) {
+            self.0.on_request(message).await;
+        } else {
+            self.1.on_request(message).await;
+        }
+    }
+
+    async fn on_replicate(&self, message: ReplicateMessage<C>)
+    where
+        ReplicateMessage<C>: Project<AckMessage<C>, C, Consensus = C> + Clone,
+    {
+        if self.0.is_applicable(&message) {
+            self.0.on_replicate(message).await;
+        } else {
+            self.1.on_replicate(message).await;
+        }
+    }
+
+    async fn on_ack(&self, message: AckMessage<C>) {
+        if self.0.is_applicable(&message) {
+            self.0.on_ack(message).await;
+        } else {
+            self.1.on_ack(message).await;
+        }
+    }
+}
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 35386b182..64f9fe959 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -17,13 +17,17 @@
 use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{
-    Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
-    ack_quorum_reached, build_reply_message, drain_committable_prefix, 
fence_old_prepare_by_commit,
-    panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
-    replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
+    Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, Project, 
Sequencer, VsrConsensus,
+    ack_preflight, ack_quorum_reached, build_reply_message, 
drain_committable_prefix,
+    fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view,
+    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+    send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
-    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader},
+    header::{
+        Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader,
+        RequestHeader,
+    },
     message::Message,
 };
 use journal::{Journal, JournalHandle};
@@ -244,6 +248,49 @@ where
     }
 }
 
+impl<B, P, J, S, M> PlaneIdentity<VsrConsensus<B, P>> for 
IggyMetadata<VsrConsensus<B, P>, J, S, M>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+    J: JournalHandle,
+    J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
+    M: StateMachine<Input = Message<PrepareHeader>>,
+{
+    fn is_applicable<H>(&self, message: &<VsrConsensus<B, P> as 
Consensus>::Message<H>) -> bool
+    where
+        H: ConsensusHeader,
+    {
+        assert!(matches!(
+            message.header().command(),
+            Command2::Request | Command2::Prepare | Command2::PrepareOk
+        ));
+        let operation = message.header().operation();
+        // TODO: Use better selection, smth like greater or equal based on op 
number.
+        matches!(
+            operation,
+            Operation::CreateStream
+                | Operation::UpdateStream
+                | Operation::DeleteStream
+                | Operation::PurgeStream
+                | Operation::CreateTopic
+                | Operation::UpdateTopic
+                | Operation::DeleteTopic
+                | Operation::PurgeTopic
+                | Operation::CreatePartitions
+                | Operation::DeletePartitions
+                | Operation::CreateConsumerGroup
+                | Operation::DeleteConsumerGroup
+                | Operation::CreateUser
+                | Operation::UpdateUser
+                | Operation::DeleteUser
+                | Operation::ChangePassword
+                | Operation::UpdatePermissions
+                | Operation::CreatePersonalAccessToken
+                | Operation::DeletePersonalAccessToken
+        )
+    }
+}
+
 impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 70830355c..648d8893e 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -17,6 +17,7 @@
 
 use crate::stm::snapshot::{FillSnapshot, RestoreSnapshot, SnapshotError};
 use iggy_common::Either;
+use iggy_common::variadic;
 use iggy_common::{header::PrepareHeader, message::Message};
 
 use crate::stm::{State, StateMachine};
@@ -52,18 +53,6 @@ where
     }
 }
 
-//TODO: Move to common
-#[macro_export]
-macro_rules! variadic {
-    () => ( () );
-    (...$a:ident  $(,)? ) => ( $a );
-    (...$a:expr  $(,)? ) => ( $a );
-    ($a:ident  $(,)? ) => ( ($a, ()) );
-    ($a:expr  $(,)? ) => ( ($a, ()) );
-    ($a:ident,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
-    ($a:expr,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
-}
-
 // TODO: Figure out how to get around the fact that we need to hardcode the 
Input/Output type for base case.
 // TODO: I think we could move the base case to the impl site of `State`, so 
this way we know the `Input` and `Output` types.
 // Base case of the recursive resolution.
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 2c646072d..f05b294db 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -20,16 +20,20 @@
 use crate::IggyPartition;
 use crate::Partition;
 use crate::types::PartitionsConfig;
+use consensus::PlaneIdentity;
 use consensus::{
     Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, Project, 
Sequencer,
     VsrConsensus, ack_preflight, build_reply_message, 
fence_old_prepare_by_commit,
     pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
     send_prepare_ok as send_prepare_ok_common,
 };
+use iggy_common::header::Command2;
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{GenericHeader, Operation, PrepareHeader, PrepareOkHeader, 
RequestHeader},
+    header::{
+        ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, RequestHeader,
+    },
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -508,6 +512,27 @@ where
     }
 }
 
+impl<B> PlaneIdentity<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, 
NamespacedPipeline>>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+{
+    fn is_applicable<H>(&self, message: &<VsrConsensus<B> as 
Consensus>::Message<H>) -> bool
+    where
+        H: ConsensusHeader,
+    {
+        assert!(matches!(
+            message.header().command(),
+            Command2::Request | Command2::Prepare | Command2::PrepareOk
+        ));
+        let operation = message.header().operation();
+        // TODO: Use better selection, smth like greater or equal based on op 
number.
+        matches!(
+            operation,
+            Operation::DeleteSegments | Operation::SendMessages | 
Operation::StoreConsumerOffset
+        )
+    }
+}
+
 impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index c4afc1bcf..9630c3aa3 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -17,14 +17,17 @@
 
 use crate::bus::SharedMemBus;
 use bytes::Bytes;
-use consensus::{NamespacedPipeline, VsrConsensus};
+use consensus::{
+    MuxPlane, {NamespacedPipeline, VsrConsensus},
+};
 use iggy_common::header::PrepareHeader;
 use iggy_common::message::Message;
+use iggy_common::variadic;
 use journal::{Journal, JournalHandle, Storage};
 use metadata::stm::consumer_group::ConsumerGroups;
 use metadata::stm::stream::Streams;
 use metadata::stm::user::Users;
-use metadata::{IggyMetadata, MuxStateMachine, variadic};
+use metadata::{IggyMetadata, MuxStateMachine};
 use std::cell::{Cell, RefCell, UnsafeCell};
 use std::collections::HashMap;
 
@@ -160,3 +163,4 @@ pub type SimMetadata = IggyMetadata<
 /// Type alias for simulator partitions
 pub type ReplicaPartitions =
     partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
+pub type SimPlane = MuxPlane<variadic!(SimMetadata, ReplicaPartitions)>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 332b7fc31..13858b62c 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,8 +21,8 @@ pub mod deps;
 pub mod replica;
 
 use bus::MemBus;
-use consensus::Plane;
-use iggy_common::header::{GenericHeader, Operation, ReplyHeader};
+use consensus::{Plane, PlaneIdentity};
+use iggy_common::header::{GenericHeader, ReplyHeader};
 use iggy_common::message::{Message, MessageBag};
 use message_bus::MessageBus;
 use replica::Replica;
@@ -115,48 +115,28 @@ impl Simulator {
     }
 
     async fn dispatch_to_replica(&self, replica: &Replica, message: 
Message<GenericHeader>) {
-        let message: MessageBag = message.into();
-        let operation = match &message {
-            MessageBag::Request(message) => message.header().operation,
-            MessageBag::Prepare(message) => message.header().operation,
-            MessageBag::PrepareOk(message) => message.header().operation,
-        };
-
-        match operation {
-            Operation::SendMessages | Operation::StoreConsumerOffset => {
-                self.dispatch_to_partition_on_replica(replica, message)
-                    .await;
-            }
-            _ => {
-                self.dispatch_to_metadata_on_replica(replica, message).await;
-            }
-        }
-    }
-
-    async fn dispatch_to_metadata_on_replica(&self, replica: &Replica, 
message: MessageBag) {
-        match message {
-            MessageBag::Request(request) => {
-                replica.metadata.on_request(request).await;
-            }
-            MessageBag::Prepare(prepare) => {
-                replica.metadata.on_replicate(prepare).await;
-            }
-            MessageBag::PrepareOk(prepare_ok) => {
-                replica.metadata.on_ack(prepare_ok).await;
-            }
-        }
-    }
-
-    async fn dispatch_to_partition_on_replica(&self, replica: &Replica, 
message: MessageBag) {
-        match message {
+        let planes = replica.plane.inner();
+        match MessageBag::from(message) {
             MessageBag::Request(request) => {
-                replica.partitions.on_request(request).await;
+                if planes.0.is_applicable(&request) {
+                    planes.0.on_request(request).await;
+                } else {
+                    planes.1.0.on_request(request).await;
+                }
             }
             MessageBag::Prepare(prepare) => {
-                replica.partitions.on_replicate(prepare).await;
+                if planes.0.is_applicable(&prepare) {
+                    planes.0.on_replicate(prepare).await;
+                } else {
+                    planes.1.0.on_replicate(prepare).await;
+                }
             }
             MessageBag::PrepareOk(prepare_ok) => {
-                replica.partitions.on_ack(prepare_ok).await;
+                if planes.0.is_applicable(&prepare_ok) {
+                    planes.0.on_ack(prepare_ok).await;
+                } else {
+                    planes.1.0.on_ack(prepare_ok).await;
+                }
             }
         }
     }
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 9f8081656..b7728f622 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -17,15 +17,14 @@
 
 use crate::bus::{MemBus, SharedMemBus};
 use crate::deps::{
-    MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
+    ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimPlane, 
SimSnapshot,
 };
 use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
-use iggy_common::IggyByteSize;
 use iggy_common::sharding::{IggyNamespace, ShardId};
+use iggy_common::{IggyByteSize, variadic};
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
-use metadata::{IggyMetadata, variadic};
 use partitions::PartitionsConfig;
 use std::sync::Arc;
 
@@ -36,8 +35,7 @@ pub struct Replica {
     pub id: u8,
     pub name: String,
     pub replica_count: u8,
-    pub metadata: SimMetadata,
-    pub partitions: ReplicaPartitions,
+    pub plane: SimPlane,
     pub bus: Arc<MemBus>,
 }
 
@@ -58,6 +56,12 @@ impl Replica {
             LocalPipeline::new(),
         );
         metadata_consensus.init();
+        let metadata = SimMetadata {
+            consensus: Some(metadata_consensus),
+            journal: Some(SimJournal::default()),
+            snapshot: Some(SimSnapshot::default()),
+            mux_stm: mux,
+        };
 
         let partitions_config = PartitionsConfig {
             messages_required_to_save: 1000,
@@ -80,25 +84,20 @@ impl Replica {
         );
         partition_consensus.init();
         partitions.set_consensus(partition_consensus);
+        let plane = SimPlane::new(variadic!(metadata, partitions));
 
         Self {
             id,
             name,
+            plane,
             replica_count,
-            metadata: IggyMetadata {
-                consensus: Some(metadata_consensus),
-                journal: Some(SimJournal::<MemStorage>::default()),
-                snapshot: Some(SimSnapshot::default()),
-                mux_stm: mux,
-            },
-            partitions,
             bus,
         }
     }
 
     pub fn init_partition(&mut self, namespace: IggyNamespace) {
-        self.partitions.init_partition_in_memory(namespace);
-        self.partitions
-            .register_namespace_in_pipeline(namespace.inner());
+        let partitions = &mut self.plane.inner_mut().1.0;
+        partitions.init_partition_in_memory(namespace);
+        partitions.register_namespace_in_pipeline(namespace.inner());
     }
 }


Reply via email to