This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 51cb96951 refactor(partitions): add plane trait and refactor
partitions module (#2744)
51cb96951 is described below
commit 51cb96951f87991c1a652ffdaddfb41bc2f39301
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Feb 16 19:31:38 2026 +0100
refactor(partitions): add plane trait and refactor partitions module (#2744)
Adds `Plane` trait as replacement for `Metadata` and `Partitions`
traits, refactors the logic by moving shared part into `plane_helpers`
module. Refactors the `partitions` module by creating `Partition` trait
and moving the `append_batch` logic to `append_messages` method from
`Partition` trait.
---
core/consensus/src/impls.rs | 4 +
core/consensus/src/lib.rs | 18 ++
core/consensus/src/plane_helpers.rs | 271 +++++++++++++++++++++
core/metadata/src/impls/metadata.rs | 296 +++--------------------
core/metadata/src/lib.rs | 4 +-
core/partitions/src/iggy_partition.rs | 76 +++++-
core/partitions/src/iggy_partitions.rs | 425 +++++++--------------------------
core/partitions/src/lib.rs | 52 ++--
core/simulator/src/lib.rs | 3 +-
9 files changed, 522 insertions(+), 627 deletions(-)
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 3fd146cd4..4b1e620c8 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -1213,6 +1213,10 @@ where
!self.is_primary()
}
+ fn is_normal(&self) -> bool {
+ self.status() == Status::Normal
+ }
+
fn is_syncing(&self) -> bool {
self.is_syncing()
}
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 5c3e152e6..17f2fb9ea 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -63,11 +63,29 @@ pub trait Consensus: Sized {
fn post_replicate_verify(&self, message: &Self::ReplicateMessage);
fn is_follower(&self) -> bool;
+ fn is_normal(&self) -> bool;
fn is_syncing(&self) -> bool;
}
+/// Shared consensus lifecycle interface for control/data planes.
+///
+/// This abstracts the VSR message flow:
+/// - request -> prepare
+/// - replicate (prepare)
+/// - ack (prepare_ok)
+pub trait Plane<C>
+where
+ C: Consensus,
+{
+ fn on_request(&self, message: C::RequestMessage) -> impl Future<Output =
()>;
+ fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output
= ()>;
+ fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+}
+
mod impls;
pub use impls::*;
+mod plane_helpers;
+pub use plane_helpers::*;
mod view_change_quorum;
pub use view_change_quorum::*;
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
new file mode 100644
index 000000000..60ea1dedf
--- /dev/null
+++ b/core/consensus/src/plane_helpers.rs
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{Consensus, Pipeline, PipelineEntry, Sequencer, Status,
VsrConsensus};
+use iggy_common::header::{Command2, GenericHeader, PrepareHeader,
PrepareOkHeader, ReplyHeader};
+use iggy_common::message::Message;
+use message_bus::MessageBus;
+use std::ops::AsyncFnOnce;
+
+// TODO: Rework all of those helpers, once the boundries are more clear and we
have a better picture of the commonalities between all of the planes.
+
+/// Shared pipeline-first request flow used by metadata and partitions.
+pub async fn pipeline_prepare_common<C, F>(
+ consensus: &C,
+ prepare: C::ReplicateMessage,
+ on_replicate: F,
+) where
+ C: Consensus,
+ C::ReplicateMessage: Clone,
+ F: AsyncFnOnce(C::ReplicateMessage) -> (),
+{
+ assert!(!consensus.is_follower(), "on_request: primary only");
+ assert!(consensus.is_normal(), "on_request: status must be normal");
+ assert!(!consensus.is_syncing(), "on_request: must not be syncing");
+
+ consensus.verify_pipeline();
+ consensus.pipeline_message(prepare.clone());
+ on_replicate(prepare.clone()).await;
+ consensus.post_replicate_verify(&prepare);
+}
+
+/// Shared commit-based old-prepare fence.
+pub fn fence_old_prepare_by_commit<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ header: &PrepareHeader,
+) -> bool
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ header.op <= consensus.commit()
+}
+
+/// Shared chain-replication forwarding to the next replica.
+pub async fn replicate_to_next_in_chain<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ message: Message<PrepareHeader>,
+) where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ let header = message.header();
+
+ assert_eq!(header.command, Command2::Prepare);
+ assert!(header.op > consensus.commit());
+
+ let next = (consensus.replica() + 1) % consensus.replica_count();
+ let primary = consensus.primary_index(header.view);
+
+ if next == primary {
+ return;
+ }
+
+ assert_ne!(next, consensus.replica());
+
+ consensus
+ .message_bus()
+ .send_to_replica(next, message.into_generic())
+ .await
+ .unwrap();
+}
+
+/// Shared preflight checks for `on_replicate`.
+///
+/// Returns current op on success.
+pub fn replicate_preflight<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ header: &PrepareHeader,
+) -> Result<u64, &'static str>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ assert_eq!(header.command, Command2::Prepare);
+
+ if consensus.is_syncing() {
+ return Err("sync");
+ }
+
+ let current_op = consensus.sequencer().current_sequence();
+
+ if consensus.status() != Status::Normal {
+ return Err("not normal state");
+ }
+
+ if header.view > consensus.view() {
+ return Err("newer view");
+ }
+
+ if consensus.is_follower() {
+ consensus.advance_commit_number(header.commit);
+ }
+
+ Ok(current_op)
+}
+
+/// Shared preflight checks for `on_ack`.
+pub fn ack_preflight<B, P>(consensus: &VsrConsensus<B, P>) -> Result<(),
&'static str>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ if !consensus.is_primary() {
+ return Err("not primary");
+ }
+
+ if consensus.status() != Status::Normal {
+ return Err("not normal");
+ }
+
+ Ok(())
+}
+
+/// Shared quorum + extraction flow for ack handling.
+pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack:
&PrepareOkHeader) -> bool
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ if !consensus.handle_prepare_ok(ack) {
+ return false;
+ }
+
+ consensus.advance_commit_number(ack.op);
+ true
+}
+
+/// Shared reply-message construction for committed prepare.
+pub fn build_reply_message<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ prepare_header: &PrepareHeader,
+) -> Message<ReplyHeader>
+where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>()).transmute_header(|_,
new| {
+ *new = ReplyHeader {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: consensus.cluster(),
+ size: std::mem::size_of::<ReplyHeader>() as u32,
+ view: consensus.view(),
+ release: 0,
+ command: Command2::Reply,
+ replica: consensus.replica(),
+ reserved_frame: [0; 66],
+ request_checksum: prepare_header.request_checksum,
+ context: 0,
+ op: prepare_header.op,
+ commit: consensus.commit(),
+ timestamp: prepare_header.timestamp,
+ request: prepare_header.request,
+ operation: prepare_header.operation,
+ ..Default::default()
+ };
+ })
+}
+
+/// Verify hash chain would not break if we add this header.
+pub fn panic_if_hash_chain_would_break_in_same_view(
+ previous: &PrepareHeader,
+ current: &PrepareHeader,
+) {
+ // If both headers are in the same view, parent must chain correctly.
+ if previous.view == current.view {
+ assert_eq!(
+ current.parent, previous.checksum,
+ "hash chain broken in same view: op={} parent={} expected={}",
+ current.op, current.parent, previous.checksum
+ );
+ }
+}
+
+// TODO: Figure out how to make this check the journal if it contains the
prepare.
+pub async fn send_prepare_ok<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ header: &PrepareHeader,
+ is_persisted: Option<bool>,
+) where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+ assert_eq!(header.command, Command2::Prepare);
+
+ if consensus.status() != Status::Normal {
+ return;
+ }
+
+ if consensus.is_syncing() {
+ return;
+ }
+
+ if let Some(false) = is_persisted {
+ return;
+ }
+
+ assert!(
+ header.view <= consensus.view(),
+ "send_prepare_ok: prepare view {} > our view {}",
+ header.view,
+ consensus.view()
+ );
+
+ if header.op > consensus.sequencer().current_sequence() {
+ return;
+ }
+
+ let prepare_ok_header = PrepareOkHeader {
+ command: Command2::PrepareOk,
+ cluster: consensus.cluster(),
+ replica: consensus.replica(),
+ view: consensus.view(),
+ op: header.op,
+ commit: consensus.commit(),
+ timestamp: header.timestamp,
+ parent: header.parent,
+ prepare_checksum: header.checksum,
+ request: header.request,
+ operation: header.operation,
+ namespace: header.namespace,
+ size: std::mem::size_of::<PrepareOkHeader>() as u32,
+ ..Default::default()
+ };
+
+ let message: Message<PrepareOkHeader> =
+ Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
+ .transmute_header(|_, new| *new = prepare_ok_header);
+ let generic_message = message.into_generic();
+ let primary = consensus.primary_index(consensus.view());
+
+ if primary == consensus.replica() {
+ // TODO: Queue for self-processing or call handle_prepare_ok directly.
+ // TODO: This is temporal, to test simulator, but we should send
message to ourselves properly.
+ consensus
+ .message_bus()
+ .send_to_replica(primary, generic_message)
+ .await
+ .unwrap();
+ } else {
+ consensus
+ .message_bus()
+ .send_to_replica(primary, generic_message)
+ .await
+ .unwrap();
+ }
+}
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 77a174e39..9a81461af 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -16,9 +16,14 @@
// under the License.
use crate::stm::StateMachine;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot,
SnapshotError};
-use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer,
Status, VsrConsensus};
+use consensus::{
+ Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer,
VsrConsensus, ack_preflight,
+ ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+ panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common,
replicate_preflight,
+ replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
+};
use iggy_common::{
- header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader,
ReplyHeader},
+ header::{Command2, GenericHeader, PrepareHeader},
message::Message,
};
use journal::{Journal, JournalHandle};
@@ -79,20 +84,6 @@ impl Snapshot for IggySnapshot {
}
}
-pub trait Metadata<C>
-where
- C: Consensus,
-{
- /// Handle a request message.
- fn on_request(&self, message: C::RequestMessage) -> impl Future<Output =
()>;
-
- /// Handle a replicate message (Prepare in VSR).
- fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output
= ()>;
-
- /// Handle an ack message (PrepareOk in VSR).
- fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
-}
-
#[derive(Debug)]
pub struct IggyMetadata<C, J, S, M> {
/// Some on shard0, None on other shards
@@ -105,7 +96,7 @@ pub struct IggyMetadata<C, J, S, M> {
pub mux_stm: M,
}
-impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for
IggyMetadata<VsrConsensus<B, P>, J, S, M>
+impl<B, P, J, S, M> Plane<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B,
P>, J, S, M>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
@@ -123,7 +114,7 @@ where
// TODO: Bunch of asserts.
debug!("handling metadata request");
let prepare = message.project(consensus);
- self.pipeline_prepare(prepare).await;
+ pipeline_prepare_common(consensus, prepare, |prepare|
self.on_replicate(prepare)).await;
}
async fn on_replicate(&self, message: <VsrConsensus<B, P> as
Consensus>::ReplicateMessage) {
@@ -132,57 +123,35 @@ where
let header = message.header();
- assert_eq!(header.command, Command2::Prepare);
+ let current_op = match replicate_preflight(consensus, header) {
+ Ok(current_op) => current_op,
+ Err(reason) => {
+ warn!(
+ replica = consensus.replica(),
+ "on_replicate: ignoring ({reason})"
+ );
+ return;
+ }
+ };
- if !self.fence_old_prepare(&message) {
+ // TODO: Handle idx calculation, for now using header.op, but since
the journal may get compacted, this may not be correct.
+ let is_old_prepare = fence_old_prepare_by_commit(consensus, header)
+ || journal.handle().header(header.op as usize).is_some();
+ if !is_old_prepare {
self.replicate(message.clone()).await;
} else {
warn!("received old prepare, not replicating");
}
- // If syncing, ignore the replicate message.
- if consensus.is_syncing() {
- warn!(
- replica = consensus.replica(),
- "on_replicate: ignoring (sync)"
- );
- return;
- }
-
- let current_op = consensus.sequencer().current_sequence();
-
- // If status is not normal, ignore the replicate.
- if consensus.status() != Status::Normal {
- warn!(
- replica = consensus.replica(),
- "on_replicate: ignoring (not normal state)"
- );
- return;
- }
-
- //if message from future view, we ignore the replicate.
- if header.view > consensus.view() {
- warn!(
- replica = consensus.replica(),
- "on_replicate: ignoring (newer view)"
- );
- return;
- }
-
// TODO add assertions for valid state here.
- // If we are a follower, we advance the commit number.
- if consensus.is_follower() {
- consensus.advance_commit_number(message.header().commit);
- }
-
// TODO verify that the current prepare fits in the WAL.
// TODO handle gap in ops.
// Verify hash chain integrity.
if let Some(previous) = journal.handle().previous_header(header) {
- self.panic_if_hash_chain_would_break_in_same_view(&previous,
header);
+ panic_if_hash_chain_would_break_in_same_view(&previous, header);
}
assert_eq!(header.op, current_op + 1);
@@ -205,13 +174,8 @@ where
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
- if !consensus.is_primary() {
- warn!("on_ack: ignoring (not primary)");
- return;
- }
-
- if consensus.status() != Status::Normal {
- warn!("on_ack: ignoring (not normal)");
+ if let Err(reason) = ack_preflight(consensus) {
+ warn!("on_ack: ignoring ({reason})");
return;
}
@@ -226,12 +190,10 @@ where
}
}
- // Let consensus handle the ack increment and quorum check
- if consensus.handle_prepare_ok(header) {
+ if ack_quorum_reached(consensus, header) {
let journal = self.journal.as_ref().unwrap();
debug!("on_ack: quorum received for op={}", header.op);
- consensus.advance_commit_number(header.op);
// Extract the header from the pipeline, fetch the full message
from journal
// TODO: Commit from the head. ALWAYS
@@ -261,32 +223,8 @@ where
let _result = self.mux_stm.update(prepare);
debug!("on_ack: state applied for op={}", prepare_header.op);
- // TODO: Figure out better infra for this, its messy.
- let reply =
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
- .transmute_header(|_, new| {
- *new = ReplyHeader {
- checksum: 0,
- checksum_body: 0,
- cluster: consensus.cluster(),
- size: std::mem::size_of::<ReplyHeader>() as u32,
- view: consensus.view(),
- release: 0,
- command: Command2::Reply,
- replica: consensus.replica(),
- reserved_frame: [0; 66],
- request_checksum: prepare_header.request_checksum,
- context: 0,
- op: prepare_header.op,
- commit: consensus.commit(),
- timestamp: prepare_header.timestamp,
- request: prepare_header.request,
- operation: prepare_header.operation,
- ..Default::default()
- };
- });
-
// Send reply to client
- let generic_reply = reply.into_generic();
+ let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
debug!(
"on_ack: sending reply to client={} for op={}",
prepare_header.client, prepare_header.op
@@ -314,30 +252,6 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
- async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
- let consensus = self.consensus.as_ref().unwrap();
-
- debug!("inserting prepare into metadata pipeline");
- consensus.verify_pipeline();
- // Pipeline-first ordering is safe only because message
- // processing is cooperative (single-task, RefCell-based).
- // If on_replicate ever early-returns (syncing, status change)
- // the entry would be in the pipeline without journal backing.
- consensus.pipeline_message(prepare.clone());
- self.on_replicate(prepare.clone()).await;
-
- consensus.post_replicate_verify(&prepare);
- }
-
- fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
- let consensus = self.consensus.as_ref().unwrap();
- let journal = self.journal.as_ref().unwrap();
-
- let header = prepare.header();
- // TODO: Handle idx calculation, for now using header.op, but since
the journal may get compacted, this may not be correct.
- header.op <= consensus.commit() || journal.handle().header(header.op
as usize).is_some()
- }
-
/// Replicate a prepare message to the next replica in the chain.
///
/// Chain replication pattern:
@@ -357,51 +271,7 @@ where
journal.handle().header(idx).is_none(),
"replicate: must not already have prepare"
);
- assert!(header.op > consensus.commit());
-
- let next = (consensus.replica() + 1) % consensus.replica_count();
-
- let primary = consensus.primary_index(header.view);
- if next == primary {
- debug!(
- replica = consensus.replica(),
- op = header.op,
- "replicate: not replicating (ring complete)"
- );
- return;
- }
-
- assert_ne!(next, consensus.replica());
-
- debug!(
- replica = consensus.replica(),
- to = next,
- op = header.op,
- "replicate: forwarding"
- );
-
- let message = message.into_generic();
- consensus
- .message_bus()
- .send_to_replica(next, message)
- .await
- .unwrap();
- }
-
- /// Verify hash chain would not break if we add this header.
- fn panic_if_hash_chain_would_break_in_same_view(
- &self,
- previous: &PrepareHeader,
- current: &PrepareHeader,
- ) {
- // If both headers are in the same view, parent must chain correctly
- if previous.view == current.view {
- assert_eq!(
- current.parent, previous.checksum,
- "hash chain broken in same view: op={} parent={} expected={}",
- current.op, current.parent, previous.checksum
- );
- }
+ replicate_to_next_in_chain(consensus, message).await;
}
// TODO: Implement jump_to_newer_op
@@ -413,114 +283,10 @@ where
// Apply each entry to the state machine
}
- /// Send a prepare_ok message to the primary.
- /// Called after successfully writing a prepare to the journal.
async fn send_prepare_ok(&self, header: &PrepareHeader) {
let consensus = self.consensus.as_ref().unwrap();
let journal = self.journal.as_ref().unwrap();
-
- assert_eq!(header.command, Command2::Prepare);
-
- if consensus.status() != Status::Normal {
- debug!(
- replica = consensus.replica(),
- status = ?consensus.status(),
- "send_prepare_ok: not sending (not normal)"
- );
- return;
- }
-
- if consensus.is_syncing() {
- debug!(
- replica = consensus.replica(),
- "send_prepare_ok: not sending (syncing)"
- );
- return;
- }
-
- // Verify we have the prepare and it's persisted (not dirty).
- if journal.handle().header(header.op as usize).is_none() {
- debug!(
- replica = consensus.replica(),
- op = header.op,
- "send_prepare_ok: not sending (not persisted or missing)"
- );
- return;
- }
-
- assert!(
- header.view <= consensus.view(),
- "send_prepare_ok: prepare view {} > our view {}",
- header.view,
- consensus.view()
- );
-
- if header.op > consensus.sequencer().current_sequence() {
- debug!(
- replica = consensus.replica(),
- op = header.op,
- our_op = consensus.sequencer().current_sequence(),
- "send_prepare_ok: not sending (op ahead)"
- );
- return;
- }
-
- debug!(
- replica = consensus.replica(),
- op = header.op,
- checksum = header.checksum,
- "send_prepare_ok: sending"
- );
-
- // Use current view, not the prepare's view.
- let prepare_ok_header = PrepareOkHeader {
- command: Command2::PrepareOk,
- cluster: consensus.cluster(),
- replica: consensus.replica(),
- view: consensus.view(),
- op: header.op,
- commit: consensus.commit(),
- timestamp: header.timestamp,
- parent: header.parent,
- prepare_checksum: header.checksum,
- request: header.request,
- operation: header.operation,
- namespace: header.namespace,
- size: std::mem::size_of::<PrepareOkHeader>() as u32,
- ..Default::default()
- };
-
- let message: Message<PrepareOkHeader> =
-
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
- .transmute_header(|_, new| *new = prepare_ok_header);
- let generic_message = message.into_generic();
- let primary = consensus.primary_index(consensus.view());
-
- if primary == consensus.replica() {
- debug!(
- replica = consensus.replica(),
- "send_prepare_ok: loopback to self"
- );
- // TODO: Queue for self-processing or call handle_prepare_ok
directly
- // TODO: This is temporal, to test simulator, but we should send
message to ourselves properly.
- consensus
- .message_bus()
- .send_to_replica(primary, generic_message)
- .await
- .unwrap();
- } else {
- debug!(
- replica = consensus.replica(),
- to = primary,
- op = header.op,
- "send_prepare_ok: sending to primary"
- );
-
- consensus
- .message_bus()
- .send_to_replica(primary, generic_message)
- .await
- .unwrap();
- }
+ let persisted = journal.handle().header(header.op as usize).is_some();
+ send_prepare_ok_common(consensus, header, Some(persisted)).await;
}
}
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index 08c137fff..7ae61f217 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -23,8 +23,8 @@ pub mod stm;
mod stats;
-// Re-export IggyMetadata and Metadata trait for use in other modules
-pub use impls::metadata::{IggyMetadata, Metadata};
+// Re-export IggyMetadata for use in other modules
+pub use impls::metadata::IggyMetadata;
// Re-export MuxStateMachine for use in other modules
pub use stm::mux::MuxStateMachine;
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 13f55b0c5..47e71dea2 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -17,9 +17,14 @@
use crate::journal::{Noop, PartitionJournal};
use crate::log::SegmentedLog;
-use iggy_common::{ConsumerGroupOffsets, ConsumerOffsets, IggyTimestamp,
PartitionStats};
+use crate::{AppendResult, Partition};
+use iggy_common::{
+ ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError,
IggyMessagesBatchMut,
+ IggyTimestamp, PartitionStats,
+};
+use journal::Journal as _;
use std::sync::Arc;
-use std::sync::atomic::AtomicU64;
+use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex as TokioMutex;
// This struct aliases in terms of the code contained the `LocalPartition from
`core/server/src/streaming/partitions/local_partition.rs`.
@@ -56,3 +61,70 @@ impl IggyPartition {
}
}
}
+
+impl Partition for IggyPartition {
+ async fn append_messages(
+ &mut self,
+ mut batch: IggyMessagesBatchMut,
+ ) -> Result<AppendResult, IggyError> {
+ if batch.count() == 0 {
+ return Ok(AppendResult::new(0, 0, 0));
+ }
+
+ let dirty_offset = if self.should_increment_offset {
+ self.dirty_offset.load(Ordering::Relaxed) + 1
+ } else {
+ 0
+ };
+
+ let segment = self.log.active_segment();
+ let segment_start_offset = segment.start_offset;
+ let current_position = segment.current_position;
+
+ batch
+ .prepare_for_persistence(segment_start_offset, dirty_offset,
current_position, None)
+ .await;
+
+ let batch_messages_count = batch.count();
+ let batch_messages_size = batch.size();
+
+ let last_dirty_offset = if batch_messages_count == 0 {
+ dirty_offset
+ } else {
+ dirty_offset + batch_messages_count as u64 - 1
+ };
+
+ if self.should_increment_offset {
+ self.dirty_offset
+ .store(last_dirty_offset, Ordering::Relaxed);
+ } else {
+ self.should_increment_offset = true;
+ self.dirty_offset
+ .store(last_dirty_offset, Ordering::Relaxed);
+ }
+
+ let segment_index = self.log.segments().len() - 1;
+ self.log.segments_mut()[segment_index].current_position +=
batch_messages_size;
+
+ let journal = self.log.journal_mut();
+ journal.info.messages_count += batch_messages_count;
+ journal.info.size += IggyByteSize::from(batch_messages_size as u64);
+ journal.info.current_offset = last_dirty_offset;
+ if let Some(ts) = batch.first_timestamp()
+ && journal.info.first_timestamp == 0
+ {
+ journal.info.first_timestamp = ts;
+ }
+ if let Some(ts) = batch.last_timestamp() {
+ journal.info.end_timestamp = ts;
+ }
+
+ journal.inner.append(batch).await;
+
+ Ok(AppendResult::new(
+ dirty_offset,
+ last_dirty_offset,
+ batch_messages_count,
+ ))
+ }
+}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index b3cdf414c..54891dfdd 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -18,17 +18,20 @@
#![allow(dead_code)]
use crate::IggyPartition;
-use crate::Partitions;
+use crate::Partition;
use crate::types::PartitionsConfig;
-use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use consensus::{
+ Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus,
ack_preflight,
+ ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
pipeline_prepare_common,
+ replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as
send_prepare_ok_common,
+};
use iggy_common::{
INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut,
PartitionStats, PooledBuffer,
Segment, SegmentStorage,
- header::{Command2, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader, ReplyHeader},
+ header::{GenericHeader, Operation, PrepareHeader},
message::Message,
sharding::{IggyNamespace, LocalIdx, ShardId},
};
-use journal::Journal as _;
use message_bus::MessageBus;
use std::cell::UnsafeCell;
use std::collections::HashMap;
@@ -326,7 +329,7 @@ impl<C> IggyPartitions<C> {
}
}
-impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
@@ -335,7 +338,7 @@ where
debug!("handling partition request");
let prepare = message.project(consensus);
- self.pipeline_prepare(prepare).await;
+ pipeline_prepare_common(consensus, prepare, |prepare|
self.on_replicate(prepare)).await;
}
async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::ReplicateMessage) {
@@ -343,48 +346,24 @@ where
let header = message.header();
- assert_eq!(header.command, Command2::Prepare);
+ let current_op = match replicate_preflight(consensus, header) {
+ Ok(current_op) => current_op,
+ Err(reason) => {
+ warn!(
+ replica = consensus.replica(),
+ "on_replicate: ignoring ({reason})"
+ );
+ return;
+ }
+ };
- if !self.fence_old_prepare(&message) {
+ let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
+ if !is_old_prepare {
self.replicate(message.clone()).await;
} else {
warn!("received old prepare, not replicating");
}
- // If syncing, ignore the replicate message.
- if consensus.is_syncing() {
- warn!(
- replica = consensus.replica(),
- "on_replicate: ignoring (sync)"
- );
- return;
- }
-
- let current_op = consensus.sequencer().current_sequence();
-
- // If status is not normal, ignore the replicate.
- if consensus.status() != Status::Normal {
- warn!(
- replica = consensus.replica(),
- "on_replicate: ignoring (not normal state)"
- );
- return;
- }
-
- // If message from future view, we ignore the replicate.
- if header.view > consensus.view() {
- warn!(
- replica = consensus.replica(),
- "on_replicate: ignoring (newer view)"
- );
- return;
- }
-
- // If we are a follower, we advance the commit number.
- if consensus.is_follower() {
- consensus.advance_commit_number(message.header().commit);
- }
-
// TODO: Make those assertions be toggleable through an feature flag,
so they can be used only by simulator/tests.
debug_assert_eq!(header.op, current_op + 1);
consensus.sequencer().set_sequence(header.op);
@@ -393,36 +372,7 @@ where
// In metadata layer we assume that when an `on_request` or
`on_replicate` is called, it's called from correct shard.
// I think we need to do the same here, which means that the code from
below is unfallable, the partition should always exist by now!
let namespace = IggyNamespace::from_raw(header.namespace);
- match header.operation {
- Operation::SendMessages => {
- let body = message.body_bytes();
- let batch = Self::batch_from_body(&body);
- self.append_batch(&namespace, batch).await;
- debug!(
- replica = consensus.replica(),
- op = header.op,
- ?namespace,
- "on_replicate: batch appended to partition journal"
- );
- }
- Operation::StoreConsumerOffset => {
- // TODO: Deserialize consumer offset from prepare body
- // and store in partition's consumer_offsets.
- debug!(
- replica = consensus.replica(),
- op = header.op,
- "on_replicate: consumer offset stored"
- );
- }
- _ => {
- warn!(
- replica = consensus.replica(),
- op = header.op,
- "on_replicate: unexpected operation {:?}",
- header.operation
- );
- }
- }
+ self.apply_replicated_operation(&message, &namespace).await;
// After successful journal write, send prepare_ok to primary.
self.send_prepare_ok(header).await;
@@ -437,17 +387,11 @@ where
let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
- if !consensus.is_primary() {
- warn!("on_ack: ignoring (not primary)");
- return;
- }
-
- if consensus.status() != Status::Normal {
- warn!("on_ack: ignoring (not normal)");
+ if let Err(reason) = ack_preflight(consensus) {
+ warn!("on_ack: ignoring ({reason})");
return;
}
- // Verify checksum by checking pipeline entry exists
{
let pipeline = consensus.pipeline().borrow();
let Some(entry) =
@@ -463,21 +407,21 @@ where
}
}
- // Let consensus handle the ack increment and quorum check
- if consensus.handle_prepare_ok(header) {
+ if ack_quorum_reached(consensus, header) {
debug!("on_ack: quorum received for op={}", header.op);
- consensus.advance_commit_number(header.op);
// Extract the prepare message from the pipeline by op
// TODO: Commit from the head. ALWAYS
let entry =
consensus.pipeline().borrow_mut().extract_by_op(header.op);
- let Some(entry) = entry else {
+ let Some(PipelineEntry {
+ header: prepare_header,
+ ..
+ }) = entry
+ else {
warn!("on_ack: prepare not found in pipeline for op={}",
header.op);
return;
};
- let prepare_header = entry.header;
-
// Data was already appended to the partition journal during
// on_replicate. Now that quorum is reached, update the partition's
// current offset and check whether the journal needs flushing.
@@ -503,32 +447,8 @@ where
}
}
- // TODO: Figure out better infra for this, its messy.
- let reply =
Message::<ReplyHeader>::new(std::mem::size_of::<ReplyHeader>())
- .transmute_header(|_, new| {
- *new = ReplyHeader {
- checksum: 0,
- checksum_body: 0,
- cluster: consensus.cluster(),
- size: std::mem::size_of::<ReplyHeader>() as u32,
- view: consensus.view(),
- release: 0,
- command: Command2::Reply,
- replica: consensus.replica(),
- reserved_frame: [0; 66],
- request_checksum: prepare_header.request_checksum,
- context: 0,
- op: prepare_header.op,
- commit: consensus.commit(),
- timestamp: prepare_header.timestamp,
- request: prepare_header.request,
- operation: prepare_header.operation,
- ..Default::default()
- };
- });
-
// Send reply to client
- let generic_reply = reply.into_generic();
+ let generic_reply = build_reply_message(consensus,
&prepare_header).into_generic();
debug!(
"on_ack: sending reply to client={} for op={}",
prepare_header.client, prepare_header.op
@@ -566,6 +486,51 @@ where
IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
}
+ async fn apply_replicated_operation(
+ &self,
+ message: &Message<PrepareHeader>,
+ namespace: &IggyNamespace,
+ ) {
+ let consensus = self.consensus.as_ref().unwrap();
+ let header = message.header();
+
+ match header.operation {
+ Operation::SendMessages => {
+ let body = message.body_bytes();
+ self.append_send_messages_to_journal(namespace, body.as_ref())
+ .await;
+ debug!(
+ replica = consensus.replica(),
+ op = header.op,
+ ?namespace,
+ "on_replicate: send_messages appended to partition journal"
+ );
+ }
+ Operation::StoreConsumerOffset => {
+ // TODO: Deserialize consumer offset from prepare body
+ // and store in partition's consumer_offsets.
+ debug!(
+ replica = consensus.replica(),
+ op = header.op,
+ "on_replicate: consumer offset stored"
+ );
+ }
+ _ => {
+ warn!(
+ replica = consensus.replica(),
+ op = header.op,
+ "on_replicate: unexpected operation {:?}",
+ header.operation
+ );
+ }
+ }
+ }
+
+ async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace,
body: &[u8]) {
+ let batch = Self::batch_from_body(body);
+ self.append_messages_to_journal(namespace, batch).await;
+ }
+
/// Append a batch to a partition's journal with offset assignment.
///
/// Updates `segment.current_position` (logical position for indexing) but
@@ -574,91 +539,15 @@ where
///
/// Uses `dirty_offset` for offset assignment so that multiple prepares
/// can be pipelined before any commit.
- async fn append_batch(&self, namespace: &IggyNamespace, mut batch:
IggyMessagesBatchMut) {
+ async fn append_messages_to_journal(
+ &self,
+ namespace: &IggyNamespace,
+ batch: IggyMessagesBatchMut,
+ ) {
let partition = self
.get_mut_by_ns(namespace)
- .expect("append_batch: partition not found for namespace");
-
- if batch.count() == 0 {
- return;
- }
-
- let dirty_offset = if partition.should_increment_offset {
- partition.dirty_offset.load(Ordering::Relaxed) + 1
- } else {
- 0
- };
-
- let segment = partition.log.active_segment();
- let segment_start_offset = segment.start_offset;
- let current_position = segment.current_position;
-
- batch
- .prepare_for_persistence(segment_start_offset, dirty_offset,
current_position, None)
- .await;
-
- let batch_messages_count = batch.count();
- let batch_messages_size = batch.size();
-
- // Advance dirty offset (committed offset is advanced in on_ack).
- let last_dirty_offset = if batch_messages_count == 0 {
- dirty_offset
- } else {
- dirty_offset + batch_messages_count as u64 - 1
- };
-
- if partition.should_increment_offset {
- partition
- .dirty_offset
- .store(last_dirty_offset, Ordering::Relaxed);
- } else {
- partition.should_increment_offset = true;
- partition
- .dirty_offset
- .store(last_dirty_offset, Ordering::Relaxed);
- }
-
- // Update segment.current_position for next prepare_for_persistence
call.
- // This is the logical position (includes unflushed journal data).
- // segment.size is only updated after actual persist (in
persist_frozen_batches_to_disk).
- let segment_index = partition.log.segments().len() - 1;
- partition.log.segments_mut()[segment_index].current_position +=
batch_messages_size;
-
- // Update journal tracking metadata.
- let journal = partition.log.journal_mut();
- journal.info.messages_count += batch_messages_count;
- journal.info.size += IggyByteSize::from(batch_messages_size as u64);
- journal.info.current_offset = last_dirty_offset;
- if let Some(ts) = batch.first_timestamp()
- && journal.info.first_timestamp == 0
- {
- journal.info.first_timestamp = ts;
- }
- if let Some(ts) = batch.last_timestamp() {
- journal.info.end_timestamp = ts;
- }
-
- journal.inner.append(batch).await;
- }
-
- async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
- let consensus = self.consensus.as_ref().unwrap();
-
- debug!("inserting prepare into partition pipeline");
- consensus.verify_pipeline();
- consensus.pipeline_message(prepare.clone());
-
- self.on_replicate(prepare.clone()).await;
- consensus.post_replicate_verify(&prepare);
- }
-
- fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
- let consensus = self.consensus.as_ref().unwrap();
-
- let header = prepare.header();
- // TODO: Check per-partition journal once namespace extraction is
possible.
- // For now, only check if the op is already committed.
- header.op <= consensus.commit()
+ .expect("append_messages_to_journal: partition not found for
namespace");
+ let _ = partition.append_messages(batch).await;
}
/// Replicate a prepare message to the next replica in the chain.
@@ -669,55 +558,7 @@ where
/// - Stops when we would forward back to primary
async fn replicate(&self, message: Message<PrepareHeader>) {
let consensus = self.consensus.as_ref().unwrap();
-
- let header = message.header();
-
- assert_eq!(header.command, Command2::Prepare);
- assert!(header.op > consensus.commit());
-
- let next = (consensus.replica() + 1) % consensus.replica_count();
-
- let primary = consensus.primary_index(header.view);
- if next == primary {
- debug!(
- replica = consensus.replica(),
- op = header.op,
- "replicate: not replicating (ring complete)"
- );
- return;
- }
-
- assert_ne!(next, consensus.replica());
-
- debug!(
- replica = consensus.replica(),
- to = next,
- op = header.op,
- "replicate: forwarding"
- );
-
- let message = message.into_generic();
- consensus
- .message_bus()
- .send_to_replica(next, message)
- .await
- .unwrap();
- }
-
- /// Verify hash chain would not break if we add this header.
- fn panic_if_hash_chain_would_break_in_same_view(
- &self,
- previous: &PrepareHeader,
- current: &PrepareHeader,
- ) {
- // If both headers are in the same view, parent must chain correctly
- if previous.view == current.view {
- assert_eq!(
- current.parent, previous.checksum,
- "hash chain broken in same view: op={} parent={} expected={}",
- current.op, current.parent, previous.checksum
- );
- }
+ replicate_to_next_in_chain(consensus, message).await;
}
fn commit_journal(&self) {
@@ -945,107 +786,11 @@ where
debug!(?namespace, start_offset, "rotated to new segment");
}
- /// Send a prepare_ok message to the primary.
- /// Called after successfully writing a prepare to the journal.
async fn send_prepare_ok(&self, header: &PrepareHeader) {
let consensus = self.consensus.as_ref().unwrap();
-
- assert_eq!(header.command, Command2::Prepare);
-
- if consensus.status() != Status::Normal {
- debug!(
- replica = consensus.replica(),
- status = ?consensus.status(),
- "send_prepare_ok: not sending (not normal)"
- );
- return;
- }
-
- if consensus.is_syncing() {
- debug!(
- replica = consensus.replica(),
- "send_prepare_ok: not sending (syncing)"
- );
- return;
- }
-
// TODO: Verify the prepare is persisted in the partition journal.
// The partition journal uses MessageLookup headers, so we cannot
// check by PrepareHeader.op directly. For now, skip this check.
-
- assert!(
- header.view <= consensus.view(),
- "send_prepare_ok: prepare view {} > our view {}",
- header.view,
- consensus.view()
- );
-
- if header.op > consensus.sequencer().current_sequence() {
- debug!(
- replica = consensus.replica(),
- op = header.op,
- our_op = consensus.sequencer().current_sequence(),
- "send_prepare_ok: not sending (op ahead)"
- );
- return;
- }
-
- debug!(
- replica = consensus.replica(),
- op = header.op,
- checksum = header.checksum,
- "send_prepare_ok: sending"
- );
-
- // Use current view, not the prepare's view.
- let prepare_ok_header = PrepareOkHeader {
- command: Command2::PrepareOk,
- cluster: consensus.cluster(),
- replica: consensus.replica(),
- view: consensus.view(),
- op: header.op,
- commit: consensus.commit(),
- timestamp: header.timestamp,
- parent: header.parent,
- prepare_checksum: header.checksum,
- request: header.request,
- operation: header.operation,
- namespace: header.namespace,
- size: std::mem::size_of::<PrepareOkHeader>() as u32,
- ..Default::default()
- };
-
- let message: Message<PrepareOkHeader> =
-
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
- .transmute_header(|_, new| *new = prepare_ok_header);
- let generic_message = message.into_generic();
- let primary = consensus.primary_index(consensus.view());
-
- if primary == consensus.replica() {
- debug!(
- replica = consensus.replica(),
- "send_prepare_ok: loopback to self"
- );
- // TODO: Queue for self-processing or call handle_prepare_ok
directly
- // TODO: This is temporal, to test simulator, but we should send
message to ourselves properly.
- consensus
- .message_bus()
- .send_to_replica(primary, generic_message)
- .await
- .unwrap();
- } else {
- debug!(
- replica = consensus.replica(),
- to = primary,
- op = header.op,
- "send_prepare_ok: sending to primary"
- );
-
- consensus
- .message_bus()
- .send_to_replica(primary, generic_message)
- .await
- .unwrap();
- }
+ send_prepare_ok_common(consensus, header, None).await;
}
}
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 01ad8da1d..d92b078c4 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -21,7 +21,7 @@ mod journal;
mod log;
mod types;
-use consensus::Consensus;
+use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet};
pub use iggy_partition::IggyPartition;
pub use iggy_partitions::IggyPartitions;
pub use types::{
@@ -29,20 +29,40 @@ pub use types::{
SendMessagesResult,
};
-// TODO: Figure out how this can be somehow merged with `Metadata` trait, in a
sense, where the `Metadata` trait would be gone
-// and something more general purpose is put in the place.
-
-/// Consensus lifecycle for partition operations (mirrors `Metadata<C>`).
+/// Partition-level data plane operations.
///
-/// Handles the VSR replication flow for partition writes:
-/// - `on_request`: Primary receives a client write, projects to Prepare,
pipelines it
-/// - `on_replicate`: Replica receives Prepare, appends to journal, sends
PrepareOk
-/// - `on_ack`: Primary receives PrepareOk, checks quorum, commits
-pub trait Partitions<C>
-where
- C: Consensus,
-{
- fn on_request(&self, message: C::RequestMessage) -> impl Future<Output =
()>;
- fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output
= ()>;
- fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>;
+/// `send_messages` MUST only append to the partition journal (prepare phase),
+/// without committing/persisting to disk.
+pub trait Partition {
+ fn append_messages(
+ &mut self,
+ batch: IggyMessagesBatchMut,
+ ) -> impl Future<Output = Result<AppendResult, IggyError>>;
+
+ fn poll_messages(
+ &self,
+ consumer: PollingConsumer,
+ args: PollingArgs,
+ ) -> impl Future<Output = Result<IggyMessagesBatchSet, IggyError>> {
+ let _ = (consumer, args);
+ async { Err(IggyError::FeatureUnavailable) }
+ }
+
+ fn store_consumer_offset(
+ &self,
+ consumer: PollingConsumer,
+ offset: u64,
+ ) -> Result<(), IggyError> {
+ let _ = (consumer, offset);
+ Err(IggyError::FeatureUnavailable)
+ }
+
+ fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> {
+ let _ = consumer;
+ None
+ }
+
+ fn offsets(&self) -> PartitionOffsets {
+ PartitionOffsets::default()
+ }
}
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index c808481a2..44e063f78 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -21,11 +21,10 @@ pub mod deps;
pub mod replica;
use bus::MemBus;
+use consensus::Plane;
use iggy_common::header::{GenericHeader, ReplyHeader};
use iggy_common::message::{Message, MessageBag};
use message_bus::MessageBus;
-use metadata::Metadata;
-use partitions::Partitions;
use replica::Replica;
use std::sync::Arc;