This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch plane_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 0a9cac77e7cc643ef7ff3749a87139fba4742f7a
Author: numinex <[email protected]>
AuthorDate: Mon Feb 16 18:43:08 2026 +0100

    x
---
 core/consensus/src/plane_helpers.rs    | 29 ++++++++++++++++++++++++++
 core/metadata/src/impls/metadata.rs    | 32 ++--------------------------
 core/partitions/src/iggy_partitions.rs | 38 +++-------------------------------
 3 files changed, 34 insertions(+), 65 deletions(-)

diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index d64af8fee..60ea1dedf 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -55,6 +55,35 @@ where
     header.op <= consensus.commit()
 }
 
+/// Shared chain-replication forwarding to the next replica.
+pub async fn replicate_to_next_in_chain<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    message: Message<PrepareHeader>,
+) where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    let header = message.header();
+
+    assert_eq!(header.command, Command2::Prepare);
+    assert!(header.op > consensus.commit());
+
+    let next = (consensus.replica() + 1) % consensus.replica_count();
+    let primary = consensus.primary_index(header.view);
+
+    if next == primary {
+        return;
+    }
+
+    assert_ne!(next, consensus.replica());
+
+    consensus
+        .message_bus()
+        .send_to_replica(next, message.into_generic())
+        .await
+        .unwrap();
+}
+
 /// Shared preflight checks for `on_replicate`.
 ///
 /// Returns current op on success.
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 9d1a38799..9a81461af 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -20,7 +20,7 @@ use consensus::{
     Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
     ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
     panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
-    send_prepare_ok as send_prepare_ok_common,
+    replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
     header::{Command2, GenericHeader, PrepareHeader},
@@ -271,35 +271,7 @@ where
             journal.handle().header(idx).is_none(),
             "replicate: must not already have prepare"
         );
-        assert!(header.op > consensus.commit());
-
-        let next = (consensus.replica() + 1) % consensus.replica_count();
-
-        let primary = consensus.primary_index(header.view);
-        if next == primary {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                "replicate: not replicating (ring complete)"
-            );
-            return;
-        }
-
-        assert_ne!(next, consensus.replica());
-
-        debug!(
-            replica = consensus.replica(),
-            to = next,
-            op = header.op,
-            "replicate: forwarding"
-        );
-
-        let message = message.into_generic();
-        consensus
-            .message_bus()
-            .send_to_replica(next, message)
-            .await
-            .unwrap();
+        replicate_to_next_in_chain(consensus, message).await;
     }
 
     // TODO: Implement jump_to_newer_op
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 0a3bb0394..54891dfdd 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -23,12 +23,12 @@ use crate::types::PartitionsConfig;
 use consensus::{
     Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, 
ack_preflight,
     ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, 
pipeline_prepare_common,
-    replicate_preflight, send_prepare_ok as send_prepare_ok_common,
+    replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as 
send_prepare_ok_common,
 };
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{Command2, GenericHeader, Operation, PrepareHeader},
+    header::{GenericHeader, Operation, PrepareHeader},
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -558,39 +558,7 @@ where
     /// - Stops when we would forward back to primary
     async fn replicate(&self, message: Message<PrepareHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
-
-        let header = message.header();
-
-        assert_eq!(header.command, Command2::Prepare);
-        assert!(header.op > consensus.commit());
-
-        let next = (consensus.replica() + 1) % consensus.replica_count();
-
-        let primary = consensus.primary_index(header.view);
-        if next == primary {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                "replicate: not replicating (ring complete)"
-            );
-            return;
-        }
-
-        assert_ne!(next, consensus.replica());
-
-        debug!(
-            replica = consensus.replica(),
-            to = next,
-            op = header.op,
-            "replicate: forwarding"
-        );
-
-        let message = message.into_generic();
-        consensus
-            .message_bus()
-            .send_to_replica(next, message)
-            .await
-            .unwrap();
+        replicate_to_next_in_chain(consensus, message).await;
     }
 
     fn commit_journal(&self) {

Reply via email to