This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new db89d7d63 feat(cluster): define metadata module and add consensus,
messages modules (#2349)
db89d7d63 is described below
commit db89d7d635b1febea58a2f3dbff232cb397f94d4
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Nov 18 08:26:07 2025 +0100
feat(cluster): define metadata module and add consensus, messages modules
(#2349)
---
Cargo.lock | 11 ++
Cargo.toml | 2 +
DEPENDENCIES.md | 2 +
core/common/src/types/consensus/mod.rs | 162 +++++++++++++++++++++
core/common/src/types/mod.rs | 1 +
core/consensus/Cargo.toml | 31 ++++
.../src/types/mod.rs => consensus/src/lib.rs} | 23 +--
core/metadata/Cargo.toml | 33 +++++
.../mod.rs => metadata/src/impls/metadata.rs} | 54 ++++---
.../src/types => metadata/src/impls}/mod.rs | 20 +--
.../src/types/mod.rs => metadata/src/lib.rs} | 22 +--
core/{common/src/types => metadata/src/stm}/mod.rs | 21 +--
.../src/types/mod.rs => metadata/src/stm/mux.rs} | 20 +--
.../types/mod.rs => metadata/src/stm/stream.rs} | 19 ---
14 files changed, 293 insertions(+), 128 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 0a9124b8e..4e90c1b92 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1693,6 +1693,10 @@ dependencies = [
"crossbeam-utils",
]
+[[package]]
+name = "consensus"
+version = "0.1.0"
+
[[package]]
name = "console"
version = "0.15.11"
@@ -5213,6 +5217,13 @@ version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
+[[package]]
+name = "metadata"
+version = "0.1.0"
+dependencies = [
+ "consensus",
+]
+
[[package]]
name = "miette"
version = "7.6.0"
diff --git a/Cargo.toml b/Cargo.toml
index 3849fb16d..53e3ae956 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,7 +35,9 @@ members = [
"core/connectors/sinks/stdout_sink",
"core/connectors/sources/postgres_source",
"core/connectors/sources/random_source",
+ "core/consensus",
"core/integration",
+ "core/metadata",
"core/sdk",
"core/server",
"core/tools",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index f725bd564..ca7720310 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -143,6 +143,7 @@ compio-ws: 0.1.0, "MIT",
compression-codecs: 0.4.32, "Apache-2.0 OR MIT",
compression-core: 0.4.30, "Apache-2.0 OR MIT",
concurrent-queue: 2.5.0, "Apache-2.0 OR MIT",
+consensus: 0.1.0, "Apache-2.0",
console: 0.15.11, "MIT",
console_error_panic_hook: 0.1.7, "Apache-2.0 OR MIT",
const-oid: 0.9.6, "Apache-2.0 OR MIT",
@@ -455,6 +456,7 @@ matchers: 0.2.0, "MIT",
matchit: 0.8.4, "BSD-3-Clause AND MIT",
md-5: 0.10.6, "Apache-2.0 OR MIT",
memchr: 2.7.6, "MIT OR Unlicense",
+metadata: 0.1.0, "Apache-2.0",
miette: 7.6.0, "Apache-2.0",
miette-derive: 7.6.0, "Apache-2.0",
mimalloc: 0.1.48, "MIT",
diff --git a/core/common/src/types/consensus/mod.rs
b/core/common/src/types/consensus/mod.rs
new file mode 100644
index 000000000..6ecac7726
--- /dev/null
+++ b/core/common/src/types/consensus/mod.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+
+// TODO: Header generic
+// TODO: We will have to impl something like this (NOT ONE TO ONE JUST A
SKETCH) as we will use `bytemuck`:
+/*
+// Generic Message type
+#[repr(C)]
+pub struct Message<H: Header = GenericHeader> {
+ buffer: AlignedBuffer<ALIGNED_TO_HEADER_SIZE>,
+ _phantom: PhantomData<H>,
+}
+
+// Trait that all headers must implement
+pub trait Header: Sized {
+ const COMMAND: Command;
+
+ fn size(&self) -> u32;
+ fn command(&self) -> Command;
+ fn checksum(&self) -> u128;
+}
+
+// Command enum (simplified)
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(u8)]
+pub enum Command {
+ Reserved = 0,
+ Ping = 1,
+ Pong = 2,
+ Request = 3,
+ Prepare = 4,
+ PrepareOk = 5,
+ Reply = 6,
+ Commit = 7,
+ // ... etc
+}
+
+// Generic header for base Message
+#[repr(C)]
+pub struct GenericHeader {
+ checksum: u128,
+ command: Command,
+ size: u32,
+ // ... other fields
+}
+
+impl Header for GenericHeader {
+ const COMMAND: Command = Command::Reserved;
+
+ fn size(&self) -> u32 { self.size }
+ fn command(&self) -> Command { self.command }
+ fn checksum(&self) -> u128 { self.checksum }
+}
+
+// Specific header types
+#[repr(C)]
+pub struct PrepareHeader {
+ checksum: u128,
+ command: Command,
+ size: u32,
+ // ... prepare-specific fields
+ view: u32,
+ op: u64,
+ commit: u64,
+}
+
+impl Header for PrepareHeader {
+ const COMMAND: Command = Command::Prepare;
+
+ fn size(&self) -> u32 { self.size }
+ fn command(&self) -> Command { self.command }
+ fn checksum(&self) -> u128 { self.checksum }
+*/
+
+// And then for Message impl
+/*
+impl<H: Header> Message<H> {
+ // Access header (no stored pointer needed!)
+ pub fn header(&self) -> &H {
+ assert!(size_of::<H>() <= ALIGNED_TO_HEADER_SIZE);
+ unsafe { &*(self.buffer.as_ptr() as *const H) }
+ }
+
+ pub fn header_mut(&mut self) -> &mut H {
+ unsafe { &mut *(self.buffer.as_mut_ptr() as *mut H) }
+ }
+
+ pub fn body(&self) -> &[u8] {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+ &self.buffer[header_size..total_size]
+ }
+
+ pub fn body_mut(&mut self) -> &mut [u8] {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+ &mut self.buffer.as_mut()[header_size..total_size]
+ }
+
+ pub fn as_bytes(&self) -> &[u8] {
+ &self.buffer[..self.header().size() as usize]
+ }
+
+ pub fn base(&self) -> &Message<GenericHeader> {
+ unsafe { &*(self as *const Self as *const Message<GenericHeader>) }
+ }
+
+ pub fn base_mut(&mut self) -> &mut Message<GenericHeader> {
+ unsafe { &mut *(self as *mut Self as *mut Message<GenericHeader>) }
+ }
+
+ pub fn try_into<T: Header>(self) -> Option<Message<T>> {
+ if self.header().command() == T::COMMAND {
+ Some(unsafe { std::mem::transmute(self) })
+ } else {
+ None
+ }
+ }
+}
+*/
+
+// Then for the `Request` header we will have to store an `Operation` enum
+/*
+ Define your operation enum
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Operation {
+ // Metadata operations
+ CreateTopic = 1,
+ DeleteTopic = 2,
+ ListTopics = 3,
+ CreatePartition = 4,
+
+ // Partition operations
+ Produce = 100,
+ Consume = 101,
+ Fetch = 102,
+}
+*/
+
+// Which will have an method that returns discriminator between Metadata and
Partition requests
+
+#[expect(unused)]
+pub struct Message<H> {
+ _phantom_header: PhantomData<H>,
+}
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 96b41233d..f0d70cf9c 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -22,6 +22,7 @@ pub(crate) mod cluster;
pub(crate) mod command;
pub(crate) mod compression;
pub(crate) mod configuration;
+pub(crate) mod consensus;
pub(crate) mod consumer;
pub(crate) mod diagnostic;
pub(crate) mod identifier;
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
new file mode 100644
index 000000000..9bf2b9ad8
--- /dev/null
+++ b/core/consensus/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "consensus"
+version = "0.1.0"
+description = "Iggy consensus module"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org"
+documentation = "https://iggy.apache.org/docs"
+repository = "https://github.com/apache/iggy"
+readme = "../../../README.md"
+
+[dependencies]
diff --git a/core/common/src/types/mod.rs b/core/consensus/src/lib.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/consensus/src/lib.rs
index 96b41233d..c3241d3c7 100644
--- a/core/common/src/types/mod.rs
+++ b/core/consensus/src/lib.rs
@@ -15,21 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+pub trait Consensus {
+ type RequestMessage;
+ type ReplicateMessage;
+ type AckMessage;
+}
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
new file mode 100644
index 000000000..87e578250
--- /dev/null
+++ b/core/metadata/Cargo.toml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "metadata"
+version = "0.1.0"
+description = "Iggy metadata module"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["database", "network-programming"]
+homepage = "https://iggy.apache.org"
+documentation = "https://iggy.apache.org/docs"
+repository = "https://github.com/apache/iggy"
+readme = "../../../README.md"
+
+[dependencies]
+consensus = { path = "../consensus" }
+
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/impls/metadata.rs
similarity index 51%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/impls/metadata.rs
index 96b41233d..659112f1e 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,22 +14,40 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+use consensus::Consensus;
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+// TODO: Define a trait (probably in some external crate)
+#[expect(unused)]
+trait Metadata<C>
+where
+ C: Consensus,
+{
+ fn on_request(&self, message: C::RequestMessage);
+ fn on_replicate(&self, message: C::ReplicateMessage);
+ fn on_ack(&self, message: C::AckMessage);
+}
+
+#[expect(unused)]
+struct IggyMetadata<C, M, J, S> {
+ consensus: C,
+ mux_stm: M,
+ journal: J,
+ snapshot: S,
+}
+
+impl<C, M, J, S> Metadata<C> for IggyMetadata<C, M, J, S>
+where
+ C: Consensus,
+{
+ fn on_request(&self, _message: C::RequestMessage) {
+ todo!()
+ }
+
+ fn on_replicate(&self, _message: C::ReplicateMessage) {
+ todo!()
+ }
+
+ fn on_ack(&self, _message: C::AckMessage) {
+ todo!()
+ }
+}
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/impls/mod.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/impls/mod.rs
index 96b41233d..1fb71a3cf 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/impls/mod.rs
@@ -14,22 +14,4 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+pub mod metadata;
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/lib.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/lib.rs
index 96b41233d..c86b13993 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/lib.rs
@@ -15,21 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+//! Iggy metadata module
+
+mod impls;
+pub mod stm;
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/stm/mod.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/stm/mod.rs
index 96b41233d..853a1c43e 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -15,21 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+mod mux;
+mod stream;
+// TODO: Add more state machines.
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/stm/mux.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/stm/mux.rs
index 96b41233d..7907c8db8 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,21 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+#[expect(unused)]
+struct MuxStateMachine {}
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/stm/stream.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/stm/stream.rs
index 96b41233d..b248758bc 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -14,22 +14,3 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;