This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 50423db99 feat(consensus): Add metadata and partition handles for
MuxPlane (#2855)
50423db99 is described below
commit 50423db99d79130719505153f497e87973efcc87
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed Mar 4 17:52:27 2026 +0530
feat(consensus): Add metadata and partition handles for MuxPlane (#2855)
Co-authored-by: Grzegorz Koszyk
<[email protected]>
---
core/consensus/src/plane_mux.rs | 52 +++++++++++++++++++++++++++++++++++++++++
core/shard/src/lib.rs | 28 +++++++++++-----------
2 files changed, 66 insertions(+), 14 deletions(-)
diff --git a/core/consensus/src/plane_mux.rs b/core/consensus/src/plane_mux.rs
index 1f32cb810..d904bf98d 100644
--- a/core/consensus/src/plane_mux.rs
+++ b/core/consensus/src/plane_mux.rs
@@ -82,6 +82,58 @@ where
async fn on_ack(&self, _message: AckMessage<C>) {}
}
+pub trait MetadataHandle {
+ type Metadata;
+ fn metadata(&self) -> &Self::Metadata;
+ fn metadata_mut(&mut self) -> &mut Self::Metadata;
+}
+
+pub trait PartitionsHandle {
+ type Partitions;
+ fn partitions(&self) -> &Self::Partitions;
+ fn partitions_mut(&mut self) -> &mut Self::Partitions;
+}
+
+impl<M, Tail> MetadataHandle for (M, Tail) {
+ type Metadata = M;
+ fn metadata(&self) -> &Self::Metadata {
+ &self.0
+ }
+ fn metadata_mut(&mut self) -> &mut Self::Metadata {
+ &mut self.0
+ }
+}
+
+impl<M, P> PartitionsHandle for (M, (P, ())) {
+ type Partitions = P;
+ fn partitions(&self) -> &Self::Partitions {
+ &self.1.0
+ }
+ fn partitions_mut(&mut self) -> &mut Self::Partitions {
+ &mut self.1.0
+ }
+}
+
+impl<T: MetadataHandle> MetadataHandle for MuxPlane<T> {
+ type Metadata = T::Metadata;
+ fn metadata(&self) -> &Self::Metadata {
+ self.inner.metadata()
+ }
+ fn metadata_mut(&mut self) -> &mut Self::Metadata {
+ self.inner.metadata_mut()
+ }
+}
+
+impl<T: PartitionsHandle> PartitionsHandle for MuxPlane<T> {
+ type Partitions = T::Partitions;
+ fn partitions(&self) -> &Self::Partitions {
+ self.inner.partitions()
+ }
+ fn partitions_mut(&mut self) -> &mut Self::Partitions {
+ self.inner.partitions_mut()
+ }
+}
+
impl<C, Head, Tail> Plane<C> for variadic!(Head, ...Tail)
where
C: Consensus,
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index ff36a97e1..35048fc00 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use consensus::{MuxPlane, NamespacedPipeline, Plane, PlaneIdentity,
VsrConsensus};
+use consensus::{
+ MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Plane,
PlaneIdentity,
+ VsrConsensus,
+};
use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader};
use iggy_common::message::{Message, MessageBag};
use iggy_common::sharding::IggyNamespace;
@@ -90,11 +93,10 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- let planes = self.plane.inner();
- if planes.0.is_applicable(&request) {
- planes.0.on_request(request).await;
+ if self.plane.metadata().is_applicable(&request) {
+ self.plane.metadata().on_request(request).await;
} else {
- planes.1.0.on_request(request).await;
+ self.plane.partitions().on_request(request).await;
}
}
@@ -109,11 +111,10 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- let planes = self.plane.inner();
- if planes.0.is_applicable(&prepare) {
- planes.0.on_replicate(prepare).await;
+ if self.plane.metadata().is_applicable(&prepare) {
+ self.plane.metadata().on_replicate(prepare).await;
} else {
- planes.1.0.on_replicate(prepare).await;
+ self.plane.partitions().on_replicate(prepare).await;
}
}
@@ -128,11 +129,10 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- let planes = self.plane.inner();
- if planes.0.is_applicable(&prepare_ok) {
- planes.0.on_ack(prepare_ok).await;
+ if self.plane.metadata().is_applicable(&prepare_ok) {
+ self.plane.metadata().on_ack(prepare_ok).await;
} else {
- planes.1.0.on_ack(prepare_ok).await;
+ self.plane.partitions().on_ack(prepare_ok).await;
}
}
@@ -195,7 +195,7 @@ where
Client = u128,
>,
{
- let partitions = &mut self.plane.inner_mut().1.0;
+ let partitions = self.plane.partitions_mut();
partitions.init_partition_in_memory(namespace);
partitions.register_namespace_in_pipeline(namespace.inner());
}