This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_mux_plane_generics in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b0af52de04038356cb6bfa5f5ac44cb62d5402d3 Author: numinex <[email protected]> AuthorDate: Tue Mar 3 15:53:10 2026 +0100 fix(cluster): fix type mismatch for variadic plane impl --- core/common/src/macros.rs | 8 ++++---- core/metadata/src/impls/metadata.rs | 12 ++++-------- core/partitions/src/iggy_partitions.rs | 18 ++++-------------- core/shard/src/lib.rs | 13 ++++++------- 4 files changed, 18 insertions(+), 33 deletions(-) diff --git a/core/common/src/macros.rs b/core/common/src/macros.rs index c7c1ce3f6..09ec4c496 100644 --- a/core/common/src/macros.rs +++ b/core/common/src/macros.rs @@ -19,9 +19,9 @@ macro_rules! variadic { () => (()); (...$a:ident $(,)?) => ($a); - (...$a:expr $(,)?) => ($a); - ($a:ident $(,)?) => (($a, ())); - ($a:expr $(,)?) => (($a, ())); + (...$a:ty $(,)?) => ($a); ($a:ident, $($b:tt)+) => (($a, $crate::variadic!($($b)+))); - ($a:expr, $($b:tt)+) => (($a, $crate::variadic!($($b)+))); + ($a:ty, $($b:tt)+) => (($a, $crate::variadic!($($b)+))); + ($a:ident $(,)?) => (($a, ())); + ($a:ty $(,)?) => (($a, ())); } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 64f9fe959..e7ce25e83 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -100,15 +100,14 @@ pub struct IggyMetadata<C, J, S, M> { pub mux_stm: M, } -impl<B, P, J, S, M> Plane<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, P>, J, S, M> +impl<B, J, S, M> Plane<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, J, S, M> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, - P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, M: StateMachine<Input = Message<PrepareHeader>>, { - async fn on_request(&self, message: <VsrConsensus<B, P> as Consensus>::Message<RequestHeader>) { + async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::Message<RequestHeader>) { let consensus = self.consensus.as_ref().unwrap(); // TODO: Bunch of asserts. @@ -117,10 +116,7 @@ where pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } - async fn on_replicate( - &self, - message: <VsrConsensus<B, P> as Consensus>::Message<PrepareHeader>, - ) { + async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { let consensus = self.consensus.as_ref().unwrap(); let journal = self.journal.as_ref().unwrap(); @@ -174,7 +170,7 @@ where } } - async fn on_ack(&self, message: <VsrConsensus<B, P> as Consensus>::Message<PrepareOkHeader>) { + async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareOkHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index f05b294db..6c1ae865c 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -333,15 +333,11 @@ impl<C> IggyPartitions<C> { } } -impl<B> Plane<VsrConsensus<B, NamespacedPipeline>> - for IggyPartitions<VsrConsensus<B, NamespacedPipeline>> +impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B, NamespacedPipeline>> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, { - async fn on_request( - &self, - message: <VsrConsensus<B, NamespacedPipeline> as Consensus>::Message<RequestHeader>, - ) { + async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::Message<RequestHeader>) { let namespace = IggyNamespace::from_raw(message.header().namespace); let consensus = self .consensus() @@ -352,10 +348,7 @@ where pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } - async fn on_replicate( - &self, - message: <VsrConsensus<B, NamespacedPipeline> as Consensus>::Message<PrepareHeader>, - ) { + async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareHeader>) { let header = message.header(); let namespace = IggyNamespace::from_raw(header.namespace); let consensus = self @@ -397,10 +390,7 @@ where } } - async fn on_ack( - &self, - message: <VsrConsensus<B, NamespacedPipeline> as Consensus>::Message<PrepareOkHeader>, - ) { + async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareOkHeader>) { let header = message.header(); let consensus = self.consensus().expect("on_ack: consensus not initialized"); diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index 0b41b1d84..40161b9cc 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -26,13 +26,12 @@ use metadata::IggyMetadata; use metadata::stm::StateMachine; use partitions::IggyPartitions; -// variadic!(Metadata, Partitions) = (Metadata, (Partitions, ())) -type PlaneInner<B, J, S, M> = ( - IggyMetadata<VsrConsensus<B>, J, S, M>, - (IggyPartitions<VsrConsensus<B, NamespacedPipeline>>, ()), -); - -pub type ShardPlane<B, J, S, M> = MuxPlane<PlaneInner<B, J, S, M>>; +pub type ShardPlane<B, J, S, M> = MuxPlane< + variadic!( + IggyMetadata<VsrConsensus<B>, J, S, M>, + IggyPartitions<VsrConsensus<B, NamespacedPipeline>> + ), +>; pub struct IggyShard<B, J, S, M> where
