This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch plane_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit a7274f1c7b53304fade06d5d8f3a63c91f15de80 Author: numinex <[email protected]> AuthorDate: Mon Feb 16 17:26:04 2026 +0100 m --- core/consensus/src/lib.rs | 15 ++++ core/metadata/src/impls/metadata.rs | 20 +---- core/metadata/src/lib.rs | 4 +- core/partitions/src/iggy_partition.rs | 76 +++++++++++++++- core/partitions/src/iggy_partitions.rs | 153 ++++++++++++--------------------- core/partitions/src/lib.rs | 52 +++++++---- core/simulator/src/lib.rs | 3 +- 7 files changed, 188 insertions(+), 135 deletions(-) diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 5c3e152e6..9a7fd5bae 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -66,6 +66,21 @@ pub trait Consensus: Sized { fn is_syncing(&self) -> bool; } +/// Shared consensus lifecycle interface for control/data planes. +/// +/// This abstracts the VSR message flow: +/// - request -> prepare +/// - replicate (prepare) +/// - ack (prepare_ok) +pub trait Plane<C> +where + C: Consensus, +{ + fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = ()>; + fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; + fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>; +} + mod impls; pub use impls::*; diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 77a174e39..c4a55fc54 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -16,7 +16,9 @@ // under the License. use crate::stm::StateMachine; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; -use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer, Status, VsrConsensus}; +use consensus::{ + Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, Status, VsrConsensus, +}; use iggy_common::{ header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader}, message::Message, @@ -79,20 +81,6 @@ impl Snapshot for IggySnapshot { } } -pub trait Metadata<C> -where - C: Consensus, -{ - /// Handle a request message. - fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = ()>; - - /// Handle a replicate message (Prepare in VSR). - fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; - - /// Handle an ack message (PrepareOk in VSR). - fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>; -} - #[derive(Debug)] pub struct IggyMetadata<C, J, S, M> { /// Some on shard0, None on other shards @@ -105,7 +93,7 @@ pub struct IggyMetadata<C, J, S, M> { pub mux_stm: M, } -impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, P>, J, S, M> +impl<B, P, J, S, M> Plane<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, P>, J, S, M> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs index 08c137fff..7ae61f217 100644 --- a/core/metadata/src/lib.rs +++ b/core/metadata/src/lib.rs @@ -23,8 +23,8 @@ pub mod stm; mod stats; -// Re-export IggyMetadata and Metadata trait for use in other modules -pub use impls::metadata::{IggyMetadata, Metadata}; +// Re-export IggyMetadata for use in other modules +pub use impls::metadata::IggyMetadata; // Re-export MuxStateMachine for use in other modules pub use stm::mux::MuxStateMachine; diff --git a/core/partitions/src/iggy_partition.rs b/core/partitions/src/iggy_partition.rs index 13f55b0c5..47e71dea2 100644 --- a/core/partitions/src/iggy_partition.rs +++ b/core/partitions/src/iggy_partition.rs @@ -17,9 +17,14 @@ use crate::journal::{Noop, PartitionJournal}; use crate::log::SegmentedLog; -use iggy_common::{ConsumerGroupOffsets, ConsumerOffsets, IggyTimestamp, PartitionStats}; +use crate::{AppendResult, Partition}; +use iggy_common::{ + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyMessagesBatchMut, + IggyTimestamp, PartitionStats, +}; +use journal::Journal as _; use std::sync::Arc; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex as TokioMutex; // This struct aliases in terms of the code contained the `LocalPartition from `core/server/src/streaming/partitions/local_partition.rs`. @@ -56,3 +61,70 @@ impl IggyPartition { } } } + +impl Partition for IggyPartition { + async fn append_messages( + &mut self, + mut batch: IggyMessagesBatchMut, + ) -> Result<AppendResult, IggyError> { + if batch.count() == 0 { + return Ok(AppendResult::new(0, 0, 0)); + } + + let dirty_offset = if self.should_increment_offset { + self.dirty_offset.load(Ordering::Relaxed) + 1 + } else { + 0 + }; + + let segment = self.log.active_segment(); + let segment_start_offset = segment.start_offset; + let current_position = segment.current_position; + + batch + .prepare_for_persistence(segment_start_offset, dirty_offset, current_position, None) + .await; + + let batch_messages_count = batch.count(); + let batch_messages_size = batch.size(); + + let last_dirty_offset = if batch_messages_count == 0 { + dirty_offset + } else { + dirty_offset + batch_messages_count as u64 - 1 + }; + + if self.should_increment_offset { + self.dirty_offset + .store(last_dirty_offset, Ordering::Relaxed); + } else { + self.should_increment_offset = true; + self.dirty_offset + .store(last_dirty_offset, Ordering::Relaxed); + } + + let segment_index = self.log.segments().len() - 1; + self.log.segments_mut()[segment_index].current_position += batch_messages_size; + + let journal = self.log.journal_mut(); + journal.info.messages_count += batch_messages_count; + journal.info.size += IggyByteSize::from(batch_messages_size as u64); + journal.info.current_offset = last_dirty_offset; + if let Some(ts) = batch.first_timestamp() + && journal.info.first_timestamp == 0 + { + journal.info.first_timestamp = ts; + } + if let Some(ts) = batch.last_timestamp() { + journal.info.end_timestamp = ts; + } + + journal.inner.append(batch).await; + + Ok(AppendResult::new( + dirty_offset, + last_dirty_offset, + batch_messages_count, + )) + } +} diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index b3cdf414c..305f2dfb2 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -18,9 +18,9 @@ #![allow(dead_code)] use crate::IggyPartition; -use crate::Partitions; +use crate::Partition; use crate::types::PartitionsConfig; -use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus}; +use consensus::{Consensus, Plane, Project, Sequencer, Status, VsrConsensus}; use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, Segment, SegmentStorage, @@ -28,7 +28,6 @@ use iggy_common::{ message::Message, sharding::{IggyNamespace, LocalIdx, ShardId}, }; -use journal::Journal as _; use message_bus::MessageBus; use std::cell::UnsafeCell; use std::collections::HashMap; @@ -326,7 +325,7 @@ impl<C> IggyPartitions<C> { } } -impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>> +impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, { @@ -393,36 +392,7 @@ where // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! let namespace = IggyNamespace::from_raw(header.namespace); - match header.operation { - Operation::SendMessages => { - let body = message.body_bytes(); - let batch = Self::batch_from_body(&body); - self.append_batch(&namespace, batch).await; - debug!( - replica = consensus.replica(), - op = header.op, - ?namespace, - "on_replicate: batch appended to partition journal" - ); - } - Operation::StoreConsumerOffset => { - // TODO: Deserialize consumer offset from prepare body - // and store in partition's consumer_offsets. - debug!( - replica = consensus.replica(), - op = header.op, - "on_replicate: consumer offset stored" - ); - } - _ => { - warn!( - replica = consensus.replica(), - op = header.op, - "on_replicate: unexpected operation {:?}", - header.operation - ); - } - } + self.apply_replicated_operation(&message, &namespace).await; // After successful journal write, send prepare_ok to primary. self.send_prepare_ok(header).await; @@ -566,6 +536,51 @@ where IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages) } + async fn apply_replicated_operation( + &self, + message: &Message<PrepareHeader>, + namespace: &IggyNamespace, + ) { + let consensus = self.consensus.as_ref().unwrap(); + let header = message.header(); + + match header.operation { + Operation::SendMessages => { + let body = message.body_bytes(); + self.append_send_messages_to_journal(namespace, body.as_ref()) + .await; + debug!( + replica = consensus.replica(), + op = header.op, + ?namespace, + "on_replicate: send_messages appended to partition journal" + ); + } + Operation::StoreConsumerOffset => { + // TODO: Deserialize consumer offset from prepare body + // and store in partition's consumer_offsets. + debug!( + replica = consensus.replica(), + op = header.op, + "on_replicate: consumer offset stored" + ); + } + _ => { + warn!( + replica = consensus.replica(), + op = header.op, + "on_replicate: unexpected operation {:?}", + header.operation + ); + } + } + } + + async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, body: &[u8]) { + let batch = Self::batch_from_body(body); + self.append_messages_to_journal(namespace, batch).await; + } + /// Append a batch to a partition's journal with offset assignment. /// /// Updates `segment.current_position` (logical position for indexing) but @@ -574,71 +589,15 @@ where /// /// Uses `dirty_offset` for offset assignment so that multiple prepares /// can be pipelined before any commit. - async fn append_batch(&self, namespace: &IggyNamespace, mut batch: IggyMessagesBatchMut) { + async fn append_messages_to_journal( + &self, + namespace: &IggyNamespace, + batch: IggyMessagesBatchMut, + ) { let partition = self .get_mut_by_ns(namespace) - .expect("append_batch: partition not found for namespace"); - - if batch.count() == 0 { - return; - } - - let dirty_offset = if partition.should_increment_offset { - partition.dirty_offset.load(Ordering::Relaxed) + 1 - } else { - 0 - }; - - let segment = partition.log.active_segment(); - let segment_start_offset = segment.start_offset; - let current_position = segment.current_position; - - batch - .prepare_for_persistence(segment_start_offset, dirty_offset, current_position, None) - .await; - - let batch_messages_count = batch.count(); - let batch_messages_size = batch.size(); - - // Advance dirty offset (committed offset is advanced in on_ack). - let last_dirty_offset = if batch_messages_count == 0 { - dirty_offset - } else { - dirty_offset + batch_messages_count as u64 - 1 - }; - - if partition.should_increment_offset { - partition - .dirty_offset - .store(last_dirty_offset, Ordering::Relaxed); - } else { - partition.should_increment_offset = true; - partition - .dirty_offset - .store(last_dirty_offset, Ordering::Relaxed); - } - - // Update segment.current_position for next prepare_for_persistence call. - // This is the logical position (includes unflushed journal data). - // segment.size is only updated after actual persist (in persist_frozen_batches_to_disk). - let segment_index = partition.log.segments().len() - 1; - partition.log.segments_mut()[segment_index].current_position += batch_messages_size; - - // Update journal tracking metadata. - let journal = partition.log.journal_mut(); - journal.info.messages_count += batch_messages_count; - journal.info.size += IggyByteSize::from(batch_messages_size as u64); - journal.info.current_offset = last_dirty_offset; - if let Some(ts) = batch.first_timestamp() - && journal.info.first_timestamp == 0 - { - journal.info.first_timestamp = ts; - } - if let Some(ts) = batch.last_timestamp() { - journal.info.end_timestamp = ts; - } - - journal.inner.append(batch).await; + .expect("append_messages_to_journal: partition not found for namespace"); + let _ = partition.append_messages(batch).await; } async fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) { diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs index 01ad8da1d..d92b078c4 100644 --- a/core/partitions/src/lib.rs +++ b/core/partitions/src/lib.rs @@ -21,7 +21,7 @@ mod journal; mod log; mod types; -use consensus::Consensus; +use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet}; pub use iggy_partition::IggyPartition; pub use iggy_partitions::IggyPartitions; pub use types::{ @@ -29,20 +29,40 @@ pub use types::{ SendMessagesResult, }; -// TODO: Figure out how this can be somehow merged with `Metadata` trait, in a sense, where the `Metadata` trait would be gone -// and something more general purpose is put in the place. - -/// Consensus lifecycle for partition operations (mirrors `Metadata<C>`). +/// Partition-level data plane operations. /// -/// Handles the VSR replication flow for partition writes: -/// - `on_request`: Primary receives a client write, projects to Prepare, pipelines it -/// - `on_replicate`: Replica receives Prepare, appends to journal, sends PrepareOk -/// - `on_ack`: Primary receives PrepareOk, checks quorum, commits -pub trait Partitions<C> -where - C: Consensus, -{ - fn on_request(&self, message: C::RequestMessage) -> impl Future<Output = ()>; - fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; - fn on_ack(&self, message: C::AckMessage) -> impl Future<Output = ()>; +/// `send_messages` MUST only append to the partition journal (prepare phase), +/// without committing/persisting to disk. +pub trait Partition { + fn append_messages( + &mut self, + batch: IggyMessagesBatchMut, + ) -> impl Future<Output = Result<AppendResult, IggyError>>; + + fn poll_messages( + &self, + consumer: PollingConsumer, + args: PollingArgs, + ) -> impl Future<Output = Result<IggyMessagesBatchSet, IggyError>> { + let _ = (consumer, args); + async { Err(IggyError::FeatureUnavailable) } + } + + fn store_consumer_offset( + &self, + consumer: PollingConsumer, + offset: u64, + ) -> Result<(), IggyError> { + let _ = (consumer, offset); + Err(IggyError::FeatureUnavailable) + } + + fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> { + let _ = consumer; + None + } + + fn offsets(&self) -> PartitionOffsets { + PartitionOffsets::default() + } } diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index c808481a2..44e063f78 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -21,11 +21,10 @@ pub mod deps; pub mod replica; use bus::MemBus; +use consensus::Plane; use iggy_common::header::{GenericHeader, ReplyHeader}; use iggy_common::message::{Message, MessageBag}; use message_bus::MessageBus; -use metadata::Metadata; -use partitions::Partitions; use replica::Replica; use std::sync::Arc;
