This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch plane_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit eb9ac2b7283bd4f17b7de2e5aefae41cedb0ec54
Author: numinex <[email protected]>
AuthorDate: Mon Feb 16 18:36:45 2026 +0100

    n
---
 core/consensus/src/impls.rs            |   4 +
 core/consensus/src/lib.rs              |   3 +
 core/consensus/src/plane_helpers.rs    | 242 ++++++++++++++++++++++++++++++++
 core/metadata/src/impls/metadata.rs    | 248 ++++-----------------------------
 core/partitions/src/iggy_partitions.rs | 240 ++++---------------------------
 5 files changed, 305 insertions(+), 432 deletions(-)

diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 3fd146cd4..4b1e620c8 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -1213,6 +1213,10 @@ where
         !self.is_primary()
     }
 
+    fn is_normal(&self) -> bool {
+        self.status() == Status::Normal
+    }
+
     fn is_syncing(&self) -> bool {
         self.is_syncing()
     }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 9a7fd5bae..17f2fb9ea 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -63,6 +63,7 @@ pub trait Consensus: Sized {
     fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
 
     fn is_follower(&self) -> bool;
+    fn is_normal(&self) -> bool;
     fn is_syncing(&self) -> bool;
 }
 
@@ -83,6 +84,8 @@ where
 
 mod impls;
 pub use impls::*;
