This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new db89d7d63 feat(cluster): define metadata module and add consensus, 
messages modules (#2349)
db89d7d63 is described below

commit db89d7d635b1febea58a2f3dbff232cb397f94d4
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Nov 18 08:26:07 2025 +0100

    feat(cluster): define metadata module and add consensus, messages modules 
(#2349)
---
 Cargo.lock                                         |  11 ++
 Cargo.toml                                         |   2 +
 DEPENDENCIES.md                                    |   2 +
 core/common/src/types/consensus/mod.rs             | 162 +++++++++++++++++++++
 core/common/src/types/mod.rs                       |   1 +
 core/consensus/Cargo.toml                          |  31 ++++
 .../src/types/mod.rs => consensus/src/lib.rs}      |  23 +--
 core/metadata/Cargo.toml                           |  33 +++++
 .../mod.rs => metadata/src/impls/metadata.rs}      |  54 ++++---
 .../src/types => metadata/src/impls}/mod.rs        |  20 +--
 .../src/types/mod.rs => metadata/src/lib.rs}       |  22 +--
 core/{common/src/types => metadata/src/stm}/mod.rs |  21 +--
 .../src/types/mod.rs => metadata/src/stm/mux.rs}   |  20 +--
 .../types/mod.rs => metadata/src/stm/stream.rs}    |  19 ---
 14 files changed, 293 insertions(+), 128 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0a9124b8e..4e90c1b92 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1693,6 +1693,10 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "consensus"
+version = "0.1.0"
+
 [[package]]
 name = "console"
 version = "0.15.11"
@@ -5213,6 +5217,13 @@ version = "2.7.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
 
+[[package]]
+name = "metadata"
+version = "0.1.0"
+dependencies = [
+ "consensus",
+]
+
 [[package]]
 name = "miette"
 version = "7.6.0"
