This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 871b18397 feat(metadata): impl states for streams,users,
consumer_groups in metadata module (#2582)
871b18397 is described below
commit 871b1839723e028b163556847d33eb6531b45ef1
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jan 23 13:04:54 2026 +0100
feat(metadata): impl states for streams,users, consumer_groups in metadata
module (#2582)
---
Cargo.lock | 3 +-
Cargo.toml | 1 +
.../consumer_groups/create_consumer_group.rs | 2 +-
.../consumer_groups/delete_consumer_group.rs | 2 +-
.../src/commands/partitions/create_partitions.rs | 2 +-
.../src/commands/partitions/delete_partitions.rs | 2 +-
.../create_personal_access_token.rs | 2 +-
.../delete_personal_access_token.rs | 2 +-
.../src/commands/segments/delete_segments.rs | 2 +-
core/common/src/commands/streams/create_stream.rs | 2 +-
core/common/src/commands/streams/delete_stream.rs | 2 +-
core/common/src/commands/streams/purge_stream.rs | 2 +-
core/common/src/commands/streams/update_stream.rs | 2 +-
core/common/src/commands/topics/create_topic.rs | 2 +-
core/common/src/commands/topics/delete_topic.rs | 2 +-
core/common/src/commands/topics/purge_topic.rs | 2 +-
core/common/src/commands/topics/update_topic.rs | 2 +-
core/common/src/commands/users/change_password.rs | 2 +-
core/common/src/commands/users/create_user.rs | 2 +-
core/common/src/commands/users/delete_user.rs | 2 +-
.../src/commands/users/update_permissions.rs | 2 +-
core/common/src/commands/users/update_user.rs | 2 +-
core/common/src/types/consensus/message.rs | 16 +
core/metadata/Cargo.toml | 3 +-
core/metadata/src/impls/metadata.rs | 220 ++++---
core/metadata/src/stm/consumer_group.rs | 321 +++++-----
core/metadata/src/stm/mod.rs | 304 +++++++---
core/metadata/src/stm/mux.rs | 12 +-
core/metadata/src/stm/stream.rs | 672 +++++++++------------
core/metadata/src/stm/user.rs | 401 +++++-------
core/server/Cargo.toml | 2 +-
31 files changed, 1020 insertions(+), 975 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index f88aaf4a6..5f7fceb1a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5676,11 +5676,12 @@ name = "metadata"
version = "0.1.0"
dependencies = [
"ahash 0.8.12",
- "bytes",
"consensus",
"iggy_common",
"journal",
+ "left-right",
"message_bus",
+ "paste",
"slab",
"tracing",
]
diff --git a/Cargo.toml b/Cargo.toml
index dbe9f6a98..802dde2d8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -142,6 +142,7 @@ iggy_connector_sdk = { path = "core/connectors/sdk",
version = "0.1.1-edge.1" }
integration = { path = "core/integration" }
keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
lazy_static = "1.5.0"
+left-right = "0.11"
log = "0.4.29"
metadata = { path = "core/metadata" }
mimalloc = "0.1"
diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs
b/core/common/src/commands/consumer_groups/create_consumer_group.rs
index c5bd3e912..c03d39964 100644
--- a/core/common/src/commands/consumer_groups/create_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs
@@ -33,7 +33,7 @@ use std::str::from_utf8;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `name` - unique consumer group name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateConsumerGroup {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/consumer_groups/delete_consumer_group.rs
b/core/common/src/commands/consumer_groups/delete_consumer_group.rs
index 3eee5cc1c..859ded6f9 100644
--- a/core/common/src/commands/consumer_groups/delete_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/delete_consumer_group.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `group_id` - unique consumer group ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteConsumerGroup {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/partitions/create_partitions.rs
b/core/common/src/commands/partitions/create_partitions.rs
index 9aa862d5c..a01d3b26d 100644
--- a/core/common/src/commands/partitions/create_partitions.rs
+++ b/core/common/src/commands/partitions/create_partitions.rs
@@ -32,7 +32,7 @@ use std::fmt::Display;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `partitions_count` - number of partitions in the topic to create, max
value is 1000.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreatePartitions {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/partitions/delete_partitions.rs
b/core/common/src/commands/partitions/delete_partitions.rs
index 068f502ce..12be4448a 100644
--- a/core/common/src/commands/partitions/delete_partitions.rs
+++ b/core/common/src/commands/partitions/delete_partitions.rs
@@ -32,7 +32,7 @@ use std::fmt::Display;
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
/// - `partitions_count` - number of partitions in the topic to delete, max
value is 1000.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct DeletePartitions {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git
a/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
b/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
index 6c0fbc54d..fa457a33e 100644
---
a/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
+++
b/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
@@ -31,7 +31,7 @@ use std::str::from_utf8;
/// It has additional payload:
/// - `name` - unique name of the token, must be between 3 and 30 characters
long.
/// - `expiry` - expiry of the token.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CreatePersonalAccessToken {
/// Unique name of the token, must be between 3 and 30 characters long.
pub name: String,
diff --git
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
index f54803fa9..064f78c48 100644
---
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
+++
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
@@ -29,7 +29,7 @@ use std::str::from_utf8;
/// `DeletePersonalAccessToken` command is used to delete a personal access
token for the authenticated user.
/// It has additional payload:
/// - `name` - unique name of the token, must be between 3 and 30 characters
long.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DeletePersonalAccessToken {
/// Unique name of the token, must be between 3 and 30 characters long.
pub name: String,
diff --git a/core/common/src/commands/segments/delete_segments.rs
b/core/common/src/commands/segments/delete_segments.rs
index 221e5c667..841a84ac7 100644
--- a/core/common/src/commands/segments/delete_segments.rs
+++ b/core/common/src/commands/segments/delete_segments.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
/// - `topic_id` - unique topic ID (numeric or name).
/// - `partition_id` - unique partition ID (numeric or name).
/// - `segments_count` - number of segments in the partition to delete.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct DeleteSegments {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/streams/create_stream.rs
b/core/common/src/commands/streams/create_stream.rs
index 292cfcfc2..9ff2fec5a 100644
--- a/core/common/src/commands/streams/create_stream.rs
+++ b/core/common/src/commands/streams/create_stream.rs
@@ -29,7 +29,7 @@ use std::str::from_utf8;
/// `CreateStream` command is used to create a new stream.
/// It has additional payload:
/// - `name` - unique stream name (string), max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateStream {
/// Unique stream name (string), max length is 255 characters.
pub name: String,
diff --git a/core/common/src/commands/streams/delete_stream.rs
b/core/common/src/commands/streams/delete_stream.rs
index ac8aa57f9..f44110340 100644
--- a/core/common/src/commands/streams/delete_stream.rs
+++ b/core/common/src/commands/streams/delete_stream.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
/// `DeleteStream` command is used to delete an existing stream.
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/streams/purge_stream.rs
b/core/common/src/commands/streams/purge_stream.rs
index 805e7c944..036930ff8 100644
--- a/core/common/src/commands/streams/purge_stream.rs
+++ b/core/common/src/commands/streams/purge_stream.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
/// `PurgeStream` command is used to purge stream data (all the messages from
its topics).
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct PurgeStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/streams/update_stream.rs
b/core/common/src/commands/streams/update_stream.rs
index ced247e71..ce3600f88 100644
--- a/core/common/src/commands/streams/update_stream.rs
+++ b/core/common/src/commands/streams/update_stream.rs
@@ -32,7 +32,7 @@ use std::str::from_utf8;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `name` - unique stream name (string), max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct UpdateStream {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/topics/create_topic.rs
b/core/common/src/commands/topics/create_topic.rs
index ba90084e7..48cd3d525 100644
--- a/core/common/src/commands/topics/create_topic.rs
+++ b/core/common/src/commands/topics/create_topic.rs
@@ -40,7 +40,7 @@ use std::str::from_utf8;
/// Can't be lower than segment size in the config.
/// - `replication_factor` - replication factor for the topic.
/// - `name` - unique topic name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/topics/delete_topic.rs
b/core/common/src/commands/topics/delete_topic.rs
index a41c0de02..45680311d 100644
--- a/core/common/src/commands/topics/delete_topic.rs
+++ b/core/common/src/commands/topics/delete_topic.rs
@@ -30,7 +30,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/topics/purge_topic.rs
b/core/common/src/commands/topics/purge_topic.rs
index 061e374fa..9473b38c7 100644
--- a/core/common/src/commands/topics/purge_topic.rs
+++ b/core/common/src/commands/topics/purge_topic.rs
@@ -30,7 +30,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `stream_id` - unique stream ID (numeric or name).
/// - `topic_id` - unique topic ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct PurgeTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/topics/update_topic.rs
b/core/common/src/commands/topics/update_topic.rs
index 32a2ba7a5..83dadb69c 100644
--- a/core/common/src/commands/topics/update_topic.rs
+++ b/core/common/src/commands/topics/update_topic.rs
@@ -40,7 +40,7 @@ use std::str::from_utf8;
/// Can't be lower than segment size in the config.
/// - `replication_factor` - replication factor for the topic.
/// - `name` - unique topic name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct UpdateTopic {
/// Unique stream ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/users/change_password.rs
b/core/common/src/commands/users/change_password.rs
index 27f1ea6b8..4a5663778 100644
--- a/core/common/src/commands/users/change_password.rs
+++ b/core/common/src/commands/users/change_password.rs
@@ -33,7 +33,7 @@ use std::str::from_utf8;
/// - `user_id` - unique user ID (numeric or name).
/// - `current_password` - current password, must be between 3 and 100
characters long.
/// - `new_password` - new password, must be between 3 and 100 characters long.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct ChangePassword {
/// Unique user ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/users/create_user.rs
b/core/common/src/commands/users/create_user.rs
index 03e0cb167..7d23c54ee 100644
--- a/core/common/src/commands/users/create_user.rs
+++ b/core/common/src/commands/users/create_user.rs
@@ -34,7 +34,7 @@ use std::str::from_utf8;
/// - `password` - password of the user, must be between 3 and 100 characters
long.
/// - `status` - status of the user, can be either `active` or `inactive`.
/// - `permissions` - optional permissions of the user. If not provided, user
will have no permissions.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
pub struct CreateUser {
/// Unique name of the user, must be between 3 and 50 characters long.
pub username: String,
diff --git a/core/common/src/commands/users/delete_user.rs
b/core/common/src/commands/users/delete_user.rs
index c41a967ff..3cc5682a6 100644
--- a/core/common/src/commands/users/delete_user.rs
+++ b/core/common/src/commands/users/delete_user.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
/// `DeleteUser` command is used to delete a user by unique ID.
/// It has additional payload:
/// - `user_id` - unique user ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct DeleteUser {
/// Unique user ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/users/update_permissions.rs
b/core/common/src/commands/users/update_permissions.rs
index ab8e525f0..ff96927dd 100644
--- a/core/common/src/commands/users/update_permissions.rs
+++ b/core/common/src/commands/users/update_permissions.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
/// It has additional payload:
/// - `user_id` - unique user ID (numeric or name).
/// - `permissions` - new permissions (optional)
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct UpdatePermissions {
/// Unique user ID (numeric or name).
#[serde(skip)]
diff --git a/core/common/src/commands/users/update_user.rs
b/core/common/src/commands/users/update_user.rs
index 28e2504d2..3e2811915 100644
--- a/core/common/src/commands/users/update_user.rs
+++ b/core/common/src/commands/users/update_user.rs
@@ -34,7 +34,7 @@ use std::str::from_utf8;
/// - `user_id` - unique user ID (numeric or name).
/// - `username` - new username (optional), if provided, must be between 3 and
50 characters long.
/// - `status` - new status (optional)
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
pub struct UpdateUser {
#[serde(skip)]
pub user_id: Identifier,
diff --git a/core/common/src/types/consensus/message.rs
b/core/common/src/types/consensus/message.rs
index 0e94820f8..ac32cc3c8 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -141,6 +141,22 @@ where
}
}
+ /// Get the message body as zero-copy `Bytes`.
+ ///
+ /// Returns an empty `Bytes` if there is no body.
+ #[inline]
+ #[allow(unused)]
+ pub fn body_bytes(&self) -> Bytes {
+ let header_size = size_of::<H>();
+ let total_size = self.header().size() as usize;
+
+ if total_size > header_size {
+ self.buffer.slice(header_size..total_size)
+ } else {
+ Bytes::new()
+ }
+ }
+
/// Get the complete message as bytes (header + body).
#[inline]
#[allow(unused)]
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index ee886e7c8..76a2f64ea 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,10 +29,11 @@ readme = "../../../README.md"
[dependencies]
ahash = { workspace = true }
-bytes = { workspace = true }
consensus = { path = "../consensus" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
+left-right = { workspace = true }
message_bus = { path = "../message_bus" }
+paste = "1.0"
slab = "0.4.11"
tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 33ae1da34..e1bee5f09 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,6 +14,8 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+use std::marker::PhantomData;
+
use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
use iggy_common::{
header::{Command2, PrepareHeader, PrepareOkHeader},
@@ -26,42 +28,74 @@ use tracing::{debug, warn};
// TODO: Define a trait (probably in some external crate)
#[expect(unused)]
trait Metadata {
- type Consensus: Consensus;
- type Journal: Journal<Entry = <Self::Consensus as
Consensus>::ReplicateMessage>;
-
/// Handle a replicate message (Prepare in VSR).
- fn on_request(&self, message: <Self::Consensus as
Consensus>::RequestMessage);
+ fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage);
/// Handle an ack message (PrepareOk in VSR).
fn on_replicate(
&self,
- message: <Self::Consensus as Consensus>::ReplicateMessage,
+ message: <VsrConsensus as Consensus>::ReplicateMessage,
) -> impl Future<Output = ()>;
- fn on_ack(&self, message: <Self::Consensus as Consensus>::AckMessage);
+ fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage);
}
-#[expect(unused)]
-struct IggyMetadata<M, J, S> {
- consensus: VsrConsensus,
- mux_stm: M,
- journal: J,
- snapshot: S,
+pub trait MetadataHandle {
+ type Consensus;
+ type Journal;
+ type Snapshot;
+ type StateMachine;
}
-impl<M, J, S> Metadata for IggyMetadata<M, J, S>
+/// Concrete implementation of `MetadataHandle` for Iggy.
+/// This is a marker struct that only holds type information.
+pub struct IggyMetadataHandle<J, S, M> {
+ _marker: PhantomData<(J, S, M)>,
+}
+
+impl<J, S, M> MetadataHandle for IggyMetadataHandle<J, S, M>
where
J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
{
type Consensus = VsrConsensus;
type Journal = J;
- fn on_request(&self, message: <Self::Consensus as
Consensus>::RequestMessage) {
+ type Snapshot = S;
+ type StateMachine = M;
+}
+
+//
=============================================================================
+// IggyMetadata
+//
=============================================================================
+
+#[expect(unused)]
+struct IggyMetadata<H: MetadataHandle> {
+ /// Some on shard0, None on other shards
+ consensus: Option<H::Consensus>,
+ /// Some on shard0, None on other shards
+ journal: Option<H::Journal>,
+ /// Some on shard0, None on other shards
+ snapshot: Option<H::Snapshot>,
+ /// State machine - lives on all shards
+ mux_stm: H::StateMachine,
+}
+
+// TODO: Handle the `routing` of messages to shard0, on the callsite.
+impl<J, S, M> Metadata for IggyMetadata<IggyMetadataHandle<J, S, M>>
+where
+ J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
+{
+ fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage)
{
+ let consensus = self.consensus.as_ref().unwrap();
+
// TODO: Bunch of asserts.
debug!("handling metadata request");
- let prepare = message.project(&self.consensus);
+ let prepare = message.project(consensus);
self.pipeline_prepare(prepare);
}
- async fn on_replicate(&self, message: <Self::Consensus as
Consensus>::ReplicateMessage) {
+ async fn on_replicate(&self, message: <VsrConsensus as
Consensus>::ReplicateMessage) {
+ let consensus = self.consensus.as_ref().unwrap();
+ let journal = self.journal.as_ref().unwrap();
+
let header = message.header();
assert_eq!(header.command, Command2::Prepare);
@@ -73,24 +107,24 @@ where
}
// If syncing, ignore the replicate message.
- if self.consensus.is_syncing() {
+ if consensus.is_syncing() {
warn!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
"on_replicate: ignoring (sync)"
);
return;
}
- let current_op = self.consensus.sequencer().current_sequence();
+ let current_op = consensus.sequencer().current_sequence();
// Old message (handle as repair). Not replicating.
- if header.view < self.consensus.view()
- || (self.consensus.status() == Status::Normal
- && header.view == self.consensus.view()
+ if header.view < consensus.view()
+ || (consensus.status() == Status::Normal
+ && header.view == consensus.view()
&& header.op <= current_op)
{
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
"on_replicate: ignoring (repair)"
);
self.on_repair(message);
@@ -98,18 +132,18 @@ where
}
// If status is not normal, ignore the replicate.
- if self.consensus.status() != Status::Normal {
+ if consensus.status() != Status::Normal {
warn!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
"on_replicate: ignoring (not normal state)"
);
return;
}
//if message from future view, we ignore the replicate.
- if header.view > self.consensus.view() {
+ if header.view > consensus.view() {
warn!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
"on_replicate: ignoring (newer view)"
);
return;
@@ -118,9 +152,8 @@ where
// TODO add assertions for valid state here.
// If we are a follower, we advance the commit number.
- if self.consensus.is_follower() {
- self.consensus
- .advance_commit_number(message.header().commit);
+ if consensus.is_follower() {
+ consensus.advance_commit_number(message.header().commit);
}
// TODO verify that the current prepare fits in the WAL.
@@ -128,46 +161,43 @@ where
// TODO handle gap in ops.
// Verify hash chain integrity.
- if let Some(previous) = self.journal.previous_entry(header) {
+ if let Some(previous) = journal.previous_entry(header) {
self.panic_if_hash_chain_would_break_in_same_view(&previous,
header);
}
assert_eq!(header.op, current_op + 1);
- self.consensus.sequencer().set_sequence(header.op);
- self.journal.set_header_as_dirty(header);
+ consensus.sequencer().set_sequence(header.op);
+ journal.set_header_as_dirty(header);
// Append to journal.
- self.journal.append(message.clone()).await;
+ journal.append(message.clone()).await;
// After successful journal write, send prepare_ok to primary.
self.send_prepare_ok(header).await;
// If follower, commit any newly committable entries.
- if self.consensus.is_follower() {
+ if consensus.is_follower() {
self.commit_journal();
}
}
- fn on_ack(&self, message: <Self::Consensus as Consensus>::AckMessage) {
+ fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage) {
+ let consensus = self.consensus.as_ref().unwrap();
let header = message.header();
- if !self.consensus.is_primary() {
+ if !consensus.is_primary() {
warn!("on_ack: ignoring (not primary)");
return;
}
- if self.consensus.status() != Status::Normal {
+ if consensus.status() != Status::Normal {
warn!("on_ack: ignoring (not normal)");
return;
}
// Find the prepare in pipeline
- let Some(mut pipeline) =
self.consensus.pipeline().try_borrow_mut().ok() else {
- warn!("on_ack: could not borrow pipeline (already mutably
borrowed)");
- return;
- };
-
+ let mut pipeline = consensus.pipeline().borrow_mut();
let Some(entry) = pipeline.message_by_op_and_checksum(header.op,
header.prepare_checksum)
else {
debug!("on_ack: prepare not in pipeline op={}", header.op);
@@ -184,34 +214,40 @@ where
let count = entry.add_ack(header.replica);
// Check quorum
- if count >= self.consensus.quorum() && !entry.ok_quorum_received {
+ if count >= consensus.quorum() && !entry.ok_quorum_received {
entry.ok_quorum_received = true;
debug!("on_ack: quorum received for op={}", header.op);
// Advance commit number and trigger commit journal
- self.consensus.advance_commit_number(header.op);
+ consensus.advance_commit_number(header.op);
self.commit_journal();
}
}
}
-impl<M, J, S> IggyMetadata<M, J, S>
+impl<J, S, M> IggyMetadata<IggyMetadataHandle<J, S, M>>
where
J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header =
PrepareHeader>,
{
#[expect(unused)]
fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
+ let consensus = self.consensus.as_ref().unwrap();
+
debug!("inserting prepare into metadata pipeline");
- self.consensus.verify_pipeline();
- self.consensus.pipeline_message(prepare.clone());
+ consensus.verify_pipeline();
+ consensus.pipeline_message(prepare.clone());
self.on_replicate(prepare.clone());
- self.consensus.post_replicate_verify(&prepare);
+ consensus.post_replicate_verify(&prepare);
}
fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
+ let (Some(consensus), Some(journal)) = (&self.consensus,
&self.journal) else {
+ todo!("dispatch fence_old_prepare to shard0");
+ };
+
let header = prepare.header();
- header.op <= self.consensus.commit() ||
self.journal.has_prepare(header)
+ header.op <= consensus.commit() || journal.has_prepare(header)
}
/// Replicate a prepare message to the next replica in the chain.
@@ -221,38 +257,41 @@ where
/// - Each backup forwards to the next
/// - Stops when we would forward back to primary
async fn replicate(&self, message: Message<PrepareHeader>) {
+ let consensus = self.consensus.as_ref().unwrap();
+ let journal = self.journal.as_ref().unwrap();
+
let header = message.header();
assert_eq!(header.command, Command2::Prepare);
assert!(
- !self.journal.has_prepare(header),
+ !journal.has_prepare(header),
"replicate: must not already have prepare"
);
- assert!(header.op > self.consensus.commit());
+ assert!(header.op > consensus.commit());
- let next = (self.consensus.replica() + 1) %
self.consensus.replica_count();
+ let next = (consensus.replica() + 1) % consensus.replica_count();
- let primary = self.consensus.primary_index(header.view);
+ let primary = consensus.primary_index(header.view);
if next == primary {
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
op = header.op,
"replicate: not replicating (ring complete)"
);
return;
}
- assert_ne!(next, self.consensus.replica());
+ assert_ne!(next, consensus.replica());
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
to = next,
op = header.op,
"replicate: forwarding"
);
let message = message.into_generic();
- self.consensus
+ consensus
.message_bus()
.send_to_replica(next, message)
.await
@@ -292,29 +331,32 @@ where
/// Send a prepare_ok message to the primary.
/// Called after successfully writing a prepare to the journal.
async fn send_prepare_ok(&self, header: &PrepareHeader) {
+ let consensus = self.consensus.as_ref().unwrap();
+ let journal = self.journal.as_ref().unwrap();
+
assert_eq!(header.command, Command2::Prepare);
- if self.consensus.status() != Status::Normal {
+ if consensus.status() != Status::Normal {
debug!(
- replica = self.consensus.replica(),
- status = ?self.consensus.status(),
+ replica = consensus.replica(),
+ status = ?consensus.status(),
"send_prepare_ok: not sending (not normal)"
);
return;
}
- if self.consensus.is_syncing() {
+ if consensus.is_syncing() {
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
"send_prepare_ok: not sending (syncing)"
);
return;
}
// Verify we have the prepare and it's persisted (not dirty).
- if !self.journal.has_prepare(header) {
+ if !journal.has_prepare(header) {
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
op = header.op,
"send_prepare_ok: not sending (not persisted or missing)"
);
@@ -322,24 +364,24 @@ where
}
assert!(
- header.view <= self.consensus.view(),
+ header.view <= consensus.view(),
"send_prepare_ok: prepare view {} > our view {}",
header.view,
- self.consensus.view()
+ consensus.view()
);
- if header.op > self.consensus.sequencer().current_sequence() {
+ if header.op > consensus.sequencer().current_sequence() {
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
op = header.op,
- our_op = self.consensus.sequencer().current_sequence(),
+ our_op = consensus.sequencer().current_sequence(),
"send_prepare_ok: not sending (op ahead)"
);
return;
}
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
op = header.op,
checksum = header.checksum,
"send_prepare_ok: sending"
@@ -348,12 +390,12 @@ where
// Use current view, not the prepare's view.
let prepare_ok_header = PrepareOkHeader {
command: Command2::PrepareOk,
- cluster: self.consensus.cluster(),
- replica: self.consensus.replica(),
- view: self.consensus.view(),
+ cluster: consensus.cluster(),
+ replica: consensus.replica(),
+ view: consensus.view(),
epoch: header.epoch,
op: header.op,
- commit: self.consensus.commit(),
+ commit: consensus.commit(),
timestamp: header.timestamp,
parent: header.parent,
prepare_checksum: header.checksum,
@@ -367,23 +409,23 @@ where
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
.transmute_header(|_, new| *new = prepare_ok_header);
let generic_message = message.into_generic();
- let primary = self.consensus.primary_index(self.consensus.view());
+ let primary = consensus.primary_index(consensus.view());
- if primary == self.consensus.replica() {
+ if primary == consensus.replica() {
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
"send_prepare_ok: loopback to self"
);
// TODO: Queue for self-processing or call handle_prepare_ok
directly
} else {
debug!(
- replica = self.consensus.replica(),
+ replica = consensus.replica(),
to = primary,
op = header.op,
"send_prepare_ok: sending to primary"
);
- self.consensus
+ consensus
.message_bus()
.send_to_replica(primary, generic_message)
.await
@@ -391,21 +433,3 @@ where
}
}
}
-
-// TODO: Hide with associated types all of those generics, so they are not
leaking to the upper layer, or maybe even make of the `Metadata` trait itself.
-// Something like this:
-// pub trait MetadataHandle {
-// type Consensus: Consensus<Self::Clock>;
-// type Clock: Clock;
-// type MuxStm;
-// type Journal;
-// type Snapshot;
-// }
-
-// pub trait Metadata<H: MetadataHandle> {
-// fn on_request(&self, message: <H::Consensus as
Consensus<H::Clock>>::RequestMessage); // Create type aliases for those long
associated types
-// fn on_replicate(&self, message: <H::Consensus as
Consensus<H::Clock>>::ReplicateMessage);
-// fn on_ack(&self, message: <H::Consensus as
Consensus<H::Clock>>::AckMessage);
-// }
-
-// The error messages can get ugly from those associated types, but I think
it's worth the fact that it hides a lot of the generics and their bounds.
diff --git a/core/metadata/src/stm/consumer_group.rs
b/core/metadata/src/stm/consumer_group.rs
index 944627ad7..3a3e947fc 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,215 +15,212 @@
// specific language governing permissions and limitations
// under the License.
-use crate::stm::{ApplyState, StateCommand};
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
use ahash::AHashMap;
-use bytes::Bytes;
use iggy_common::create_consumer_group::CreateConsumerGroup;
use iggy_common::delete_consumer_group::DeleteConsumerGroup;
-use iggy_common::{
- BytesSerializable, IggyTimestamp,
- header::{Operation, PrepareHeader},
- message::Message,
-};
+use iggy_common::{IdKind, Identifier};
use slab::Slab;
-use std::cell::RefCell;
-
-// ============================================================================
-// ConsumerGroupMember - Individual member of a consumer group
-// ============================================================================
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
#[derive(Debug, Clone)]
pub struct ConsumerGroupMember {
- pub id: u32,
- pub joined_at: IggyTimestamp,
+ pub id: usize,
+ pub client_id: u32,
+ pub partitions: Vec<usize>,
+ pub partition_index: Arc<AtomicUsize>,
}
impl ConsumerGroupMember {
- pub fn new(id: u32, joined_at: IggyTimestamp) -> Self {
- Self { id, joined_at }
+ pub fn new(id: usize, client_id: u32) -> Self {
+ Self {
+ id,
+ client_id,
+ partitions: Vec::new(),
+ partition_index: Arc::new(AtomicUsize::new(0)),
+ }
}
}
-// ============================================================================
-// ConsumerGroup - A group of consumers
-// ============================================================================
-
#[derive(Debug, Clone)]
pub struct ConsumerGroup {
pub id: usize,
- pub stream_id: usize,
- pub topic_id: usize,
- pub name: String,
- pub created_at: IggyTimestamp,
- pub members: Vec<ConsumerGroupMember>,
+ pub name: Arc<str>,
+ pub partitions: Vec<usize>,
+ pub members: Slab<ConsumerGroupMember>,
}
impl ConsumerGroup {
- pub fn new(stream_id: usize, topic_id: usize, name: String, created_at:
IggyTimestamp) -> Self {
+ pub fn new(name: Arc<str>) -> Self {
Self {
id: 0,
- stream_id,
- topic_id,
name,
- created_at,
- members: Vec::new(),
+ partitions: Vec::new(),
+ members: Slab::new(),
}
}
- pub fn add_member(&mut self, member: ConsumerGroupMember) {
- self.members.push(member);
- }
+ pub fn rebalance_members(&mut self) {
+ let partition_count = self.partitions.len();
+ let member_count = self.members.len();
- pub fn remove_member(&mut self, member_id: u32) ->
Option<ConsumerGroupMember> {
- if let Some(pos) = self.members.iter().position(|m| m.id == member_id)
{
- Some(self.members.remove(pos))
- } else {
- None
+ if member_count == 0 || partition_count == 0 {
+ return;
}
- }
-
- pub fn members_count(&self) -> usize {
- self.members.len()
- }
-}
-
-// ============================================================================
-// ConsumerGroups Collection
-// ============================================================================
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroups {
- // Global index for all consumer groups across all streams/topics
- index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id,
topic_id, name) -> id
- items: RefCell<Slab<ConsumerGroup>>,
-}
-
-impl ConsumerGroups {
- pub fn new() -> Self {
- Self {
- index: RefCell::new(AHashMap::with_capacity(256)),
- items: RefCell::new(Slab::with_capacity(256)),
+ let member_ids: Vec<usize> = self.members.iter().map(|(id, _)|
id).collect();
+ for &member_id in &member_ids {
+ if let Some(member) = self.members.get_mut(member_id) {
+ member.partitions.clear();
+ }
}
- }
-
- /// Insert a consumer group and return the assigned ID
- pub fn insert(&self, group: ConsumerGroup) -> usize {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
- let key = (group.stream_id, group.topic_id, group.name.clone());
- let id = items.insert(group);
- items[id].id = id;
- index.insert(key, id);
- id
+ for (i, &partition_id) in self.partitions.iter().enumerate() {
+ let member_idx = i % member_count;
+ if let Some(&member_id) = member_ids.get(member_idx)
+ && let Some(member) = self.members.get_mut(member_id)
+ {
+ member.partitions.push(partition_id);
+ }
+ }
}
+}
- /// Get consumer group by ID
- pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
- self.items.borrow().get(id).cloned()
- }
+define_state! {
+ ConsumerGroups {
+ name_index: AHashMap<Arc<str>, usize>,
+ topic_index: AHashMap<(usize, usize), Vec<usize>>,
+ topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>>,
+ items: Slab<ConsumerGroup>,
+ },
+ [CreateConsumerGroup, DeleteConsumerGroup]
+}
+impl_absorb!(ConsumerGroupsInner, ConsumerGroupsCommand);
- /// Get consumer group by stream_id, topic_id, and name
- pub fn get_by_location(
+impl ConsumerGroupsInner {
+ fn resolve_consumer_group_id_by_identifiers(
&self,
- stream_id: usize,
- topic_id: usize,
- name: &str,
- ) -> Option<ConsumerGroup> {
- let index = self.index.borrow();
- let key = (stream_id, topic_id, name.to_string());
- if let Some(&id) = index.get(&key) {
- self.items.borrow().get(id).cloned()
- } else {
- None
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ group_id: &Identifier,
+ ) -> Option<usize> {
+ // Resolve by numeric IDs
+ if let (Ok(s), Ok(t)) = (stream_id.get_u32_value(),
topic_id.get_u32_value()) {
+ let groups_in_topic = self.topic_index.get(&(s as usize, t as
usize))?;
+
+ return match group_id.kind {
+ IdKind::Numeric => {
+ let g_id = group_id.get_u32_value().ok()? as usize;
+ groups_in_topic.contains(&g_id).then_some(g_id)
+ }
+ IdKind::String => {
+ let g_name = group_id.get_string_value().ok()?;
+ groups_in_topic
+ .iter()
+ .find(|&&id| {
+ self.items
+ .get(id)
+ .is_some_and(|g| g.name.as_ref() == g_name)
+ })
+ .copied()
+ }
+ };
}
- }
- /// Remove consumer group by ID
- pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
+ // Resolve by string names
+ if let (Ok(s), Ok(t)) = (stream_id.get_string_value(),
topic_id.get_string_value()) {
+ let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
+ let groups_in_topic = self.topic_name_index.get(&key)?;
- if !items.contains(id) {
- return None;
+ return match group_id.kind {
+ IdKind::Numeric => {
+ let g_id = group_id.get_u32_value().ok()? as usize;
+ groups_in_topic.contains(&g_id).then_some(g_id)
+ }
+ IdKind::String => {
+ let g_name = group_id.get_string_value().ok()?;
+ groups_in_topic
+ .iter()
+ .find(|&&id| {
+ self.items
+ .get(id)
+ .is_some_and(|g| g.name.as_ref() == g_name)
+ })
+ .copied()
+ }
+ };
}
- let group = items.remove(id);
- let key = (group.stream_id, group.topic_id, group.name.clone());
- index.remove(&key);
- Some(group)
+ None
}
+}
- /// Get all consumer groups for a specific topic
- pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) ->
Vec<ConsumerGroup> {
- self.items
- .borrow()
- .iter()
- .filter_map(|(_, g)| {
- if g.stream_id == stream_id && g.topic_id == topic_id {
- Some(g.clone())
- } else {
- None
+impl Handler for ConsumerGroupsInner {
+ fn handle(&mut self, cmd: &Self::Cmd) {
+ // TODO: This is all an hack, we need to figure out how to do this, in
a way where `Identifier` does not reach
+ // this stage of execution.
+ match cmd {
+ ConsumerGroupsCommand::CreateConsumerGroup(payload) => {
+ let name: Arc<str> = Arc::from(payload.name.as_str());
+ if self.name_index.contains_key(&name) {
+ return;
}
- })
- .collect()
- }
-
- /// Get number of consumer groups
- pub fn len(&self) -> usize {
- self.items.borrow().len()
- }
- /// Check if empty
- pub fn is_empty(&self) -> bool {
- self.items.borrow().is_empty()
- }
+ let group = ConsumerGroup::new(name.clone());
+ let id = self.items.insert(group);
+ self.items[id].id = id;
- /// Get all consumer groups
- pub fn values(&self) -> Vec<ConsumerGroup> {
- self.items
- .borrow()
- .iter()
- .map(|(_, g): (usize, &ConsumerGroup)| g.clone())
- .collect()
- }
-}
+ self.name_index.insert(name.clone(), id);
-#[derive(Debug)]
-pub enum ConsumerGroupsCommand {
- Create(CreateConsumerGroup),
- Delete(DeleteConsumerGroup),
-}
-
-impl StateCommand for ConsumerGroups {
- type Command = ConsumerGroupsCommand;
- type Input = Message<PrepareHeader>;
-
- fn into_command(input: &Self::Input) -> Option<Self::Command> {
- // TODO: rework this thing, so we don't copy the bytes on each request
- let body = Bytes::copy_from_slice(input.body());
- match input.header().operation {
- Operation::CreateConsumerGroup =>
Some(ConsumerGroupsCommand::Create(
- CreateConsumerGroup::from_bytes(body.clone()).unwrap(),
- )),
- Operation::DeleteConsumerGroup =>
Some(ConsumerGroupsCommand::Delete(
- DeleteConsumerGroup::from_bytes(body.clone()).unwrap(),
- )),
- _ => None,
- }
- }
-}
-
-impl ApplyState for ConsumerGroups {
- type Output = ();
+ if let (Ok(s), Ok(t)) = (
+ payload.stream_id.get_u32_value(),
+ payload.topic_id.get_u32_value(),
+ ) {
+ self.topic_index
+ .entry((s as usize, t as usize))
+ .or_default()
+ .push(id);
+ }
- fn do_apply(&self, cmd: Self::Command) -> Self::Output {
- match cmd {
- ConsumerGroupsCommand::Create(payload) => {
- todo!("Handle Create consumer group with {:?}", payload)
+ if let (Ok(s), Ok(t)) = (
+ payload.stream_id.get_string_value(),
+ payload.topic_id.get_string_value(),
+ ) {
+ let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
+ self.topic_name_index.entry(key).or_default().push(id);
+ }
}
- ConsumerGroupsCommand::Delete(payload) => {
- todo!("Handle Delete consumer group with {:?}", payload)
+
+ ConsumerGroupsCommand::DeleteConsumerGroup(payload) => {
+ if let Some(id) =
self.resolve_consumer_group_id_by_identifiers(
+ &payload.stream_id,
+ &payload.topic_id,
+ &payload.group_id,
+ ) {
+ let group = self.items.remove(id);
+
+ self.name_index.remove(&group.name);
+
+ if let (Ok(s), Ok(t)) = (
+ payload.stream_id.get_u32_value(),
+ payload.topic_id.get_u32_value(),
+ ) && let Some(vec) = self.topic_index.get_mut(&(s as
usize, t as usize))
+ {
+ vec.retain(|&x| x != id);
+ }
+
+ if let (Ok(s), Ok(t)) = (
+ payload.stream_id.get_string_value(),
+ payload.topic_id.get_string_value(),
+ ) {
+ let key = (Arc::from(s.as_str()),
Arc::from(t.as_str()));
+ if let Some(vec) = self.topic_name_index.get_mut(&key)
{
+ vec.retain(|&x| x != id);
+ }
+ }
+ }
}
}
}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e6c13684b..7f3baa0c8 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -1,4 +1,3 @@
-// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -20,109 +19,254 @@ pub mod mux;
pub mod stream;
pub mod user;
-/// Macro to generate a `{State}Command` enum and implement `StateCommand`
trait.
-///
-/// # Arguments
-/// * `$state_type` - The type that implements `ApplyState` trait
-/// * `$command_enum` - The name of the command enum to generate (e.g.,
StreamsCommand)
-/// * `$operations` - Array of Operation enum variants (also used as payload
type names)
-///
-/// # Example
-/// ```ignore
-/// define_state_command! {
-/// Streams,
-/// StreamsCommand,
-/// [CreateStream, UpdateStream, DeleteStream, PurgeStream]
-/// }
-/// ```
-#[macro_export]
-macro_rules! define_state_command {
- (
- $state_type:ty,
- $command_enum:ident,
- [$($operation:ident),* $(,)?]
- ) => {
- #[derive(Debug)]
- pub enum $command_enum {
- $(
- $operation($operation),
- )*
- }
+use left_right::*;
+use std::cell::UnsafeCell;
+use std::sync::Arc;
- impl $crate::stm::StateCommand for $state_type {
- type Command = $command_enum;
- type Input =
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
-
- fn into_command(input: &Self::Input) -> Option<Self::Command> {
- use ::iggy_common::BytesSerializable;
- use ::bytes::Bytes;
- use ::iggy_common::header::Operation;
-
- // TODO: rework this thing, so we don't copy the bytes on each
request
- let body = Bytes::copy_from_slice(input.body());
- match input.header().operation {
- $(
- Operation::$operation => {
- Some($command_enum::$operation(
- $operation::from_bytes(body.clone()).unwrap()
- ))
- },
- )*
- _ => None,
- }
- }
+pub struct WriteCell<T, O>
+where
+ T: Absorb<O>,
+{
+ inner: UnsafeCell<WriteHandle<T, O>>,
+}
+
+impl<T, O> WriteCell<T, O>
+where
+ T: Absorb<O>,
+{
+ pub fn new(write: WriteHandle<T, O>) -> Self {
+ Self {
+ inner: UnsafeCell::new(write),
}
+ }
- // Compile-time check that the type implements ApplyState
- const _: () = {
- const fn assert_impl_apply_state<T: $crate::stm::ApplyState>() {}
- assert_impl_apply_state::<$state_type>();
+ pub fn apply(&self, cmd: O) {
+ let hdl = unsafe {
+ self.inner
+ .get()
+ .as_mut()
+ .expect("[apply]: called on uninit writer, for cmd: {cmd}")
};
- };
+ hdl.append(cmd).publish();
+ }
}
-// This is public interface to state, therefore it will be imported from
different crate, for now during development I am leaving it there.
-pub trait State
+/// Parses type-erased input into a command. Macro-generated.
+pub trait Command {
+ type Cmd;
+ type Input;
+
+ fn parse(input: &Self::Input) -> Option<Self::Cmd>;
+}
+
+/// Handles commands. User-implemented business logic.
+pub trait Handler: Command {
+ fn handle(&mut self, cmd: &Self::Cmd);
+}
+
+pub struct LeftRight<T, C>
+where
+ T: Absorb<C>,
+{
+ write: Option<WriteCell<T, C>>,
+ #[allow(unused)]
+ read: Arc<ReadHandle<T>>,
+}
+
+impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
where
- Self: Sized,
+ T: Absorb<<T as Command>::Cmd> + Clone + Command,
{
+ fn from(inner: T) -> Self {
+ let (write, read) = {
+ let (w, r) = left_right::new_from_empty(inner);
+ (WriteCell::new(w).into(), r.into())
+ };
+ Self { write, read }
+ }
+}
+
+impl<T> LeftRight<T, <T as Command>::Cmd>
+where
+ T: Absorb<<T as Command>::Cmd> + Clone + Handler,
+{
+ pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
+ self.write
+ .as_ref()
+ .expect("no write handle - not the owner shard")
+ .apply(cmd);
+ }
+}
+
+/// Public interface for state machines.
+pub trait State {
type Output;
type Input;
- // Apply the state machine logic and return an optional output.
- // The output is optional, as we model the `StateMachine`, as an variadic
list,
- // where not all state machines will produce an output for every input
event.
fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
}
-// TODO: This interface should be private to the stm module.
pub trait StateMachine {
type Input;
type Output;
fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
}
-pub trait StateCommand {
- type Command;
- type Input;
+/// Generates a state machine with convention-based storage.
+///
+/// # Generated items
+/// - `{$state}Inner` struct with the specified fields (the data)
+/// - `{$state}Command` enum with variants for each operation
+/// - `$state` wrapper struct (non-generic, contains LeftRight storage)
+/// - `Command` impl for `{$state}Inner` (parsing)
+/// - `State` impl for `$state`
+/// - `From<LeftRight<...>>` impl for `$state`
+///
+/// # User must implement
+/// - `Handler` for `{$state}Inner` (business logic)
+/// - `impl_absorb!` for `{$state}Inner` and `{$state}Command`
+///
+/// # Example
+/// ```ignore
+/// define_state! {
+/// Streams {
+/// index: AHashMap<String, usize>,
+/// items: Slab<Stream>,
+/// },
+/// [CreateStream, UpdateStream, DeleteStream]
+/// }
+///
+/// // User implements Handler manually:
+/// impl Handler for StreamsInner {
+/// fn handle(&mut self, cmd: &StreamsCommand) {
+/// match cmd {
+/// StreamsCommand::CreateStream(payload) => { /* ... */ }
+/// // ...
+/// }
+/// }
+/// }
+///
+/// // User implements Absorb via macro:
+/// impl_absorb!(StreamsInner, StreamsCommand);
+/// ```
+// TODO: The `operation` argument can be removed, once we create an trait for
mapping.
+#[macro_export]
+macro_rules! define_state {
+ (
+ $state:ident {
+ $($field_name:ident : $field_type:ty),* $(,)?
+ },
+ [$($operation:ident),* $(,)?]
+ ) => {
+ paste::paste! {
+ #[derive(Debug, Clone, Default)]
+ pub struct [<$state Inner>] {
+ $(
+ pub $field_name: $field_type,
+ )*
+ }
- fn into_command(input: &Self::Input) -> Option<Self::Command>;
-}
+ impl [<$state Inner>] {
+ pub fn new() -> Self {
+ Self::default()
+ }
+ }
-pub trait ApplyState: StateCommand {
- type Output;
+ #[derive(Debug, Clone)]
+ pub enum [<$state Command>] {
+ $(
+ $operation($operation),
+ )*
+ }
+
+ pub struct $state {
+ inner: $crate::stm::LeftRight<[<$state Inner>], [<$state
Command>]>,
+ }
+
+ impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state
Command>]>> for $state {
+ fn from(inner: $crate::stm::LeftRight<[<$state Inner>],
[<$state Command>]>) -> Self {
+ Self { inner }
+ }
+ }
+
+ impl From<[<$state Inner>]> for $state {
+ fn from(inner: [<$state Inner>]) -> Self {
+ let left_right: $crate::stm::LeftRight<[<$state Inner>],
[<$state Command>]> = inner.into();
+ left_right.into()
+ }
+ }
+
+ impl $crate::stm::State for $state {
+ type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
+ type Output = ();
- fn do_apply(&self, cmd: Self::Command) -> Self::Output;
+ fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+ <[<$state Inner>] as $crate::stm::Command>::parse(input)
+ .map(|cmd| self.inner.do_apply(cmd))
+ }
+ }
+
+ impl $crate::stm::Command for [<$state Inner>] {
+ type Cmd = [<$state Command>];
+ type Input =
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+
+ fn parse(input: &Self::Input) -> Option<Self::Cmd> {
+ use ::iggy_common::BytesSerializable;
+ use ::iggy_common::header::Operation;
+
+ let body = input.body_bytes();
+ match input.header().operation {
+ $(
+ Operation::$operation => {
+ Some([<$state Command>]::$operation(
+ $operation::from_bytes(body).unwrap()
+ ))
+ },
+ )*
+ _ => None,
+ }
+ }
+ }
+ }
+ };
}
-impl<T> State for T
-where
- T: ApplyState,
-{
- type Output = T::Output;
- type Input = T::Input;
+// This macro is really sad, but we can't do blanket impl from below, due to
orphan rule.
+// impl<T> Absorb<T::Cmd> for T
+// where
+// T: Handler + Clone,
+// {
+// fn absorb_first(&mut self, cmd: &mut T::Cmd, _other: &Self) {
+// self.handle(cmd);
- fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
- T::into_command(input).map(|cmd| self.do_apply(cmd))
- }
+// }
+
+// fn absorb_second(&mut self, cmd: T::Cmd, _other: &Self) {
+// self.handle(&cmd);
+// }
+
+// fn sync_with(&mut self, first: &Self) {
+// *self = first.clone();
+// }
+
+// fn drop_first(self: Box<Self>) {}
+// fn drop_second(self: Box<Self>) {}
+// }
+#[macro_export]
+macro_rules! impl_absorb {
+ ($inner:ident, $cmd:ident) => {
+ impl left_right::Absorb<$cmd> for $inner {
+ fn absorb_first(&mut self, cmd: &mut $cmd, _other: &Self) {
+ self.handle(cmd);
+ }
+
+ fn absorb_second(&mut self, cmd: $cmd, _other: &Self) {
+ self.handle(&cmd);
+ }
+
+ fn sync_with(&mut self, first: &Self) {
+ *self = first.clone();
+ }
+ }
+ };
}
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 3fc80d88b..979ff3ccf 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -92,14 +92,16 @@ mod tests {
#[test]
fn construct_mux_state_machine_from_states_with_same_output() {
- use crate::stm::*;
+ use crate::stm::StateMachine;
+ use crate::stm::mux::MuxStateMachine;
+ use crate::stm::stream::{Streams, StreamsInner};
+ use crate::stm::user::{Users, UsersInner};
use iggy_common::header::PrepareHeader;
use iggy_common::message::Message;
- let users = user::Users::new();
- let streams = stream::Streams::new();
- let cgs = consumer_group::ConsumerGroups::new();
- let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+ let users: Users = UsersInner::new().into();
+ let streams: Streams = StreamsInner::new().into();
+ let mux = MuxStateMachine::new(variadic!(users, streams));
let input = Message::new(std::mem::size_of::<PrepareHeader>());
let mut output = Vec::new();
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 6f05e5aa4..5106d6122 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,169 +15,41 @@
// specific language governing permissions and limitations
// under the License.
-use crate::define_state_command;
use crate::stats::{StreamStats, TopicStats};
-use crate::stm::ApplyState;
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
use ahash::AHashMap;
use iggy_common::create_partitions::CreatePartitions;
use iggy_common::create_stream::CreateStream;
use iggy_common::create_topic::CreateTopic;
use iggy_common::delete_partitions::DeletePartitions;
-use iggy_common::delete_segments::DeleteSegments;
use iggy_common::delete_stream::DeleteStream;
use iggy_common::delete_topic::DeleteTopic;
use iggy_common::purge_stream::PurgeStream;
use iggy_common::purge_topic::PurgeTopic;
use iggy_common::update_stream::UpdateStream;
use iggy_common::update_topic::UpdateTopic;
-use iggy_common::{
- CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp,
MaxTopicSize,
-};
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp,
MaxTopicSize};
use slab::Slab;
-use std::cell::RefCell;
use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
#[derive(Debug, Clone)]
pub struct Partition {
pub id: usize,
-}
-
-impl Partition {
- pub fn new(id: usize) -> Self {
- Self { id }
- }
-}
-
-#[derive(Debug, Clone, Default)]
-pub struct Partitions {
- items: RefCell<Slab<Partition>>,
-}
-
-impl Partitions {
- pub fn new() -> Self {
- Self {
- items: RefCell::new(Slab::with_capacity(1024)),
- }
- }
-
- pub fn insert(&self, partition: Partition) -> usize {
- let mut items = self.items.borrow_mut();
- let id = items.insert(partition);
- items[id].id = id;
- id
- }
-
- pub fn get(&self, id: usize) -> Option<Partition> {
- self.items.borrow().get(id).cloned()
- }
-
- pub fn remove(&self, id: usize) -> Option<Partition> {
- let mut items = self.items.borrow_mut();
- if items.contains(id) {
- Some(items.remove(id))
- } else {
- None
- }
- }
-
- pub fn len(&self) -> usize {
- self.items.borrow().len()
- }
-
- pub fn is_empty(&self) -> bool {
- self.items.borrow().is_empty()
- }
-
- pub fn iter(&self) -> Vec<Partition> {
- self.items
- .borrow()
- .iter()
- .map(|(_, p): (usize, &Partition)| p.clone())
- .collect()
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct ConsumerGroup {
- pub id: usize,
- pub name: String,
pub created_at: IggyTimestamp,
}
-impl ConsumerGroup {
- pub fn new(name: String, created_at: IggyTimestamp) -> Self {
- Self {
- id: 0,
- name,
- created_at,
- }
- }
-}
-
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroups {
- index: RefCell<AHashMap<String, usize>>,
- items: RefCell<Slab<ConsumerGroup>>,
-}
-
-impl ConsumerGroups {
- pub fn new() -> Self {
- Self {
- index: RefCell::new(AHashMap::with_capacity(256)),
- items: RefCell::new(Slab::with_capacity(256)),
- }
- }
-
- pub fn insert(&self, group: ConsumerGroup) -> usize {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- let name = group.name.clone();
- let id = items.insert(group);
- items[id].id = id;
- index.insert(name, id);
- id
- }
-
- pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
- self.items.borrow().get(id).cloned()
- }
-
- pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> {
- let index = self.index.borrow();
- if let Some(&id) = index.get(name) {
- self.items.borrow().get(id).cloned()
- } else {
- None
- }
- }
-
- pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- if !items.contains(id) {
- return None;
- }
-
- let group = items.remove(id);
- index.remove(&group.name);
- Some(group)
- }
-
- pub fn len(&self) -> usize {
- self.items.borrow().len()
- }
-
- pub fn is_empty(&self) -> bool {
- self.items.borrow().is_empty()
+impl Partition {
+ pub fn new(id: usize, created_at: IggyTimestamp) -> Self {
+ Self { id, created_at }
}
}
#[derive(Debug, Clone)]
pub struct Topic {
pub id: usize,
- pub name: String,
+ pub name: Arc<str>,
pub created_at: IggyTimestamp,
pub replication_factor: u8,
pub message_expiry: IggyExpiry,
@@ -185,13 +57,30 @@ pub struct Topic {
pub max_topic_size: MaxTopicSize,
pub stats: Arc<TopicStats>,
- pub partitions: Partitions,
- pub consumer_groups: ConsumerGroups,
+ pub partitions: Vec<Partition>,
+ pub round_robin_counter: Arc<AtomicUsize>,
+}
+
+impl Default for Topic {
+ fn default() -> Self {
+ Self {
+ id: 0,
+ name: Arc::from(""),
+ created_at: IggyTimestamp::default(),
+ replication_factor: 1,
+ message_expiry: IggyExpiry::default(),
+ compression_algorithm: CompressionAlgorithm::default(),
+ max_topic_size: MaxTopicSize::default(),
+ stats: Arc::new(TopicStats::default()),
+ partitions: Vec::new(),
+ round_robin_counter: Arc::new(AtomicUsize::new(0)),
+ }
+ }
}
impl Topic {
pub fn new(
- name: String,
+ name: Arc<str>,
created_at: IggyTimestamp,
replication_factor: u8,
message_expiry: IggyExpiry,
@@ -208,293 +97,336 @@ impl Topic {
compression_algorithm,
max_topic_size,
stats: Arc::new(TopicStats::new(stream_stats)),
- partitions: Partitions::new(),
- consumer_groups: ConsumerGroups::new(),
+ partitions: Vec::new(),
+ round_robin_counter: Arc::new(AtomicUsize::new(0)),
}
}
}
-#[derive(Debug, Clone, Default)]
-pub struct Topics {
- index: RefCell<AHashMap<String, usize>>,
- items: RefCell<Slab<Topic>>,
+#[derive(Debug)]
+pub struct Stream {
+ pub id: usize,
+ pub name: Arc<str>,
+ pub created_at: IggyTimestamp,
+
+ pub stats: Arc<StreamStats>,
+ pub topics: Slab<Topic>,
+ pub topic_index: AHashMap<Arc<str>, usize>,
}
-impl Topics {
- pub fn new() -> Self {
+impl Default for Stream {
+ fn default() -> Self {
Self {
- index: RefCell::new(AHashMap::with_capacity(1024)),
- items: RefCell::new(Slab::with_capacity(1024)),
- }
- }
-
- pub fn insert(&self, topic: Topic) -> usize {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- let name = topic.name.clone();
- let id = items.insert(topic);
- items[id].id = id;
- index.insert(name, id);
- id
- }
-
- pub fn get(&self, id: usize) -> Option<Topic> {
- self.items.borrow().get(id).cloned()
- }
-
- pub fn get_by_name(&self, name: &str) -> Option<Topic> {
- let index = self.index.borrow();
- if let Some(&id) = index.get(name) {
- self.items.borrow().get(id).cloned()
- } else {
- None
- }
- }
-
- pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> {
- match identifier.kind {
- iggy_common::IdKind::Numeric => {
- if let Ok(id) = identifier.get_u32_value() {
- self.get(id as usize)
- } else {
- None
- }
- }
- iggy_common::IdKind::String => {
- if let Ok(name) = identifier.get_string_value() {
- self.get_by_name(&name)
- } else {
- None
- }
- }
+ id: 0,
+ name: Arc::from(""),
+ created_at: IggyTimestamp::default(),
+ stats: Arc::new(StreamStats::default()),
+ topics: Slab::new(),
+ topic_index: AHashMap::default(),
}
}
+}
- pub fn remove(&self, id: usize) -> Option<Topic> {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- if !items.contains(id) {
- return None;
+impl Clone for Stream {
+ fn clone(&self) -> Self {
+ Self {
+ id: self.id,
+ name: self.name.clone(),
+ created_at: self.created_at,
+ stats: self.stats.clone(),
+ topics: self.topics.clone(),
+ topic_index: self.topic_index.clone(),
}
-
- let topic = items.remove(id);
- index.remove(&topic.name);
- Some(topic)
}
-
- pub fn len(&self) -> usize {
- self.items.borrow().len()
- }
-
- pub fn is_empty(&self) -> bool {
- self.items.borrow().is_empty()
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct Stream {
- pub id: usize,
- pub name: String,
- pub created_at: IggyTimestamp,
-
- pub stats: Arc<StreamStats>,
- pub topics: Topics,
}
impl Stream {
- pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+ pub fn new(name: Arc<str>, created_at: IggyTimestamp) -> Self {
Self {
id: 0,
name,
created_at,
stats: Arc::new(StreamStats::default()),
- topics: Topics::new(),
+ topics: Slab::new(),
+ topic_index: AHashMap::default(),
}
}
-}
-#[derive(Debug, Clone, Default)]
-pub struct Streams {
- index: RefCell<AHashMap<String, usize>>,
- items: RefCell<Slab<Stream>>,
-}
-
-impl Streams {
- pub fn new() -> Self {
+ pub fn with_stats(name: Arc<str>, created_at: IggyTimestamp, stats:
Arc<StreamStats>) -> Self {
Self {
- index: RefCell::new(AHashMap::with_capacity(256)),
- items: RefCell::new(Slab::with_capacity(256)),
+ id: 0,
+ name,
+ created_at,
+ stats,
+ topics: Slab::new(),
+ topic_index: AHashMap::default(),
}
}
+}
- pub fn insert(&self, stream: Stream) -> usize {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- let name = stream.name.clone();
- let id = items.insert(stream);
- items[id].id = id;
- index.insert(name, id);
- id
- }
-
- pub fn get(&self, id: usize) -> Option<Stream> {
- self.items.borrow().get(id).cloned()
- }
-
- pub fn get_by_name(&self, name: &str) -> Option<Stream> {
- let index = self.index.borrow();
- if let Some(&id) = index.get(name) {
- self.items.borrow().get(id).cloned()
- } else {
- None
- }
- }
+define_state! {
+ Streams {
+ index: AHashMap<Arc<str>, usize>,
+ items: Slab<Stream>,
+ },
+ [
+ CreateStream,
+ UpdateStream,
+ DeleteStream,
+ PurgeStream,
+ CreateTopic,
+ UpdateTopic,
+ DeleteTopic,
+ PurgeTopic,
+ CreatePartitions,
+ DeletePartitions
+ ]
+}
+impl_absorb!(StreamsInner, StreamsCommand);
- pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream>
{
+impl StreamsInner {
+ fn resolve_stream_id(&self, identifier: &iggy_common::Identifier) ->
Option<usize> {
+ use iggy_common::IdKind;
match identifier.kind {
- iggy_common::IdKind::Numeric => {
- if let Ok(id) = identifier.get_u32_value() {
- self.get(id as usize)
+ IdKind::Numeric => {
+ let id = identifier.get_u32_value().ok()? as usize;
+ if self.items.contains(id) {
+ Some(id)
} else {
None
}
}
- iggy_common::IdKind::String => {
- if let Ok(name) = identifier.get_string_value() {
- self.get_by_name(&name)
- } else {
- None
- }
+ IdKind::String => {
+ let name = identifier.get_string_value().ok()?;
+ self.index.get(name.as_str()).copied()
}
}
}
- pub fn remove(&self, id: usize) -> Option<Stream> {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
+ fn resolve_topic_id(
+ &self,
+ stream_id: usize,
+ identifier: &iggy_common::Identifier,
+ ) -> Option<usize> {
+ use iggy_common::IdKind;
+ let stream = self.items.get(stream_id)?;
- if !items.contains(id) {
- return None;
- }
-
- let stream = items.remove(id);
- index.remove(&stream.name);
- Some(stream)
- }
-
- pub fn update_name(&self, identifier: &Identifier, new_name: String) ->
Result<(), IggyError> {
- let stream = self.get_by_identifier(identifier);
- if let Some(stream) = stream {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- index.remove(&stream.name);
- if let Some(s) = items.get_mut(stream.id) {
- s.name = new_name.clone();
+ match identifier.kind {
+ IdKind::Numeric => {
+ let id = identifier.get_u32_value().ok()? as usize;
+ if stream.topics.contains(id) {
+ Some(id)
+ } else {
+ None
+ }
+ }
+ IdKind::String => {
+ let name = identifier.get_string_value().ok()?;
+ stream.topic_index.get(name.as_str()).copied()
}
- index.insert(new_name, stream.id);
- Ok(())
- } else {
- Err(IggyError::ResourceNotFound("Stream".to_string()))
- }
- }
-
- pub fn purge(&self, id: usize) -> Result<(), IggyError> {
- let items = self.items.borrow();
- if let Some(_stream) = items.get(id) {
- // TODO: Purge all topics in the stream
- Ok(())
- } else {
- Err(IggyError::ResourceNotFound("Stream".to_string()))
}
}
-
- pub fn len(&self) -> usize {
- self.items.borrow().len()
- }
-
- pub fn is_empty(&self) -> bool {
- self.items.borrow().is_empty()
- }
-
- pub fn iter(&self) -> Vec<Stream> {
- self.items
- .borrow()
- .iter()
- .map(|(_, s): (usize, &Stream)| s.clone())
- .collect()
- }
}
-// Define StreamsCommand enum and StateCommand implementation using the macro
-define_state_command! {
- Streams,
- StreamsCommand,
- [CreateStream, UpdateStream, DeleteStream, PurgeStream]
-}
-
-impl ApplyState for Streams {
- type Output = ();
-
- fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+impl Handler for StreamsInner {
+ fn handle(&mut self, cmd: &StreamsCommand) {
match cmd {
StreamsCommand::CreateStream(payload) => {
- todo!("Handle Create stream with {:?}", payload)
+ let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+ if self.index.contains_key(&name_arc) {
+ return;
+ }
+
+ let stream = Stream {
+ id: 0,
+ name: name_arc.clone(),
+ created_at: iggy_common::IggyTimestamp::now(),
+ stats: Arc::new(StreamStats::default()),
+ topics: Slab::new(),
+ topic_index: AHashMap::default(),
+ };
+
+ let id = self.items.insert(stream);
+ if let Some(stream) = self.items.get_mut(id) {
+ stream.id = id;
+ }
+ self.index.insert(name_arc, id);
}
StreamsCommand::UpdateStream(payload) => {
- todo!("Handle Update stream with {:?}", payload)
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
+ let Some(stream) = self.items.get_mut(stream_id) else {
+ return;
+ };
+
+ let new_name_arc: Arc<str> = Arc::from(payload.name.as_str());
+ if let Some(&existing_id) = self.index.get(&new_name_arc)
+ && existing_id != stream_id
+ {
+ return;
+ }
+
+ self.index.remove(&stream.name);
+ stream.name = new_name_arc.clone();
+ self.index.insert(new_name_arc, stream_id);
}
StreamsCommand::DeleteStream(payload) => {
- todo!("Handle Delete stream with {:?}", payload)
- }
- StreamsCommand::PurgeStream(payload) => todo!("Handle Purge stream
with {:?}", payload),
- }
- }
-}
-
-// Define TopicsCommand enum and StateCommand implementation using the macro
-define_state_command! {
- Topics,
- TopicsCommand,
- [CreateTopic, UpdateTopic, DeleteTopic, PurgeTopic]
-}
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
-impl ApplyState for Topics {
- type Output = ();
+ if let Some(stream) = self.items.get(stream_id) {
+ let name = stream.name.clone();
- fn do_apply(&self, cmd: Self::Command) -> Self::Output {
- match cmd {
- TopicsCommand::CreateTopic(payload) => todo!("Handle Create topic
with {:?}", payload),
- TopicsCommand::UpdateTopic(payload) => todo!("Handle Update topic
with {:?}", payload),
- TopicsCommand::DeleteTopic(payload) => todo!("Handle Delete topic
with {:?}", payload),
- TopicsCommand::PurgeTopic(payload) => todo!("Handle Purge topic
with {:?}", payload),
- }
- }
-}
+ self.items.remove(stream_id);
+ self.index.remove(&name);
+ }
+ }
+ StreamsCommand::PurgeStream(_payload) => {
+ // TODO
+ todo!();
+ }
+ StreamsCommand::CreateTopic(payload) => {
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
+ let Some(stream) = self.items.get_mut(stream_id) else {
+ return;
+ };
+
+ let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+ if stream.topic_index.contains_key(&name_arc) {
+ return;
+ }
-// Define PartitionsCommand enum and StateCommand implementation using the
macro
-define_state_command! {
- Partitions,
- PartitionsCommand,
- [CreatePartitions, DeletePartitions, DeleteSegments]
-}
+ let topic = Topic {
+ id: 0, // Will be assigned by slab
+ name: name_arc.clone(),
+ created_at: iggy_common::IggyTimestamp::now(),
+ replication_factor:
payload.replication_factor.unwrap_or(1),
+ message_expiry: payload.message_expiry,
+ compression_algorithm: payload.compression_algorithm,
+ max_topic_size: payload.max_topic_size,
+ stats: Arc::new(TopicStats::new(stream.stats.clone())),
+ partitions: Vec::new(),
+ round_robin_counter: Arc::new(AtomicUsize::new(0)),
+ };
+
+ let topic_id = stream.topics.insert(topic);
+ if let Some(topic) = stream.topics.get_mut(topic_id) {
+ topic.id = topic_id;
+
+ for partition_id in 0..payload.partitions_count as usize {
+ let partition = Partition {
+ id: partition_id,
+ created_at: iggy_common::IggyTimestamp::now(),
+ };
+ topic.partitions.push(partition);
+ }
+ }
-impl ApplyState for Partitions {
- type Output = ();
+ stream.topic_index.insert(name_arc, topic_id);
+ }
+ StreamsCommand::UpdateTopic(payload) => {
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
+ let Some(topic_id) = self.resolve_topic_id(stream_id,
&payload.topic_id) else {
+ return;
+ };
+
+ let Some(stream) = self.items.get_mut(stream_id) else {
+ return;
+ };
+ let Some(topic) = stream.topics.get_mut(topic_id) else {
+ return;
+ };
+
+ let new_name_arc: Arc<str> = Arc::from(payload.name.as_str());
+ if let Some(&existing_id) =
stream.topic_index.get(&new_name_arc)
+ && existing_id != topic_id
+ {
+ return;
+ }
- fn do_apply(&self, cmd: Self::Command) -> Self::Output {
- match cmd {
- PartitionsCommand::CreatePartitions(payload) => {
- todo!("Handle Create partitions with {:?}", payload)
+ stream.topic_index.remove(&topic.name);
+ topic.name = new_name_arc.clone();
+ topic.compression_algorithm = payload.compression_algorithm;
+ topic.message_expiry = payload.message_expiry;
+ topic.max_topic_size = payload.max_topic_size;
+ if let Some(rf) = payload.replication_factor {
+ topic.replication_factor = rf;
+ }
+ stream.topic_index.insert(new_name_arc, topic_id);
}
- PartitionsCommand::DeletePartitions(payload) => {
- todo!("Handle Delete partitions with {:?}", payload)
+ StreamsCommand::DeleteTopic(payload) => {
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
+ let Some(topic_id) = self.resolve_topic_id(stream_id,
&payload.topic_id) else {
+ return;
+ };
+ let Some(stream) = self.items.get_mut(stream_id) else {
+ return;
+ };
+
+ if let Some(topic) = stream.topics.get(topic_id) {
+ let name = topic.name.clone();
+ stream.topics.remove(topic_id);
+ stream.topic_index.remove(&name);
+ }
+ }
+ StreamsCommand::PurgeTopic(_payload) => {
+ // TODO:
+ todo!();
}
- PartitionsCommand::DeleteSegments(payload) => {
- todo!("Handle Delete segments with {:?}", payload)
+ StreamsCommand::CreatePartitions(payload) => {
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
+ let Some(topic_id) = self.resolve_topic_id(stream_id,
&payload.topic_id) else {
+ return;
+ };
+
+ let Some(stream) = self.items.get_mut(stream_id) else {
+ return;
+ };
+ let Some(topic) = stream.topics.get_mut(topic_id) else {
+ return;
+ };
+
+ let current_partition_count = topic.partitions.len();
+ for i in 0..payload.partitions_count as usize {
+ let partition_id = current_partition_count + i;
+ let partition = Partition {
+ id: partition_id,
+ created_at: iggy_common::IggyTimestamp::now(),
+ };
+ topic.partitions.push(partition);
+ }
+ }
+ StreamsCommand::DeletePartitions(payload) => {
+ let Some(stream_id) =
self.resolve_stream_id(&payload.stream_id) else {
+ return;
+ };
+ let Some(topic_id) = self.resolve_topic_id(stream_id,
&payload.topic_id) else {
+ return;
+ };
+
+ let Some(stream) = self.items.get_mut(stream_id) else {
+ return;
+ };
+ let Some(topic) = stream.topics.get_mut(topic_id) else {
+ return;
+ };
+
+ let count_to_delete = payload.partitions_count as usize;
+ if count_to_delete > 0 && count_to_delete <=
topic.partitions.len() {
+ topic
+ .partitions
+ .truncate(topic.partitions.len() - count_to_delete);
+ }
}
}
}
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 635277965..81ef0b018 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,295 +15,222 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{
- permissioner::Permissioner,
- stm::{ApplyState, StateCommand},
-};
+use crate::permissioner::Permissioner;
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
use ahash::AHashMap;
-use bytes::Bytes;
use iggy_common::change_password::ChangePassword;
+use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
use iggy_common::create_user::CreateUser;
+use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
use iggy_common::delete_user::DeleteUser;
use iggy_common::update_permissions::UpdatePermissions;
use iggy_common::update_user::UpdateUser;
-use iggy_common::{
- BytesSerializable, Identifier, IggyError, IggyTimestamp, Permissions,
PersonalAccessToken,
- UserId, UserStatus,
- header::{Operation, PrepareHeader},
- message::Message,
-};
+use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId,
UserStatus};
use slab::Slab;
-use std::cell::RefCell;
+use std::sync::Arc;
+
+// ============================================================================
+// User Entity
+// ============================================================================
#[derive(Debug, Clone)]
pub struct User {
pub id: UserId,
- pub username: String,
- pub password: String,
+ pub username: Arc<str>,
+ pub password_hash: Arc<str>,
pub status: UserStatus,
pub created_at: IggyTimestamp,
- pub permissions: Option<Permissions>,
- pub personal_access_tokens: AHashMap<String, PersonalAccessToken>,
+ pub permissions: Option<Arc<Permissions>>,
+}
+
+impl Default for User {
+ fn default() -> Self {
+ Self {
+ id: 0,
+ username: Arc::from(""),
+ password_hash: Arc::from(""),
+ status: UserStatus::default(),
+ created_at: IggyTimestamp::default(),
+ permissions: None,
+ }
+ }
}
impl User {
pub fn new(
- username: String,
- password: String,
+ username: Arc<str>,
+ password_hash: Arc<str>,
status: UserStatus,
created_at: IggyTimestamp,
- permissions: Option<Permissions>,
+ permissions: Option<Arc<Permissions>>,
) -> Self {
Self {
id: 0,
username,
- password,
+ password_hash,
status,
created_at,
permissions,
- personal_access_tokens: AHashMap::new(),
}
}
}
-#[derive(Debug, Clone, Default)]
-pub struct Users {
- index: RefCell<AHashMap<String, usize>>,
- items: RefCell<Slab<User>>,
- permissioner: RefCell<Permissioner>,
+define_state! {
+ Users {
+ index: AHashMap<Arc<str>, UserId>,
+ items: Slab<User>,
+ personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>,
PersonalAccessToken>>,
+ permissioner: Permissioner,
+ },
+ [
+ CreateUser,
+ UpdateUser,
+ DeleteUser,
+ ChangePassword,
+ UpdatePermissions,
+ CreatePersonalAccessToken,
+ DeletePersonalAccessToken
+ ]
}
+impl_absorb!(UsersInner, UsersCommand);
-impl Users {
- pub fn new() -> Self {
- Self {
- index: RefCell::new(AHashMap::with_capacity(1024)),
- items: RefCell::new(Slab::with_capacity(1024)),
- permissioner: RefCell::new(Permissioner::new()),
- }
- }
-
- /// Insert a user and return the assigned ID
- pub fn insert(&self, user: User) -> usize {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
-
- let username = user.username.clone();
- let id = items.insert(user);
- items[id].id = id as u32;
- index.insert(username, id);
- id
- }
-
- /// Get user by ID
- pub fn get(&self, id: usize) -> Option<User> {
- self.items.borrow().get(id).cloned()
- }
-
- /// Get user by username or ID (via Identifier enum)
- pub fn get_by_identifier(&self, identifier: &Identifier) ->
Result<Option<User>, IggyError> {
+impl UsersInner {
+ fn resolve_user_id(&self, identifier: &iggy_common::Identifier) ->
Option<usize> {
+ use iggy_common::IdKind;
match identifier.kind {
- iggy_common::IdKind::Numeric => {
- let id = identifier.get_u32_value()? as usize;
- Ok(self.items.borrow().get(id).cloned())
- }
- iggy_common::IdKind::String => {
- let username = identifier.get_string_value()?;
- let index = self.index.borrow();
- if let Some(&id) = index.get(&username) {
- Ok(self.items.borrow().get(id).cloned())
+ IdKind::Numeric => {
+ let id = identifier.get_u32_value().ok()? as usize;
+ if self.items.contains(id) {
+ Some(id)
} else {
- Ok(None)
+ None
}
}
+ IdKind::String => {
+ let username = identifier.get_string_value().ok()?;
+ self.index.get(username.as_str()).map(|&id| id as usize)
+ }
}
}
+}
- /// Remove user by ID
- pub fn remove(&self, id: usize) -> Option<User> {
- let mut items = self.items.borrow_mut();
- let mut index = self.index.borrow_mut();
+impl Handler for UsersInner {
+ fn handle(&mut self, cmd: &UsersCommand) {
+ match cmd {
+ UsersCommand::CreateUser(payload) => {
+ let username_arc: Arc<str> =
Arc::from(payload.username.as_str());
+ if self.index.contains_key(&username_arc) {
+ return;
+ }
- if !items.contains(id) {
- return None;
- }
+ let user = User {
+ id: 0,
+ username: username_arc.clone(),
+ password_hash: Arc::from(payload.password.as_str()),
+ status: payload.status,
+ created_at: iggy_common::IggyTimestamp::now(),
+ permissions: payload.permissions.as_ref().map(|p|
Arc::new(p.clone())),
+ };
+
+ let id = self.items.insert(user);
+ if let Some(user) = self.items.get_mut(id) {
+ user.id = id as UserId;
+ }
- let user = items.remove(id);
- index.remove(&user.username);
- Some(user)
- }
+ self.index.insert(username_arc, id as UserId);
+ self.personal_access_tokens
+ .insert(id as UserId, AHashMap::default());
+ }
+ UsersCommand::UpdateUser(payload) => {
+ let Some(user_id) = self.resolve_user_id(&payload.user_id)
else {
+ return;
+ };
+
+ let Some(user) = self.items.get_mut(user_id) else {
+ return;
+ };
+
+ if let Some(new_username) = &payload.username {
+ let new_username_arc: Arc<str> =
Arc::from(new_username.as_str());
+ if let Some(&existing_id) =
self.index.get(&new_username_arc)
+ && existing_id != user_id as UserId
+ {
+ return;
+ }
+
+ self.index.remove(&user.username);
+ user.username = new_username_arc.clone();
+ self.index.insert(new_username_arc, user_id as UserId);
+ }
- /// Check if user exists
- pub fn contains(&self, identifier: &Identifier) -> bool {
- match identifier.kind {
- iggy_common::IdKind::Numeric => {
- if let Ok(id) = identifier.get_u32_value() {
- self.items.borrow().contains(id as usize)
- } else {
- false
+ if let Some(new_status) = payload.status {
+ user.status = new_status;
}
}
- iggy_common::IdKind::String => {
- if let Ok(username) = identifier.get_string_value() {
- self.index.borrow().contains_key(&username)
- } else {
- false
+ UsersCommand::DeleteUser(payload) => {
+ let Some(user_id) = self.resolve_user_id(&payload.user_id)
else {
+ return;
+ };
+
+ if let Some(user) = self.items.get(user_id) {
+ let username = user.username.clone();
+ self.items.remove(user_id);
+ self.index.remove(&username);
+ self.personal_access_tokens.remove(&(user_id as UserId));
}
}
- }
- }
-
- /// Get all users as a Vec
- pub fn values(&self) -> Vec<User> {
- self.items
- .borrow()
- .iter()
- .map(|(_, u): (usize, &User)| u.clone())
- .collect()
- }
-
- /// Get number of users
- pub fn len(&self) -> usize {
- self.items.borrow().len()
- }
-
- /// Check if empty
- pub fn is_empty(&self) -> bool {
- self.items.borrow().is_empty()
- }
-
- /// Check if username already exists
- pub fn username_exists(&self, username: &str) -> bool {
- self.index.borrow().contains_key(username)
- }
-
- /// Get ID by username
- pub fn get_id_by_username(&self, username: &str) -> Option<usize> {
- self.index.borrow().get(username).copied()
- }
-
- /// Initialize permissions for a user
- pub fn init_permissions(&self, user_id: UserId, permissions:
Option<Permissions>) {
- self.permissioner
- .borrow_mut()
- .init_permissions_for_user(user_id, permissions);
- }
-
- /// Update permissions for a user
- pub fn update_permissions(&self, user_id: UserId, permissions:
Option<Permissions>) {
- self.permissioner
- .borrow_mut()
- .update_permissions_for_user(user_id, permissions);
- }
-
- /// Delete permissions for a user
- pub fn delete_permissions(&self, user_id: UserId) {
- self.permissioner
- .borrow_mut()
- .delete_permissions_for_user(user_id);
- }
+ UsersCommand::ChangePassword(payload) => {
+ let Some(user_id) = self.resolve_user_id(&payload.user_id)
else {
+ return;
+ };
- /// Update username
- pub fn update_username(
- &self,
- identifier: &Identifier,
- new_username: String,
- ) -> Result<(), IggyError> {
- let id = match identifier.kind {
- iggy_common::IdKind::Numeric => identifier.get_u32_value()? as
usize,
- iggy_common::IdKind::String => {
- let username = identifier.get_string_value()?;
- let index = self.index.borrow();
- *index
- .get(&username)
- .ok_or_else(||
IggyError::ResourceNotFound(username.to_string()))?
+ if let Some(user) = self.items.get_mut(user_id) {
+ user.password_hash =
Arc::from(payload.new_password.as_str());
+ }
}
- };
-
- let old_username = {
- let items = self.items.borrow();
- let user = items
- .get(id)
- .ok_or_else(||
IggyError::ResourceNotFound(identifier.to_string()))?;
- user.username.clone()
- };
-
- if old_username == new_username {
- return Ok(());
- }
-
- tracing::trace!(
- "Updating username: '{}' → '{}' for user ID: {}",
- old_username,
- new_username,
- id
- );
-
- {
- let mut items = self.items.borrow_mut();
- let user = items
- .get_mut(id)
- .ok_or_else(||
IggyError::ResourceNotFound(identifier.to_string()))?;
- user.username = new_username.clone();
- }
-
- let mut index = self.index.borrow_mut();
- index.remove(&old_username);
- index.insert(new_username, id);
-
- Ok(())
- }
-}
-
-#[derive(Debug)]
-pub enum UsersCommand {
- Create(CreateUser),
- Update(UpdateUser),
- Delete(DeleteUser),
- ChangePassword(ChangePassword),
- UpdatePermissions(UpdatePermissions),
-}
-
-impl StateCommand for Users {
- type Command = UsersCommand;
- type Input = Message<PrepareHeader>;
+ UsersCommand::UpdatePermissions(payload) => {
+ let Some(user_id) = self.resolve_user_id(&payload.user_id)
else {
+ return;
+ };
- fn into_command(input: &Self::Input) -> Option<Self::Command> {
- // TODO: rework this thing, so we don't copy the bytes on each request
- let body = Bytes::copy_from_slice(input.body());
- match input.header().operation {
- Operation::CreateUser => Some(UsersCommand::Create(
- CreateUser::from_bytes(body.clone()).unwrap(),
- )),
- Operation::UpdateUser => Some(UsersCommand::Update(
- UpdateUser::from_bytes(body.clone()).unwrap(),
- )),
- Operation::DeleteUser => Some(UsersCommand::Delete(
- DeleteUser::from_bytes(body.clone()).unwrap(),
- )),
- Operation::ChangePassword => Some(UsersCommand::ChangePassword(
- ChangePassword::from_bytes(body.clone()).unwrap(),
- )),
- Operation::UpdatePermissions =>
Some(UsersCommand::UpdatePermissions(
- UpdatePermissions::from_bytes(body.clone()).unwrap(),
- )),
- _ => None,
- }
- }
-}
+ if let Some(user) = self.items.get_mut(user_id) {
+ user.permissions = payload.permissions.as_ref().map(|p|
Arc::new(p.clone()));
+ }
+ }
+ UsersCommand::CreatePersonalAccessToken(payload) => {
+ // TODO: Stub untill protocol gets adjusted.
+ let user_id = 0;
+ let user_tokens =
self.personal_access_tokens.entry(user_id).or_default();
+ let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+ if user_tokens.contains_key(&name_arc) {
+ return;
+ }
-impl ApplyState for Users {
- type Output = ();
+ let expiry_at =
+
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), payload.expiry);
+ if let Some(expiry_at) = expiry_at
+ && expiry_at.as_micros() <=
IggyTimestamp::now().as_micros()
+ {
+ return;
+ }
- fn do_apply(&self, cmd: Self::Command) -> Self::Output {
- match cmd {
- UsersCommand::Create(payload) => todo!("Handle Create user with
{:?}", payload),
- UsersCommand::Update(payload) => todo!("Handle Update user with
{:?}", payload),
- UsersCommand::Delete(payload) => todo!("Handle Delete user with
{:?}", payload),
- UsersCommand::ChangePassword(payload) => {
- todo!("Handle Change password with {:?}", payload)
+ let (pat, _) = PersonalAccessToken::new(
+ user_id,
+ payload.name.as_ref(),
+ IggyTimestamp::now(),
+ payload.expiry,
+ );
+ user_tokens.insert(name_arc, pat);
}
- UsersCommand::UpdatePermissions(payload) => {
- todo!("Handle Update permissions with {:?}", payload)
+ UsersCommand::DeletePersonalAccessToken(payload) => {
+ // TODO: Stub untill protocol gets adjusted.
+ let user_id = 0;
+
+ if let Some(user_tokens) =
self.personal_access_tokens.get_mut(&user_id) {
+ let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+ user_tokens.remove(&name_arc);
+ }
}
}
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 743f14f18..3ba2cecb5 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -71,7 +71,7 @@ hash32 = "1.0.0"
human-repr = { workspace = true }
iggy_common = { workspace = true }
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
-left-right = "0.11"
+left-right = { workspace = true }
lending-iterator = "0.1.7"
metadata = { workspace = true }
mimalloc = { workspace = true, optional = true }