+mod plane_helpers;
+pub use plane_helpers::*;
 
 mod view_change_quorum;
 pub use view_change_quorum::*;
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
new file mode 100644
index 000000000..d64af8fee
--- /dev/null
+++ b/core/consensus/src/plane_helpers.rs
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, 
VsrConsensus};
+use iggy_common::header::{Command2, GenericHeader, PrepareHeader, 
PrepareOkHeader, ReplyHeader};
+use iggy_common::message::Message;
+use message_bus::MessageBus;
+use std::ops::AsyncFnOnce;
+
+// TODO: Rework all of those helpers, once the boundries are more clear and we 
have a better picture of the commonalities between all of the planes.
+
+/// Shared pipeline-first request flow used by metadata and partitions.
+pub async fn pipeline_prepare_common<C, F>(
+    consensus: &C,
+    prepare: C::ReplicateMessage,
+    on_replicate: F,
+) where
+    C: Consensus,
+    C::ReplicateMessage: Clone,
+    F: AsyncFnOnce(C::ReplicateMessage) -> (),
+{
+    assert!(!consensus.is_follower(), "on_request: primary only");
+    assert!(consensus.is_normal(), "on_request: status must be normal");
+    assert!(!consensus.is_syncing(), "on_request: must not be syncing");
+
+    consensus.verify_pipeline();
+    consensus.pipeline_message(prepare.clone());
+    on_replicate(prepare.clone()).await;
+    consensus.post_replicate_verify(&prepare);
+}
+
+/// Shared commit-based old-prepare fence.
+pub fn fence_old_prepare_by_commit<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    header: &PrepareHeader,
+) -> bool
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    header.op <= consensus.commit()
+}
+
+/// Shared preflight checks for `on_replicate`.
+///
+/// Returns current op on success.
+pub fn replicate_preflight<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    header: &PrepareHeader,
+) -> Result<u64, &'static str>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    assert_eq!(header.command, Command2::Prepare);
+
+    if consensus.is_syncing() {
+        return Err("sync");
+    }
+
+    let current_op = consensus.sequencer().current_sequence();
+
+    if consensus.status() != Status::Normal {
+        return Err("not normal state");
+    }
+
+    if header.view > consensus.view() {
+        return Err("newer view");
+    }
+
+    if consensus.is_follower() {
+        consensus.advance_commit_number(header.commit);
+    }
+
+    Ok(current_op)
+}
+
+/// Shared preflight checks for `on_ack`.
+pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), 
&'static str>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    if !consensus.is_primary() {
+        return Err("not primary");
+    }
+
+    if consensus.status() != Status::Normal {
+        return Err("not normal");
+    }
+
+    Ok(())
+}
+
+/// Shared quorum + extraction flow for ack handling.
+pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: 
&PrepareOkHeader) -> bool
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    if !consensus.handle_prepare_ok(ack) {
+        return false;
+    }
+
+    consensus.advance_commit_number(ack.op);
+    true
+}
+
+/// Shared reply-message construction for committed prepare.
+pub fn build_reply_message<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    prepare_header: &PrepareHeader,
+) -> Message<ReplyHeader>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_,
 new| {
+        *new = ReplyHeader {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: consensus.cluster(),
+            size: std::mem::size_of::<ReplyHeader>() as u32,
+            view: consensus.view(),
+            release: 0,
+            command: Command2::Reply,
+            replica: consensus.replica(),
+            reserved_frame: [0; 66],
+            request_checksum: prepare_header.request_checksum,
+            context: 0,
+            op: prepare_header.op,
+            commit: consensus.commit(),
+            timestamp: prepare_header.timestamp,
+            request: prepare_header.request,
+            operation: prepare_header.operation,
+            ..Default::default()
+        };
+    })
+}
+
+/// Verify hash chain would not break if we add this header.
+pub fn panic_if_hash_chain_would_break_in_same_view(
+    previous: &PrepareHeader,
+    current: &PrepareHeader,
+) {
+    // If both headers are in the same view, parent must chain correctly.
+    if previous.view == current.view {
+        assert_eq!(
+            current.parent, previous.checksum,
+            "hash chain broken in same view: op={} parent={} expected={}",
+            current.op, current.parent, previous.checksum
+        );
+    }
+}
+
+// TODO: Figure out how to make this check the journal if it contains the 
prepare.
+pub async fn send_prepare_ok<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    header: &PrepareHeader,
+    is_persisted: Option<bool>,
+) where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    assert_eq!(header.command, Command2::Prepare);
+
+    if consensus.status() != Status::Normal {
+        return;
+    }
+
+    if consensus.is_syncing() {
+        return;
+    }
+
+    if let Some(false) = is_persisted {
+        return;
+    }
+
+    assert!(
+        header.view <= consensus.view(),
+        "send_prepare_ok: prepare view {} > our view {}",
+        header.view,
+        consensus.view()
+    );
+
+    if header.op > consensus.sequencer().current_sequence() {
+        return;
+    }
+
+    let prepare_ok_header = PrepareOkHeader {
+        command: Command2::PrepareOk,
+        cluster: consensus.cluster(),
+        replica: consensus.replica(),
+        view: consensus.view(),
+        op: header.op,
+        commit: consensus.commit(),
+        timestamp: header.timestamp,
+        parent: header.parent,
+        prepare_checksum: header.checksum,
+        request: header.request,
+        operation: header.operation,
+        namespace: header.namespace,
+        size: std::mem::size_of::<PrepareOkHeader>() as u32,
+        ..Default::default()
+    };
+
+    let message: Message<PrepareOkHeader> =
+        Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
+            .transmute_header(|_, new| *new = prepare_ok_header);
+    let generic_message = message.into_generic();
+    let primary = consensus.primary_index(consensus.view());
+
+    if primary == consensus.replica() {
+        // TODO: Queue for self-processing or call handle_prepare_ok directly.
+        // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
+        consensus
+            .message_bus()
+            .send_to_replica(primary, generic_message)
+            .await
+            .unwrap();
+    } else {
+        consensus
+            .message_bus()
+            .send_to_replica(primary, generic_message)
+            .await
+            .unwrap();
+    }
+}
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index c4a55fc54..9d1a38799 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -17,10 +17,13 @@
 use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{
-    Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, Status, 
VsrConsensus,
+    Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
+    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+    panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
+    send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
-    header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
+    header::{Command2, GenericHeader, PrepareHeader},
     message::Message,
 };
 use journal::{Journal, JournalHandle};
@@ -111,7 +114,7 @@ where
         // TODO: Bunch of asserts.
         debug!("handling metadata request");
         let prepare = message.project(consensus);
