This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 50423db99 feat(consensus): Add metadata and partition handles for 
MuxPlane (#2855)
50423db99 is described below

commit 50423db99d79130719505153f497e87973efcc87
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed Mar 4 17:52:27 2026 +0530

    feat(consensus): Add metadata and partition handles for MuxPlane (#2855)
    
    Co-authored-by: Grzegorz Koszyk 
<[email protected]>
---
 core/consensus/src/plane_mux.rs | 52 +++++++++++++++++++++++++++++++++++++++++
 core/shard/src/lib.rs           | 28 +++++++++++-----------
 2 files changed, 66 insertions(+), 14 deletions(-)

diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
index 1f32cb810..d904bf98d 100644
--- a/core/consensus/src/plane_mux.rs
+++ b/core/consensus/src/plane_mux.rs
@@ -82,6 +82,58 @@ where
     async fn on_ack(&self, _message: AckMessage<C>) {}
 }
 
+pub trait MetadataHandle {
+    type Metadata;
+    fn metadata(&self) -> &Self::Metadata;
+    fn metadata_mut(&mut self) -> &mut Self::Metadata;
+}
+
+pub trait PartitionsHandle {
+    type Partitions;
+    fn partitions(&self) -> &Self::Partitions;
+    fn partitions_mut(&mut self) -> &mut Self::Partitions;
+}
+
+impl<M, Tail> MetadataHandle for (M, Tail) {
+    type Metadata = M;
+    fn metadata(&self) -> &Self::Metadata {
+        &self.0
+    }
+    fn metadata_mut(&mut self) -> &mut Self::Metadata {
+        &mut self.0
+    }
+}
+
+impl<M, P> PartitionsHandle for (M, (P, ())) {
+    type Partitions = P;
+    fn partitions(&self) -> &Self::Partitions {
+        &self.1.0
+    }
+    fn partitions_mut(&mut self) -> &mut Self::Partitions {
+        &mut self.1.0
+    }
+}
+
+impl<T: MetadataHandle> MetadataHandle for MuxPlane<T> {
+    type Metadata = T::Metadata;
+    fn metadata(&self) -> &Self::Metadata {
+        self.inner.metadata()
+    }
+    fn metadata_mut(&mut self) -> &mut Self::Metadata {
+        self.inner.metadata_mut()
+    }
+}
+
+impl<T: PartitionsHandle> PartitionsHandle for MuxPlane<T> {
+    type Partitions = T::Partitions;
+    fn partitions(&self) -> &Self::Partitions {
+        self.inner.partitions()
+    }
+    fn partitions_mut(&mut self) -> &mut Self::Partitions {
+        self.inner.partitions_mut()
+    }
+}
+
 impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
 where
     C: Consensus,
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index ff36a97e1..35048fc00 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use consensus::{MuxPlane, NamespacedPipeline, Plane, PlaneIdentity, 
VsrConsensus};
+use consensus::{
+    MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, 
PlaneIdentity,
+    VsrConsensus,
+};
 use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader, 
RequestHeader};
 use iggy_common::message::{Message, MessageBag};
 use iggy_common::sharding::IggyNamespace;
@@ -90,11 +93,10 @@ where
             >,
         M: StateMachine<Input = Message<PrepareHeader>>,
     {
-        let planes = self.plane.inner();
-        if planes.0.is_applicable(&request) {
-            planes.0.on_request(request).await;
+        if self.plane.metadata().is_applicable(&request) {
+            self.plane.metadata().on_request(request).await;
         } else {
-            planes.1.0.on_request(request).await;
+            self.plane.partitions().on_request(request).await;
         }
     }
 
@@ -109,11 +111,10 @@ where
             >,
         M: StateMachine<Input = Message<PrepareHeader>>,
     {
-        let planes = self.plane.inner();
-        if planes.0.is_applicable(&prepare) {
-            planes.0.on_replicate(prepare).await;
+        if self.plane.metadata().is_applicable(&prepare) {
+            self.plane.metadata().on_replicate(prepare).await;
         } else {
-            planes.1.0.on_replicate(prepare).await;
+            self.plane.partitions().on_replicate(prepare).await;
         }
     }
 
@@ -128,11 +129,10 @@ where
             >,
         M: StateMachine<Input = Message<PrepareHeader>>,
     {
-        let planes = self.plane.inner();
-        if planes.0.is_applicable(&prepare_ok) {
-            planes.0.on_ack(prepare_ok).await;
+        if self.plane.metadata().is_applicable(&prepare_ok) {
+            self.plane.metadata().on_ack(prepare_ok).await;
         } else {
-            planes.1.0.on_ack(prepare_ok).await;
+            self.plane.partitions().on_ack(prepare_ok).await;
         }
     }
 
@@ -195,7 +195,7 @@ where
                 Client = u128,
             >,
     {
-        let partitions = &mut self.plane.inner_mut().1.0;
+        let partitions = self.plane.partitions_mut();
         partitions.init_partition_in_memory(namespace);
         partitions.register_namespace_in_pipeline(namespace.inner());
     }

Reply via email to