This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch metadata_dev in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2bb03d17db6e5107996cd28740a760962c98d3aa Author: numinex <[email protected]> AuthorDate: Mon Jan 5 15:33:48 2026 +0100 squash --- Cargo.lock | 4 + core/common/Cargo.toml | 2 + core/common/src/lib.rs | 2 + core/common/src/types/consensus/message.rs | 24 ++ core/common/src/types/mod.rs | 1 + .../src/types/personal_access_tokens/mod.rs} | 14 +- .../src/streaming => common/src}/utils/hash.rs | 0 core/common/src/utils/mod.rs | 1 + core/metadata/Cargo.toml | 2 + core/metadata/src/lib.rs | 3 + core/metadata/src/permissioner/mod.rs | 2 + core/metadata/src/permissioner/permissioner.rs | 96 +++++ .../permissioner_rules/consumer_groups.rs | 76 ++++ .../permissioner_rules/consumer_offsets.rs} | 41 +- .../permissioner/permissioner_rules/messages.rs | 135 +++++++ .../src/permissioner/permissioner_rules}/mod.rs | 10 +- .../permissioner/permissioner_rules/partitions.rs} | 34 +- .../permissioner/permissioner_rules/segments.rs} | 14 +- .../src/permissioner/permissioner_rules/streams.rs | 86 +++++ .../src/permissioner/permissioner_rules/system.rs} | 36 +- .../src/permissioner/permissioner_rules/topics.rs | 153 ++++++++ .../src/permissioner/permissioner_rules/users.rs | 70 ++++ core/metadata/src/stats/mod.rs | 349 +++++++++++++++++ core/metadata/src/stm/consumer_group.rs | 185 ++++++++- core/metadata/src/stm/mod.rs | 37 +- core/metadata/src/stm/mux.rs | 25 +- core/metadata/src/stm/stream.rs | 429 ++++++++++++++++++++- core/metadata/src/stm/user.rs | 245 +++++++++++- core/server/src/binary/mapper.rs | 5 +- core/server/src/bootstrap.rs | 3 +- core/server/src/http/http_shard_wrapper.rs | 2 +- core/server/src/http/mapper.rs | 2 +- core/server/src/http/personal_access_tokens.rs | 2 +- .../src/shard/system/personal_access_tokens.rs | 2 +- core/server/src/shard/transmission/event.rs | 5 +- core/server/src/state/system.rs | 2 +- .../server/src/streaming/clients/client_manager.rs | 6 +- core/server/src/streaming/mod.rs | 1 - core/server/src/streaming/polling_consumer.rs | 5 +- core/server/src/streaming/topics/helpers.rs | 5 +- core/server/src/streaming/users/user.rs | 2 +- core/server/src/streaming/utils/mod.rs | 1 - 42 files changed, 2012 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b37ab1d22..5704427ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4656,6 +4656,7 @@ dependencies = [ "aes-gcm", "ahash 0.8.12", "base64 0.22.1", + "blake3", "bon", "byte-unit", "bytemuck", @@ -4677,6 +4678,7 @@ dependencies = [ "nix", "once_cell", "rcgen", + "ring", "rustls", "serde", "serde_json", @@ -5656,10 +5658,12 @@ dependencies = [ name = "metadata" version = "0.1.0" dependencies = [ + "ahash 0.8.12", "consensus", "iggy_common", "journal", "message_bus", + "slab", "tracing", ] diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index e935435ef..a529c1ab3 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -36,6 +36,7 @@ fast_async_lock = ["dep:fast-async-mutex"] aes-gcm = { workspace = true } ahash = { workspace = true } base64 = { workspace = true } +blake3 = { workspace = true } bon = { workspace = true } byte-unit = { workspace = true } bytemuck = { workspace = true } @@ -57,6 +58,7 @@ humantime = { workspace = true } nix = { workspace = true } once_cell = { workspace = true } rcgen = "0.14.6" +ring = "0.17.14" rustls = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index af04602ce..9171c4097 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -88,6 +88,7 @@ pub use types::message::*; pub use types::partition::*; pub use types::permissions::permissions_global::*; pub use types::permissions::personal_access_token::*; +pub use types::personal_access_tokens::*; pub use types::snapshot::*; pub use types::stats::*; pub use types::stream::*; @@ -100,6 +101,7 @@ pub use utils::checksum::*; pub use utils::crypto::*; pub use utils::duration::{IggyDuration, SEC_IN_MICRO}; pub use utils::expiry::IggyExpiry; +pub use utils::hash::*; pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry; pub use utils::text; pub use utils::timestamp::*; diff --git a/core/common/src/types/consensus/message.rs b/core/common/src/types/consensus/message.rs index 977b06e32..0f9f78f3a 100644 --- a/core/common/src/types/consensus/message.rs +++ b/core/common/src/types/consensus/message.rs @@ -89,6 +89,30 @@ where } } + pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut T)) -> Message<T> { + assert_eq!(size_of::<H>(), size_of::<T>()); + + // Copy old header to stack to avoid UB. + let old_header = *self.header(); + + // Safety: We ensured that size_of::<H>() == size_of::<T>() + // On top of that, there is going to be only one reference to buffer during this function call + // so no other references can observe the mutation. + // In the future we can replace the `Bytes` buffer with something that does not allow sharing between different threads. + let buffer = self.into_inner(); + unsafe { + let ptr = buffer.as_ptr() as *mut u8; + let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>()); + let new_header: &mut T = bytemuck::from_bytes_mut(slice); + f(old_header, new_header); + } + + Message { + buffer, + _marker: PhantomData, + } + } + /// Replace the header with a new one, using the provided function to generate it. pub fn replace_header<T: ConsensusHeader>(self, f: impl FnOnce(&H) -> T) -> Message<T> { assert_eq!(size_of::<H>(), size_of::<T>()); diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs index f0d70cf9c..065a48c1e 100644 --- a/core/common/src/types/mod.rs +++ b/core/common/src/types/mod.rs @@ -29,6 +29,7 @@ pub(crate) mod identifier; pub(crate) mod message; pub(crate) mod partition; pub(crate) mod permissions; +pub(crate) mod personal_access_tokens; pub(crate) mod snapshot; pub(crate) mod stats; pub(crate) mod stream; diff --git a/core/server/src/streaming/personal_access_tokens/personal_access_token.rs b/core/common/src/types/personal_access_tokens/mod.rs similarity index 94% rename from core/server/src/streaming/personal_access_tokens/personal_access_token.rs rename to core/common/src/types/personal_access_tokens/mod.rs index 8fc0c8535..e28a585de 100644 --- a/core/server/src/streaming/personal_access_tokens/personal_access_token.rs +++ b/core/common/src/types/personal_access_tokens/mod.rs @@ -15,11 +15,11 @@ * specific language governing permissions and limitations * under the License. */ -use crate::streaming::utils::hash; -use iggy_common::IggyExpiry; -use iggy_common::IggyTimestamp; -use iggy_common::UserId; -use iggy_common::text::as_base64; +use crate::IggyExpiry; +use crate::IggyTimestamp; +use crate::UserId; +use crate::text::as_base64; +use crate::utils::hash; use ring::rand::SecureRandom; use std::sync::Arc; @@ -96,8 +96,8 @@ impl PersonalAccessToken { #[cfg(test)] mod tests { use super::*; - use iggy_common::IggyDuration; - use iggy_common::IggyTimestamp; + use crate::IggyDuration; + use crate::IggyTimestamp; #[test] fn personal_access_token_should_be_created_with_random_secure_value_and_hashed_successfully() { diff --git a/core/server/src/streaming/utils/hash.rs b/core/common/src/utils/hash.rs similarity index 100% copy from core/server/src/streaming/utils/hash.rs copy to core/common/src/utils/hash.rs diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs index 35656a15c..aa8714daa 100644 --- a/core/common/src/utils/mod.rs +++ b/core/common/src/utils/mod.rs @@ -20,6 +20,7 @@ pub(crate) mod checksum; pub(crate) mod crypto; pub(crate) mod duration; pub(crate) mod expiry; +pub(crate) mod hash; pub(crate) mod personal_access_token_expiry; pub mod text; pub(crate) mod timestamp; diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index a6ba9a43f..335808e6b 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -33,3 +33,5 @@ iggy_common = { path = "../common" } journal = { path = "../journal" } message_bus = { path = "../message_bus" } tracing = { workspace = true } +ahash = { workspace = true } +slab = "0.4.11" diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs index c86b13993..9b22a6211 100644 --- a/core/metadata/src/lib.rs +++ b/core/metadata/src/lib.rs @@ -18,4 +18,7 @@ //! Iggy metadata module mod impls; +mod permissioner; pub mod stm; + +mod stats; diff --git a/core/metadata/src/permissioner/mod.rs b/core/metadata/src/permissioner/mod.rs new file mode 100644 index 000000000..da31e18bf --- /dev/null +++ b/core/metadata/src/permissioner/mod.rs @@ -0,0 +1,2 @@ +pub mod permissioner; +pub mod permissioner_rules; diff --git a/core/metadata/src/permissioner/permissioner.rs b/core/metadata/src/permissioner/permissioner.rs new file mode 100644 index 000000000..79adca03f --- /dev/null +++ b/core/metadata/src/permissioner/permissioner.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ahash::{AHashMap, AHashSet}; +use iggy_common::{GlobalPermissions, Permissions, StreamPermissions, UserId}; + +#[derive(Debug, Default, Clone)] +pub struct Permissioner { + pub users_permissions: AHashMap<UserId, GlobalPermissions>, + pub users_streams_permissions: AHashMap<(UserId, usize), StreamPermissions>, + pub users_that_can_poll_messages_from_all_streams: AHashSet<UserId>, + pub users_that_can_send_messages_to_all_streams: AHashSet<UserId>, + pub users_that_can_poll_messages_from_specific_streams: AHashSet<(UserId, usize)>, + pub users_that_can_send_messages_to_specific_streams: AHashSet<(UserId, usize)>, +} + +impl Permissioner { + pub fn new() -> Self { + Self::default() + } + + pub fn init_permissions(&mut self, user_id: UserId, permissions: Option<Permissions>) { + if permissions.is_none() { + return; + } + + let permissions = permissions.unwrap(); + if permissions.global.poll_messages { + self.users_that_can_poll_messages_from_all_streams + .insert(user_id); + } + + if permissions.global.send_messages { + self.users_that_can_send_messages_to_all_streams + .insert(user_id); + } + + self.users_permissions.insert(user_id, permissions.global); + if permissions.streams.is_none() { + return; + } + + let streams = permissions.streams.unwrap(); + for (stream_id, stream) in streams { + if stream.poll_messages { + self.users_that_can_poll_messages_from_specific_streams + .insert((user_id, stream_id)); + } + + if stream.send_messages { + self.users_that_can_send_messages_to_specific_streams + .insert((user_id, stream_id)); + } + + self.users_streams_permissions + .insert((user_id, stream_id), stream); + } + } + + pub fn update_permissions_for_user( + &mut self, + user_id: UserId, + permissions: Option<Permissions>, + ) { + self.delete_permissions(user_id); + self.init_permissions(user_id, permissions); + } + + pub fn delete_permissions(&mut self, user_id: UserId) { + self.users_permissions.remove(&user_id); + self.users_that_can_poll_messages_from_all_streams + .remove(&user_id); + self.users_that_can_send_messages_to_all_streams + .remove(&user_id); + self.users_streams_permissions + .retain(|(id, _), _| *id != user_id); + self.users_that_can_poll_messages_from_specific_streams + .retain(|(id, _)| *id != user_id); + self.users_that_can_send_messages_to_specific_streams + .retain(|(id, _)| *id != user_id); + } +} diff --git a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs new file mode 100644 index 000000000..85111fdd0 --- /dev/null +++ b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs @@ -0,0 +1,76 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; + +impl Permissioner { + pub fn create_consumer_group( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.get_topic(user_id, stream_id, topic_id) + } + + pub fn delete_consumer_group( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.get_topic(user_id, stream_id, topic_id) + } + + pub fn get_consumer_group( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.get_topic(user_id, stream_id, topic_id) + } + + pub fn get_consumer_groups( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.get_topic(user_id, stream_id, topic_id) + } + + pub fn join_consumer_group( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.get_topic(user_id, stream_id, topic_id) + } + + pub fn leave_consumer_group( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.get_topic(user_id, stream_id, topic_id) + } +} diff --git a/core/server/src/streaming/utils/hash.rs b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs similarity index 52% copy from core/server/src/streaming/utils/hash.rs copy to core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs index f2829a6be..7d9a7340a 100644 --- a/core/server/src/streaming/utils/hash.rs +++ b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs @@ -16,23 +16,34 @@ * under the License. */ -use twox_hash::XxHash32; +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; -pub fn calculate_32(data: &[u8]) -> u32 { - XxHash32::oneshot(0, data) -} +impl Permissioner { + pub fn get_consumer_offset( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.poll_messages(user_id, stream_id, topic_id) + } -pub fn calculate_256(data: &[u8]) -> String { - blake3::hash(data).to_hex().to_string() -} + pub fn store_consumer_offset( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.poll_messages(user_id, stream_id, topic_id) + } -#[cfg(test)] -mod tests { - #[test] - fn given_same_input_calculate_should_produce_same_output() { - let input = "hello world".as_bytes(); - let output1 = super::calculate_32(input); - let output2 = super::calculate_32(input); - assert_eq!(output1, output2); + pub fn delete_consumer_offset( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.poll_messages(user_id, stream_id, topic_id) } } diff --git a/core/metadata/src/permissioner/permissioner_rules/messages.rs b/core/metadata/src/permissioner/permissioner_rules/messages.rs new file mode 100644 index 000000000..29982faf6 --- /dev/null +++ b/core/metadata/src/permissioner/permissioner_rules/messages.rs @@ -0,0 +1,135 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; + +impl Permissioner { + pub fn poll_messages( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + if self + .users_that_can_poll_messages_from_all_streams + .contains(&user_id) + { + return Ok(()); + } + + if self + .users_that_can_poll_messages_from_specific_streams + .contains(&(user_id, stream_id)) + { + return Ok(()); + } + + let stream_permissions = self.users_streams_permissions.get(&(user_id, stream_id)); + if stream_permissions.is_none() { + return Err(IggyError::Unauthorized); + } + + let stream_permissions = stream_permissions.unwrap(); + if stream_permissions.read_stream { + return Ok(()); + } + + if stream_permissions.manage_topics { + return Ok(()); + } + + if stream_permissions.read_topics { + return Ok(()); + } + + if stream_permissions.poll_messages { + return Ok(()); + } + + if stream_permissions.topics.is_none() { + return Err(IggyError::Unauthorized); + } + + let topic_permissions = stream_permissions.topics.as_ref().unwrap(); + if let Some(topic_permissions) = topic_permissions.get(&topic_id) { + return match topic_permissions.poll_messages + | topic_permissions.read_topic + | topic_permissions.manage_topic + { + true => Ok(()), + false => Err(IggyError::Unauthorized), + }; + } + + Err(IggyError::Unauthorized) + } + + pub fn append_messages( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + if self + .users_that_can_send_messages_to_all_streams + .contains(&user_id) + { + return Ok(()); + } + + if self + .users_that_can_send_messages_to_specific_streams + .contains(&(user_id, stream_id)) + { + return Ok(()); + } + + let stream_permissions = self.users_streams_permissions.get(&(user_id, stream_id)); + if stream_permissions.is_none() { + return Err(IggyError::Unauthorized); + } + + let stream_permissions = stream_permissions.unwrap(); + if stream_permissions.manage_stream { + return Ok(()); + } + + if stream_permissions.manage_topics { + return Ok(()); + } + + if stream_permissions.send_messages { + return Ok(()); + } + + if stream_permissions.topics.is_none() { + return Err(IggyError::Unauthorized); + } + + let topic_permissions = stream_permissions.topics.as_ref().unwrap(); + if let Some(topic_permissions) = topic_permissions.get(&topic_id) { + return match topic_permissions.send_messages | topic_permissions.manage_topic { + true => Ok(()), + false => Err(IggyError::Unauthorized), + }; + } + + Err(IggyError::Unauthorized) + } +} diff --git a/core/server/src/streaming/personal_access_tokens/mod.rs b/core/metadata/src/permissioner/permissioner_rules/mod.rs similarity index 85% copy from core/server/src/streaming/personal_access_tokens/mod.rs copy to core/metadata/src/permissioner/permissioner_rules/mod.rs index d404dbe5b..0d08a7a1c 100644 --- a/core/server/src/streaming/personal_access_tokens/mod.rs +++ b/core/metadata/src/permissioner/permissioner_rules/mod.rs @@ -16,4 +16,12 @@ * under the License. */ -pub mod personal_access_token; +mod consumer_groups; +pub mod consumer_offsets; +mod messages; +mod partitions; +mod segments; +mod streams; +mod system; +mod topics; +mod users; diff --git a/core/server/src/streaming/utils/hash.rs b/core/metadata/src/permissioner/permissioner_rules/partitions.rs similarity index 61% copy from core/server/src/streaming/utils/hash.rs copy to core/metadata/src/permissioner/permissioner_rules/partitions.rs index f2829a6be..cd480ccd0 100644 --- a/core/server/src/streaming/utils/hash.rs +++ b/core/metadata/src/permissioner/permissioner_rules/partitions.rs @@ -16,23 +16,25 @@ * under the License. */ -use twox_hash::XxHash32; +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; -pub fn calculate_32(data: &[u8]) -> u32 { - XxHash32::oneshot(0, data) -} - -pub fn calculate_256(data: &[u8]) -> String { - blake3::hash(data).to_hex().to_string() -} +impl Permissioner { + pub fn create_partitions( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.update_topic(user_id, stream_id, topic_id) + } -#[cfg(test)] -mod tests { - #[test] - fn given_same_input_calculate_should_produce_same_output() { - let input = "hello world".as_bytes(); - let output1 = super::calculate_32(input); - let output2 = super::calculate_32(input); - assert_eq!(output1, output2); + pub fn delete_partitions( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.update_topic(user_id, stream_id, topic_id) } } diff --git a/core/server/src/streaming/personal_access_tokens/mod.rs b/core/metadata/src/permissioner/permissioner_rules/segments.rs similarity index 71% rename from core/server/src/streaming/personal_access_tokens/mod.rs rename to core/metadata/src/permissioner/permissioner_rules/segments.rs index d404dbe5b..64493a9b8 100644 --- a/core/server/src/streaming/personal_access_tokens/mod.rs +++ b/core/metadata/src/permissioner/permissioner_rules/segments.rs @@ -16,4 +16,16 @@ * under the License. */ -pub mod personal_access_token; +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; + +impl Permissioner { + pub fn delete_segments( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.update_topic(user_id, stream_id, topic_id) + } +} diff --git a/core/metadata/src/permissioner/permissioner_rules/streams.rs b/core/metadata/src/permissioner/permissioner_rules/streams.rs new file mode 100644 index 000000000..50b7fd44a --- /dev/null +++ b/core/metadata/src/permissioner/permissioner_rules/streams.rs @@ -0,0 +1,86 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; + +impl Permissioner { + pub fn get_stream(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.manage_streams || global_permissions.read_streams) + { + return Ok(()); + } + + if let Some(stream_permissions) = self.users_streams_permissions.get(&(user_id, stream_id)) + && (stream_permissions.manage_stream || stream_permissions.read_stream) + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } + + pub fn get_streams(&self, user_id: u32) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.manage_streams || global_permissions.read_streams) + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } + + pub fn create_stream(&self, user_id: u32) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && global_permissions.manage_streams + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } + + pub fn update_stream(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + self.manage_stream(user_id, stream_id) + } + + pub fn delete_stream(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + self.manage_stream(user_id, stream_id) + } + + pub fn purge_stream(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + self.manage_stream(user_id, stream_id) + } + + fn manage_stream(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && global_permissions.manage_streams + { + return Ok(()); + } + + if let Some(stream_permissions) = self.users_streams_permissions.get(&(user_id, stream_id)) + && stream_permissions.manage_stream + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } +} diff --git a/core/server/src/streaming/utils/hash.rs b/core/metadata/src/permissioner/permissioner_rules/system.rs similarity index 51% rename from core/server/src/streaming/utils/hash.rs rename to core/metadata/src/permissioner/permissioner_rules/system.rs index f2829a6be..2ce1fdb29 100644 --- a/core/server/src/streaming/utils/hash.rs +++ b/core/metadata/src/permissioner/permissioner_rules/system.rs @@ -16,23 +16,29 @@ * under the License. */ -use twox_hash::XxHash32; +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; -pub fn calculate_32(data: &[u8]) -> u32 { - XxHash32::oneshot(0, data) -} +impl Permissioner { + pub fn get_stats(&self, user_id: u32) -> Result<(), IggyError> { + self.get_server_info(user_id) + } -pub fn calculate_256(data: &[u8]) -> String { - blake3::hash(data).to_hex().to_string() -} + pub fn get_clients(&self, user_id: u32) -> Result<(), IggyError> { + self.get_server_info(user_id) + } + + pub fn get_client(&self, user_id: u32) -> Result<(), IggyError> { + self.get_server_info(user_id) + } + + fn get_server_info(&self, user_id: u32) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.manage_servers || global_permissions.read_servers) + { + return Ok(()); + } -#[cfg(test)] -mod tests { - #[test] - fn given_same_input_calculate_should_produce_same_output() { - let input = "hello world".as_bytes(); - let output1 = super::calculate_32(input); - let output2 = super::calculate_32(input); - assert_eq!(output1, output2); + Err(IggyError::Unauthorized) } } diff --git a/core/metadata/src/permissioner/permissioner_rules/topics.rs b/core/metadata/src/permissioner/permissioner_rules/topics.rs new file mode 100644 index 000000000..fcf7ae5c1 --- /dev/null +++ b/core/metadata/src/permissioner/permissioner_rules/topics.rs @@ -0,0 +1,153 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; + +impl Permissioner { + pub fn get_topic( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.read_streams + || global_permissions.manage_streams + || global_permissions.manage_topics + || global_permissions.read_topics) + { + return Ok(()); + } + + if let Some(stream_permissions) = self.users_streams_permissions.get(&(user_id, stream_id)) + { + if stream_permissions.manage_topics || stream_permissions.read_topics { + return Ok(()); + } + + if let Some(topic_permissions) = + stream_permissions.topics.as_ref().unwrap().get(&topic_id) + && (topic_permissions.manage_topic || topic_permissions.read_topic) + { + return Ok(()); + } + } + + Err(IggyError::Unauthorized) + } + + pub fn get_topics(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.read_streams + || global_permissions.manage_streams + || global_permissions.manage_topics + || global_permissions.read_topics) + { + return Ok(()); + } + + if let Some(stream_permissions) = self.users_streams_permissions.get(&(user_id, stream_id)) + { + if stream_permissions.manage_topics || stream_permissions.read_topics { + return Ok(()); + } + + if let Some(topic_permissions) = + stream_permissions.topics.as_ref().unwrap().get(&stream_id) + && (topic_permissions.manage_topic || topic_permissions.read_topic) + { + return Ok(()); + } + } + + Err(IggyError::Unauthorized) + } + + pub fn create_topic(&self, user_id: u32, stream_id: usize) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.manage_streams || global_permissions.manage_topics) + { + return Ok(()); + } + + if let Some(stream_permissions) = self.users_streams_permissions.get(&(user_id, stream_id)) + && stream_permissions.manage_topics + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } + + pub fn update_topic( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.manage_topic(user_id, stream_id, topic_id) + } + + pub fn delete_topic( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.manage_topic(user_id, stream_id, topic_id) + } + + pub fn purge_topic( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + self.manage_topic(user_id, stream_id, topic_id) + } + + fn manage_topic( + &self, + user_id: u32, + stream_id: usize, + topic_id: usize, + ) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.manage_streams || global_permissions.manage_topics) + { + return Ok(()); + } + + if let Some(stream_permissions) = self.users_streams_permissions.get(&(user_id, stream_id)) + { + if stream_permissions.manage_topics { + return Ok(()); + } + + if let Some(topic_permissions) = + stream_permissions.topics.as_ref().unwrap().get(&topic_id) + && topic_permissions.manage_topic + { + return Ok(()); + } + } + + Err(IggyError::Unauthorized) + } +} diff --git a/core/metadata/src/permissioner/permissioner_rules/users.rs b/core/metadata/src/permissioner/permissioner_rules/users.rs new file mode 100644 index 000000000..b5c016a33 --- /dev/null +++ b/core/metadata/src/permissioner/permissioner_rules/users.rs @@ -0,0 +1,70 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::permissioner::permissioner::Permissioner; +use iggy_common::IggyError; + +impl Permissioner { + pub fn get_user(&self, user_id: u32) -> Result<(), IggyError> { + self.read_users(user_id) + } + + pub fn get_users(&self, user_id: u32) -> Result<(), IggyError> { + self.read_users(user_id) + } + + pub fn create_user(&self, user_id: u32) -> Result<(), IggyError> { + self.manager_users(user_id) + } + + pub fn delete_user(&self, user_id: u32) -> Result<(), IggyError> { + self.manager_users(user_id) + } + + pub fn update_user(&self, user_id: u32) -> Result<(), IggyError> { + self.manager_users(user_id) + } + + pub fn update_permissions(&self, user_id: u32) -> Result<(), IggyError> { + self.manager_users(user_id) + } + + pub fn change_password(&self, user_id: u32) -> Result<(), IggyError> { + self.manager_users(user_id) + } + + fn manager_users(&self, user_id: u32) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && global_permissions.manage_users + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } + + fn read_users(&self, user_id: u32) -> Result<(), IggyError> { + if let Some(global_permissions) = self.users_permissions.get(&user_id) + && (global_permissions.manage_users || global_permissions.read_users) + { + return Ok(()); + } + + Err(IggyError::Unauthorized) + } +} diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs new file mode 100644 index 000000000..89d594665 --- /dev/null +++ b/core/metadata/src/stats/mod.rs @@ -0,0 +1,349 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{ + Arc, + atomic::{AtomicU32, AtomicU64, Ordering}, +}; + +#[derive(Default, Debug)] +pub struct StreamStats { + size_bytes: AtomicU64, + messages_count: AtomicU64, + segments_count: AtomicU32, +} + +impl StreamStats { + pub fn increment_size_bytes(&self, size_bytes: u64) { + self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel); + } + + pub fn increment_messages_count(&self, messages_count: u64) { + self.messages_count + .fetch_add(messages_count, Ordering::AcqRel); + } + + pub fn increment_segments_count(&self, segments_count: u32) { + self.segments_count + .fetch_add(segments_count, Ordering::AcqRel); + } + + pub fn decrement_size_bytes(&self, size_bytes: u64) { + self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel); + } + + pub fn decrement_messages_count(&self, messages_count: u64) { + self.messages_count + .fetch_sub(messages_count, Ordering::AcqRel); + } + + pub fn decrement_segments_count(&self, segments_count: u32) { + self.segments_count + .fetch_sub(segments_count, Ordering::AcqRel); + } + + pub fn size_bytes_inconsistent(&self) -> u64 { + self.size_bytes.load(Ordering::Relaxed) + } + + pub fn messages_count_inconsistent(&self) -> u64 { + self.messages_count.load(Ordering::Relaxed) + } + + pub fn segments_count_inconsistent(&self) -> u32 { + self.segments_count.load(Ordering::Relaxed) + } + + pub fn zero_out_size_bytes(&self) { + self.size_bytes.store(0, Ordering::Relaxed); + } + + pub fn zero_out_messages_count(&self) { + self.messages_count.store(0, Ordering::Relaxed); + } + + pub fn zero_out_segments_count(&self) { + self.segments_count.store(0, Ordering::Relaxed); + } + + pub fn zero_out_all(&self) { + self.zero_out_size_bytes(); + self.zero_out_messages_count(); + self.zero_out_segments_count(); + } +} + +#[derive(Default, Debug)] +pub struct TopicStats { + parent: Arc<StreamStats>, + size_bytes: AtomicU64, + messages_count: AtomicU64, + segments_count: AtomicU32, +} + +impl TopicStats { + pub fn new(parent: Arc<StreamStats>) -> Self { + Self { + parent, + size_bytes: AtomicU64::new(0), + messages_count: AtomicU64::new(0), + segments_count: AtomicU32::new(0), + } + } + + pub fn parent(&self) -> Arc<StreamStats> { + self.parent.clone() + } + + pub fn increment_parent_size_bytes(&self, size_bytes: u64) { + self.parent.increment_size_bytes(size_bytes); + } + + pub fn increment_parent_messages_count(&self, messages_count: u64) { + self.parent.increment_messages_count(messages_count); + } + + pub fn increment_parent_segments_count(&self, segments_count: u32) { + self.parent.increment_segments_count(segments_count); + } + + pub fn increment_size_bytes(&self, size_bytes: u64) { + self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel); + self.increment_parent_size_bytes(size_bytes); + } + + pub fn increment_messages_count(&self, messages_count: u64) { + self.messages_count + .fetch_add(messages_count, Ordering::AcqRel); + self.increment_parent_messages_count(messages_count); + } + + pub fn increment_segments_count(&self, segments_count: u32) { + self.segments_count + .fetch_add(segments_count, Ordering::AcqRel); + self.increment_parent_segments_count(segments_count); + } + + pub fn decrement_parent_size_bytes(&self, size_bytes: u64) { + self.parent.decrement_size_bytes(size_bytes); + } + + pub fn decrement_parent_messages_count(&self, messages_count: u64) { + self.parent.decrement_messages_count(messages_count); + } + + pub fn decrement_parent_segments_count(&self, segments_count: u32) { + self.parent.decrement_segments_count(segments_count); + } + + pub fn decrement_size_bytes(&self, size_bytes: u64) { + self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel); + self.decrement_parent_size_bytes(size_bytes); + } + + pub fn decrement_messages_count(&self, messages_count: u64) { + self.messages_count + .fetch_sub(messages_count, Ordering::AcqRel); + self.decrement_parent_messages_count(messages_count); + } + + pub fn decrement_segments_count(&self, segments_count: u32) { + self.segments_count + .fetch_sub(segments_count, Ordering::AcqRel); + self.decrement_parent_segments_count(segments_count); + } + + pub fn size_bytes_inconsistent(&self) -> u64 { + self.size_bytes.load(Ordering::Relaxed) + } + + pub fn messages_count_inconsistent(&self) -> u64 { + self.messages_count.load(Ordering::Relaxed) + } + + pub fn segments_count_inconsistent(&self) -> u32 { + self.segments_count.load(Ordering::Relaxed) + } + + pub fn zero_out_parent_size_bytes(&self) { + self.parent.zero_out_size_bytes(); + } + + pub fn zero_out_parent_messages_count(&self) { + self.parent.zero_out_messages_count(); + } + + pub fn zero_out_parent_segments_count(&self) { + self.parent.zero_out_segments_count(); + } + + pub fn zero_out_parent_all(&self) { + self.parent.zero_out_all(); + } + + pub fn zero_out_size_bytes(&self) { + self.size_bytes.store(0, Ordering::Relaxed); + self.zero_out_parent_size_bytes(); + } + + pub fn zero_out_messages_count(&self) { + self.messages_count.store(0, Ordering::Relaxed); + self.zero_out_parent_messages_count(); + } + + pub fn zero_out_segments_count(&self) { + self.segments_count.store(0, Ordering::Relaxed); + self.zero_out_parent_segments_count(); + } + + pub fn zero_out_all(&self) { + self.zero_out_size_bytes(); + self.zero_out_messages_count(); + self.zero_out_segments_count(); + } +} + +#[derive(Default, Debug)] +pub struct PartitionStats { + parent: Arc<TopicStats>, + messages_count: AtomicU64, + size_bytes: AtomicU64, + segments_count: AtomicU32, +} + +impl PartitionStats { + pub fn new(parent_stats: Arc<TopicStats>) -> Self { + Self { + parent: parent_stats, + messages_count: AtomicU64::new(0), + size_bytes: AtomicU64::new(0), + segments_count: AtomicU32::new(0), + } + } + + pub fn parent(&self) -> Arc<TopicStats> { + self.parent.clone() + } + + pub fn increment_size_bytes(&self, size_bytes: u64) { + self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel); + self.increment_parent_size_bytes(size_bytes); + } + + pub fn increment_messages_count(&self, messages_count: u64) { + self.messages_count + .fetch_add(messages_count, Ordering::AcqRel); + self.increment_parent_messages_count(messages_count); + } + + pub fn increment_segments_count(&self, segments_count: u32) { + self.segments_count + .fetch_add(segments_count, Ordering::AcqRel); + self.increment_parent_segments_count(segments_count); + } + + pub fn increment_parent_size_bytes(&self, size_bytes: u64) { + self.parent.increment_size_bytes(size_bytes); + } + + pub fn increment_parent_messages_count(&self, messages_count: u64) { + self.parent.increment_messages_count(messages_count); + } + + pub fn increment_parent_segments_count(&self, segments_count: u32) { + self.parent.increment_segments_count(segments_count); + } + + pub fn decrement_size_bytes(&self, size_bytes: u64) { + self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel); + self.decrement_parent_size_bytes(size_bytes); + } + + pub fn decrement_messages_count(&self, messages_count: u64) { + self.messages_count + .fetch_sub(messages_count, Ordering::AcqRel); + self.decrement_parent_messages_count(messages_count); + } + + pub fn decrement_segments_count(&self, segments_count: u32) { + self.segments_count + .fetch_sub(segments_count, Ordering::AcqRel); + self.decrement_parent_segments_count(segments_count); + } + + pub fn decrement_parent_size_bytes(&self, size_bytes: u64) { + self.parent.decrement_size_bytes(size_bytes); + } + + pub fn decrement_parent_messages_count(&self, messages_count: u64) { + self.parent.decrement_messages_count(messages_count); + } + + pub fn decrement_parent_segments_count(&self, segments_count: u32) { + self.parent.decrement_segments_count(segments_count); + } + + pub fn size_bytes_inconsistent(&self) -> u64 { + self.size_bytes.load(Ordering::Relaxed) + } + + pub fn messages_count_inconsistent(&self) -> u64 { + self.messages_count.load(Ordering::Relaxed) + } + + pub fn segments_count_inconsistent(&self) -> u32 { + self.segments_count.load(Ordering::Relaxed) + } + + pub fn zero_out_parent_size_bytes(&self) { + self.parent.zero_out_size_bytes(); + } + + pub fn zero_out_parent_messages_count(&self) { + self.parent.zero_out_messages_count(); + } + + pub fn zero_out_parent_segments_count(&self) { + self.parent.zero_out_segments_count(); + } + + pub fn zero_out_parent_all(&self) { + self.parent.zero_out_all(); + } + + pub fn zero_out_size_bytes(&self) { + self.size_bytes.store(0, Ordering::Relaxed); + self.zero_out_parent_size_bytes(); + } + + pub fn zero_out_messages_count(&self) { + self.messages_count.store(0, Ordering::Relaxed); + self.zero_out_parent_messages_count(); + } + + pub fn zero_out_segments_count(&self) { + self.segments_count.store(0, Ordering::Relaxed); + self.zero_out_parent_segments_count(); + } + + pub fn zero_out_all(&self) { + self.zero_out_size_bytes(); + self.zero_out_messages_count(); + self.zero_out_segments_count(); + self.zero_out_parent_all(); + } +} diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 4aed76aed..a78da082b 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -15,14 +15,189 @@ // specific language governing permissions and limitations // under the License. -use crate::stm::State; +use crate::stm::ApplyState; +use ahash::AHashMap; +use iggy_common::{ + IggyTimestamp, + header::{Operation, PrepareHeader}, + message::Message, +}; +use slab::Slab; +use std::cell::RefCell; -pub struct ConsumerGroups {} +// ============================================================================ +// ConsumerGroupMember - Individual member of a consumer group +// ============================================================================ -impl State for ConsumerGroups { +#[derive(Debug, Clone)] +pub struct ConsumerGroupMember { + pub id: u32, + pub joined_at: IggyTimestamp, +} + +impl ConsumerGroupMember { + pub fn new(id: u32, joined_at: IggyTimestamp) -> Self { + Self { id, joined_at } + } +} + +// ============================================================================ +// ConsumerGroup - A group of consumers +// ============================================================================ + +#[derive(Debug, Clone)] +pub struct ConsumerGroup { + pub id: usize, + pub stream_id: usize, + pub topic_id: usize, + pub name: String, + pub created_at: IggyTimestamp, + pub members: Vec<ConsumerGroupMember>, +} + +impl ConsumerGroup { + pub fn new(stream_id: usize, topic_id: usize, name: String, created_at: IggyTimestamp) -> Self { + Self { + id: 0, + stream_id, + topic_id, + name, + created_at, + members: Vec::new(), + } + } + + pub fn add_member(&mut self, member: ConsumerGroupMember) { + self.members.push(member); + } + + pub fn remove_member(&mut self, member_id: u32) -> Option<ConsumerGroupMember> { + if let Some(pos) = self.members.iter().position(|m| m.id == member_id) { + Some(self.members.remove(pos)) + } else { + None + } + } + + pub fn members_count(&self) -> usize { + self.members.len() + } +} + +// ============================================================================ +// ConsumerGroups Collection +// ============================================================================ + +#[derive(Debug, Clone, Default)] +pub struct ConsumerGroups { + // Global index for all consumer groups across all streams/topics + index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id, topic_id, name) -> id + items: RefCell<Slab<ConsumerGroup>>, +} + +impl ConsumerGroups { + pub fn new() -> Self { + Self { + index: RefCell::new(AHashMap::with_capacity(256)), + items: RefCell::new(Slab::with_capacity(256)), + } + } + + /// Insert a consumer group and return the assigned ID + pub fn insert(&self, group: ConsumerGroup) -> usize { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + let key = (group.stream_id, group.topic_id, group.name.clone()); + let id = items.insert(group); + items[id].id = id; + index.insert(key, id); + id + } + + /// Get consumer group by ID + pub fn get(&self, id: usize) -> Option<ConsumerGroup> { + self.items.borrow().get(id).cloned() + } + + /// Get consumer group by stream_id, topic_id, and name + pub fn get_by_location( + &self, + stream_id: usize, + topic_id: usize, + name: &str, + ) -> Option<ConsumerGroup> { + let index = self.index.borrow(); + let key = (stream_id, topic_id, name.to_string()); + if let Some(&id) = index.get(&key) { + self.items.borrow().get(id).cloned() + } else { + None + } + } + + /// Remove consumer group by ID + pub fn remove(&self, id: usize) -> Option<ConsumerGroup> { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + if !items.contains(id) { + return None; + } + + let group = items.remove(id); + let key = (group.stream_id, group.topic_id, group.name.clone()); + index.remove(&key); + Some(group) + } + + /// Get all consumer groups for a specific topic + pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) -> Vec<ConsumerGroup> { + self.items + .borrow() + .iter() + .filter_map(|(_, g)| { + if g.stream_id == stream_id && g.topic_id == topic_id { + Some(g.clone()) + } else { + None + } + }) + .collect() + } + + /// Get number of consumer groups + pub fn len(&self) -> usize { + self.items.borrow().len() + } + + /// Check if empty + pub fn is_empty(&self) -> bool { + self.items.borrow().is_empty() + } + + /// Get all consumer groups + pub fn values(&self) -> Vec<ConsumerGroup> { + self.items + .borrow() + .iter() + .map(|(_, g): (usize, &ConsumerGroup)| g.clone()) + .collect() + } +} + +impl ApplyState for ConsumerGroups { type Output = (); + type Input = Message<PrepareHeader>; + + fn do_apply(&self, _input: &Self::Input) -> Self::Output { + todo!() + } - fn apply(&self) -> Option<Self::Output> { - Some(()) + fn is_applicable(input: &Self::Input) -> bool { + matches!( + input.header().operation, + Operation::CreateConsumerGroup | Operation::DeleteConsumerGroup + ) } } diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 80646a559..09abe12d4 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -16,22 +16,49 @@ // under the License. pub mod consumer_group; +pub mod mux; pub mod stream; pub mod user; -pub mod mux; -// TODO: Add more state machines. - -pub trait State { +// This is public interface to state, therefore it will be imported from different crate, for now during development I am leaving it there. +pub trait State +where + Self: Sized, +{ type Output; + type Input; + // Apply the state machine logic and return an optional output. // The output is optional, as we model the `StateMachine`, as an variadic list, // where not all state machines will produce an output for every input event. - fn apply(&self) -> Option<Self::Output>; + fn apply(&self, input: &Self::Input) -> Option<Self::Output>; } +// TODO: This interface should be private to the stm module. pub trait StateMachine { type Input; type Output; fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); } + +// This will be an internal trait used to define the application logic of a state machine. +pub trait ApplyState { + type Output; + type Input; + + fn do_apply(&self, input: &Self::Input) -> Self::Output; + fn is_applicable(input: &Self::Input) -> bool; +} + +// Blanket implementation of State for all types that implement ApplyState +impl<T> State for T +where + T: ApplyState, +{ + type Output = T::Output; + type Input = T::Input; + + fn apply(&self, input: &Self::Input) -> Option<Self::Output> { + T::is_applicable(input).then(|| self.do_apply(input)) + } +} diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 22601bdc0..ca5e5ea57 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::stm::{State, StateMachine}; use iggy_common::{header::PrepareHeader, message::Message}; +use crate::stm::{State, StateMachine}; + // MuxStateMachine that proxies to an tuple of variadic state machines pub struct MuxStateMachine<T> where @@ -59,6 +60,7 @@ macro_rules! variadic { ($a:expr, $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) ); } +// TODO: Figure out how to get around the fact that we need to hardcode the Input/Output type for base case. // Base case of the recursive resolution. impl StateMachine for () { type Input = Message<PrepareHeader>; @@ -72,13 +74,13 @@ impl StateMachine for () { impl<O, S, Rest> StateMachine for variadic!(S, ...Rest) where S: State<Output = O>, - Rest: StateMachine<Input = Message<PrepareHeader>, Output = O>, + Rest: StateMachine<Input = S::Input, Output = O>, { - type Input = Message<PrepareHeader>; + type Input = Rest::Input; type Output = O; fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) { - if let Some(result) = self.0.apply() { + if let Some(result) = self.0.apply(input) { output.push(result); } self.1.update(input, output) @@ -90,10 +92,17 @@ mod tests { #[test] fn construct_mux_state_machine_from_states_with_same_output() { use crate::stm::*; + use iggy_common::header::PrepareHeader; + use iggy_common::message::Message; + + let users = user::Users::new(); + let streams = stream::Streams::new(); + let cgs = consumer_group::ConsumerGroups::new(); + let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs)); + + let input = Message::new(std::mem::size_of::<PrepareHeader>()); + let mut output = Vec::new(); - let users = user::Users {}; - let streams = stream::Streams {}; - let cgs = consumer_group::ConsumerGroups {}; - let _mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs)); + mux.update(&input, &mut output); } } diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 737e21e94..7b341eb22 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -15,14 +15,433 @@ // specific language governing permissions and limitations // under the License. -use crate::stm::State; +use crate::stats::{StreamStats, TopicStats}; +use crate::stm::ApplyState; +use ahash::AHashMap; +use iggy_common::header::Operation; +use iggy_common::{ + CompressionAlgorithm, Identifier, IggyExpiry, IggyTimestamp, MaxTopicSize, + header::PrepareHeader, message::Message, +}; +use slab::Slab; +use std::cell::RefCell; +use std::sync::Arc; -pub struct Streams {} +#[derive(Debug, Clone)] +pub struct Partition { + pub id: usize, +} + +impl Partition { + pub fn new(id: usize) -> Self { + Self { id } + } +} + +#[derive(Debug, Clone, Default)] +pub struct Partitions { + items: RefCell<Slab<Partition>>, +} + +impl Partitions { + pub fn new() -> Self { + Self { + items: RefCell::new(Slab::with_capacity(1024)), + } + } + + pub fn insert(&self, partition: Partition) -> usize { + let mut items = self.items.borrow_mut(); + let id = items.insert(partition); + items[id].id = id; + id + } + + pub fn get(&self, id: usize) -> Option<Partition> { + self.items.borrow().get(id).cloned() + } + + pub fn remove(&self, id: usize) -> Option<Partition> { + let mut items = self.items.borrow_mut(); + if items.contains(id) { + Some(items.remove(id)) + } else { + None + } + } + + pub fn len(&self) -> usize { + self.items.borrow().len() + } + + pub fn is_empty(&self) -> bool { + self.items.borrow().is_empty() + } + + pub fn iter(&self) -> Vec<Partition> { + self.items + .borrow() + .iter() + .map(|(_, p): (usize, &Partition)| p.clone()) + .collect() + } +} + +#[derive(Debug, Clone)] +pub struct ConsumerGroup { + pub id: usize, + pub name: String, + pub created_at: IggyTimestamp, +} + +impl ConsumerGroup { + pub fn new(name: String, created_at: IggyTimestamp) -> Self { + Self { + id: 0, + name, + created_at, + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct ConsumerGroups { + index: RefCell<AHashMap<String, usize>>, + items: RefCell<Slab<ConsumerGroup>>, +} + +impl ConsumerGroups { + pub fn new() -> Self { + Self { + index: RefCell::new(AHashMap::with_capacity(256)), + items: RefCell::new(Slab::with_capacity(256)), + } + } + + pub fn insert(&self, group: ConsumerGroup) -> usize { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + let name = group.name.clone(); + let id = items.insert(group); + items[id].id = id; + index.insert(name, id); + id + } + + pub fn get(&self, id: usize) -> Option<ConsumerGroup> { + self.items.borrow().get(id).cloned() + } + + pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> { + let index = self.index.borrow(); + if let Some(&id) = index.get(name) { + self.items.borrow().get(id).cloned() + } else { + None + } + } + + pub fn remove(&self, id: usize) -> Option<ConsumerGroup> { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + if !items.contains(id) { + return None; + } + + let group = items.remove(id); + index.remove(&group.name); + Some(group) + } + + pub fn len(&self) -> usize { + self.items.borrow().len() + } + + pub fn is_empty(&self) -> bool { + self.items.borrow().is_empty() + } +} + +#[derive(Debug, Clone)] +pub struct Topic { + pub id: usize, + pub name: String, + pub created_at: IggyTimestamp, + pub replication_factor: u8, + pub message_expiry: IggyExpiry, + pub compression_algorithm: CompressionAlgorithm, + pub max_topic_size: MaxTopicSize, + + pub stats: Arc<TopicStats>, + pub partitions: Partitions, + pub consumer_groups: ConsumerGroups, +} + +impl Topic { + pub fn new( + name: String, + created_at: IggyTimestamp, + replication_factor: u8, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + stream_stats: Arc<StreamStats>, + ) -> Self { + Self { + id: 0, + name, + created_at, + replication_factor, + message_expiry, + compression_algorithm, + max_topic_size, + stats: Arc::new(TopicStats::new(stream_stats)), + partitions: Partitions::new(), + consumer_groups: ConsumerGroups::new(), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct Topics { + index: RefCell<AHashMap<String, usize>>, + items: RefCell<Slab<Topic>>, +} + +impl Topics { + pub fn new() -> Self { + Self { + index: RefCell::new(AHashMap::with_capacity(1024)), + items: RefCell::new(Slab::with_capacity(1024)), + } + } + + pub fn insert(&self, topic: Topic) -> usize { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + let name = topic.name.clone(); + let id = items.insert(topic); + items[id].id = id; + index.insert(name, id); + id + } + + pub fn get(&self, id: usize) -> Option<Topic> { + self.items.borrow().get(id).cloned() + } + + pub fn get_by_name(&self, name: &str) -> Option<Topic> { + let index = self.index.borrow(); + if let Some(&id) = index.get(name) { + self.items.borrow().get(id).cloned() + } else { + None + } + } + + pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> { + match identifier.kind { + iggy_common::IdKind::Numeric => { + if let Ok(id) = identifier.get_u32_value() { + self.get(id as usize) + } else { + None + } + } + iggy_common::IdKind::String => { + if let Ok(name) = identifier.get_string_value() { + self.get_by_name(&name) + } else { + None + } + } + } + } + + pub fn remove(&self, id: usize) -> Option<Topic> { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + if !items.contains(id) { + return None; + } + + let topic = items.remove(id); + index.remove(&topic.name); + Some(topic) + } + + pub fn len(&self) -> usize { + self.items.borrow().len() + } + + pub fn is_empty(&self) -> bool { + self.items.borrow().is_empty() + } +} + +#[derive(Debug, Clone)] +pub struct Stream { + pub id: usize, + pub name: String, + pub created_at: IggyTimestamp, + + pub stats: Arc<StreamStats>, + pub topics: Topics, +} + +impl Stream { + pub fn new(name: String, created_at: IggyTimestamp) -> Self { + Self { + id: 0, + name, + created_at, + stats: Arc::new(StreamStats::default()), + topics: Topics::new(), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct Streams { + index: RefCell<AHashMap<String, usize>>, + items: RefCell<Slab<Stream>>, +} + +impl Streams { + pub fn new() -> Self { + Self { + index: RefCell::new(AHashMap::with_capacity(256)), + items: RefCell::new(Slab::with_capacity(256)), + } + } + + pub fn insert(&self, stream: Stream) -> usize { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + let name = stream.name.clone(); + let id = items.insert(stream); + items[id].id = id; + index.insert(name, id); + id + } + + pub fn get(&self, id: usize) -> Option<Stream> { + self.items.borrow().get(id).cloned() + } + + pub fn get_by_name(&self, name: &str) -> Option<Stream> { + let index = self.index.borrow(); + if let Some(&id) = index.get(name) { + self.items.borrow().get(id).cloned() + } else { + None + } + } + + pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream> { + match identifier.kind { + iggy_common::IdKind::Numeric => { + if let Ok(id) = identifier.get_u32_value() { + self.get(id as usize) + } else { + None + } + } + iggy_common::IdKind::String => { + if let Ok(name) = identifier.get_string_value() { + self.get_by_name(&name) + } else { + None + } + } + } + } + + pub fn remove(&self, id: usize) -> Option<Stream> { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + if !items.contains(id) { + return None; + } + + let stream = items.remove(id); + index.remove(&stream.name); + Some(stream) + } + + pub fn len(&self) -> usize { + self.items.borrow().len() + } + + pub fn is_empty(&self) -> bool { + self.items.borrow().is_empty() + } + + pub fn iter(&self) -> Vec<Stream> { + self.items + .borrow() + .iter() + .map(|(_, s): (usize, &Stream)| s.clone()) + .collect() + } +} + +impl ApplyState for Streams { + type Output = (); + type Input = Message<PrepareHeader>; + + fn do_apply(&self, _input: &Self::Input) -> Self::Output { + () + } + + fn is_applicable(input: &Self::Input) -> bool { + matches!( + input.header().operation, + Operation::CreateStream + | Operation::UpdateStream + | Operation::DeleteStream + | Operation::PurgeStream + ) + } +} -impl State for Streams { +impl ApplyState for Topics { type Output = (); + type Input = Message<PrepareHeader>; + + fn do_apply(&self, _input: &Self::Input) -> Self::Output { + () + } + + fn is_applicable(input: &Self::Input) -> bool { + matches!( + input.header().operation, + Operation::CreateTopic + | Operation::UpdateTopic + | Operation::DeleteTopic + | Operation::PurgeTopic + ) + } +} + +impl ApplyState for Partitions { + type Output = (); + type Input = Message<PrepareHeader>; + + fn do_apply(&self, _input: &Self::Input) -> Self::Output { + () + } - fn apply(&self) -> Option<Self::Output> { - Some(()) + fn is_applicable(input: &Self::Input) -> bool { + matches!( + input.header().operation, + Operation::CreatePartitions | Operation::DeletePartitions | Operation::DeleteSegments + ) } } diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index d5f0fac96..409c33030 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -15,14 +15,249 @@ // specific language governing permissions and limitations // under the License. -use crate::stm::State; +use crate::{permissioner::permissioner::Permissioner, stm::ApplyState}; +use ahash::AHashMap; +use iggy_common::{ + Identifier, IggyError, IggyTimestamp, Permissions, PersonalAccessToken, UserId, UserStatus, + header::{Operation, PrepareHeader}, + message::Message, +}; +use slab::Slab; +use std::cell::RefCell; -pub struct Users {} +#[derive(Debug, Clone)] +pub struct User { + pub id: UserId, + pub username: String, + pub password: String, + pub status: UserStatus, + pub created_at: IggyTimestamp, + pub permissions: Option<Permissions>, + pub personal_access_tokens: AHashMap<String, PersonalAccessToken>, +} + +impl User { + pub fn new( + username: String, + password: String, + status: UserStatus, + created_at: IggyTimestamp, + permissions: Option<Permissions>, + ) -> Self { + Self { + id: 0, + username, + password, + status, + created_at, + permissions, + personal_access_tokens: AHashMap::new(), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct Users { + index: RefCell<AHashMap<String, usize>>, + items: RefCell<Slab<User>>, + permissioner: RefCell<Permissioner>, +} + +impl Users { + pub fn new() -> Self { + Self { + index: RefCell::new(AHashMap::with_capacity(1024)), + items: RefCell::new(Slab::with_capacity(1024)), + permissioner: RefCell::new(Permissioner::new()), + } + } + + /// Insert a user and return the assigned ID + pub fn insert(&self, user: User) -> usize { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + let username = user.username.clone(); + let id = items.insert(user); + items[id].id = id as u32; + index.insert(username, id); + id + } + + /// Get user by ID + pub fn get(&self, id: usize) -> Option<User> { + self.items.borrow().get(id).cloned() + } + + /// Get user by username or ID (via Identifier enum) + pub fn get_by_identifier(&self, identifier: &Identifier) -> Result<Option<User>, IggyError> { + match identifier.kind { + iggy_common::IdKind::Numeric => { + let id = identifier.get_u32_value()? as usize; + Ok(self.items.borrow().get(id).cloned()) + } + iggy_common::IdKind::String => { + let username = identifier.get_string_value()?; + let index = self.index.borrow(); + if let Some(&id) = index.get(&username) { + Ok(self.items.borrow().get(id).cloned()) + } else { + Ok(None) + } + } + } + } + + /// Remove user by ID + pub fn remove(&self, id: usize) -> Option<User> { + let mut items = self.items.borrow_mut(); + let mut index = self.index.borrow_mut(); + + if !items.contains(id) { + return None; + } + + let user = items.remove(id); + index.remove(&user.username); + Some(user) + } + + /// Check if user exists + pub fn contains(&self, identifier: &Identifier) -> bool { + match identifier.kind { + iggy_common::IdKind::Numeric => { + if let Ok(id) = identifier.get_u32_value() { + self.items.borrow().contains(id as usize) + } else { + false + } + } + iggy_common::IdKind::String => { + if let Ok(username) = identifier.get_string_value() { + self.index.borrow().contains_key(&username) + } else { + false + } + } + } + } + + /// Get all users as a Vec + pub fn values(&self) -> Vec<User> { + self.items + .borrow() + .iter() + .map(|(_, u): (usize, &User)| u.clone()) + .collect() + } + + /// Get number of users + pub fn len(&self) -> usize { + self.items.borrow().len() + } + + /// Check if empty + pub fn is_empty(&self) -> bool { + self.items.borrow().is_empty() + } + + /// Check if username already exists + pub fn username_exists(&self, username: &str) -> bool { + self.index.borrow().contains_key(username) + } + + /// Get ID by username + pub fn get_id_by_username(&self, username: &str) -> Option<usize> { + self.index.borrow().get(username).copied() + } + + /// Initialize permissions for a user + pub fn init_permissions(&self, user_id: UserId, permissions: Option<Permissions>) { + self.permissioner + .borrow_mut() + .init_permissions(user_id, permissions); + } + + /// Update permissions for a user + pub fn update_permissions(&self, user_id: UserId, permissions: Option<Permissions>) { + self.permissioner + .borrow_mut() + .update_permissions_for_user(user_id, permissions); + } + + /// Delete permissions for a user + pub fn delete_permissions(&self, user_id: UserId) { + self.permissioner.borrow_mut().delete_permissions(user_id); + } + + /// Update username + pub fn update_username( + &self, + identifier: &Identifier, + new_username: String, + ) -> Result<(), IggyError> { + let id = match identifier.kind { + iggy_common::IdKind::Numeric => identifier.get_u32_value()? as usize, + iggy_common::IdKind::String => { + let username = identifier.get_string_value()?; + let index = self.index.borrow(); + *index + .get(&username) + .ok_or_else(|| IggyError::ResourceNotFound(username.to_string()))? + } + }; + + let old_username = { + let items = self.items.borrow(); + let user = items + .get(id) + .ok_or_else(|| IggyError::ResourceNotFound(identifier.to_string()))?; + user.username.clone() + }; + + if old_username == new_username { + return Ok(()); + } + + tracing::trace!( + "Updating username: '{}' → '{}' for user ID: {}", + old_username, + new_username, + id + ); + + { + let mut items = self.items.borrow_mut(); + let user = items + .get_mut(id) + .ok_or_else(|| IggyError::ResourceNotFound(identifier.to_string()))?; + user.username = new_username.clone(); + } -impl State for Users { + let mut index = self.index.borrow_mut(); + index.remove(&old_username); + index.insert(new_username, id); + + Ok(()) + } +} + +impl ApplyState for Users { type Output = (); + type Input = Message<PrepareHeader>; + + fn do_apply(&self, _input: &Self::Input) -> Self::Output { + todo!() + } - fn apply(&self) -> Option<Self::Output> { - Some(()) + fn is_applicable(input: &Self::Input) -> bool { + matches!( + input.header().operation, + Operation::CreateUser + | Operation::UpdateUser + | Operation::DeleteUser + | Operation::ChangePassword + | Operation::UpdatePermissions + ) } } diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs index 7b17c038e..6e98081d6 100644 --- a/core/server/src/binary/mapper.rs +++ b/core/server/src/binary/mapper.rs @@ -22,7 +22,6 @@ use crate::slab::Keyed; use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; use crate::streaming::clients::client_manager::Client; use crate::streaming::partitions::partition::PartitionRoot; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats}; use crate::streaming::streams::stream; use crate::streaming::topics::consumer_group::{ConsumerGroupMembers, ConsumerGroupRoot, Member}; @@ -30,7 +29,9 @@ use crate::streaming::topics::topic::{self, TopicRoot}; use crate::streaming::users::user::User; use arcshift::SharedGetGuard; use bytes::{BufMut, Bytes, BytesMut}; -use iggy_common::{BytesSerializable, ConsumerOffsetInfo, Stats, TransportProtocol, UserId}; +use iggy_common::{ + BytesSerializable, ConsumerOffsetInfo, PersonalAccessToken, Stats, TransportProtocol, UserId, +}; use slab::Slab; pub fn map_stats(stats: &Stats) -> Bytes { diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index 67c832231..f461cd472 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -53,7 +53,6 @@ use crate::{ storage::{load_consumer_group_offsets, load_consumer_offsets}, }, persistence::persister::{FilePersister, FileWithSyncPersister, PersisterKind}, - personal_access_tokens::personal_access_token::PersonalAccessToken, polling_consumer::ConsumerGroupId, segments::{Segment, storage::Storage}, stats::{PartitionStats, StreamStats, TopicStats}, @@ -69,7 +68,7 @@ use ahash::HashMap; use compio::{fs::create_dir_all, runtime::Runtime}; use err_trail::ErrContext; use iggy_common::{ - IggyByteSize, IggyError, + IggyByteSize, IggyError, PersonalAccessToken, defaults::{ DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, diff --git a/core/server/src/http/http_shard_wrapper.rs b/core/server/src/http/http_shard_wrapper.rs index f279b51be..7bcfdb65b 100644 --- a/core/server/src/http/http_shard_wrapper.rs +++ b/core/server/src/http/http_shard_wrapper.rs @@ -26,11 +26,11 @@ use send_wrapper::SendWrapper; use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; use crate::shard::system::messages::PollingArgs; use crate::state::command::EntryCommand; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet}; use crate::streaming::topics; use crate::streaming::users::user::User; use crate::{shard::IggyShard, streaming::session::Session}; +use iggy_common::PersonalAccessToken; /// A wrapper around IggyShard that is safe to use in HTTP handlers. /// diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs index d2910ae1e..afe243529 100644 --- a/core/server/src/http/mapper.rs +++ b/core/server/src/http/mapper.rs @@ -20,11 +20,11 @@ use crate::http::jwt::json_web_token::GeneratedToken; use crate::slab::Keyed; use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; use crate::streaming::clients::client_manager::Client; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::stats::TopicStats; use crate::streaming::topics::consumer_group::{ConsumerGroupMembers, ConsumerGroupRoot}; use crate::streaming::topics::topic::TopicRoot; use crate::streaming::users::user::User; +use iggy_common::PersonalAccessToken; use iggy_common::{ConsumerGroupDetails, ConsumerGroupInfo, ConsumerGroupMember, IggyByteSize}; use iggy_common::{IdentityInfo, PersonalAccessTokenInfo, TokenInfo, TopicDetails}; use iggy_common::{UserInfo, UserInfoDetails}; diff --git a/core/server/src/http/personal_access_tokens.rs b/core/server/src/http/personal_access_tokens.rs index 66cb50882..75382fb76 100644 --- a/core/server/src/http/personal_access_tokens.rs +++ b/core/server/src/http/personal_access_tokens.rs @@ -24,7 +24,6 @@ use crate::http::mapper::map_generated_access_token_to_identity_info; use crate::http::shared::AppState; use crate::state::command::EntryCommand; use crate::state::models::CreatePersonalAccessTokenWithHash; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; use axum::extract::{Path, State}; use axum::http::StatusCode; @@ -32,6 +31,7 @@ use axum::routing::{delete, get, post}; use axum::{Extension, Json, Router, debug_handler}; use err_trail::ErrContext; use iggy_common::IdentityInfo; +use iggy_common::PersonalAccessToken; use iggy_common::Validatable; use iggy_common::create_personal_access_token::CreatePersonalAccessToken; use iggy_common::delete_personal_access_token::DeletePersonalAccessToken; diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index 189abf507..7b26aff1e 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -18,13 +18,13 @@ use super::COMPONENT; use crate::shard::IggyShard; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::session::Session; use crate::streaming::users::user::User; use err_trail::ErrContext; use iggy_common::IggyError; use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; +use iggy_common::PersonalAccessToken; use tracing::{error, info}; impl IggyShard { diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index eef75d958..1180d06f9 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -17,7 +17,6 @@ use crate::streaming::{ partitions::partition, - personal_access_tokens::personal_access_token::PersonalAccessToken, streams::stream, topics::{ consumer_group::{self}, @@ -25,8 +24,8 @@ use crate::streaming::{ }, }; use iggy_common::{ - CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, TransportProtocol, - UserStatus, + CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, PersonalAccessToken, + TransportProtocol, UserStatus, }; use std::net::SocketAddr; use strum::Display; diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs index 454439a3d..35a560164 100644 --- a/core/server/src/state/system.rs +++ b/core/server/src/state/system.rs @@ -20,7 +20,6 @@ use crate::bootstrap::create_root_user; use crate::state::file::FileState; use crate::state::models::CreateUserWithId; use crate::state::{COMPONENT, EntryCommand, StateEntry}; -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use ahash::AHashMap; use err_trail::ErrContext; use iggy_common::CompressionAlgorithm; @@ -28,6 +27,7 @@ use iggy_common::IggyError; use iggy_common::IggyExpiry; use iggy_common::IggyTimestamp; use iggy_common::MaxTopicSize; +use iggy_common::PersonalAccessToken; use iggy_common::create_user::CreateUser; use iggy_common::defaults::DEFAULT_ROOT_USER_ID; use iggy_common::{IdKind, Identifier, Permissions, UserStatus}; diff --git a/core/server/src/streaming/clients/client_manager.rs b/core/server/src/streaming/clients/client_manager.rs index 349980b55..d1cfe0887 100644 --- a/core/server/src/streaming/clients/client_manager.rs +++ b/core/server/src/streaming/clients/client_manager.rs @@ -17,12 +17,12 @@ */ use crate::streaming::session::Session; -use crate::streaming::utils::{hash, ptr::EternalPtr}; +use crate::streaming::utils::ptr::EternalPtr; use dashmap::DashMap; -use iggy_common::IggyError; use iggy_common::IggyTimestamp; use iggy_common::TransportProtocol; use iggy_common::UserId; +use iggy_common::{IggyError, calculate_32}; use std::net::SocketAddr; pub struct ClientManager { @@ -61,7 +61,7 @@ pub struct ConsumerGroup { impl ClientManager { pub fn add_client(&self, address: &SocketAddr, transport: TransportProtocol) -> Session { - let client_id = hash::calculate_32(address.to_string().as_bytes()); + let client_id = calculate_32(address.to_string().as_bytes()); let session = Session::from_client_id(client_id, *address); let client = Client { user_id: None, diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs index fe1eb0611..a6141bd8b 100644 --- a/core/server/src/streaming/mod.rs +++ b/core/server/src/streaming/mod.rs @@ -21,7 +21,6 @@ pub mod deduplication; pub mod diagnostics; pub mod partitions; pub mod persistence; -pub mod personal_access_tokens; pub mod polling_consumer; pub mod segments; pub mod session; diff --git a/core/server/src/streaming/polling_consumer.rs b/core/server/src/streaming/polling_consumer.rs index dcc5f0f4a..42a2ceb23 100644 --- a/core/server/src/streaming/polling_consumer.rs +++ b/core/server/src/streaming/polling_consumer.rs @@ -16,8 +16,7 @@ * under the License. */ -use crate::streaming::utils::hash; -use iggy_common::{IdKind, Identifier}; +use iggy_common::{IdKind, Identifier, calculate_32}; use std::fmt::{Display, Formatter}; #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)] @@ -56,7 +55,7 @@ impl PollingConsumer { pub fn resolve_consumer_id(identifier: &Identifier) -> usize { match identifier.kind { IdKind::Numeric => identifier.get_u32_value().unwrap() as usize, - IdKind::String => hash::calculate_32(&identifier.value) as usize, + IdKind::String => calculate_32(&identifier.value) as usize, } } } diff --git a/core/server/src/streaming/topics/helpers.rs b/core/server/src/streaming/topics/helpers.rs index 0972596b8..d5f77de9d 100644 --- a/core/server/src/streaming/topics/helpers.rs +++ b/core/server/src/streaming/topics/helpers.rs @@ -35,10 +35,9 @@ use crate::{ }, topic::{Topic, TopicRef, TopicRefMut, TopicRoot}, }, - utils::hash, }, }; -use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize}; +use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, calculate_32}; use slab::Slab; pub fn rename_index( @@ -100,7 +99,7 @@ pub fn calculate_partition_id_by_messages_key_hash( upperbound: usize, messages_key: &[u8], ) -> usize { - let messages_key_hash = hash::calculate_32(messages_key) as usize; + let messages_key_hash = calculate_32(messages_key) as usize; let partition_id = messages_key_hash % upperbound; tracing::trace!( "Calculated partition ID: {} for messages key: {:?}, hash: {}", diff --git a/core/server/src/streaming/users/user.rs b/core/server/src/streaming/users/user.rs index 437ce955b..672662d3e 100644 --- a/core/server/src/streaming/users/user.rs +++ b/core/server/src/streaming/users/user.rs @@ -15,10 +15,10 @@ * specific language governing permissions and limitations * under the License. */ -use crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken; use crate::streaming::utils::crypto; use dashmap::DashMap; use iggy_common::IggyTimestamp; +use iggy_common::PersonalAccessToken; use iggy_common::UserStatus; use iggy_common::defaults::*; use iggy_common::{Permissions, UserId}; diff --git a/core/server/src/streaming/utils/mod.rs b/core/server/src/streaming/utils/mod.rs index 6acd0abc0..00e9f65f3 100644 --- a/core/server/src/streaming/utils/mod.rs +++ b/core/server/src/streaming/utils/mod.rs @@ -19,6 +19,5 @@ pub mod address; pub mod crypto; pub mod file; -pub mod hash; pub mod ptr; pub mod random_id;