-        self.pipeline_prepare(prepare).await;
+        pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B, P> as 
Consensus>::ReplicateMessage) {
@@ -120,57 +123,35 @@ where
 
         let header = message.header();
 
-        assert_eq!(header.command, Command2::Prepare);
+        let current_op = match replicate_preflight(consensus, header) {
+            Ok(current_op) => current_op,
+            Err(reason) => {
+                warn!(
+                    replica = consensus.replica(),
+                    "on_replicate: ignoring ({reason})"
+                );
+                return;
+            }
+        };
 
-        if !self.fence_old_prepare(&message) {
+        // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
+        let is_old_prepare = fence_old_prepare_by_commit(consensus, header)
+            || journal.handle().header(header.op as usize).is_some();
+        if !is_old_prepare {
             self.replicate(message.clone()).await;
         } else {
             warn!("received old prepare, not replicating");
         }
 
-        // If syncing, ignore the replicate message.
-        if consensus.is_syncing() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (sync)"
-            );
-            return;
-        }
-
-        let current_op = consensus.sequencer().current_sequence();
-
-        // If status is not normal, ignore the replicate.
-        if consensus.status() != Status::Normal {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (not normal state)"
-            );
-            return;
-        }
-
-        //if message from future view, we ignore the replicate.
-        if header.view > consensus.view() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (newer view)"
-            );
-            return;
-        }
-
         // TODO add assertions for valid state here.
 
-        // If we are a follower, we advance the commit number.
-        if consensus.is_follower() {
-            consensus.advance_commit_number(message.header().commit);
-        }
-
         // TODO verify that the current prepare fits in the WAL.
 
         // TODO handle gap in ops.
 
         // Verify hash chain integrity.
         if let Some(previous) = journal.handle().previous_header(header) {
-            self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
+            panic_if_hash_chain_would_break_in_same_view(&previous, header);
         }
 
         assert_eq!(header.op, current_op + 1);
@@ -193,13 +174,8 @@ where
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
-        if !consensus.is_primary() {
-            warn!("on_ack: ignoring (not primary)");
-            return;
-        }
-
-        if consensus.status() != Status::Normal {
-            warn!("on_ack: ignoring (not normal)");
+        if let Err(reason) = ack_preflight(consensus) {
+            warn!("on_ack: ignoring ({reason})");
             return;
         }
 
@@ -214,12 +190,10 @@ where
             }
         }
 
-        // Let consensus handle the ack increment and quorum check
-        if consensus.handle_prepare_ok(header) {
+        if ack_quorum_reached(consensus, header) {
             let journal = self.journal.as_ref().unwrap();
 
             debug!("on_ack: quorum received for op={}", header.op);
-            consensus.advance_commit_number(header.op);
 
             // Extract the header from the pipeline, fetch the full message 
from journal
             // TODO: Commit from the head. ALWAYS
@@ -249,32 +223,8 @@ where
             let _result = self.mux_stm.update(prepare);
             debug!("on_ack: state applied for op={}", prepare_header.op);
 
-            // TODO: Figure out better infra for this, its messy.
-            let reply = 
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
-                .transmute_header(|_, new| {
-                    *new = ReplyHeader {
-                        checksum: 0,
-                        checksum_body: 0,
-                        cluster: consensus.cluster(),
-                        size: std::mem::size_of::<ReplyHeader>() as u32,
-                        view: consensus.view(),
-                        release: 0,
-                        command: Command2::Reply,
-                        replica: consensus.replica(),
-                        reserved_frame: [0; 66],
-                        request_checksum: prepare_header.request_checksum,
-                        context: 0,
-                        op: prepare_header.op,
-                        commit: consensus.commit(),
-                        timestamp: prepare_header.timestamp,
-                        request: prepare_header.request,
-                        operation: prepare_header.operation,
-                        ..Default::default()
-                    };
-                });
-
             // Send reply to client
-            let generic_reply = reply.into_generic();
+            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
             debug!(
                 "on_ack: sending reply to client={} for op={}",
                 prepare_header.client, prepare_header.op
@@ -302,30 +252,6 @@ where
         >,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
-
-        debug!("inserting prepare into metadata pipeline");
-        consensus.verify_pipeline();
-        // Pipeline-first ordering is safe only because message
-        // processing is cooperative (single-task, RefCell-based).
-        // If on_replicate ever early-returns (syncing, status change)
-        // the entry would be in the pipeline without journal backing.
-        consensus.pipeline_message(prepare.clone());
-        self.on_replicate(prepare.clone()).await;
-
-        consensus.post_replicate_verify(&prepare);
-    }
-
-    fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
-        let consensus = self.consensus.as_ref().unwrap();
-        let journal = self.journal.as_ref().unwrap();
-
-        let header = prepare.header();
-        // TODO: Handle idx calculation, for now using header.op, but since 
the journal may get compacted, this may not be correct.
-        header.op <= consensus.commit() || journal.handle().header(header.op 
as usize).is_some()
-    }
-
     /// Replicate a prepare message to the next replica in the chain.
     ///
     /// Chain replication pattern:
@@ -376,22 +302,6 @@ where
             .unwrap();
     }
 