diff --git a/Cargo.toml b/Cargo.toml
index 3849fb16d..53e3ae956 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -35,7 +35,9 @@ members = [
     "core/connectors/sinks/stdout_sink",
     "core/connectors/sources/postgres_source",
     "core/connectors/sources/random_source",
+    "core/consensus",
     "core/integration",
+    "core/metadata",
     "core/sdk",
     "core/server",
     "core/tools",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index f725bd564..ca7720310 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -143,6 +143,7 @@ compio-ws: 0.1.0, "MIT",
 compression-codecs: 0.4.32, "Apache-2.0 OR MIT",
 compression-core: 0.4.30, "Apache-2.0 OR MIT",
 concurrent-queue: 2.5.0, "Apache-2.0 OR MIT",
+consensus: 0.1.0, "Apache-2.0",
 console: 0.15.11, "MIT",
 console_error_panic_hook: 0.1.7, "Apache-2.0 OR MIT",
 const-oid: 0.9.6, "Apache-2.0 OR MIT",
@@ -455,6 +456,7 @@ matchers: 0.2.0, "MIT",
 matchit: 0.8.4, "BSD-3-Clause AND MIT",
 md-5: 0.10.6, "Apache-2.0 OR MIT",
 memchr: 2.7.6, "MIT OR Unlicense",
+metadata: 0.1.0, "Apache-2.0",
 miette: 7.6.0, "Apache-2.0",
 miette-derive: 7.6.0, "Apache-2.0",
 mimalloc: 0.1.48, "MIT",
diff --git a/core/common/src/types/consensus/mod.rs 
b/core/common/src/types/consensus/mod.rs
new file mode 100644
index 000000000..6ecac7726
--- /dev/null
+++ b/core/common/src/types/consensus/mod.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::marker::PhantomData;
+
+// TODO: Header generic
+// TODO: We will have to impl something like this (NOT ONE TO ONE JUST A 
SKETCH) as we will use `bytemuck`:
+/*
+// Generic Message type
+#[repr(C)]
+pub struct Message<H: Header = GenericHeader> {
+    buffer: AlignedBuffer<ALIGNED_TO_HEADER_SIZE>,
+    _phantom: PhantomData<H>,
+}
+
+// Trait that all headers must implement
+pub trait Header: Sized {
+    const COMMAND: Command;
+
+    fn size(&self) -> u32;
+    fn command(&self) -> Command;
+    fn checksum(&self) -> u128;
+}
+
+// Command enum (simplified)
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(u8)]
+pub enum Command {
+    Reserved = 0,
+    Ping = 1,
+    Pong = 2,
+    Request = 3,
+    Prepare = 4,
+    PrepareOk = 5,
+    Reply = 6,
+    Commit = 7,
+    // ... etc
+}
+
+// Generic header for base Message
+#[repr(C)]
+pub struct GenericHeader {
+    checksum: u128,
+    command: Command,
+    size: u32,
+    // ... other fields
+}
+
+impl Header for GenericHeader {
+    const COMMAND: Command = Command::Reserved;
+
+    fn size(&self) -> u32 { self.size }
+    fn command(&self) -> Command { self.command }
+    fn checksum(&self) -> u128 { self.checksum }
+}
+
+// Specific header types
+#[repr(C)]
+pub struct PrepareHeader {
+    checksum: u128,
+    command: Command,
+    size: u32,
+    // ... prepare-specific fields
+    view: u32,
+    op: u64,
+    commit: u64,
+}
+
+impl Header for PrepareHeader {
+    const COMMAND: Command = Command::Prepare;
+
+    fn size(&self) -> u32 { self.size }
+    fn command(&self) -> Command { self.command }
+    fn checksum(&self) -> u128 { self.checksum }
+*/
+
+// And then for Message impl
+/*
+impl<H: Header> Message<H> {
+    // Access header (no stored pointer needed!)
+    pub fn header(&self) -> &H {
+        assert!(size_of::<H>() <= ALIGNED_TO_HEADER_SIZE);
+        unsafe { &*(self.buffer.as_ptr() as *const H) }
+    }
+
+    pub fn header_mut(&mut self) -> &mut H {
+        unsafe { &mut *(self.buffer.as_mut_ptr() as *mut H) }
+    }
+
+    pub fn body(&self) -> &[u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+        &self.buffer[header_size..total_size]
+    }
+
+    pub fn body_mut(&mut self) -> &mut [u8] {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+        &mut self.buffer.as_mut()[header_size..total_size]
+    }
+
+    pub fn as_bytes(&self) -> &[u8] {
+        &self.buffer[..self.header().size() as usize]
+    }
+
+    pub fn base(&self) -> &Message<GenericHeader> {
+        unsafe { &*(self as *const Self as *const Message<GenericHeader>) }
+    }
+
+    pub fn base_mut(&mut self) -> &mut Message<GenericHeader> {
+        unsafe { &mut *(self as *mut Self as *mut Message<GenericHeader>) }
+    }
+
+    pub fn try_into<T: Header>(self) -> Option<Message<T>> {
+        if self.header().command() == T::COMMAND {
+            Some(unsafe { std::mem::transmute(self) })
+        } else {
+            None
+        }
+    }
+}
+*/
+
+// Then for the `Request` header we will have to store an `Operation` enum
+/*
+ Define your operation enum
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Operation {
+    // Metadata operations
+    CreateTopic = 1,
+    DeleteTopic = 2,
+    ListTopics = 3,
+    CreatePartition = 4,
+
+    // Partition operations
+    Produce = 100,
+    Consume = 101,
+    Fetch = 102,
+}
+*/
+
+// Which will have an method that returns discriminator between Metadata and 
Partition requests
+
+#[expect(unused)]
+pub struct Message<H> {
+    _phantom_header: PhantomData<H>,
+}
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index 96b41233d..f0d70cf9c 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -22,6 +22,7 @@ pub(crate) mod cluster;
 pub(crate) mod command;
 pub(crate) mod compression;
 pub(crate) mod configuration;
