This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f2c56d8fa feat(metadata): expose metadata generics and create journal
handle (#2624)
f2c56d8fa is described below
commit f2c56d8facbb9d7cbf2359d611d386e0e6d0307d
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jan 30 14:58:43 2026 +0100
feat(metadata): expose metadata generics and create journal handle (#2624)
---
core/journal/src/lib.rs | 24 ++++++++--
core/metadata/src/impls/metadata.rs | 89 +++++++++++++++----------------------
2 files changed, 56 insertions(+), 57 deletions(-)
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 2aa693c68..b091bcd3d 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -17,11 +17,14 @@
// TODO: We already have a `Journal` trait inside of the `storage` module
`journal.rs` file.
// But the interface was designed for partition log, not an generic journal.
-pub trait Journal {
- type Entry;
+pub trait Journal<S>
+where
+ S: Storage,
+{
type Header;
+ type Entry;
- fn has_prepare(&self, header: &Self::Header) -> bool;
+ fn entry(&self, header: &Self::Header) -> Option<&Self::Entry>;
fn previous_entry(&self, header: &Self::Header) -> Option<Self::Header>;
@@ -29,3 +32,18 @@ pub trait Journal {
fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
}
+
+// TODO: Move to other crate.
+pub trait Storage {
+ type Buffer;
+
+ fn write(&self, buf: Self::Buffer) -> usize;
+ fn read(&self, offset: u64, buffer: Self::Buffer) -> Self::Buffer;
+}
+
+pub trait JournalHandle {
+ type Storage: Storage;
+ type Target: Journal<Self::Storage>;
+
+ fn handle(&self) -> &Self::Target;
+}
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index e1bee5f09..c8c7fdf90 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,74 +14,50 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use std::marker::PhantomData;
-
use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
use iggy_common::{
header::{Command2, PrepareHeader, PrepareOkHeader},
message::Message,
};
-use journal::Journal;
+use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use tracing::{debug, warn};
-// TODO: Define a trait (probably in some external crate)
#[expect(unused)]
-trait Metadata {
+pub trait Metadata<C>
+where
+ C: Consensus,
+{
+ /// Handle a request message.
+ fn on_request(&self, message: C::RequestMessage);
+
/// Handle a replicate message (Prepare in VSR).
- fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage);
+ fn on_replicate(&self, message: C::ReplicateMessage) -> impl Future<Output
= ()>;
/// Handle an ack message (PrepareOk in VSR).
- fn on_replicate(
- &self,
- message: <VsrConsensus as Consensus>::ReplicateMessage,
- ) -> impl Future<Output = ()>;
- fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage);
-}
-
-pub trait MetadataHandle {
- type Consensus;
- type Journal;
- type Snapshot;
- type StateMachine;
+ fn on_ack(&self, message: C::AckMessage);
}
-/// Concrete implementation of `MetadataHandle` for Iggy.
-/// This is a marker struct that only holds type information.
-pub struct IggyMetadataHandle<J, S, M> {
- _marker: PhantomData<(J, S, M)>,
-}
-
-impl<J, S, M> MetadataHandle for IggyMetadataHandle<J, S, M>
-where
- J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
-{
- type Consensus = VsrConsensus;
- type Journal = J;
- type Snapshot = S;
- type StateMachine = M;
-}
-
-//
=============================================================================
-// IggyMetadata
-//
=============================================================================
-
#[expect(unused)]
-struct IggyMetadata<H: MetadataHandle> {
+struct IggyMetadata<C, J, S, M> {
/// Some on shard0, None on other shards
- consensus: Option<H::Consensus>,
+ consensus: Option<C>,
/// Some on shard0, None on other shards
- journal: Option<H::Journal>,
+ journal: Option<J>,
/// Some on shard0, None on other shards
- snapshot: Option<H::Snapshot>,
+ snapshot: Option<S>,
/// State machine - lives on all shards
- mux_stm: H::StateMachine,
+ mux_stm: M,
}
-// TODO: Handle the `routing` of messages to shard0, on the callsite.
-impl<J, S, M> Metadata for IggyMetadata<IggyMetadataHandle<J, S, M>>
+impl<J, S, M> Metadata<VsrConsensus> for IggyMetadata<VsrConsensus, J, S, M>
where
- J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
+ J: JournalHandle,
+ J::Target: Journal<
+ J::Storage,
+ Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+ Header = PrepareHeader,
+ >,
{
fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage)
{
let consensus = self.consensus.as_ref().unwrap();
@@ -161,17 +137,17 @@ where
// TODO handle gap in ops.
// Verify hash chain integrity.
- if let Some(previous) = journal.previous_entry(header) {
+ if let Some(previous) = journal.handle().previous_entry(header) {
self.panic_if_hash_chain_would_break_in_same_view(&previous,
header);
}
assert_eq!(header.op, current_op + 1);
consensus.sequencer().set_sequence(header.op);
- journal.set_header_as_dirty(header);
+ journal.handle().set_header_as_dirty(header);
// Append to journal.
- journal.append(message.clone()).await;
+ journal.handle().append(message.clone()).await;
// After successful journal write, send prepare_ok to primary.
self.send_prepare_ok(header).await;
@@ -225,9 +201,14 @@ where
}
}
-impl<J, S, M> IggyMetadata<IggyMetadataHandle<J, S, M>>
+impl<J, S, M> IggyMetadata<VsrConsensus, J, S, M>
where
- J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
+ J: JournalHandle,
+ J::Target: Journal<
+ J::Storage,
+ Entry = <VsrConsensus as Consensus>::ReplicateMessage,
+ Header = PrepareHeader,
+ >,
{
#[expect(unused)]
fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
@@ -247,7 +228,7 @@ where
};
let header = prepare.header();
- header.op <= consensus.commit() || journal.has_prepare(header)
+ header.op <= consensus.commit() ||
journal.handle().entry(header).is_some()
}
/// Replicate a prepare message to the next replica in the chain.
@@ -264,7 +245,7 @@ where
assert_eq!(header.command, Command2::Prepare);
assert!(
- !journal.has_prepare(header),
+ journal.handle().entry(header).is_none(),
"replicate: must not already have prepare"
);
assert!(header.op > consensus.commit());
@@ -354,7 +335,7 @@ where
}
// Verify we have the prepare and it's persisted (not dirty).
- if !journal.has_prepare(header) {
+ if journal.handle().entry(header).is_none() {
debug!(
replica = consensus.replica(),
op = header.op,