-    /// Verify hash chain would not break if we add this header.
-    fn panic_if_hash_chain_would_break_in_same_view(
-        &self,
-        previous: &PrepareHeader,
-        current: &PrepareHeader,
-    ) {
-        // If both headers are in the same view, parent must chain correctly
-        if previous.view == current.view {
-            assert_eq!(
-                current.parent, previous.checksum,
-                "hash chain broken in same view: op={} parent={} expected={}",
-                current.op, current.parent, previous.checksum
-            );
-        }
-    }
-
     // TODO: Implement jump_to_newer_op
     // fn jump_to_newer_op(&self, header: &PrepareHeader) {}
 
@@ -401,114 +311,10 @@ where
         // Apply each entry to the state machine
     }
 
-    /// Send a prepare_ok message to the primary.
-    /// Called after successfully writing a prepare to the journal.
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
-
-        assert_eq!(header.command, Command2::Prepare);
-
-        if consensus.status() != Status::Normal {
-            debug!(
-                replica = consensus.replica(),
-                status = ?consensus.status(),
-                "send_prepare_ok: not sending (not normal)"
-            );
-            return;
-        }
-
-        if consensus.is_syncing() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: not sending (syncing)"
-            );
-            return;
-        }
-
-        // Verify we have the prepare and it's persisted (not dirty).
-        if journal.handle().header(header.op as usize).is_none() {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                "send_prepare_ok: not sending (not persisted or missing)"
-            );
-            return;
-        }
-
-        assert!(
-            header.view <= consensus.view(),
-            "send_prepare_ok: prepare view {} > our view {}",
-            header.view,
-            consensus.view()
-        );
-
-        if header.op > consensus.sequencer().current_sequence() {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                our_op = consensus.sequencer().current_sequence(),
-                "send_prepare_ok: not sending (op ahead)"
-            );
-            return;
-        }
-
-        debug!(
-            replica = consensus.replica(),
-            op = header.op,
-            checksum = header.checksum,
-            "send_prepare_ok: sending"
-        );
-
-        // Use current view, not the prepare's view.
-        let prepare_ok_header = PrepareOkHeader {
-            command: Command2::PrepareOk,
-            cluster: consensus.cluster(),
-            replica: consensus.replica(),
-            view: consensus.view(),
-            op: header.op,
-            commit: consensus.commit(),
-            timestamp: header.timestamp,
-            parent: header.parent,
-            prepare_checksum: header.checksum,
-            request: header.request,
-            operation: header.operation,
-            namespace: header.namespace,
-            size: std::mem::size_of::<PrepareOkHeader>() as u32,
-            ..Default::default()
-        };
-
-        let message: Message<PrepareOkHeader> =
-            
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
-                .transmute_header(|_, new| *new = prepare_ok_header);
-        let generic_message = message.into_generic();
-        let primary = consensus.primary_index(consensus.view());
-
-        if primary == consensus.replica() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: loopback to self"
-            );
-            // TODO: Queue for self-processing or call handle_prepare_ok 
directly
-            // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        } else {
-            debug!(
-                replica = consensus.replica(),
-                to = primary,
-                op = header.op,
-                "send_prepare_ok: sending to primary"
-            );
-
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        }
+        let persisted = journal.handle().header(header.op as usize).is_some();
+        send_prepare_ok_common(consensus, header, Some(persisted)).await;
     }
 }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 305f2dfb2..0a3bb0394 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -20,11 +20,15 @@
 use crate::IggyPartition;
 use crate::Partition;
 use crate::types::PartitionsConfig;
-use consensus::{Consensus, Plane, Project, Sequencer, Status, VsrConsensus};
+use consensus::{
+    Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, 
ack_preflight,
+    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, 
pipeline_prepare_common,
+    replicate_preflight, send_prepare_ok as send_prepare_ok_common,
+};
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
     Segment, SegmentStorage,
-    header::{Command2, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, ReplyHeader},
+    header::{Command2, GenericHeader, Operation, PrepareHeader},
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
 };
@@ -334,7 +338,7 @@ where
 
         debug!("handling partition request");
         let prepare = message.project(consensus);