+pub(crate) mod consensus;
 pub(crate) mod consumer;
 pub(crate) mod diagnostic;
 pub(crate) mod identifier;
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
new file mode 100644
index 000000000..9bf2b9ad8
--- /dev/null
+++ b/core/consensus/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "consensus"
+version = "0.1.0"
+description = "Iggy consensus module"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["command-line-utilities", "database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../../README.md"
+
+[dependencies]
diff --git a/core/common/src/types/mod.rs b/core/consensus/src/lib.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/consensus/src/lib.rs
index 96b41233d..c3241d3c7 100644
--- a/core/common/src/types/mod.rs
+++ b/core/consensus/src/lib.rs
@@ -15,21 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+pub trait Consensus {
+    type RequestMessage;
+    type ReplicateMessage;
+    type AckMessage;
+}
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
new file mode 100644
index 000000000..87e578250
--- /dev/null
+++ b/core/metadata/Cargo.toml
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "metadata"
+version = "0.1.0"
+description = "Iggy metadata module"
+edition = "2024"
+license = "Apache-2.0"
+keywords = ["iggy", "messaging", "streaming"]
+categories = ["database", "network-programming"]
+homepage = "https://iggy.apache.org";
+documentation = "https://iggy.apache.org/docs";
+repository = "https://github.com/apache/iggy";
+readme = "../../../README.md"
+
+[dependencies]
+consensus = { path = "../consensus" }
+
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/impls/metadata.rs
similarity index 51%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/impls/metadata.rs
index 96b41233d..659112f1e 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,22 +14,40 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use consensus::Consensus;
 
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+// TODO: Define a trait (probably in some external crate)
+#[expect(unused)]
+trait Metadata<C>
+where
+    C: Consensus,
+{
+    fn on_request(&self, message: C::RequestMessage);
+    fn on_replicate(&self, message: C::ReplicateMessage);
+    fn on_ack(&self, message: C::AckMessage);
+}
+
+#[expect(unused)]
+struct IggyMetadata<C, M, J, S> {
+    consensus: C,
+    mux_stm: M,
+    journal: J,
+    snapshot: S,
+}
+
+impl<C, M, J, S> Metadata<C> for IggyMetadata<C, M, J, S>
+where
+    C: Consensus,
+{
+    fn on_request(&self, _message: C::RequestMessage) {
+        todo!()
+    }
+
+    fn on_replicate(&self, _message: C::ReplicateMessage) {
+        todo!()
+    }
+
+    fn on_ack(&self, _message: C::AckMessage) {
+        todo!()
+    }
+}
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/impls/mod.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/impls/mod.rs
index 96b41233d..1fb71a3cf 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/impls/mod.rs
@@ -14,22 +14,4 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+pub mod metadata;
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/lib.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/lib.rs
index 96b41233d..c86b13993 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/lib.rs
@@ -15,21 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+//! Iggy metadata module
+
+mod impls;
+pub mod stm;
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/stm/mod.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/stm/mod.rs
index 96b41233d..853a1c43e 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -15,21 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+mod mux;
+mod stream;
+// TODO: Add more state machines.
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/stm/mux.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/stm/mux.rs
index 96b41233d..7907c8db8 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,21 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;
+#[expect(unused)]
+struct MuxStateMachine {}
diff --git a/core/common/src/types/mod.rs b/core/metadata/src/stm/stream.rs
similarity index 64%
copy from core/common/src/types/mod.rs
copy to core/metadata/src/stm/stream.rs
index 96b41233d..b248758bc 100644
--- a/core/common/src/types/mod.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -14,22 +14,3 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-pub(crate) mod args;
-pub(crate) mod client;
-pub(crate) mod client_state;
-pub(crate) mod cluster;
-pub(crate) mod command;
-pub(crate) mod compression;
-pub(crate) mod configuration;
-pub(crate) mod consumer;
-pub(crate) mod diagnostic;
-pub(crate) mod identifier;
-pub(crate) mod message;
-pub(crate) mod partition;
-pub(crate) mod permissions;
-pub(crate) mod snapshot;
-pub(crate) mod stats;
-pub(crate) mod stream;
-pub(crate) mod topic;
-pub(crate) mod user;

Reply via email to