This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8a0df74cb feat(metadata): introduce `mux_state_machine` with state
impls (#2544)
8a0df74cb is described below
commit 8a0df74cb0b414e5b65010d89ac34c880f425603
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jan 9 09:21:31 2026 +0100
feat(metadata): introduce `mux_state_machine` with state impls (#2544)
Implement `users` , `streams`, `consumer_groups` states for
MuxStateMachine.
---
Cargo.lock | 8 +-
core/common/Cargo.toml | 2 +
core/common/src/lib.rs | 2 +
core/common/src/types/consensus/message.rs | 19 +-
core/common/src/types/mod.rs | 1 +
.../src/types/personal_access_tokens/mod.rs} | 14 +-
.../src/streaming => common/src}/utils/hash.rs | 0
core/common/src/utils/mod.rs | 1 +
core/consensus/src/impls.rs | 33 +-
core/metadata/Cargo.toml | 3 +
core/metadata/src/impls/metadata.rs | 2 +-
core/metadata/src/lib.rs | 3 +
core/metadata/src/permissioner/mod.rs | 98 +++++
.../permissioner_rules/consumer_groups.rs | 80 ++++
.../permissioner_rules/consumer_offsets.rs | 53 +++
.../permissioner/permissioner_rules/messages.rs | 139 ++++++
.../src/permissioner/permissioner_rules}/mod.rs | 10 +-
.../permissioner/permissioner_rules/partitions.rs} | 38 +-
.../permissioner/permissioner_rules/segments.rs} | 18 +-
.../src/permissioner/permissioner_rules/streams.rs | 90 ++++
.../src/permissioner/permissioner_rules/system.rs | 48 ++
.../src/permissioner/permissioner_rules/topics.rs | 157 +++++++
.../src/permissioner/permissioner_rules/users.rs | 74 ++++
core/metadata/src/stats/mod.rs | 353 +++++++++++++++
core/metadata/src/stm/consumer_group.rs | 212 ++++++++-
core/metadata/src/stm/mod.rs | 99 ++++-
core/metadata/src/stm/mux.rs | 26 +-
core/metadata/src/stm/stream.rs | 483 ++++++++++++++++++++-
core/metadata/src/stm/user.rs | 290 ++++++++++++-
core/server/Cargo.toml | 3 -
core/server/src/binary/mapper.rs | 5 +-
core/server/src/bootstrap.rs | 3 +-
core/server/src/http/http_shard_wrapper.rs | 2 +-
core/server/src/http/mapper.rs | 2 +-
core/server/src/http/personal_access_tokens.rs | 2 +-
.../src/shard/system/personal_access_tokens.rs | 2 +-
core/server/src/shard/transmission/event.rs | 5 +-
core/server/src/state/system.rs | 2 +-
.../server/src/streaming/clients/client_manager.rs | 6 +-
core/server/src/streaming/mod.rs | 1 -
core/server/src/streaming/polling_consumer.rs | 5 +-
core/server/src/streaming/topics/helpers.rs | 5 +-
core/server/src/streaming/users/user.rs | 2 +-
core/server/src/streaming/utils/mod.rs | 1 -
44 files changed, 2294 insertions(+), 108 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index b8e260dc6..4c330ed8d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4656,6 +4656,7 @@ dependencies = [
"aes-gcm",
"ahash 0.8.12",
"base64 0.22.1",
+ "blake3",
"bon",
"byte-unit",
"bytemuck",
@@ -4677,6 +4678,7 @@ dependencies = [
"nix",
"once_cell",
"rcgen",
+ "ring",
"rustls",
"serde",
"serde_json",
@@ -5656,10 +5658,13 @@ dependencies = [
name = "metadata"
version = "0.1.0"
dependencies = [
+ "ahash 0.8.12",
+ "bytes",
"consensus",
"iggy_common",
"journal",
"message_bus",
+ "slab",
"tracing",
]
@@ -8256,7 +8261,6 @@ dependencies = [
"async_zip",
"axum",
"axum-server",
- "blake3",
"bytes",
"chrono",
"clap",
@@ -8297,7 +8301,6 @@ dependencies = [
"prometheus-client",
"rand 0.9.2",
"reqwest",
- "ring",
"ringbuffer",
"rmp-serde",
"rust-embed",
@@ -8321,7 +8324,6 @@ dependencies = [
"tracing-opentelemetry",
"tracing-subscriber",
"tungstenite",
- "twox-hash",
"ulid",
"uuid",
"vergen-git2",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index e935435ef..a529c1ab3 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -36,6 +36,7 @@ fast_async_lock = ["dep:fast-async-mutex"]
aes-gcm = { workspace = true }
ahash = { workspace = true }
base64 = { workspace = true }
+blake3 = { workspace = true }
bon = { workspace = true }
byte-unit = { workspace = true }
bytemuck = { workspace = true }
@@ -57,6 +58,7 @@ humantime = { workspace = true }
nix = { workspace = true }
once_cell = { workspace = true }
rcgen = "0.14.6"
+ring = "0.17.14"
rustls = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index af04602ce..9171c4097 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -88,6 +88,7 @@ pub use types::message::*;
pub use types::partition::*;
pub use types::permissions::permissions_global::*;
pub use types::permissions::personal_access_token::*;
+pub use types::personal_access_tokens::*;
pub use types::snapshot::*;
pub use types::stats::*;
pub use types::stream::*;
@@ -100,6 +101,7 @@ pub use utils::checksum::*;
pub use utils::crypto::*;
pub use utils::duration::{IggyDuration, SEC_IN_MICRO};
pub use utils::expiry::IggyExpiry;
+pub use utils::hash::*;
pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
pub use utils::text;
pub use utils::timestamp::*;
diff --git a/core/common/src/types/consensus/message.rs
b/core/common/src/types/consensus/message.rs
index 977b06e32..0e94820f8 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -89,24 +89,25 @@ where
}
}
- /// Replace the header with a new one, using the provided function to
generate it.
- pub fn replace_header<T: ConsensusHeader>(self, f: impl FnOnce(&H) -> T)
-> Message<T> {
+ /// Transmute the header to a different type, using the provided function
to modify the new header.
+ pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut
T)) -> Message<T> {
assert_eq!(size_of::<H>(), size_of::<T>());
- let prev = self.header();
- let header = f(prev);
- let header_bytes = bytemuck::bytes_of(&header);
- let buffer = self.into_inner();
+ // Copy old header to stack to avoid UB.
+ let old_header = *self.header();
+
// Safety: We ensured that size_of::<H>() == size_of::<T>()
// On top of that, there is going to be only one reference to buffer
during this function call
// so no other references can observe the mutation.
// In the future we can replace the `Bytes` buffer with something that
does not allow sharing between different threads.
+ let buffer = self.into_inner();
unsafe {
let ptr = buffer.as_ptr() as *mut u8;
- let slice = std::slice::from_raw_parts_mut(ptr, buffer.len());
- slice[..size_of::<H>()].copy_from_slice(header_bytes);
+ let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
+ let new_header = bytemuck::from_bytes_mut(slice);
+ f(old_header, new_header);
}
- // TODO: Recalculate checksums
+
Message {
buffer,
_marker: PhantomData,
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index f0d70cf9c..065a48c1e 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -29,6 +29,7 @@ pub(crate) mod identifier;
pub(crate) mod message;
pub(crate) mod partition;
pub(crate) mod permissions;
+pub(crate) mod personal_access_tokens;
pub(crate) mod snapshot;
pub(crate) mod stats;
pub(crate) mod stream;
diff --git
a/core/server/src/streaming/personal_access_tokens/personal_access_token.rs
b/core/common/src/types/personal_access_tokens/mod.rs
similarity index 94%
rename from
core/server/src/streaming/personal_access_tokens/personal_access_token.rs
rename to core/common/src/types/personal_access_tokens/mod.rs
index 8fc0c8535..e28a585de 100644
--- a/core/server/src/streaming/personal_access_tokens/personal_access_token.rs
+++ b/core/common/src/types/personal_access_tokens/mod.rs
@@ -15,11 +15,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-use crate::streaming::utils::hash;
-use iggy_common::IggyExpiry;
-use iggy_common::IggyTimestamp;
-use iggy_common::UserId;
-use iggy_common::text::as_base64;
+use crate::IggyExpiry;
+use crate::IggyTimestamp;
+use crate::UserId;
+use crate::text::as_base64;
+use crate::utils::hash;
use ring::rand::SecureRandom;
use std::sync::Arc;
@@ -96,8 +96,8 @@ impl PersonalAccessToken {
#[cfg(test)]
mod tests {
use super::*;
- use iggy_common::IggyDuration;
- use iggy_common::IggyTimestamp;
+ use crate::IggyDuration;
+ use crate::IggyTimestamp;
#[test]
fn
personal_access_token_should_be_created_with_random_secure_value_and_hashed_successfully()
{
diff --git a/core/server/src/streaming/utils/hash.rs
b/core/common/src/utils/hash.rs
similarity index 100%
copy from core/server/src/streaming/utils/hash.rs
copy to core/common/src/utils/hash.rs
diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs
index 35656a15c..aa8714daa 100644
--- a/core/common/src/utils/mod.rs
+++ b/core/common/src/utils/mod.rs
@@ -20,6 +20,7 @@ pub(crate) mod checksum;
pub(crate) mod crypto;
pub(crate) mod duration;
pub(crate) mod expiry;
+pub(crate) mod hash;
pub(crate) mod personal_access_token_expiry;
pub mod text;
pub(crate) mod timestamp;
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 48248b13d..98d78e737 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -513,22 +513,22 @@ impl Project<Message<PrepareHeader>> for
Message<RequestHeader> {
fn project(self, consensus: &Self::Consensus) -> Message<PrepareHeader> {
let op = consensus.sequencer.current_sequence() + 1;
- self.replace_header(|prev| {
- PrepareHeader {
+ self.transmute_header(|old, new| {
+ *new = PrepareHeader {
cluster: consensus.cluster,
- size: prev.size,
+ size: old.size,
epoch: 0,
view: consensus.view.get(),
- release: prev.release,
+ release: old.release,
command: Command2::Prepare,
replica: consensus.replica,
parent: 0, // TODO: Get parent checksum from the previous
entry in the journal (figure out how to pass that ctx here)
- request_checksum: prev.request_checksum,
- request: prev.request,
+ request_checksum: old.request_checksum,
+ request: old.request,
commit: consensus.commit.get(),
op,
timestamp: 0, // 0 for now. Implement correct way to get
timestamp later
- operation: prev.operation,
+ operation: old.operation,
..Default::default()
}
})
@@ -537,25 +537,26 @@ impl Project<Message<PrepareHeader>> for
Message<RequestHeader> {
impl Project<Message<PrepareOkHeader>> for Message<PrepareHeader> {
type Consensus = VsrConsensus;
+
fn project(self, consensus: &Self::Consensus) -> Message<PrepareOkHeader> {
- self.replace_header(|prev| {
- PrepareOkHeader {
+ self.transmute_header(|old, new| {
+ *new = PrepareOkHeader {
command: Command2::PrepareOk,
- parent: prev.parent,
- prepare_checksum: prev.checksum,
- request: prev.request,
+ parent: old.parent,
+ prepare_checksum: old.checksum,
+ request: old.request,
cluster: consensus.cluster,
replica: consensus.replica,
epoch: 0, // TODO: consensus.epoch
// It's important to use the view of the replica, not the
received prepare!
view: consensus.view.get(),
- op: prev.op,
+ op: old.op,
commit: consensus.commit.get(),
- timestamp: prev.timestamp,
- operation: prev.operation,
+ timestamp: old.timestamp,
+ operation: old.operation,
// PrepareOks are only header no body
..Default::default()
- }
+ };
})
}
}
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index a6ba9a43f..ee886e7c8 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -28,8 +28,11 @@ repository = "https://github.com/apache/iggy"
readme = "../../../README.md"
[dependencies]
+ahash = { workspace = true }
+bytes = { workspace = true }
consensus = { path = "../consensus" }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
+slab = "0.4.11"
tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index cbf50cbe5..c09d86d34 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -365,7 +365,7 @@ where
let message: Message<PrepareOkHeader> =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
- .replace_header(|_prev: &PrepareOkHeader| prepare_ok_header);
+ .transmute_header(|_, new| *new = prepare_ok_header);
let generic_message = message.into_generic();
let primary = self.consensus.primary_index(self.consensus.view());
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index c86b13993..9b22a6211 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -18,4 +18,7 @@
//! Iggy metadata module
mod impls;
+mod permissioner;
pub mod stm;
+
+mod stats;
diff --git a/core/metadata/src/permissioner/mod.rs
b/core/metadata/src/permissioner/mod.rs
new file mode 100644
index 000000000..36809a9d9
--- /dev/null
+++ b/core/metadata/src/permissioner/mod.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod permissioner_rules;
+
+use ahash::{AHashMap, AHashSet};
+use iggy_common::{GlobalPermissions, Permissions, StreamPermissions, UserId};
+
+#[derive(Debug, Default, Clone)]
+pub struct Permissioner {
+ pub users_permissions: AHashMap<UserId, GlobalPermissions>,
+ pub users_streams_permissions: AHashMap<(UserId, usize),
StreamPermissions>,
+ pub users_that_can_poll_messages_from_all_streams: AHashSet<UserId>,
+ pub users_that_can_send_messages_to_all_streams: AHashSet<UserId>,
+ pub users_that_can_poll_messages_from_specific_streams: AHashSet<(UserId,
usize)>,
+ pub users_that_can_send_messages_to_specific_streams: AHashSet<(UserId,
usize)>,
+}
+
+impl Permissioner {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn init_permissions(&mut self, user_id: UserId, permissions:
Option<Permissions>) {
+ if permissions.is_none() {
+ return;
+ }
+
+ let permissions = permissions.unwrap();
+ if permissions.global.poll_messages {
+ self.users_that_can_poll_messages_from_all_streams
+ .insert(user_id);
+ }
+
+ if permissions.global.send_messages {
+ self.users_that_can_send_messages_to_all_streams
+ .insert(user_id);
+ }
+
+ self.users_permissions.insert(user_id, permissions.global);
+ if permissions.streams.is_none() {
+ return;
+ }
+
+ let streams = permissions.streams.unwrap();
+ for (stream_id, stream) in streams {
+ if stream.poll_messages {
+ self.users_that_can_poll_messages_from_specific_streams
+ .insert((user_id, stream_id));
+ }
+
+ if stream.send_messages {
+ self.users_that_can_send_messages_to_specific_streams
+ .insert((user_id, stream_id));
+ }
+
+ self.users_streams_permissions
+ .insert((user_id, stream_id), stream);
+ }
+ }
+
+ pub fn update_permissions_for_user(
+ &mut self,
+ user_id: UserId,
+ permissions: Option<Permissions>,
+ ) {
+ self.delete_permissions(user_id);
+ self.init_permissions(user_id, permissions);
+ }
+
+ pub fn delete_permissions(&mut self, user_id: UserId) {
+ self.users_permissions.remove(&user_id);
+ self.users_that_can_poll_messages_from_all_streams
+ .remove(&user_id);
+ self.users_that_can_send_messages_to_all_streams
+ .remove(&user_id);
+ self.users_streams_permissions
+ .retain(|(id, _), _| *id != user_id);
+ self.users_that_can_poll_messages_from_specific_streams
+ .retain(|(id, _)| *id != user_id);
+ self.users_that_can_send_messages_to_specific_streams
+ .retain(|(id, _)| *id != user_id);
+ }
+}
diff --git
a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
new file mode 100644
index 000000000..e73794d7c
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
@@ -0,0 +1,80 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn create_consumer_group(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.get_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn delete_consumer_group(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.get_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn get_consumer_group(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.get_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn get_consumer_groups(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.get_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn join_consumer_group(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.get_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn leave_consumer_group(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.get_topic(user_id, stream_id, topic_id)
+ }
+}
diff --git
a/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
new file mode 100644
index 000000000..bd8bcaa53
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn get_consumer_offset(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.poll_messages(user_id, stream_id, topic_id)
+ }
+
+ pub fn store_consumer_offset(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.poll_messages(user_id, stream_id, topic_id)
+ }
+
+ pub fn delete_consumer_offset(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.poll_messages(user_id, stream_id, topic_id)
+ }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/messages.rs
b/core/metadata/src/permissioner/permissioner_rules/messages.rs
new file mode 100644
index 000000000..520de0385
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/messages.rs
@@ -0,0 +1,139 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn poll_messages(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ if self
+ .users_that_can_poll_messages_from_all_streams
+ .contains(&user_id)
+ {
+ return Ok(());
+ }
+
+ if self
+ .users_that_can_poll_messages_from_specific_streams
+ .contains(&(user_id, stream_id))
+ {
+ return Ok(());
+ }
+
+ let stream_permissions = self.users_streams_permissions.get(&(user_id,
stream_id));
+ if stream_permissions.is_none() {
+ return Err(IggyError::Unauthorized);
+ }
+
+ let stream_permissions = stream_permissions.unwrap();
+ if stream_permissions.read_stream {
+ return Ok(());
+ }
+
+ if stream_permissions.manage_topics {
+ return Ok(());
+ }
+
+ if stream_permissions.read_topics {
+ return Ok(());
+ }
+
+ if stream_permissions.poll_messages {
+ return Ok(());
+ }
+
+ if stream_permissions.topics.is_none() {
+ return Err(IggyError::Unauthorized);
+ }
+
+ let topic_permissions = stream_permissions.topics.as_ref().unwrap();
+ if let Some(topic_permissions) = topic_permissions.get(&topic_id) {
+ return match topic_permissions.poll_messages
+ | topic_permissions.read_topic
+ | topic_permissions.manage_topic
+ {
+ true => Ok(()),
+ false => Err(IggyError::Unauthorized),
+ };
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ pub fn append_messages(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ if self
+ .users_that_can_send_messages_to_all_streams
+ .contains(&user_id)
+ {
+ return Ok(());
+ }
+
+ if self
+ .users_that_can_send_messages_to_specific_streams
+ .contains(&(user_id, stream_id))
+ {
+ return Ok(());
+ }
+
+ let stream_permissions = self.users_streams_permissions.get(&(user_id,
stream_id));
+ if stream_permissions.is_none() {
+ return Err(IggyError::Unauthorized);
+ }
+
+ let stream_permissions = stream_permissions.unwrap();
+ if stream_permissions.manage_stream {
+ return Ok(());
+ }
+
+ if stream_permissions.manage_topics {
+ return Ok(());
+ }
+
+ if stream_permissions.send_messages {
+ return Ok(());
+ }
+
+ if stream_permissions.topics.is_none() {
+ return Err(IggyError::Unauthorized);
+ }
+
+ let topic_permissions = stream_permissions.topics.as_ref().unwrap();
+ if let Some(topic_permissions) = topic_permissions.get(&topic_id) {
+ return match topic_permissions.send_messages |
topic_permissions.manage_topic {
+ true => Ok(()),
+ false => Err(IggyError::Unauthorized),
+ };
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+}
diff --git a/core/server/src/streaming/personal_access_tokens/mod.rs
b/core/metadata/src/permissioner/permissioner_rules/mod.rs
similarity index 85%
copy from core/server/src/streaming/personal_access_tokens/mod.rs
copy to core/metadata/src/permissioner/permissioner_rules/mod.rs
index d404dbe5b..0d08a7a1c 100644
--- a/core/server/src/streaming/personal_access_tokens/mod.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/mod.rs
@@ -16,4 +16,12 @@
* under the License.
*/
-pub mod personal_access_token;
+mod consumer_groups;
+pub mod consumer_offsets;
+mod messages;
+mod partitions;
+mod segments;
+mod streams;
+mod system;
+mod topics;
+mod users;
diff --git a/core/server/src/streaming/utils/hash.rs
b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
similarity index 56%
rename from core/server/src/streaming/utils/hash.rs
rename to core/metadata/src/permissioner/permissioner_rules/partitions.rs
index f2829a6be..ad1829387 100644
--- a/core/server/src/streaming/utils/hash.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
@@ -16,23 +16,29 @@
* under the License.
*/
-use twox_hash::XxHash32;
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
-pub fn calculate_32(data: &[u8]) -> u32 {
- XxHash32::oneshot(0, data)
-}
-
-pub fn calculate_256(data: &[u8]) -> String {
- blake3::hash(data).to_hex().to_string()
-}
+impl Permissioner {
+ pub fn create_partitions(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.update_topic(user_id, stream_id, topic_id)
+ }
-#[cfg(test)]
-mod tests {
- #[test]
- fn given_same_input_calculate_should_produce_same_output() {
- let input = "hello world".as_bytes();
- let output1 = super::calculate_32(input);
- let output2 = super::calculate_32(input);
- assert_eq!(output1, output2);
+ pub fn delete_partitions(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.update_topic(user_id, stream_id, topic_id)
}
}
diff --git a/core/server/src/streaming/personal_access_tokens/mod.rs
b/core/metadata/src/permissioner/permissioner_rules/segments.rs
similarity index 65%
rename from core/server/src/streaming/personal_access_tokens/mod.rs
rename to core/metadata/src/permissioner/permissioner_rules/segments.rs
index d404dbe5b..484a2c158 100644
--- a/core/server/src/streaming/personal_access_tokens/mod.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/segments.rs
@@ -16,4 +16,20 @@
* under the License.
*/
-pub mod personal_access_token;
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn delete_segments(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.update_topic(user_id, stream_id, topic_id)
+ }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/streams.rs
b/core/metadata/src/permissioner/permissioner_rules/streams.rs
new file mode 100644
index 000000000..70829c2fb
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/streams.rs
@@ -0,0 +1,90 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn get_stream(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.manage_streams ||
global_permissions.read_streams)
+ {
+ return Ok(());
+ }
+
+ if let Some(stream_permissions) =
self.users_streams_permissions.get(&(user_id, stream_id))
+ && (stream_permissions.manage_stream ||
stream_permissions.read_stream)
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ pub fn get_streams(&self, user_id: u32) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.manage_streams ||
global_permissions.read_streams)
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ pub fn create_stream(&self, user_id: u32) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && global_permissions.manage_streams
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ pub fn update_stream(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ self.manage_stream(user_id, stream_id)
+ }
+
+ pub fn delete_stream(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ self.manage_stream(user_id, stream_id)
+ }
+
+ pub fn purge_stream(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ self.manage_stream(user_id, stream_id)
+ }
+
+ fn manage_stream(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && global_permissions.manage_streams
+ {
+ return Ok(());
+ }
+
+ if let Some(stream_permissions) =
self.users_streams_permissions.get(&(user_id, stream_id))
+ && stream_permissions.manage_stream
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/system.rs
b/core/metadata/src/permissioner/permissioner_rules/system.rs
new file mode 100644
index 000000000..ce03c2dae
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/system.rs
@@ -0,0 +1,48 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn get_stats(&self, user_id: u32) -> Result<(), IggyError> {
+ self.get_server_info(user_id)
+ }
+
+ pub fn get_clients(&self, user_id: u32) -> Result<(), IggyError> {
+ self.get_server_info(user_id)
+ }
+
+ pub fn get_client(&self, user_id: u32) -> Result<(), IggyError> {
+ self.get_server_info(user_id)
+ }
+
+ fn get_server_info(&self, user_id: u32) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.manage_servers ||
global_permissions.read_servers)
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/topics.rs
b/core/metadata/src/permissioner/permissioner_rules/topics.rs
new file mode 100644
index 000000000..ca226a8d0
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/topics.rs
@@ -0,0 +1,157 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn get_topic(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.read_streams
+ || global_permissions.manage_streams
+ || global_permissions.manage_topics
+ || global_permissions.read_topics)
+ {
+ return Ok(());
+ }
+
+ if let Some(stream_permissions) =
self.users_streams_permissions.get(&(user_id, stream_id))
+ {
+ if stream_permissions.manage_topics ||
stream_permissions.read_topics {
+ return Ok(());
+ }
+
+ if let Some(topic_permissions) =
+ stream_permissions.topics.as_ref().unwrap().get(&topic_id)
+ && (topic_permissions.manage_topic ||
topic_permissions.read_topic)
+ {
+ return Ok(());
+ }
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ pub fn get_topics(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.read_streams
+ || global_permissions.manage_streams
+ || global_permissions.manage_topics
+ || global_permissions.read_topics)
+ {
+ return Ok(());
+ }
+
+ if let Some(stream_permissions) =
self.users_streams_permissions.get(&(user_id, stream_id))
+ {
+ if stream_permissions.manage_topics ||
stream_permissions.read_topics {
+ return Ok(());
+ }
+
+ if let Some(topic_permissions) =
stream_permissions.topics.as_ref() {
+ for (topic_id, topic_permissions) in topic_permissions {
+ if !topic_permissions.manage_topic ||
!topic_permissions.read_topic {
+ return Err(IggyError::Unauthorized);
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ pub fn create_topic(&self, user_id: u32, stream_id: usize) -> Result<(),
IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.manage_streams ||
global_permissions.manage_topics)
+ {
+ return Ok(());
+ }
+
+ if let Some(stream_permissions) =
self.users_streams_permissions.get(&(user_id, stream_id))
+ && stream_permissions.manage_topics
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ pub fn update_topic(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.manage_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn delete_topic(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.manage_topic(user_id, stream_id, topic_id)
+ }
+
+ pub fn purge_topic(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ self.manage_topic(user_id, stream_id, topic_id)
+ }
+
+ fn manage_topic(
+ &self,
+ user_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ ) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.manage_streams ||
global_permissions.manage_topics)
+ {
+ return Ok(());
+ }
+
+ if let Some(stream_permissions) =
self.users_streams_permissions.get(&(user_id, stream_id))
+ {
+ if stream_permissions.manage_topics {
+ return Ok(());
+ }
+
+ if let Some(topic_permissions) =
+ stream_permissions.topics.as_ref().unwrap().get(&topic_id)
+ && topic_permissions.manage_topic
+ {
+ return Ok(());
+ }
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/users.rs
b/core/metadata/src/permissioner/permissioner_rules/users.rs
new file mode 100644
index 000000000..c084effcb
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/users.rs
@@ -0,0 +1,74 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use crate::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+ pub fn get_user(&self, user_id: u32) -> Result<(), IggyError> {
+ self.read_users(user_id)
+ }
+
+ pub fn get_users(&self, user_id: u32) -> Result<(), IggyError> {
+ self.read_users(user_id)
+ }
+
+ pub fn create_user(&self, user_id: u32) -> Result<(), IggyError> {
+ self.manager_users(user_id)
+ }
+
+ pub fn delete_user(&self, user_id: u32) -> Result<(), IggyError> {
+ self.manager_users(user_id)
+ }
+
+ pub fn update_user(&self, user_id: u32) -> Result<(), IggyError> {
+ self.manager_users(user_id)
+ }
+
+ pub fn update_permissions(&self, user_id: u32) -> Result<(), IggyError> {
+ self.manager_users(user_id)
+ }
+
+ pub fn change_password(&self, user_id: u32) -> Result<(), IggyError> {
+ self.manager_users(user_id)
+ }
+
+ fn manager_users(&self, user_id: u32) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && global_permissions.manage_users
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+
+ fn read_users(&self, user_id: u32) -> Result<(), IggyError> {
+ if let Some(global_permissions) = self.users_permissions.get(&user_id)
+ && (global_permissions.manage_users ||
global_permissions.read_users)
+ {
+ return Ok(());
+ }
+
+ Err(IggyError::Unauthorized)
+ }
+}
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
new file mode 100644
index 000000000..35b926176
--- /dev/null
+++ b/core/metadata/src/stats/mod.rs
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![expect(
+ unused,
+ reason = "Methods are part of the state API and will be used once the
implementation is complete"
+)]
+use std::sync::{
+ Arc,
+ atomic::{AtomicU32, AtomicU64, Ordering},
+};
+
+#[derive(Default, Debug)]
+pub struct StreamStats {
+ size_bytes: AtomicU64,
+ messages_count: AtomicU64,
+ segments_count: AtomicU32,
+}
+
+impl StreamStats {
+ pub fn increment_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+ }
+
+ pub fn increment_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_add(messages_count, Ordering::AcqRel);
+ }
+
+ pub fn increment_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_add(segments_count, Ordering::AcqRel);
+ }
+
+ pub fn decrement_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+ }
+
+ pub fn decrement_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_sub(messages_count, Ordering::AcqRel);
+ }
+
+ pub fn decrement_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_sub(segments_count, Ordering::AcqRel);
+ }
+
+ pub fn size_bytes_inconsistent(&self) -> u64 {
+ self.size_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn messages_count_inconsistent(&self) -> u64 {
+ self.messages_count.load(Ordering::Relaxed)
+ }
+
+ pub fn segments_count_inconsistent(&self) -> u32 {
+ self.segments_count.load(Ordering::Relaxed)
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
+}
+
+#[derive(Default, Debug)]
+pub struct TopicStats {
+ parent: Arc<StreamStats>,
+ size_bytes: AtomicU64,
+ messages_count: AtomicU64,
+ segments_count: AtomicU32,
+}
+
+impl TopicStats {
+ pub fn new(parent: Arc<StreamStats>) -> Self {
+ Self {
+ parent,
+ size_bytes: AtomicU64::new(0),
+ messages_count: AtomicU64::new(0),
+ segments_count: AtomicU32::new(0),
+ }
+ }
+
+ pub fn parent(&self) -> Arc<StreamStats> {
+ self.parent.clone()
+ }
+
+ pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.increment_size_bytes(size_bytes);
+ }
+
+ pub fn increment_parent_messages_count(&self, messages_count: u64) {
+ self.parent.increment_messages_count(messages_count);
+ }
+
+ pub fn increment_parent_segments_count(&self, segments_count: u32) {
+ self.parent.increment_segments_count(segments_count);
+ }
+
+ pub fn increment_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+ self.increment_parent_size_bytes(size_bytes);
+ }
+
+ pub fn increment_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_add(messages_count, Ordering::AcqRel);
+ self.increment_parent_messages_count(messages_count);
+ }
+
+ pub fn increment_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_add(segments_count, Ordering::AcqRel);
+ self.increment_parent_segments_count(segments_count);
+ }
+
+ pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.decrement_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+ self.parent.decrement_messages_count(messages_count);
+ }
+
+ pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+ self.parent.decrement_segments_count(segments_count);
+ }
+
+ pub fn decrement_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+ self.decrement_parent_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_sub(messages_count, Ordering::AcqRel);
+ self.decrement_parent_messages_count(messages_count);
+ }
+
+ pub fn decrement_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_sub(segments_count, Ordering::AcqRel);
+ self.decrement_parent_segments_count(segments_count);
+ }
+
+ pub fn size_bytes_inconsistent(&self) -> u64 {
+ self.size_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn messages_count_inconsistent(&self) -> u64 {
+ self.messages_count.load(Ordering::Relaxed)
+ }
+
+ pub fn segments_count_inconsistent(&self) -> u32 {
+ self.segments_count.load(Ordering::Relaxed)
+ }
+
+ pub fn zero_out_parent_size_bytes(&self) {
+ self.parent.zero_out_size_bytes();
+ }
+
+ pub fn zero_out_parent_messages_count(&self) {
+ self.parent.zero_out_messages_count();
+ }
+
+ pub fn zero_out_parent_segments_count(&self) {
+ self.parent.zero_out_segments_count();
+ }
+
+ pub fn zero_out_parent_all(&self) {
+ self.parent.zero_out_all();
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ self.zero_out_parent_size_bytes();
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_messages_count();
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_segments_count();
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
+}
+
+#[derive(Default, Debug)]
+pub struct PartitionStats {
+ parent: Arc<TopicStats>,
+ messages_count: AtomicU64,
+ size_bytes: AtomicU64,
+ segments_count: AtomicU32,
+}
+
+impl PartitionStats {
+ pub fn new(parent_stats: Arc<TopicStats>) -> Self {
+ Self {
+ parent: parent_stats,
+ messages_count: AtomicU64::new(0),
+ size_bytes: AtomicU64::new(0),
+ segments_count: AtomicU32::new(0),
+ }
+ }
+
+ pub fn parent(&self) -> Arc<TopicStats> {
+ self.parent.clone()
+ }
+
+ pub fn increment_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+ self.increment_parent_size_bytes(size_bytes);
+ }
+
+ pub fn increment_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_add(messages_count, Ordering::AcqRel);
+ self.increment_parent_messages_count(messages_count);
+ }
+
+ pub fn increment_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_add(segments_count, Ordering::AcqRel);
+ self.increment_parent_segments_count(segments_count);
+ }
+
+ pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.increment_size_bytes(size_bytes);
+ }
+
+ pub fn increment_parent_messages_count(&self, messages_count: u64) {
+ self.parent.increment_messages_count(messages_count);
+ }
+
+ pub fn increment_parent_segments_count(&self, segments_count: u32) {
+ self.parent.increment_segments_count(segments_count);
+ }
+
+ pub fn decrement_size_bytes(&self, size_bytes: u64) {
+ self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+ self.decrement_parent_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_messages_count(&self, messages_count: u64) {
+ self.messages_count
+ .fetch_sub(messages_count, Ordering::AcqRel);
+ self.decrement_parent_messages_count(messages_count);
+ }
+
+ pub fn decrement_segments_count(&self, segments_count: u32) {
+ self.segments_count
+ .fetch_sub(segments_count, Ordering::AcqRel);
+ self.decrement_parent_segments_count(segments_count);
+ }
+
+ pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+ self.parent.decrement_size_bytes(size_bytes);
+ }
+
+ pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+ self.parent.decrement_messages_count(messages_count);
+ }
+
+ pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+ self.parent.decrement_segments_count(segments_count);
+ }
+
+ pub fn size_bytes_inconsistent(&self) -> u64 {
+ self.size_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn messages_count_inconsistent(&self) -> u64 {
+ self.messages_count.load(Ordering::Relaxed)
+ }
+
+ pub fn segments_count_inconsistent(&self) -> u32 {
+ self.segments_count.load(Ordering::Relaxed)
+ }
+
+ pub fn zero_out_parent_size_bytes(&self) {
+ self.parent.zero_out_size_bytes();
+ }
+
+ pub fn zero_out_parent_messages_count(&self) {
+ self.parent.zero_out_messages_count();
+ }
+
+ pub fn zero_out_parent_segments_count(&self) {
+ self.parent.zero_out_segments_count();
+ }
+
+ pub fn zero_out_parent_all(&self) {
+ self.parent.zero_out_all();
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ self.zero_out_parent_size_bytes();
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_messages_count();
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_segments_count();
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ self.zero_out_parent_all();
+ }
+}
diff --git a/core/metadata/src/stm/consumer_group.rs
b/core/metadata/src/stm/consumer_group.rs
index 4aed76aed..944627ad7 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,14 +15,216 @@
// specific language governing permissions and limitations
// under the License.
-use crate::stm::State;
+use crate::stm::{ApplyState, StateCommand};
+use ahash::AHashMap;
+use bytes::Bytes;
+use iggy_common::create_consumer_group::CreateConsumerGroup;
+use iggy_common::delete_consumer_group::DeleteConsumerGroup;
+use iggy_common::{
+ BytesSerializable, IggyTimestamp,
+ header::{Operation, PrepareHeader},
+ message::Message,
+};
+use slab::Slab;
+use std::cell::RefCell;
-pub struct ConsumerGroups {}
+// ============================================================================
+// ConsumerGroupMember - Individual member of a consumer group
+// ============================================================================
-impl State for ConsumerGroups {
+#[derive(Debug, Clone)]
+pub struct ConsumerGroupMember {
+ pub id: u32,
+ pub joined_at: IggyTimestamp,
+}
+
+impl ConsumerGroupMember {
+ pub fn new(id: u32, joined_at: IggyTimestamp) -> Self {
+ Self { id, joined_at }
+ }
+}
+
+// ============================================================================
+// ConsumerGroup - A group of consumers
+// ============================================================================
+
+#[derive(Debug, Clone)]
+pub struct ConsumerGroup {
+ pub id: usize,
+ pub stream_id: usize,
+ pub topic_id: usize,
+ pub name: String,
+ pub created_at: IggyTimestamp,
+ pub members: Vec<ConsumerGroupMember>,
+}
+
+impl ConsumerGroup {
+ pub fn new(stream_id: usize, topic_id: usize, name: String, created_at:
IggyTimestamp) -> Self {
+ Self {
+ id: 0,
+ stream_id,
+ topic_id,
+ name,
+ created_at,
+ members: Vec::new(),
+ }
+ }
+
+ pub fn add_member(&mut self, member: ConsumerGroupMember) {
+ self.members.push(member);
+ }
+
+ pub fn remove_member(&mut self, member_id: u32) ->
Option<ConsumerGroupMember> {
+ if let Some(pos) = self.members.iter().position(|m| m.id == member_id)
{
+ Some(self.members.remove(pos))
+ } else {
+ None
+ }
+ }
+
+ pub fn members_count(&self) -> usize {
+ self.members.len()
+ }
+}
+
+// ============================================================================
+// ConsumerGroups Collection
+// ============================================================================
+
+#[derive(Debug, Clone, Default)]
+pub struct ConsumerGroups {
+ // Global index for all consumer groups across all streams/topics
+ index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id,
topic_id, name) -> id
+ items: RefCell<Slab<ConsumerGroup>>,
+}
+
+impl ConsumerGroups {
+ pub fn new() -> Self {
+ Self {
+ index: RefCell::new(AHashMap::with_capacity(256)),
+ items: RefCell::new(Slab::with_capacity(256)),
+ }
+ }
+
+ /// Insert a consumer group and return the assigned ID
+ pub fn insert(&self, group: ConsumerGroup) -> usize {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ let key = (group.stream_id, group.topic_id, group.name.clone());
+ let id = items.insert(group);
+ items[id].id = id;
+ index.insert(key, id);
+ id
+ }
+
+ /// Get consumer group by ID
+ pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
+ self.items.borrow().get(id).cloned()
+ }
+
+ /// Get consumer group by stream_id, topic_id, and name
+ pub fn get_by_location(
+ &self,
+ stream_id: usize,
+ topic_id: usize,
+ name: &str,
+ ) -> Option<ConsumerGroup> {
+ let index = self.index.borrow();
+ let key = (stream_id, topic_id, name.to_string());
+ if let Some(&id) = index.get(&key) {
+ self.items.borrow().get(id).cloned()
+ } else {
+ None
+ }
+ }
+
+ /// Remove consumer group by ID
+ pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ if !items.contains(id) {
+ return None;
+ }
+
+ let group = items.remove(id);
+ let key = (group.stream_id, group.topic_id, group.name.clone());
+ index.remove(&key);
+ Some(group)
+ }
+
+ /// Get all consumer groups for a specific topic
+ pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) ->
Vec<ConsumerGroup> {
+ self.items
+ .borrow()
+ .iter()
+ .filter_map(|(_, g)| {
+ if g.stream_id == stream_id && g.topic_id == topic_id {
+ Some(g.clone())
+ } else {
+ None
+ }
+ })
+ .collect()
+ }
+
+ /// Get number of consumer groups
+ pub fn len(&self) -> usize {
+ self.items.borrow().len()
+ }
+
+ /// Check if empty
+ pub fn is_empty(&self) -> bool {
+ self.items.borrow().is_empty()
+ }
+
+ /// Get all consumer groups
+ pub fn values(&self) -> Vec<ConsumerGroup> {
+ self.items
+ .borrow()
+ .iter()
+ .map(|(_, g): (usize, &ConsumerGroup)| g.clone())
+ .collect()
+ }
+}
+
+#[derive(Debug)]
+pub enum ConsumerGroupsCommand {
+ Create(CreateConsumerGroup),
+ Delete(DeleteConsumerGroup),
+}
+
+impl StateCommand for ConsumerGroups {
+ type Command = ConsumerGroupsCommand;
+ type Input = Message<PrepareHeader>;
+
+ fn into_command(input: &Self::Input) -> Option<Self::Command> {
+ // TODO: rework this thing, so we don't copy the bytes on each request
+ let body = Bytes::copy_from_slice(input.body());
+ match input.header().operation {
+ Operation::CreateConsumerGroup =>
Some(ConsumerGroupsCommand::Create(
+ CreateConsumerGroup::from_bytes(body.clone()).unwrap(),
+ )),
+ Operation::DeleteConsumerGroup =>
Some(ConsumerGroupsCommand::Delete(
+ DeleteConsumerGroup::from_bytes(body.clone()).unwrap(),
+ )),
+ _ => None,
+ }
+ }
+}
+
+impl ApplyState for ConsumerGroups {
type Output = ();
- fn apply(&self) -> Option<Self::Output> {
- Some(())
+ fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+ match cmd {
+ ConsumerGroupsCommand::Create(payload) => {
+ todo!("Handle Create consumer group with {:?}", payload)
+ }
+ ConsumerGroupsCommand::Delete(payload) => {
+ todo!("Handle Delete consumer group with {:?}", payload)
+ }
+ }
}
}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 80646a559..e6c13684b 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -16,22 +16,113 @@
// under the License.
pub mod consumer_group;
+pub mod mux;
pub mod stream;
pub mod user;
-pub mod mux;
-// TODO: Add more state machines.
+/// Macro to generate a `{State}Command` enum and implement `StateCommand`
trait.
+///
+/// # Arguments
+/// * `$state_type` - The type that implements `ApplyState` trait
+/// * `$command_enum` - The name of the command enum to generate (e.g.,
StreamsCommand)
+/// * `$operations` - Array of Operation enum variants (also used as payload
type names)
+///
+/// # Example
+/// ```ignore
+/// define_state_command! {
+/// Streams,
+/// StreamsCommand,
+/// [CreateStream, UpdateStream, DeleteStream, PurgeStream]
+/// }
+/// ```
+#[macro_export]
+macro_rules! define_state_command {
+ (
+ $state_type:ty,
+ $command_enum:ident,
+ [$($operation:ident),* $(,)?]
+ ) => {
+ #[derive(Debug)]
+ pub enum $command_enum {
+ $(
+ $operation($operation),
+ )*
+ }
+
+ impl $crate::stm::StateCommand for $state_type {
+ type Command = $command_enum;
+ type Input =
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+
+ fn into_command(input: &Self::Input) -> Option<Self::Command> {
+ use ::iggy_common::BytesSerializable;
+ use ::bytes::Bytes;
+ use ::iggy_common::header::Operation;
+
+ // TODO: rework this thing, so we don't copy the bytes on each
request
+ let body = Bytes::copy_from_slice(input.body());
+ match input.header().operation {
+ $(
+ Operation::$operation => {
+ Some($command_enum::$operation(
+ $operation::from_bytes(body.clone()).unwrap()
+ ))
+ },
+ )*
+ _ => None,
+ }
+ }
+ }
-pub trait State {
+ // Compile-time check that the type implements ApplyState
+ const _: () = {
+ const fn assert_impl_apply_state<T: $crate::stm::ApplyState>() {}
+ assert_impl_apply_state::<$state_type>();
+ };
+ };
+}
+
+// This is public interface to state, therefore it will be imported from
different crate, for now during development I am leaving it there.
+pub trait State
+where
+ Self: Sized,
+{
type Output;
+ type Input;
+
// Apply the state machine logic and return an optional output.
// The output is optional, as we model the `StateMachine`, as an variadic
list,
// where not all state machines will produce an output for every input
event.
- fn apply(&self) -> Option<Self::Output>;
+ fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
}
+// TODO: This interface should be private to the stm module.
pub trait StateMachine {
type Input;
type Output;
fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
}
+
+pub trait StateCommand {
+ type Command;
+ type Input;
+
+ fn into_command(input: &Self::Input) -> Option<Self::Command>;
+}
+
+pub trait ApplyState: StateCommand {
+ type Output;
+
+ fn do_apply(&self, cmd: Self::Command) -> Self::Output;
+}
+
+impl<T> State for T
+where
+ T: ApplyState,
+{
+ type Output = T::Output;
+ type Input = T::Input;
+
+ fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+ T::into_command(input).map(|cmd| self.do_apply(cmd))
+ }
+}
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 22601bdc0..3fc80d88b 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::stm::{State, StateMachine};
use iggy_common::{header::PrepareHeader, message::Message};
+use crate::stm::{State, StateMachine};
+
// MuxStateMachine that proxies to an tuple of variadic state machines
pub struct MuxStateMachine<T>
where
@@ -59,6 +60,8 @@ macro_rules! variadic {
($a:expr, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
}
+// TODO: Figure out how to get around the fact that we need to hardcode the
Input/Output type for base case.
+// TODO: I think we could move the base case to the impl site of `State`, so
this way we know the `Input` and `Output` types.
// Base case of the recursive resolution.
impl StateMachine for () {
type Input = Message<PrepareHeader>;
@@ -72,13 +75,13 @@ impl StateMachine for () {
impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
where
S: State<Output = O>,
- Rest: StateMachine<Input = Message<PrepareHeader>, Output = O>,
+ Rest: StateMachine<Input = S::Input, Output = O>,
{
- type Input = Message<PrepareHeader>;
+ type Input = Rest::Input;
type Output = O;
fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
- if let Some(result) = self.0.apply() {
+ if let Some(result) = self.0.apply(input) {
output.push(result);
}
self.1.update(input, output)
@@ -90,10 +93,17 @@ mod tests {
#[test]
fn construct_mux_state_machine_from_states_with_same_output() {
use crate::stm::*;
+ use iggy_common::header::PrepareHeader;
+ use iggy_common::message::Message;
+
+ let users = user::Users::new();
+ let streams = stream::Streams::new();
+ let cgs = consumer_group::ConsumerGroups::new();
+ let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+
+ let input = Message::new(std::mem::size_of::<PrepareHeader>());
+ let mut output = Vec::new();
- let users = user::Users {};
- let streams = stream::Streams {};
- let cgs = consumer_group::ConsumerGroups {};
- let _mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+ mux.update(&input, &mut output);
}
}
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 737e21e94..6f05e5aa4 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,14 +15,487 @@
// specific language governing permissions and limitations
// under the License.
-use crate::stm::State;
+use crate::define_state_command;
+use crate::stats::{StreamStats, TopicStats};
+use crate::stm::ApplyState;
+use ahash::AHashMap;
+use iggy_common::create_partitions::CreatePartitions;
+use iggy_common::create_stream::CreateStream;
+use iggy_common::create_topic::CreateTopic;
+use iggy_common::delete_partitions::DeletePartitions;
+use iggy_common::delete_segments::DeleteSegments;
+use iggy_common::delete_stream::DeleteStream;
+use iggy_common::delete_topic::DeleteTopic;
+use iggy_common::purge_stream::PurgeStream;
+use iggy_common::purge_topic::PurgeTopic;
+use iggy_common::update_stream::UpdateStream;
+use iggy_common::update_topic::UpdateTopic;
+use iggy_common::{
+ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp,
MaxTopicSize,
+};
+use slab::Slab;
+use std::cell::RefCell;
+use std::sync::Arc;
-pub struct Streams {}
+#[derive(Debug, Clone)]
+pub struct Partition {
+ pub id: usize,
+}
+
+impl Partition {
+ pub fn new(id: usize) -> Self {
+ Self { id }
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Partitions {
+ items: RefCell<Slab<Partition>>,
+}
+
+impl Partitions {
+ pub fn new() -> Self {
+ Self {
+ items: RefCell::new(Slab::with_capacity(1024)),
+ }
+ }
+
+ pub fn insert(&self, partition: Partition) -> usize {
+ let mut items = self.items.borrow_mut();
+ let id = items.insert(partition);
+ items[id].id = id;
+ id
+ }
+
+ pub fn get(&self, id: usize) -> Option<Partition> {
+ self.items.borrow().get(id).cloned()
+ }
+
+ pub fn remove(&self, id: usize) -> Option<Partition> {
+ let mut items = self.items.borrow_mut();
+ if items.contains(id) {
+ Some(items.remove(id))
+ } else {
+ None
+ }
+ }
+
+ pub fn len(&self) -> usize {
+ self.items.borrow().len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.items.borrow().is_empty()
+ }
+
+ pub fn iter(&self) -> Vec<Partition> {
+ self.items
+ .borrow()
+ .iter()
+ .map(|(_, p): (usize, &Partition)| p.clone())
+ .collect()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct ConsumerGroup {
+ pub id: usize,
+ pub name: String,
+ pub created_at: IggyTimestamp,
+}
+
+impl ConsumerGroup {
+ pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+ Self {
+ id: 0,
+ name,
+ created_at,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct ConsumerGroups {
+ index: RefCell<AHashMap<String, usize>>,
+ items: RefCell<Slab<ConsumerGroup>>,
+}
+
+impl ConsumerGroups {
+ pub fn new() -> Self {
+ Self {
+ index: RefCell::new(AHashMap::with_capacity(256)),
+ items: RefCell::new(Slab::with_capacity(256)),
+ }
+ }
+
+ pub fn insert(&self, group: ConsumerGroup) -> usize {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ let name = group.name.clone();
+ let id = items.insert(group);
+ items[id].id = id;
+ index.insert(name, id);
+ id
+ }
+
+ pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
+ self.items.borrow().get(id).cloned()
+ }
+
+ pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> {
+ let index = self.index.borrow();
+ if let Some(&id) = index.get(name) {
+ self.items.borrow().get(id).cloned()
+ } else {
+ None
+ }
+ }
+
+ pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ if !items.contains(id) {
+ return None;
+ }
+
+ let group = items.remove(id);
+ index.remove(&group.name);
+ Some(group)
+ }
+
+ pub fn len(&self) -> usize {
+ self.items.borrow().len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.items.borrow().is_empty()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Topic {
+ pub id: usize,
+ pub name: String,
+ pub created_at: IggyTimestamp,
+ pub replication_factor: u8,
+ pub message_expiry: IggyExpiry,
+ pub compression_algorithm: CompressionAlgorithm,
+ pub max_topic_size: MaxTopicSize,
+
+ pub stats: Arc<TopicStats>,
+ pub partitions: Partitions,
+ pub consumer_groups: ConsumerGroups,
+}
+
+impl Topic {
+ pub fn new(
+ name: String,
+ created_at: IggyTimestamp,
+ replication_factor: u8,
+ message_expiry: IggyExpiry,
+ compression_algorithm: CompressionAlgorithm,
+ max_topic_size: MaxTopicSize,
+ stream_stats: Arc<StreamStats>,
+ ) -> Self {
+ Self {
+ id: 0,
+ name,
+ created_at,
+ replication_factor,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ stats: Arc::new(TopicStats::new(stream_stats)),
+ partitions: Partitions::new(),
+ consumer_groups: ConsumerGroups::new(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Topics {
+ index: RefCell<AHashMap<String, usize>>,
+ items: RefCell<Slab<Topic>>,
+}
+
+impl Topics {
+ pub fn new() -> Self {
+ Self {
+ index: RefCell::new(AHashMap::with_capacity(1024)),
+ items: RefCell::new(Slab::with_capacity(1024)),
+ }
+ }
+
+ pub fn insert(&self, topic: Topic) -> usize {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ let name = topic.name.clone();
+ let id = items.insert(topic);
+ items[id].id = id;
+ index.insert(name, id);
+ id
+ }
+
+ pub fn get(&self, id: usize) -> Option<Topic> {
+ self.items.borrow().get(id).cloned()
+ }
+
+ pub fn get_by_name(&self, name: &str) -> Option<Topic> {
+ let index = self.index.borrow();
+ if let Some(&id) = index.get(name) {
+ self.items.borrow().get(id).cloned()
+ } else {
+ None
+ }
+ }
+
+ pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> {
+ match identifier.kind {
+ iggy_common::IdKind::Numeric => {
+ if let Ok(id) = identifier.get_u32_value() {
+ self.get(id as usize)
+ } else {
+ None
+ }
+ }
+ iggy_common::IdKind::String => {
+ if let Ok(name) = identifier.get_string_value() {
+ self.get_by_name(&name)
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ pub fn remove(&self, id: usize) -> Option<Topic> {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ if !items.contains(id) {
+ return None;
+ }
+
+ let topic = items.remove(id);
+ index.remove(&topic.name);
+ Some(topic)
+ }
+
+ pub fn len(&self) -> usize {
+ self.items.borrow().len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.items.borrow().is_empty()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Stream {
+ pub id: usize,
+ pub name: String,
+ pub created_at: IggyTimestamp,
+
+ pub stats: Arc<StreamStats>,
+ pub topics: Topics,
+}
+
+impl Stream {
+ pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+ Self {
+ id: 0,
+ name,
+ created_at,
+ stats: Arc::new(StreamStats::default()),
+ topics: Topics::new(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Streams {
+ index: RefCell<AHashMap<String, usize>>,
+ items: RefCell<Slab<Stream>>,
+}
+
+impl Streams {
+ pub fn new() -> Self {
+ Self {
+ index: RefCell::new(AHashMap::with_capacity(256)),
+ items: RefCell::new(Slab::with_capacity(256)),
+ }
+ }
+
+ pub fn insert(&self, stream: Stream) -> usize {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ let name = stream.name.clone();
+ let id = items.insert(stream);
+ items[id].id = id;
+ index.insert(name, id);
+ id
+ }
+
+ pub fn get(&self, id: usize) -> Option<Stream> {
+ self.items.borrow().get(id).cloned()
+ }
+
+ pub fn get_by_name(&self, name: &str) -> Option<Stream> {
+ let index = self.index.borrow();
+ if let Some(&id) = index.get(name) {
+ self.items.borrow().get(id).cloned()
+ } else {
+ None
+ }
+ }
+
+ pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream>
{
+ match identifier.kind {
+ iggy_common::IdKind::Numeric => {
+ if let Ok(id) = identifier.get_u32_value() {
+ self.get(id as usize)
+ } else {
+ None
+ }
+ }
+ iggy_common::IdKind::String => {
+ if let Ok(name) = identifier.get_string_value() {
+ self.get_by_name(&name)
+ } else {
+ None
+ }
+ }
+ }
+ }
+
+ pub fn remove(&self, id: usize) -> Option<Stream> {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ if !items.contains(id) {
+ return None;
+ }
+
+ let stream = items.remove(id);
+ index.remove(&stream.name);
+ Some(stream)
+ }
+
+ pub fn update_name(&self, identifier: &Identifier, new_name: String) ->
Result<(), IggyError> {
+ let stream = self.get_by_identifier(identifier);
+ if let Some(stream) = stream {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ index.remove(&stream.name);
+ if let Some(s) = items.get_mut(stream.id) {
+ s.name = new_name.clone();
+ }
+ index.insert(new_name, stream.id);
+ Ok(())
+ } else {
+ Err(IggyError::ResourceNotFound("Stream".to_string()))
+ }
+ }
+
+ pub fn purge(&self, id: usize) -> Result<(), IggyError> {
+ let items = self.items.borrow();
+ if let Some(_stream) = items.get(id) {
+ // TODO: Purge all topics in the stream
+ Ok(())
+ } else {
+ Err(IggyError::ResourceNotFound("Stream".to_string()))
+ }
+ }
+
+ pub fn len(&self) -> usize {
+ self.items.borrow().len()
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.items.borrow().is_empty()
+ }
+
+ pub fn iter(&self) -> Vec<Stream> {
+ self.items
+ .borrow()
+ .iter()
+ .map(|(_, s): (usize, &Stream)| s.clone())
+ .collect()
+ }
+}
+
+// Define StreamsCommand enum and StateCommand implementation using the macro
+define_state_command! {
+ Streams,
+ StreamsCommand,
+ [CreateStream, UpdateStream, DeleteStream, PurgeStream]
+}
+
+impl ApplyState for Streams {
+ type Output = ();
+
+ fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+ match cmd {
+ StreamsCommand::CreateStream(payload) => {
+ todo!("Handle Create stream with {:?}", payload)
+ }
+ StreamsCommand::UpdateStream(payload) => {
+ todo!("Handle Update stream with {:?}", payload)
+ }
+ StreamsCommand::DeleteStream(payload) => {
+ todo!("Handle Delete stream with {:?}", payload)
+ }
+ StreamsCommand::PurgeStream(payload) => todo!("Handle Purge stream
with {:?}", payload),
+ }
+ }
+}
+
+// Define TopicsCommand enum and StateCommand implementation using the macro
+define_state_command! {
+ Topics,
+ TopicsCommand,
+ [CreateTopic, UpdateTopic, DeleteTopic, PurgeTopic]
+}
+
+impl ApplyState for Topics {
+ type Output = ();
+
+ fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+ match cmd {
+ TopicsCommand::CreateTopic(payload) => todo!("Handle Create topic
with {:?}", payload),
+ TopicsCommand::UpdateTopic(payload) => todo!("Handle Update topic
with {:?}", payload),
+ TopicsCommand::DeleteTopic(payload) => todo!("Handle Delete topic
with {:?}", payload),
+ TopicsCommand::PurgeTopic(payload) => todo!("Handle Purge topic
with {:?}", payload),
+ }
+ }
+}
+
+// Define PartitionsCommand enum and StateCommand implementation using the
macro
+define_state_command! {
+ Partitions,
+ PartitionsCommand,
+ [CreatePartitions, DeletePartitions, DeleteSegments]
+}
-impl State for Streams {
+impl ApplyState for Partitions {
type Output = ();
- fn apply(&self) -> Option<Self::Output> {
- Some(())
+ fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+ match cmd {
+ PartitionsCommand::CreatePartitions(payload) => {
+ todo!("Handle Create partitions with {:?}", payload)
+ }
+ PartitionsCommand::DeletePartitions(payload) => {
+ todo!("Handle Delete partitions with {:?}", payload)
+ }
+ PartitionsCommand::DeleteSegments(payload) => {
+ todo!("Handle Delete segments with {:?}", payload)
+ }
+ }
}
}
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index d5f0fac96..b31bdc764 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,14 +15,294 @@
// specific language governing permissions and limitations
// under the License.
-use crate::stm::State;
+use crate::{
+ permissioner::Permissioner,
+ stm::{ApplyState, StateCommand},
+};
+use ahash::AHashMap;
+use bytes::Bytes;
+use iggy_common::change_password::ChangePassword;
+use iggy_common::create_user::CreateUser;
+use iggy_common::delete_user::DeleteUser;
+use iggy_common::update_permissions::UpdatePermissions;
+use iggy_common::update_user::UpdateUser;
+use iggy_common::{
+ BytesSerializable, Identifier, IggyError, IggyTimestamp, Permissions,
PersonalAccessToken,
+ UserId, UserStatus,
+ header::{Operation, PrepareHeader},
+ message::Message,
+};
+use slab::Slab;
+use std::cell::RefCell;
-pub struct Users {}
+#[derive(Debug, Clone)]
+pub struct User {
+ pub id: UserId,
+ pub username: String,
+ pub password: String,
+ pub status: UserStatus,
+ pub created_at: IggyTimestamp,
+ pub permissions: Option<Permissions>,
+ pub personal_access_tokens: AHashMap<String, PersonalAccessToken>,
+}
+
+impl User {
+ pub fn new(
+ username: String,
+ password: String,
+ status: UserStatus,
+ created_at: IggyTimestamp,
+ permissions: Option<Permissions>,
+ ) -> Self {
+ Self {
+ id: 0,
+ username,
+ password,
+ status,
+ created_at,
+ permissions,
+ personal_access_tokens: AHashMap::new(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Users {
+ index: RefCell<AHashMap<String, usize>>,
+ items: RefCell<Slab<User>>,
+ permissioner: RefCell<Permissioner>,
+}
+
+impl Users {
+ pub fn new() -> Self {
+ Self {
+ index: RefCell::new(AHashMap::with_capacity(1024)),
+ items: RefCell::new(Slab::with_capacity(1024)),
+ permissioner: RefCell::new(Permissioner::new()),
+ }
+ }
+
+ /// Insert a user and return the assigned ID
+ pub fn insert(&self, user: User) -> usize {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ let username = user.username.clone();
+ let id = items.insert(user);
+ items[id].id = id as u32;
+ index.insert(username, id);
+ id
+ }
+
+ /// Get user by ID
+ pub fn get(&self, id: usize) -> Option<User> {
+ self.items.borrow().get(id).cloned()
+ }
+
+ /// Get user by username or ID (via Identifier enum)
+ pub fn get_by_identifier(&self, identifier: &Identifier) ->
Result<Option<User>, IggyError> {
+ match identifier.kind {
+ iggy_common::IdKind::Numeric => {
+ let id = identifier.get_u32_value()? as usize;
+ Ok(self.items.borrow().get(id).cloned())
+ }
+ iggy_common::IdKind::String => {
+ let username = identifier.get_string_value()?;
+ let index = self.index.borrow();
+ if let Some(&id) = index.get(&username) {
+ Ok(self.items.borrow().get(id).cloned())
+ } else {
+ Ok(None)
+ }
+ }
+ }
+ }
+
+ /// Remove user by ID
+ pub fn remove(&self, id: usize) -> Option<User> {
+ let mut items = self.items.borrow_mut();
+ let mut index = self.index.borrow_mut();
+
+ if !items.contains(id) {
+ return None;
+ }
+
+ let user = items.remove(id);
+ index.remove(&user.username);
+ Some(user)
+ }
+
+ /// Check if user exists
+ pub fn contains(&self, identifier: &Identifier) -> bool {
+ match identifier.kind {
+ iggy_common::IdKind::Numeric => {
+ if let Ok(id) = identifier.get_u32_value() {
+ self.items.borrow().contains(id as usize)
+ } else {
+ false
+ }
+ }
+ iggy_common::IdKind::String => {
+ if let Ok(username) = identifier.get_string_value() {
+ self.index.borrow().contains_key(&username)
+ } else {
+ false
+ }
+ }
+ }
+ }
+
+ /// Get all users as a Vec
+ pub fn values(&self) -> Vec<User> {
+ self.items
+ .borrow()
+ .iter()
+ .map(|(_, u): (usize, &User)| u.clone())
+ .collect()
+ }
+
+ /// Get number of users
+ pub fn len(&self) -> usize {
+ self.items.borrow().len()
+ }
+
+ /// Check if empty
+ pub fn is_empty(&self) -> bool {
+ self.items.borrow().is_empty()
+ }
+
+ /// Check if username already exists
+ pub fn username_exists(&self, username: &str) -> bool {
+ self.index.borrow().contains_key(username)
+ }
+
+ /// Get ID by username
+ pub fn get_id_by_username(&self, username: &str) -> Option<usize> {
+ self.index.borrow().get(username).copied()
+ }
+
+ /// Initialize permissions for a user
+ pub fn init_permissions(&self, user_id: UserId, permissions:
Option<Permissions>) {
+ self.permissioner
+ .borrow_mut()
+ .init_permissions(user_id, permissions);
+ }
+
+ /// Update permissions for a user
+ pub fn update_permissions(&self, user_id: UserId, permissions:
Option<Permissions>) {
+ self.permissioner
+ .borrow_mut()
+ .update_permissions_for_user(user_id, permissions);
+ }
+
+ /// Delete permissions for a user
+ pub fn delete_permissions(&self, user_id: UserId) {
+ self.permissioner.borrow_mut().delete_permissions(user_id);
+ }
+
+ /// Update username
+ pub fn update_username(
+ &self,
+ identifier: &Identifier,
+ new_username: String,
+ ) -> Result<(), IggyError> {
+ let id = match identifier.kind {
+ iggy_common::IdKind::Numeric => identifier.get_u32_value()? as
usize,
+ iggy_common::IdKind::String => {
+ let username = identifier.get_string_value()?;
+ let index = self.index.borrow();
+ *index
+ .get(&username)
+ .ok_or_else(||
IggyError::ResourceNotFound(username.to_string()))?
+ }
+ };
+
+ let old_username = {
+ let items = self.items.borrow();
+ let user = items
+ .get(id)
+ .ok_or_else(||
IggyError::ResourceNotFound(identifier.to_string()))?;
+ user.username.clone()
+ };
+
+ if old_username == new_username {
+ return Ok(());
+ }
+
+ tracing::trace!(
+ "Updating username: '{}' → '{}' for user ID: {}",
+ old_username,
+ new_username,
+ id
+ );
+
+ {
+ let mut items = self.items.borrow_mut();
+ let user = items
+ .get_mut(id)
+ .ok_or_else(||
IggyError::ResourceNotFound(identifier.to_string()))?;
+ user.username = new_username.clone();
+ }
+
+ let mut index = self.index.borrow_mut();
+ index.remove(&old_username);
+ index.insert(new_username, id);
+
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+pub enum UsersCommand {
+ Create(CreateUser),
+ Update(UpdateUser),
+ Delete(DeleteUser),
+ ChangePassword(ChangePassword),
+ UpdatePermissions(UpdatePermissions),
+}
+
+impl StateCommand for Users {
+ type Command = UsersCommand;
+ type Input = Message<PrepareHeader>;
+
+ fn into_command(input: &Self::Input) -> Option<Self::Command> {
+ // TODO: rework this thing, so we don't copy the bytes on each request
+ let body = Bytes::copy_from_slice(input.body());
+ match input.header().operation {
+ Operation::CreateUser => Some(UsersCommand::Create(
+ CreateUser::from_bytes(body.clone()).unwrap(),
+ )),
+ Operation::UpdateUser => Some(UsersCommand::Update(
+ UpdateUser::from_bytes(body.clone()).unwrap(),
+ )),
+ Operation::DeleteUser => Some(UsersCommand::Delete(
+ DeleteUser::from_bytes(body.clone()).unwrap(),
+ )),
+ Operation::ChangePassword => Some(UsersCommand::ChangePassword(
+ ChangePassword::from_bytes(body.clone()).unwrap(),
+ )),
+ Operation::UpdatePermissions =>
Some(UsersCommand::UpdatePermissions(
+ UpdatePermissions::from_bytes(body.clone()).unwrap(),
+ )),
+ _ => None,
+ }
+ }
+}
-impl State for Users {
+impl ApplyState for Users {
type Output = ();
- fn apply(&self) -> Option<Self::Output> {
- Some(())
+ fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+ match cmd {
+ UsersCommand::Create(payload) => todo!("Handle Create user with
{:?}", payload),
+ UsersCommand::Update(payload) => todo!("Handle Update user with
{:?}", payload),
+ UsersCommand::Delete(payload) => todo!("Handle Delete user with
{:?}", payload),
+ UsersCommand::ChangePassword(payload) => {
+ todo!("Handle Change password with {:?}", payload)
+ }
+ UsersCommand::UpdatePermissions(payload) => {
+ todo!("Handle Update permissions with {:?}", payload)
+ }
+ }
}
}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index cf5fad08c..ff38a114b 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -46,7 +46,6 @@ async-channel = { workspace = true }
async_zip = { workspace = true }
axum = { workspace = true }
axum-server = { workspace = true }
-blake3 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
@@ -100,7 +99,6 @@ papaya = "0.2.3"
prometheus-client = "0.24.0"
rand = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-no-provider"] }
-ring = "0.17.14"
ringbuffer = "0.16.0"
rmp-serde = { workspace = true }
rust-embed = { version = "8.9.0", optional = true }
@@ -124,7 +122,6 @@ tracing-appender = { workspace = true }
tracing-opentelemetry = "0.32.0"
tracing-subscriber = { workspace = true }
tungstenite = { workspace = true }
-twox-hash = { workspace = true }
ulid = "1.2.1"
uuid = { workspace = true }
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index 7b17c038e..6e98081d6 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -22,7 +22,6 @@ use crate::slab::Keyed;
use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
use crate::streaming::clients::client_manager::Client;
use crate::streaming::partitions::partition::PartitionRoot;
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
use crate::streaming::streams::stream;
use crate::streaming::topics::consumer_group::{ConsumerGroupMembers,
ConsumerGroupRoot, Member};
@@ -30,7 +29,9 @@ use crate::streaming::topics::topic::{self, TopicRoot};
use crate::streaming::users::user::User;
use arcshift::SharedGetGuard;
use bytes::{BufMut, Bytes, BytesMut};
-use iggy_common::{BytesSerializable, ConsumerOffsetInfo, Stats,
TransportProtocol, UserId};
+use iggy_common::{
+ BytesSerializable, ConsumerOffsetInfo, PersonalAccessToken, Stats,
TransportProtocol, UserId,
+};
use slab::Slab;
pub fn map_stats(stats: &Stats) -> Bytes {
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 361c224e5..216e7c93a 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -50,7 +50,6 @@ use crate::{
storage::{load_consumer_group_offsets, load_consumer_offsets},
},
persistence::persister::{FilePersister, FileWithSyncPersister,
PersisterKind},
- personal_access_tokens::personal_access_token::PersonalAccessToken,
polling_consumer::ConsumerGroupId,
segments::{Segment, storage::Storage},
stats::{PartitionStats, StreamStats, TopicStats},
@@ -66,7 +65,7 @@ use ahash::HashMap;
use compio::{fs::create_dir_all, runtime::Runtime};
use err_trail::ErrContext;
use iggy_common::{
- IggyByteSize, IggyError,
+ IggyByteSize, IggyError, PersonalAccessToken,
defaults::{
DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH,
MIN_PASSWORD_LENGTH,
MIN_USERNAME_LENGTH,
diff --git a/core/server/src/http/http_shard_wrapper.rs
b/core/server/src/http/http_shard_wrapper.rs
index f279b51be..7bcfdb65b 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -26,11 +26,11 @@ use send_wrapper::SendWrapper;
use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
use crate::shard::system::messages::PollingArgs;
use crate::state::command::EntryCommand;
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
use crate::streaming::topics;
use crate::streaming::users::user::User;
use crate::{shard::IggyShard, streaming::session::Session};
+use iggy_common::PersonalAccessToken;
/// A wrapper around IggyShard that is safe to use in HTTP handlers.
///
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index d2910ae1e..afe243529 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -20,11 +20,11 @@ use crate::http::jwt::json_web_token::GeneratedToken;
use crate::slab::Keyed;
use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
use crate::streaming::clients::client_manager::Client;
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::stats::TopicStats;
use crate::streaming::topics::consumer_group::{ConsumerGroupMembers,
ConsumerGroupRoot};
use crate::streaming::topics::topic::TopicRoot;
use crate::streaming::users::user::User;
+use iggy_common::PersonalAccessToken;
use iggy_common::{ConsumerGroupDetails, ConsumerGroupInfo,
ConsumerGroupMember, IggyByteSize};
use iggy_common::{IdentityInfo, PersonalAccessTokenInfo, TokenInfo,
TopicDetails};
use iggy_common::{UserInfo, UserInfoDetails};
diff --git a/core/server/src/http/personal_access_tokens.rs
b/core/server/src/http/personal_access_tokens.rs
index 66cb50882..75382fb76 100644
--- a/core/server/src/http/personal_access_tokens.rs
+++ b/core/server/src/http/personal_access_tokens.rs
@@ -24,7 +24,6 @@ use
crate::http::mapper::map_generated_access_token_to_identity_info;
use crate::http::shared::AppState;
use crate::state::command::EntryCommand;
use crate::state::models::CreatePersonalAccessTokenWithHash;
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::session::Session;
use axum::extract::{Path, State};
use axum::http::StatusCode;
@@ -32,6 +31,7 @@ use axum::routing::{delete, get, post};
use axum::{Extension, Json, Router, debug_handler};
use err_trail::ErrContext;
use iggy_common::IdentityInfo;
+use iggy_common::PersonalAccessToken;
use iggy_common::Validatable;
use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
diff --git a/core/server/src/shard/system/personal_access_tokens.rs
b/core/server/src/shard/system/personal_access_tokens.rs
index 189abf507..7b26aff1e 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -18,13 +18,13 @@
use super::COMPONENT;
use crate::shard::IggyShard;
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::session::Session;
use crate::streaming::users::user::User;
use err_trail::ErrContext;
use iggy_common::IggyError;
use iggy_common::IggyExpiry;
use iggy_common::IggyTimestamp;
+use iggy_common::PersonalAccessToken;
use tracing::{error, info};
impl IggyShard {
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index eef75d958..1180d06f9 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -17,7 +17,6 @@
use crate::streaming::{
partitions::partition,
- personal_access_tokens::personal_access_token::PersonalAccessToken,
streams::stream,
topics::{
consumer_group::{self},
@@ -25,8 +24,8 @@ use crate::streaming::{
},
};
use iggy_common::{
- CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions,
TransportProtocol,
- UserStatus,
+ CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions,
PersonalAccessToken,
+ TransportProtocol, UserStatus,
};
use std::net::SocketAddr;
use strum::Display;
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 454439a3d..35a560164 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -20,7 +20,6 @@ use crate::bootstrap::create_root_user;
use crate::state::file::FileState;
use crate::state::models::CreateUserWithId;
use crate::state::{COMPONENT, EntryCommand, StateEntry};
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use ahash::AHashMap;
use err_trail::ErrContext;
use iggy_common::CompressionAlgorithm;
@@ -28,6 +27,7 @@ use iggy_common::IggyError;
use iggy_common::IggyExpiry;
use iggy_common::IggyTimestamp;
use iggy_common::MaxTopicSize;
+use iggy_common::PersonalAccessToken;
use iggy_common::create_user::CreateUser;
use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
diff --git a/core/server/src/streaming/clients/client_manager.rs
b/core/server/src/streaming/clients/client_manager.rs
index 3378fbee1..cb2139845 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -17,12 +17,12 @@
*/
use crate::streaming::session::Session;
-use crate::streaming::utils::{hash, ptr::EternalPtr};
+use crate::streaming::utils::ptr::EternalPtr;
use dashmap::DashMap;
-use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
use iggy_common::TransportProtocol;
use iggy_common::UserId;
+use iggy_common::{IggyError, calculate_32};
use std::net::SocketAddr;
pub struct ClientManager {
@@ -61,7 +61,7 @@ pub struct ConsumerGroup {
impl ClientManager {
pub fn add_client(&self, address: &SocketAddr, transport:
TransportProtocol) -> Session {
- let client_id = hash::calculate_32(address.to_string().as_bytes());
+ let client_id = calculate_32(address.to_string().as_bytes());
let session = Session::from_client_id(client_id, *address);
let client = Client {
user_id: None,
diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs
index fe1eb0611..a6141bd8b 100644
--- a/core/server/src/streaming/mod.rs
+++ b/core/server/src/streaming/mod.rs
@@ -21,7 +21,6 @@ pub mod deduplication;
pub mod diagnostics;
pub mod partitions;
pub mod persistence;
-pub mod personal_access_tokens;
pub mod polling_consumer;
pub mod segments;
pub mod session;
diff --git a/core/server/src/streaming/polling_consumer.rs
b/core/server/src/streaming/polling_consumer.rs
index dcc5f0f4a..42a2ceb23 100644
--- a/core/server/src/streaming/polling_consumer.rs
+++ b/core/server/src/streaming/polling_consumer.rs
@@ -16,8 +16,7 @@
* under the License.
*/
-use crate::streaming::utils::hash;
-use iggy_common::{IdKind, Identifier};
+use iggy_common::{IdKind, Identifier, calculate_32};
use std::fmt::{Display, Formatter};
#[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
@@ -56,7 +55,7 @@ impl PollingConsumer {
pub fn resolve_consumer_id(identifier: &Identifier) -> usize {
match identifier.kind {
IdKind::Numeric => identifier.get_u32_value().unwrap() as usize,
- IdKind::String => hash::calculate_32(&identifier.value) as usize,
+ IdKind::String => calculate_32(&identifier.value) as usize,
}
}
}
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 0972596b8..d5f77de9d 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -35,10 +35,9 @@ use crate::{
},
topic::{Topic, TopicRef, TopicRefMut, TopicRoot},
},
- utils::hash,
},
};
-use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
+use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize,
calculate_32};
use slab::Slab;
pub fn rename_index(
@@ -100,7 +99,7 @@ pub fn calculate_partition_id_by_messages_key_hash(
upperbound: usize,
messages_key: &[u8],
) -> usize {
- let messages_key_hash = hash::calculate_32(messages_key) as usize;
+ let messages_key_hash = calculate_32(messages_key) as usize;
let partition_id = messages_key_hash % upperbound;
tracing::trace!(
"Calculated partition ID: {} for messages key: {:?}, hash: {}",
diff --git a/core/server/src/streaming/users/user.rs
b/core/server/src/streaming/users/user.rs
index 437ce955b..672662d3e 100644
--- a/core/server/src/streaming/users/user.rs
+++ b/core/server/src/streaming/users/user.rs
@@ -15,10 +15,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-use
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
use crate::streaming::utils::crypto;
use dashmap::DashMap;
use iggy_common::IggyTimestamp;
+use iggy_common::PersonalAccessToken;
use iggy_common::UserStatus;
use iggy_common::defaults::*;
use iggy_common::{Permissions, UserId};
diff --git a/core/server/src/streaming/utils/mod.rs
b/core/server/src/streaming/utils/mod.rs
index 6acd0abc0..00e9f65f3 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/server/src/streaming/utils/mod.rs
@@ -19,6 +19,5 @@
pub mod address;
pub mod crypto;
pub mod file;
-pub mod hash;
pub mod ptr;
pub mod random_id;