This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f2c56d8fa feat(metadata): expose metadata generics and create journal 
handle (#2624)
f2c56d8fa is described below

commit f2c56d8facbb9d7cbf2359d611d386e0e6d0307d
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jan 30 14:58:43 2026 +0100

    feat(metadata): expose metadata generics and create journal handle (#2624)
---
 core/journal/src/lib.rs             | 24 ++++++++--
 core/metadata/src/impls/metadata.rs | 89 +++++++++++++++----------------------
 2 files changed, 56 insertions(+), 57 deletions(-)

diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 2aa693c68..b091bcd3d 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -17,11 +17,14 @@
 
 // TODO: We already have a `Journal` trait inside of the `storage` module 
`journal.rs` file.
 // But the interface was designed for partition log, not an generic journal.
-pub trait Journal {
-    type Entry;
+pub trait Journal<S>
+where
+    S: Storage,
+{
     type Header;
+    type Entry;
 
-    fn has_prepare(&self, header: &Self::Header) -> bool;
+    fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>;
 
     fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
 
@@ -29,3 +32,18 @@ pub trait Journal {
 
     fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
 }
+
+// TODO: Move to other crate.
+pub trait Storage {
+    type Buffer;
+
+    fn write(&self, buf: Self::Buffer) -> usize;
+    fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer;
+}
+
+pub trait JournalHandle {
+    type Storage: Storage;
+    type Target: Journal<Self::Storage>;
+
+    fn handle(&self) -> &Self::Target;
+}
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index e1bee5f09..c8c7fdf90 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,74 +14,50 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use std::marker::PhantomData;
-
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
     header::{Command2, PrepareHeader, PrepareOkHeader},
     message::Message,
 };
-use journal::Journal;
+use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use tracing::{debug, warn};
 
-// TODO: Define a trait (probably in some external crate)
 #[expect(unused)]
-trait Metadata {
+pub trait Metadata<C>
+where
+    C: Consensus,
+{
+    /// Handle a request message.
+    fn on_request(&self, message: C::RequestMessage);
+
     /// Handle a replicate message (Prepare in VSR).
-    fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage);
+    fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output 
= ()>;
 
     /// Handle an ack message (PrepareOk in VSR).
-    fn on_replicate(
-        &self,
-        message: <VsrConsensus as Consensus>::ReplicateMessage,
-    ) -> impl Future<Output = ()>;
-    fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage);
-}
-
-pub trait MetadataHandle {
-    type Consensus;
-    type Journal;
-    type Snapshot;
-    type StateMachine;
+    fn on_ack(&self, message: C::AckMessage);
 }
 
-/// Concrete implementation of `MetadataHandle` for Iggy.
-/// This is a marker struct that only holds type information.
-pub struct IggyMetadataHandle<J, S, M> {
-    _marker: PhantomData<(J, S, M)>,
-}
-
-impl<J, S, M> MetadataHandle for IggyMetadataHandle<J, S, M>
-where
-    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
-{
-    type Consensus = VsrConsensus;
-    type Journal = J;
-    type Snapshot = S;
-    type StateMachine = M;
-}
-
-// 
=============================================================================
-// IggyMetadata
-// 
=============================================================================
-
 #[expect(unused)]
-struct IggyMetadata<H: MetadataHandle> {
+struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
-    consensus: Option<H::Consensus>,
+    consensus: Option<C>,
     /// Some on shard0, None on other shards
-    journal: Option<H::Journal>,
+    journal: Option<J>,
     /// Some on shard0, None on other shards
-    snapshot: Option<H::Snapshot>,
+    snapshot: Option<S>,
     /// State machine - lives on all shards
-    mux_stm: H::StateMachine,
+    mux_stm: M,
 }
 
-// TODO: Handle the `routing` of messages to shard0, on the callsite.
-impl<J, S, M> Metadata for IggyMetadata<IggyMetadataHandle<J, S, M>>
+impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M>
 where
-    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
+    J: JournalHandle,
+    J::Target: Journal<
+            J::Storage,
+            Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+            Header = PrepareHeader,
+        >,
 {
     fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage) 
{
         let consensus = self.consensus.as_ref().unwrap();
@@ -161,17 +137,17 @@ where
         // TODO handle gap in ops.
 
         // Verify hash chain integrity.
-        if let Some(previous) = journal.previous_entry(header) {
+        if let Some(previous) = journal.handle().previous_entry(header) {
             self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
         }
 
         assert_eq!(header.op, current_op + 1);
 
         consensus.sequencer().set_sequence(header.op);
-        journal.set_header_as_dirty(header);
+        journal.handle().set_header_as_dirty(header);
 
         // Append to journal.
-        journal.append(message.clone()).await;
+        journal.handle().append(message.clone()).await;
 
         // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(header).await;
@@ -225,9 +201,14 @@ where
     }
 }
 
-impl<J, S, M> IggyMetadata<IggyMetadataHandle<J, S, M>>
+impl<J, S, M> IggyMetadata<VsrConsensus, J, S, M>
 where
-    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
+    J: JournalHandle,
+    J::Target: Journal<
+            J::Storage,
+            Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+            Header = PrepareHeader,
+        >,
 {
     #[expect(unused)]
     fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
@@ -247,7 +228,7 @@ where
         };
 
         let header = prepare.header();
-        header.op <= consensus.commit() || journal.has_prepare(header)
+        header.op <= consensus.commit() || 
journal.handle().entry(header).is_some()
     }
 
     /// Replicate a prepare message to the next replica in the chain.
@@ -264,7 +245,7 @@ where
 
         assert_eq!(header.command, Command2::Prepare);
         assert!(
-            !journal.has_prepare(header),
+            journal.handle().entry(header).is_none(),
             "replicate: must not already have prepare"
         );
         assert!(header.op > consensus.commit());
@@ -354,7 +335,7 @@ where
         }
 
         // Verify we have the prepare and it's persisted (not dirty).
-        if !journal.has_prepare(header) {
+        if journal.handle().entry(header).is_none() {
             debug!(
                 replica = consensus.replica(),
                 op = header.op,

Reply via email to