This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch plane_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 0a9cac77e7cc643ef7ff3749a87139fba4742f7a Author: numinex <[email protected]> AuthorDate: Mon Feb 16 18:43:08 2026 +0100 x --- core/consensus/src/plane_helpers.rs | 29 ++++++++++++++++++++++++++ core/metadata/src/impls/metadata.rs | 32 ++-------------------------- core/partitions/src/iggy_partitions.rs | 38 +++------------------------------- 3 files changed, 34 insertions(+), 65 deletions(-) diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index d64af8fee..60ea1dedf 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -55,6 +55,35 @@ where header.op <= consensus.commit() } +/// Shared chain-replication forwarding to the next replica. +pub async fn replicate_to_next_in_chain<B, P>( + consensus: &VsrConsensus<B, P>, + message: Message<PrepareHeader>, +) where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + let header = message.header(); + + assert_eq!(header.command, Command2::Prepare); + assert!(header.op > consensus.commit()); + + let next = (consensus.replica() + 1) % consensus.replica_count(); + let primary = consensus.primary_index(header.view); + + if next == primary { + return; + } + + assert_ne!(next, consensus.replica()); + + consensus + .message_bus() + .send_to_replica(next, message.into_generic()) + .await + .unwrap(); +} + /// Shared preflight checks for `on_replicate`. /// /// Returns current op on success. diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 9d1a38799..9a81461af 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -20,7 +20,7 @@ use consensus::{ Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, replicate_preflight, - send_prepare_ok as send_prepare_ok_common, + replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ header::{Command2, GenericHeader, PrepareHeader}, @@ -271,35 +271,7 @@ where journal.handle().header(idx).is_none(), "replicate: must not already have prepare" ); - assert!(header.op > consensus.commit()); - - let next = (consensus.replica() + 1) % consensus.replica_count(); - - let primary = consensus.primary_index(header.view); - if next == primary { - debug!( - replica = consensus.replica(), - op = header.op, - "replicate: not replicating (ring complete)" - ); - return; - } - - assert_ne!(next, consensus.replica()); - - debug!( - replica = consensus.replica(), - to = next, - op = header.op, - "replicate: forwarding" - ); - - let message = message.into_generic(); - consensus - .message_bus() - .send_to_replica(next, message) - .await - .unwrap(); + replicate_to_next_in_chain(consensus, message).await; } // TODO: Implement jump_to_newer_op diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 0a3bb0394..54891dfdd 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -23,12 +23,12 @@ use crate::types::PartitionsConfig; use consensus::{ Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, pipeline_prepare_common, - replicate_preflight, send_prepare_ok as send_prepare_ok_common, + replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, Segment, SegmentStorage, - header::{Command2, GenericHeader, Operation, PrepareHeader}, + header::{GenericHeader, Operation, PrepareHeader}, message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, }; @@ -558,39 +558,7 @@ where /// - Stops when we would forward back to primary async fn replicate(&self, message: Message<PrepareHeader>) { let consensus = self.consensus.as_ref().unwrap(); - - let header = message.header(); - - assert_eq!(header.command, Command2::Prepare); - assert!(header.op > consensus.commit()); - - let next = (consensus.replica() + 1) % consensus.replica_count(); - - let primary = consensus.primary_index(header.view); - if next == primary { - debug!( - replica = consensus.replica(), - op = header.op, - "replicate: not replicating (ring complete)" - ); - return; - } - - assert_ne!(next, consensus.replica()); - - debug!( - replica = consensus.replica(), - to = next, - op = header.op, - "replicate: forwarding" - ); - - let message = message.into_generic(); - consensus - .message_bus() - .send_to_replica(next, message) - .await - .unwrap(); + replicate_to_next_in_chain(consensus, message).await; } fn commit_journal(&self) {