-        self.pipeline_prepare(prepare).await;
+        pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
@@ -342,48 +346,24 @@ where
 
         let header = message.header();
 
-        assert_eq!(header.command, Command2::Prepare);
+        let current_op = match replicate_preflight(consensus, header) {
+            Ok(current_op) => current_op,
+            Err(reason) => {
+                warn!(
+                    replica = consensus.replica(),
+                    "on_replicate: ignoring ({reason})"
+                );
+                return;
+            }
+        };
 
-        if !self.fence_old_prepare(&message) {
+        let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
+        if !is_old_prepare {
             self.replicate(message.clone()).await;
         } else {
             warn!("received old prepare, not replicating");
         }
 
-        // If syncing, ignore the replicate message.
-        if consensus.is_syncing() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (sync)"
-            );
-            return;
-        }
-
-        let current_op = consensus.sequencer().current_sequence();
-
-        // If status is not normal, ignore the replicate.
-        if consensus.status() != Status::Normal {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (not normal state)"
-            );
-            return;
-        }
-
-        // If message from future view, we ignore the replicate.
-        if header.view > consensus.view() {
-            warn!(
-                replica = consensus.replica(),
-                "on_replicate: ignoring (newer view)"
-            );
-            return;
-        }
-
-        // If we are a follower, we advance the commit number.
-        if consensus.is_follower() {
-            consensus.advance_commit_number(message.header().commit);
-        }
-
         // TODO: Make those assertions be toggleable through an feature flag, 
so they can be used only by simulator/tests.
         debug_assert_eq!(header.op, current_op + 1);
         consensus.sequencer().set_sequence(header.op);
@@ -407,17 +387,11 @@ where
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
-        if !consensus.is_primary() {
-            warn!("on_ack: ignoring (not primary)");
-            return;
-        }
-
-        if consensus.status() != Status::Normal {
-            warn!("on_ack: ignoring (not normal)");
+        if let Err(reason) = ack_preflight(consensus) {
+            warn!("on_ack: ignoring ({reason})");
             return;
         }
 
-        // Verify checksum by checking pipeline entry exists
         {
             let pipeline = consensus.pipeline().borrow();
             let Some(entry) =
@@ -433,21 +407,21 @@ where
             }
         }
 
-        // Let consensus handle the ack increment and quorum check
-        if consensus.handle_prepare_ok(header) {
+        if ack_quorum_reached(consensus, header) {
             debug!("on_ack: quorum received for op={}", header.op);
-            consensus.advance_commit_number(header.op);
 
             // Extract the prepare message from the pipeline by op
             // TODO: Commit from the head. ALWAYS
             let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
-            let Some(entry) = entry else {
+            let Some(PipelineEntry {
+                header: prepare_header,
+                ..
+            }) = entry
+            else {
                 warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
                 return;
             };
 
-            let prepare_header = entry.header;
-
             // Data was already appended to the partition journal during
             // on_replicate. Now that quorum is reached, update the partition's
             // current offset and check whether the journal needs flushing.
@@ -473,32 +447,8 @@ where
                 }
             }
 
-            // TODO: Figure out better infra for this, its messy.
-            let reply = 
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
-                .transmute_header(|_, new| {
-                    *new = ReplyHeader {
-                        checksum: 0,
-                        checksum_body: 0,
-                        cluster: consensus.cluster(),
-                        size: std::mem::size_of::<ReplyHeader>() as u32,
-                        view: consensus.view(),
-                        release: 0,
-                        command: Command2::Reply,
-                        replica: consensus.replica(),
-                        reserved_frame: [0; 66],
-                        request_checksum: prepare_header.request_checksum,
-                        context: 0,
-                        op: prepare_header.op,
-                        commit: consensus.commit(),
-                        timestamp: prepare_header.timestamp,
-                        request: prepare_header.request,
-                        operation: prepare_header.operation,
-                        ..Default::default()
-                    };
-                });
-
             // Send reply to client
-            let generic_reply = reply.into_generic();
+            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
             debug!(
                 "on_ack: sending reply to client={} for op={}",
                 prepare_header.client, prepare_header.op
@@ -600,26 +550,6 @@ where
         let _ = partition.append_messages(batch).await;
     }
 
-    async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
-
-        debug!("inserting prepare into partition pipeline");
-        consensus.verify_pipeline();
-        consensus.pipeline_message(prepare.clone());
-
-        self.on_replicate(prepare.clone()).await;
-        consensus.post_replicate_verify(&prepare);
-    }
-
-    fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
-        let consensus = self.consensus.as_ref().unwrap();
-
-        let header = prepare.header();
-        // TODO: Check per-partition journal once namespace extraction is 
possible.
-        // For now, only check if the op is already committed.
-        header.op <= consensus.commit()
-    }
-
     /// Replicate a prepare message to the next replica in the chain.
     ///
     /// Chain replication pattern:
