This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch plane_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit eb9ac2b7283bd4f17b7de2e5aefae41cedb0ec54 Author: numinex <[email protected]> AuthorDate: Mon Feb 16 18:36:45 2026 +0100 n --- core/consensus/src/impls.rs | 4 + core/consensus/src/lib.rs | 3 + core/consensus/src/plane_helpers.rs | 242 ++++++++++++++++++++++++++++++++ core/metadata/src/impls/metadata.rs | 248 ++++----------------------------- core/partitions/src/iggy_partitions.rs | 240 ++++--------------------------- 5 files changed, 305 insertions(+), 432 deletions(-) diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 3fd146cd4..4b1e620c8 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -1213,6 +1213,10 @@ where !self.is_primary() } + fn is_normal(&self) -> bool { + self.status() == Status::Normal + } + fn is_syncing(&self) -> bool { self.is_syncing() } diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 9a7fd5bae..17f2fb9ea 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -63,6 +63,7 @@ pub trait Consensus: Sized { fn post_replicate_verify(&self, message: &Self::ReplicateMessage); fn is_follower(&self) -> bool; + fn is_normal(&self) -> bool; fn is_syncing(&self) -> bool; } @@ -83,6 +84,8 @@ where mod impls; pub use impls::*; +mod plane_helpers; +pub use plane_helpers::*; mod view_change_quorum; pub use view_change_quorum::*; diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs new file mode 100644 index 000000000..d64af8fee --- /dev/null +++ b/core/consensus/src/plane_helpers.rs @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status, VsrConsensus}; +use iggy_common::header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader}; +use iggy_common::message::Message; +use message_bus::MessageBus; +use std::ops::AsyncFnOnce; + +// TODO: Rework all of those helpers, once the boundries are more clear and we have a better picture of the commonalities between all of the planes. + +/// Shared pipeline-first request flow used by metadata and partitions. +pub async fn pipeline_prepare_common<C, F>( + consensus: &C, + prepare: C::ReplicateMessage, + on_replicate: F, +) where + C: Consensus, + C::ReplicateMessage: Clone, + F: AsyncFnOnce(C::ReplicateMessage) -> (), +{ + assert!(!consensus.is_follower(), "on_request: primary only"); + assert!(consensus.is_normal(), "on_request: status must be normal"); + assert!(!consensus.is_syncing(), "on_request: must not be syncing"); + + consensus.verify_pipeline(); + consensus.pipeline_message(prepare.clone()); + on_replicate(prepare.clone()).await; + consensus.post_replicate_verify(&prepare); +} + +/// Shared commit-based old-prepare fence. +pub fn fence_old_prepare_by_commit<B, P>( + consensus: &VsrConsensus<B, P>, + header: &PrepareHeader, +) -> bool +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + header.op <= consensus.commit() +} + +/// Shared preflight checks for `on_replicate`. +/// +/// Returns current op on success. +pub fn replicate_preflight<B, P>( + consensus: &VsrConsensus<B, P>, + header: &PrepareHeader, +) -> Result<u64, &'static str> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + assert_eq!(header.command, Command2::Prepare); + + if consensus.is_syncing() { + return Err("sync"); + } + + let current_op = consensus.sequencer().current_sequence(); + + if consensus.status() != Status::Normal { + return Err("not normal state"); + } + + if header.view > consensus.view() { + return Err("newer view"); + } + + if consensus.is_follower() { + consensus.advance_commit_number(header.commit); + } + + Ok(current_op) +} + +/// Shared preflight checks for `on_ack`. +pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(), &'static str> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + if !consensus.is_primary() { + return Err("not primary"); + } + + if consensus.status() != Status::Normal { + return Err("not normal"); + } + + Ok(()) +} + +/// Shared quorum + extraction flow for ack handling. +pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: &PrepareOkHeader) -> bool +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + if !consensus.handle_prepare_ok(ack) { + return false; + } + + consensus.advance_commit_number(ack.op); + true +} + +/// Shared reply-message construction for committed prepare. +pub fn build_reply_message<B, P>( + consensus: &VsrConsensus<B, P>, + prepare_header: &PrepareHeader, +) -> Message<ReplyHeader> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_, new| { + *new = ReplyHeader { + checksum: 0, + checksum_body: 0, + cluster: consensus.cluster(), + size: std::mem::size_of::<ReplyHeader>() as u32, + view: consensus.view(), + release: 0, + command: Command2::Reply, + replica: consensus.replica(), + reserved_frame: [0; 66], + request_checksum: prepare_header.request_checksum, + context: 0, + op: prepare_header.op, + commit: consensus.commit(), + timestamp: prepare_header.timestamp, + request: prepare_header.request, + operation: prepare_header.operation, + ..Default::default() + }; + }) +} + +/// Verify hash chain would not break if we add this header. +pub fn panic_if_hash_chain_would_break_in_same_view( + previous: &PrepareHeader, + current: &PrepareHeader, +) { + // If both headers are in the same view, parent must chain correctly. + if previous.view == current.view { + assert_eq!( + current.parent, previous.checksum, + "hash chain broken in same view: op={} parent={} expected={}", + current.op, current.parent, previous.checksum + ); + } +} + +// TODO: Figure out how to make this check the journal if it contains the prepare. +pub async fn send_prepare_ok<B, P>( + consensus: &VsrConsensus<B, P>, + header: &PrepareHeader, + is_persisted: Option<bool>, +) where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + assert_eq!(header.command, Command2::Prepare); + + if consensus.status() != Status::Normal { + return; + } + + if consensus.is_syncing() { + return; + } + + if let Some(false) = is_persisted { + return; + } + + assert!( + header.view <= consensus.view(), + "send_prepare_ok: prepare view {} > our view {}", + header.view, + consensus.view() + ); + + if header.op > consensus.sequencer().current_sequence() { + return; + } + + let prepare_ok_header = PrepareOkHeader { + command: Command2::PrepareOk, + cluster: consensus.cluster(), + replica: consensus.replica(), + view: consensus.view(), + op: header.op, + commit: consensus.commit(), + timestamp: header.timestamp, + parent: header.parent, + prepare_checksum: header.checksum, + request: header.request, + operation: header.operation, + namespace: header.namespace, + size: std::mem::size_of::<PrepareOkHeader>() as u32, + ..Default::default() + }; + + let message: Message<PrepareOkHeader> = + Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>()) + .transmute_header(|_, new| *new = prepare_ok_header); + let generic_message = message.into_generic(); + let primary = consensus.primary_index(consensus.view()); + + if primary == consensus.replica() { + // TODO: Queue for self-processing or call handle_prepare_ok directly. + // TODO: This is temporal, to test simulator, but we should send message to ourselves properly. + consensus + .message_bus() + .send_to_replica(primary, generic_message) + .await + .unwrap(); + } else { + consensus + .message_bus() + .send_to_replica(primary, generic_message) + .await + .unwrap(); + } +} diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index c4a55fc54..9d1a38799 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -17,10 +17,13 @@ use crate::stm::StateMachine; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; use consensus::{ - Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, Status, VsrConsensus, + Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, + ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, + panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, replicate_preflight, + send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ - header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader}, + header::{Command2, GenericHeader, PrepareHeader}, message::Message, }; use journal::{Journal, JournalHandle}; @@ -111,7 +114,7 @@ where // TODO: Bunch of asserts. debug!("handling metadata request"); let prepare = message.project(consensus); - self.pipeline_prepare(prepare).await; + pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } async fn on_replicate(&self, message: <VsrConsensus<B, P> as Consensus>::ReplicateMessage) { @@ -120,57 +123,35 @@ where let header = message.header(); - assert_eq!(header.command, Command2::Prepare); + let current_op = match replicate_preflight(consensus, header) { + Ok(current_op) => current_op, + Err(reason) => { + warn!( + replica = consensus.replica(), + "on_replicate: ignoring ({reason})" + ); + return; + } + }; - if !self.fence_old_prepare(&message) { + // TODO: Handle idx calculation, for now using header.op, but since the journal may get compacted, this may not be correct. + let is_old_prepare = fence_old_prepare_by_commit(consensus, header) + || journal.handle().header(header.op as usize).is_some(); + if !is_old_prepare { self.replicate(message.clone()).await; } else { warn!("received old prepare, not replicating"); } - // If syncing, ignore the replicate message. - if consensus.is_syncing() { - warn!( - replica = consensus.replica(), - "on_replicate: ignoring (sync)" - ); - return; - } - - let current_op = consensus.sequencer().current_sequence(); - - // If status is not normal, ignore the replicate. - if consensus.status() != Status::Normal { - warn!( - replica = consensus.replica(), - "on_replicate: ignoring (not normal state)" - ); - return; - } - - //if message from future view, we ignore the replicate. - if header.view > consensus.view() { - warn!( - replica = consensus.replica(), - "on_replicate: ignoring (newer view)" - ); - return; - } - // TODO add assertions for valid state here. - // If we are a follower, we advance the commit number. - if consensus.is_follower() { - consensus.advance_commit_number(message.header().commit); - } - // TODO verify that the current prepare fits in the WAL. // TODO handle gap in ops. // Verify hash chain integrity. if let Some(previous) = journal.handle().previous_header(header) { - self.panic_if_hash_chain_would_break_in_same_view(&previous, header); + panic_if_hash_chain_would_break_in_same_view(&previous, header); } assert_eq!(header.op, current_op + 1); @@ -193,13 +174,8 @@ where let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); - if !consensus.is_primary() { - warn!("on_ack: ignoring (not primary)"); - return; - } - - if consensus.status() != Status::Normal { - warn!("on_ack: ignoring (not normal)"); + if let Err(reason) = ack_preflight(consensus) { + warn!("on_ack: ignoring ({reason})"); return; } @@ -214,12 +190,10 @@ where } } - // Let consensus handle the ack increment and quorum check - if consensus.handle_prepare_ok(header) { + if ack_quorum_reached(consensus, header) { let journal = self.journal.as_ref().unwrap(); debug!("on_ack: quorum received for op={}", header.op); - consensus.advance_commit_number(header.op); // Extract the header from the pipeline, fetch the full message from journal // TODO: Commit from the head. ALWAYS @@ -249,32 +223,8 @@ where let _result = self.mux_stm.update(prepare); debug!("on_ack: state applied for op={}", prepare_header.op); - // TODO: Figure out better infra for this, its messy. - let reply = Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()) - .transmute_header(|_, new| { - *new = ReplyHeader { - checksum: 0, - checksum_body: 0, - cluster: consensus.cluster(), - size: std::mem::size_of::<ReplyHeader>() as u32, - view: consensus.view(), - release: 0, - command: Command2::Reply, - replica: consensus.replica(), - reserved_frame: [0; 66], - request_checksum: prepare_header.request_checksum, - context: 0, - op: prepare_header.op, - commit: consensus.commit(), - timestamp: prepare_header.timestamp, - request: prepare_header.request, - operation: prepare_header.operation, - ..Default::default() - }; - }); - // Send reply to client - let generic_reply = reply.into_generic(); + let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); debug!( "on_ack: sending reply to client={} for op={}", prepare_header.client, prepare_header.op @@ -302,30 +252,6 @@ where >, M: StateMachine<Input = Message<PrepareHeader>>, { - async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) { - let consensus = self.consensus.as_ref().unwrap(); - - debug!("inserting prepare into metadata pipeline"); - consensus.verify_pipeline(); - // Pipeline-first ordering is safe only because message - // processing is cooperative (single-task, RefCell-based). - // If on_replicate ever early-returns (syncing, status change) - // the entry would be in the pipeline without journal backing. - consensus.pipeline_message(prepare.clone()); - self.on_replicate(prepare.clone()).await; - - consensus.post_replicate_verify(&prepare); - } - - fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool { - let consensus = self.consensus.as_ref().unwrap(); - let journal = self.journal.as_ref().unwrap(); - - let header = prepare.header(); - // TODO: Handle idx calculation, for now using header.op, but since the journal may get compacted, this may not be correct. - header.op <= consensus.commit() || journal.handle().header(header.op as usize).is_some() - } - /// Replicate a prepare message to the next replica in the chain. /// /// Chain replication pattern: @@ -376,22 +302,6 @@ where .unwrap(); } - /// Verify hash chain would not break if we add this header. - fn panic_if_hash_chain_would_break_in_same_view( - &self, - previous: &PrepareHeader, - current: &PrepareHeader, - ) { - // If both headers are in the same view, parent must chain correctly - if previous.view == current.view { - assert_eq!( - current.parent, previous.checksum, - "hash chain broken in same view: op={} parent={} expected={}", - current.op, current.parent, previous.checksum - ); - } - } - // TODO: Implement jump_to_newer_op // fn jump_to_newer_op(&self, header: &PrepareHeader) {} @@ -401,114 +311,10 @@ where // Apply each entry to the state machine } - /// Send a prepare_ok message to the primary. - /// Called after successfully writing a prepare to the journal. async fn send_prepare_ok(&self, header: &PrepareHeader) { let consensus = self.consensus.as_ref().unwrap(); let journal = self.journal.as_ref().unwrap(); - - assert_eq!(header.command, Command2::Prepare); - - if consensus.status() != Status::Normal { - debug!( - replica = consensus.replica(), - status = ?consensus.status(), - "send_prepare_ok: not sending (not normal)" - ); - return; - } - - if consensus.is_syncing() { - debug!( - replica = consensus.replica(), - "send_prepare_ok: not sending (syncing)" - ); - return; - } - - // Verify we have the prepare and it's persisted (not dirty). - if journal.handle().header(header.op as usize).is_none() { - debug!( - replica = consensus.replica(), - op = header.op, - "send_prepare_ok: not sending (not persisted or missing)" - ); - return; - } - - assert!( - header.view <= consensus.view(), - "send_prepare_ok: prepare view {} > our view {}", - header.view, - consensus.view() - ); - - if header.op > consensus.sequencer().current_sequence() { - debug!( - replica = consensus.replica(), - op = header.op, - our_op = consensus.sequencer().current_sequence(), - "send_prepare_ok: not sending (op ahead)" - ); - return; - } - - debug!( - replica = consensus.replica(), - op = header.op, - checksum = header.checksum, - "send_prepare_ok: sending" - ); - - // Use current view, not the prepare's view. - let prepare_ok_header = PrepareOkHeader { - command: Command2::PrepareOk, - cluster: consensus.cluster(), - replica: consensus.replica(), - view: consensus.view(), - op: header.op, - commit: consensus.commit(), - timestamp: header.timestamp, - parent: header.parent, - prepare_checksum: header.checksum, - request: header.request, - operation: header.operation, - namespace: header.namespace, - size: std::mem::size_of::<PrepareOkHeader>() as u32, - ..Default::default() - }; - - let message: Message<PrepareOkHeader> = - Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>()) - .transmute_header(|_, new| *new = prepare_ok_header); - let generic_message = message.into_generic(); - let primary = consensus.primary_index(consensus.view()); - - if primary == consensus.replica() { - debug!( - replica = consensus.replica(), - "send_prepare_ok: loopback to self" - ); - // TODO: Queue for self-processing or call handle_prepare_ok directly - // TODO: This is temporal, to test simulator, but we should send message to ourselves properly. - consensus - .message_bus() - .send_to_replica(primary, generic_message) - .await - .unwrap(); - } else { - debug!( - replica = consensus.replica(), - to = primary, - op = header.op, - "send_prepare_ok: sending to primary" - ); - - consensus - .message_bus() - .send_to_replica(primary, generic_message) - .await - .unwrap(); - } + let persisted = journal.handle().header(header.op as usize).is_some(); + send_prepare_ok_common(consensus, header, Some(persisted)).await; } } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 305f2dfb2..0a3bb0394 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -20,11 +20,15 @@ use crate::IggyPartition; use crate::Partition; use crate::types::PartitionsConfig; -use consensus::{Consensus, Plane, Project, Sequencer, Status, VsrConsensus}; +use consensus::{ + Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, + ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, pipeline_prepare_common, + replicate_preflight, send_prepare_ok as send_prepare_ok_common, +}; use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, Segment, SegmentStorage, - header::{Command2, GenericHeader, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader}, + header::{Command2, GenericHeader, Operation, PrepareHeader}, message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, }; @@ -334,7 +338,7 @@ where debug!("handling partition request"); let prepare = message.project(consensus); - self.pipeline_prepare(prepare).await; + pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { @@ -342,48 +346,24 @@ where let header = message.header(); - assert_eq!(header.command, Command2::Prepare); + let current_op = match replicate_preflight(consensus, header) { + Ok(current_op) => current_op, + Err(reason) => { + warn!( + replica = consensus.replica(), + "on_replicate: ignoring ({reason})" + ); + return; + } + }; - if !self.fence_old_prepare(&message) { + let is_old_prepare = fence_old_prepare_by_commit(consensus, header); + if !is_old_prepare { self.replicate(message.clone()).await; } else { warn!("received old prepare, not replicating"); } - // If syncing, ignore the replicate message. - if consensus.is_syncing() { - warn!( - replica = consensus.replica(), - "on_replicate: ignoring (sync)" - ); - return; - } - - let current_op = consensus.sequencer().current_sequence(); - - // If status is not normal, ignore the replicate. - if consensus.status() != Status::Normal { - warn!( - replica = consensus.replica(), - "on_replicate: ignoring (not normal state)" - ); - return; - } - - // If message from future view, we ignore the replicate. - if header.view > consensus.view() { - warn!( - replica = consensus.replica(), - "on_replicate: ignoring (newer view)" - ); - return; - } - - // If we are a follower, we advance the commit number. - if consensus.is_follower() { - consensus.advance_commit_number(message.header().commit); - } - // TODO: Make those assertions be toggleable through an feature flag, so they can be used only by simulator/tests. debug_assert_eq!(header.op, current_op + 1); consensus.sequencer().set_sequence(header.op); @@ -407,17 +387,11 @@ where let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); - if !consensus.is_primary() { - warn!("on_ack: ignoring (not primary)"); - return; - } - - if consensus.status() != Status::Normal { - warn!("on_ack: ignoring (not normal)"); + if let Err(reason) = ack_preflight(consensus) { + warn!("on_ack: ignoring ({reason})"); return; } - // Verify checksum by checking pipeline entry exists { let pipeline = consensus.pipeline().borrow(); let Some(entry) = @@ -433,21 +407,21 @@ where } } - // Let consensus handle the ack increment and quorum check - if consensus.handle_prepare_ok(header) { + if ack_quorum_reached(consensus, header) { debug!("on_ack: quorum received for op={}", header.op); - consensus.advance_commit_number(header.op); // Extract the prepare message from the pipeline by op // TODO: Commit from the head. ALWAYS let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); - let Some(entry) = entry else { + let Some(PipelineEntry { + header: prepare_header, + .. + }) = entry + else { warn!("on_ack: prepare not found in pipeline for op={}", header.op); return; }; - let prepare_header = entry.header; - // Data was already appended to the partition journal during // on_replicate. Now that quorum is reached, update the partition's // current offset and check whether the journal needs flushing. @@ -473,32 +447,8 @@ where } } - // TODO: Figure out better infra for this, its messy. - let reply = Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()) - .transmute_header(|_, new| { - *new = ReplyHeader { - checksum: 0, - checksum_body: 0, - cluster: consensus.cluster(), - size: std::mem::size_of::<ReplyHeader>() as u32, - view: consensus.view(), - release: 0, - command: Command2::Reply, - replica: consensus.replica(), - reserved_frame: [0; 66], - request_checksum: prepare_header.request_checksum, - context: 0, - op: prepare_header.op, - commit: consensus.commit(), - timestamp: prepare_header.timestamp, - request: prepare_header.request, - operation: prepare_header.operation, - ..Default::default() - }; - }); - // Send reply to client - let generic_reply = reply.into_generic(); + let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); debug!( "on_ack: sending reply to client={} for op={}", prepare_header.client, prepare_header.op @@ -600,26 +550,6 @@ where let _ = partition.append_messages(batch).await; } - async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) { - let consensus = self.consensus.as_ref().unwrap(); - - debug!("inserting prepare into partition pipeline"); - consensus.verify_pipeline(); - consensus.pipeline_message(prepare.clone()); - - self.on_replicate(prepare.clone()).await; - consensus.post_replicate_verify(&prepare); - } - - fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool { - let consensus = self.consensus.as_ref().unwrap(); - - let header = prepare.header(); - // TODO: Check per-partition journal once namespace extraction is possible. - // For now, only check if the op is already committed. - header.op <= consensus.commit() - } - /// Replicate a prepare message to the next replica in the chain. /// /// Chain replication pattern: @@ -663,22 +593,6 @@ where .unwrap(); } - /// Verify hash chain would not break if we add this header. - fn panic_if_hash_chain_would_break_in_same_view( - &self, - previous: &PrepareHeader, - current: &PrepareHeader, - ) { - // If both headers are in the same view, parent must chain correctly - if previous.view == current.view { - assert_eq!( - current.parent, previous.checksum, - "hash chain broken in same view: op={} parent={} expected={}", - current.op, current.parent, previous.checksum - ); - } - } - fn commit_journal(&self) { // TODO: Implement commit logic for followers. // Walk through journal from last committed to current commit number @@ -904,107 +818,11 @@ where debug!(?namespace, start_offset, "rotated to new segment"); } - /// Send a prepare_ok message to the primary. - /// Called after successfully writing a prepare to the journal. async fn send_prepare_ok(&self, header: &PrepareHeader) { let consensus = self.consensus.as_ref().unwrap(); - - assert_eq!(header.command, Command2::Prepare); - - if consensus.status() != Status::Normal { - debug!( - replica = consensus.replica(), - status = ?consensus.status(), - "send_prepare_ok: not sending (not normal)" - ); - return; - } - - if consensus.is_syncing() { - debug!( - replica = consensus.replica(), - "send_prepare_ok: not sending (syncing)" - ); - return; - } - // TODO: Verify the prepare is persisted in the partition journal. // The partition journal uses MessageLookup headers, so we cannot // check by PrepareHeader.op directly. For now, skip this check. - - assert!( - header.view <= consensus.view(), - "send_prepare_ok: prepare view {} > our view {}", - header.view, - consensus.view() - ); - - if header.op > consensus.sequencer().current_sequence() { - debug!( - replica = consensus.replica(), - op = header.op, - our_op = consensus.sequencer().current_sequence(), - "send_prepare_ok: not sending (op ahead)" - ); - return; - } - - debug!( - replica = consensus.replica(), - op = header.op, - checksum = header.checksum, - "send_prepare_ok: sending" - ); - - // Use current view, not the prepare's view. - let prepare_ok_header = PrepareOkHeader { - command: Command2::PrepareOk, - cluster: consensus.cluster(), - replica: consensus.replica(), - view: consensus.view(), - op: header.op, - commit: consensus.commit(), - timestamp: header.timestamp, - parent: header.parent, - prepare_checksum: header.checksum, - request: header.request, - operation: header.operation, - namespace: header.namespace, - size: std::mem::size_of::<PrepareOkHeader>() as u32, - ..Default::default() - }; - - let message: Message<PrepareOkHeader> = - Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>()) - .transmute_header(|_, new| *new = prepare_ok_header); - let generic_message = message.into_generic(); - let primary = consensus.primary_index(consensus.view()); - - if primary == consensus.replica() { - debug!( - replica = consensus.replica(), - "send_prepare_ok: loopback to self" - ); - // TODO: Queue for self-processing or call handle_prepare_ok directly - // TODO: This is temporal, to test simulator, but we should send message to ourselves properly. - consensus - .message_bus() - .send_to_replica(primary, generic_message) - .await - .unwrap(); - } else { - debug!( - replica = consensus.replica(), - to = primary, - op = header.op, - "send_prepare_ok: sending to primary" - ); - - consensus - .message_bus() - .send_to_replica(primary, generic_message) - .await - .unwrap(); - } + send_prepare_ok_common(consensus, header, None).await; } }
