This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch journal_handle in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 78bca6a0ef5c8e6920475da54ac88d85840e6598 Author: numinex <[email protected]> AuthorDate: Tue Jan 27 13:23:11 2026 +0100 feat(metadata): expose metadata generics and create journal handle --- core/journal/src/lib.rs | 24 ++++++++-- core/metadata/src/impls/metadata.rs | 89 +++++++++++++++---------------------- 2 files changed, 56 insertions(+), 57 deletions(-) diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs index 2aa693c68..b091bcd3d 100644 --- a/core/journal/src/lib.rs +++ b/core/journal/src/lib.rs @@ -17,11 +17,14 @@ // TODO: We already have a `Journal` trait inside of the `storage` module `journal.rs` file. // But the interface was designed for partition log, not an generic journal. -pub trait Journal { - type Entry; +pub trait Journal<S> +where + S: Storage, +{ type Header; + type Entry; - fn has_prepare(&self, header: &Self::Header) -> bool; + fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>; fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>; @@ -29,3 +32,18 @@ pub trait Journal { fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>; } + +// TODO: Move to other crate. +pub trait Storage { + type Buffer; + + fn write(&self, buf: Self::Buffer) -> usize; + fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer; +} + +pub trait JournalHandle { + type Storage: Storage; + type Target: Journal<Self::Storage>; + + fn handle(&self) -> &Self::Target; +} diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index e1bee5f09..b55780e1e 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -14,74 +14,50 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use std::marker::PhantomData; - use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus}; use iggy_common::{ header::{Command2, PrepareHeader, PrepareOkHeader}, message::Message, }; -use journal::Journal; +use journal::{Journal, JournalHandle}; use message_bus::MessageBus; use tracing::{debug, warn}; -// TODO: Define a trait (probably in some external crate) #[expect(unused)] -trait Metadata { +pub trait Metadata<C> +where + C: Consensus, +{ + /// Handle a request message. + fn on_request(&self, message: C::RequestMessage); + /// Handle a replicate message (Prepare in VSR). - fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage); + fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output = ()>; /// Handle an ack message (PrepareOk in VSR). - fn on_replicate( - &self, - message: <VsrConsensus as Consensus>::ReplicateMessage, - ) -> impl Future<Output = ()>; - fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage); -} - -pub trait MetadataHandle { - type Consensus; - type Journal; - type Snapshot; - type StateMachine; + fn on_ack(&self, message: C::AckMessage); } -/// Concrete implementation of `MetadataHandle` for Iggy. -/// This is a marker struct that only holds type information. -pub struct IggyMetadataHandle<J, S, M> { - _marker: PhantomData<(J, S, M)>, -} - -impl<J, S, M> MetadataHandle for IggyMetadataHandle<J, S, M> -where - J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = PrepareHeader>, -{ - type Consensus = VsrConsensus; - type Journal = J; - type Snapshot = S; - type StateMachine = M; -} - -// ============================================================================= -// IggyMetadata -// ============================================================================= - #[expect(unused)] -struct IggyMetadata<H: MetadataHandle> { +struct IggyMetadata<C, J, S, M> { /// Some on shard0, None on other shards - consensus: Option<H::Consensus>, + consensus: Option<C>, /// Some on shard0, None on other shards - journal: Option<H::Journal>, + journal: Option<J>, /// Some on shard0, None on other shards - snapshot: Option<H::Snapshot>, + snapshot: Option<S>, /// State machine - lives on all shards - mux_stm: H::StateMachine, + mux_stm: M, } -// TODO: Handle the `routing` of messages to shard0, on the callsite. -impl<J, S, M> Metadata for IggyMetadata<IggyMetadataHandle<J, S, M>> +impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M> where - J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = PrepareHeader>, + J: JournalHandle, + J::Target: Journal< + J::Storage, + Entry = <VsrConsensus as Consensus>::ReplicateMessage, + Header = PrepareHeader, + >, { fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage) { let consensus = self.consensus.as_ref().unwrap(); @@ -161,17 +137,17 @@ where // TODO handle gap in ops. // Verify hash chain integrity. - if let Some(previous) = journal.previous_entry(header) { + if let Some(previous) = journal.handle().previous_entry(header) { self.panic_if_hash_chain_would_break_in_same_view(&previous, header); } assert_eq!(header.op, current_op + 1); consensus.sequencer().set_sequence(header.op); - journal.set_header_as_dirty(header); + journal.handle().set_header_as_dirty(header); // Append to journal. - journal.append(message.clone()).await; + journal.handle().append(message.clone()).await; // After successful journal write, send prepare_ok to primary. self.send_prepare_ok(header).await; @@ -225,9 +201,14 @@ where } } -impl<J, S, M> IggyMetadata<IggyMetadataHandle<J, S, M>> +impl<J, SS, M> IggyMetadata<VsrConsensus, J, SS, M> where - J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = PrepareHeader>, + J: JournalHandle, + J::Target: Journal< + J::Storage, + Entry = <VsrConsensus as Consensus>::ReplicateMessage, + Header = PrepareHeader, + >, { #[expect(unused)] fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) { @@ -247,7 +228,7 @@ where }; let header = prepare.header(); - header.op <= consensus.commit() || journal.has_prepare(header) + header.op <= consensus.commit() || journal.handle().entry(header).is_some() } /// Replicate a prepare message to the next replica in the chain. @@ -264,7 +245,7 @@ where assert_eq!(header.command, Command2::Prepare); assert!( - !journal.has_prepare(header), + !journal.handle().entry(header).is_some(), "replicate: must not already have prepare" ); assert!(header.op > consensus.commit()); @@ -354,7 +335,7 @@ where } // Verify we have the prepare and it's persisted (not dirty). - if !journal.has_prepare(header) { + if journal.handle().entry(header).is_none() { debug!( replica = consensus.replica(), op = header.op,