@@ -663,22 +593,6 @@ where
             .unwrap();
     }
 
-    /// Verify hash chain would not break if we add this header.
-    fn panic_if_hash_chain_would_break_in_same_view(
-        &self,
-        previous: &PrepareHeader,
-        current: &PrepareHeader,
-    ) {
-        // If both headers are in the same view, parent must chain correctly
-        if previous.view == current.view {
-            assert_eq!(
-                current.parent, previous.checksum,
-                "hash chain broken in same view: op={} parent={} expected={}",
-                current.op, current.parent, previous.checksum
-            );
-        }
-    }
-
     fn commit_journal(&self) {
         // TODO: Implement commit logic for followers.
         // Walk through journal from last committed to current commit number
@@ -904,107 +818,11 @@ where
         debug!(?namespace, start_offset, "rotated to new segment");
     }
 
-    /// Send a prepare_ok message to the primary.
-    /// Called after successfully writing a prepare to the journal.
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
         let consensus = self.consensus.as_ref().unwrap();
-
-        assert_eq!(header.command, Command2::Prepare);
-
-        if consensus.status() != Status::Normal {
-            debug!(
-                replica = consensus.replica(),
-                status = ?consensus.status(),
-                "send_prepare_ok: not sending (not normal)"
-            );
-            return;
-        }
-
-        if consensus.is_syncing() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: not sending (syncing)"
-            );
-            return;
-        }
-
         // TODO: Verify the prepare is persisted in the partition journal.
         // The partition journal uses MessageLookup headers, so we cannot
         // check by PrepareHeader.op directly. For now, skip this check.
-
-        assert!(
-            header.view <= consensus.view(),
-            "send_prepare_ok: prepare view {} > our view {}",
-            header.view,
-            consensus.view()
-        );
-
-        if header.op > consensus.sequencer().current_sequence() {
-            debug!(
-                replica = consensus.replica(),
-                op = header.op,
-                our_op = consensus.sequencer().current_sequence(),
-                "send_prepare_ok: not sending (op ahead)"
-            );
-            return;
-        }
-
-        debug!(
-            replica = consensus.replica(),
-            op = header.op,
-            checksum = header.checksum,
-            "send_prepare_ok: sending"
-        );
-
-        // Use current view, not the prepare's view.
-        let prepare_ok_header = PrepareOkHeader {
-            command: Command2::PrepareOk,
-            cluster: consensus.cluster(),
-            replica: consensus.replica(),
-            view: consensus.view(),
-            op: header.op,
-            commit: consensus.commit(),
-            timestamp: header.timestamp,
-            parent: header.parent,
-            prepare_checksum: header.checksum,
-            request: header.request,
-            operation: header.operation,
-            namespace: header.namespace,
-            size: std::mem::size_of::<PrepareOkHeader>() as u32,
-            ..Default::default()
-        };
-
-        let message: Message<PrepareOkHeader> =
-            
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
-                .transmute_header(|_, new| *new = prepare_ok_header);
-        let generic_message = message.into_generic();
-        let primary = consensus.primary_index(consensus.view());
-
-        if primary == consensus.replica() {
-            debug!(
-                replica = consensus.replica(),
-                "send_prepare_ok: loopback to self"
-            );
-            // TODO: Queue for self-processing or call handle_prepare_ok 
directly
-            // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        } else {
-            debug!(
-                replica = consensus.replica(),
-                to = primary,
-                op = header.op,
-                "send_prepare_ok: sending to primary"
-            );
-
-            consensus
-                .message_bus()
-                .send_to_replica(primary, generic_message)
-                .await
-                .unwrap();
-        }
+        send_prepare_ok_common(consensus, header, None).await;
     }
 }


Reply via email to