This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 6e0fdf7f feat(io_uring): fix pat/user/cg events (#1989)
6e0fdf7f is described below
commit 6e0fdf7f1d56cfb99b11618dc9c746f88d55c94a
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Jul 10 08:42:57 2025 +0200
feat(io_uring): fix pat/user/cg events (#1989)
---
.../create_consumer_group_handler.rs | 11 +-
.../create_personal_access_token_handler.rs | 11 +-
.../delete_personal_access_token_handler.rs | 7 +
.../handlers/users/change_password_handler.rs | 10 +-
.../binary/handlers/users/create_user_handler.rs | 10 +-
.../binary/handlers/users/delete_user_handler.rs | 8 +-
.../binary/handlers/users/logout_user_handler.rs | 7 +-
.../handlers/users/update_permissions_handler.rs | 8 +-
.../binary/handlers/users/update_user_handler.rs | 9 +-
core/server/src/main.rs | 5 +-
core/server/src/shard/mod.rs | 505 +++++++++++----------
core/server/src/shard/system/consumer_groups.rs | 67 ++-
.../src/shard/system/personal_access_tokens.rs | 49 +-
core/server/src/shard/system/users.rs | 137 +++++-
core/server/src/shard/transmission/event.rs | 16 +-
.../server/src/streaming/topics/consumer_groups.rs | 23 +-
16 files changed, 574 insertions(+), 309 deletions(-)
diff --git
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index e6e1f239..51d60b0c 100644
---
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::state::models::CreateConsumerGroupWithId;
use crate::streaming::session::Session;
@@ -40,12 +41,11 @@ impl ServerCommandHandler for CreateConsumerGroup {
async fn handle(
self,
sender: &mut SenderKind,
- length: u32,
+ _length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
-
let consumer_group_id = shard
.create_consumer_group(
session,
@@ -60,6 +60,13 @@ impl ServerCommandHandler for CreateConsumerGroup {
self.stream_id, self.topic_id, self.group_id
)
})?;
+ let event = ShardEvent::CreatedConsumerGroup {
+ stream_id: self.stream_id.clone(),
+ topic_id: self.topic_id.clone(),
+ consumer_group_id: self.group_id,
+ name: self.name.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let stream = shard.find_stream(session, &self.stream_id)
.with_error_context(|error| {
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index c28ac77b..bfeda164 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::personal_access_tokens::COMPONENT,
sender::SenderKind};
+use crate::shard::transmission::event::ShardEvent;
use crate::shard::IggyShard;
use crate::state::command::EntryCommand;
use crate::state::models::CreatePersonalAccessTokenWithHash;
@@ -47,7 +48,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
- let token = shard
+ let (personal_access_token, token) = shard
.create_personal_access_token(session, &self.name, self.expiry)
.with_error_context(|error| {
format!(
@@ -56,7 +57,11 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
)
})?;
let bytes = mapper::map_raw_pat(&token);
- let token_hash = PersonalAccessToken::hash_token(&token);
+ let hash = personal_access_token.token.to_string();
+ let event = ShardEvent::CreatedPersonalAccessToken {
+ personal_access_token: personal_access_token.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
shard
.state
@@ -67,7 +72,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
name: self.name.to_owned(),
expiry: self.expiry,
},
- hash: token_hash,
+ hash,
}),
)
.await
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index bb8dd463..13a06e55 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -51,6 +51,13 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
"{COMPONENT} (error: {error}) - failed to delete personal
access token with name: {token_name}, session: {session}"
)})?;
+ // Broadcast the event to other shards
+ let event =
crate::shard::transmission::event::ShardEvent::DeletedPersonalAccessToken {
+ user_id: session.get_user_id(),
+ name: self.name.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
+
shard
.state
.apply(
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 32b376cd..1e89aeb0 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::utils::crypto;
@@ -39,7 +40,7 @@ impl ServerCommandHandler for ChangePassword {
async fn handle(
self,
sender: &mut SenderKind,
- length: u32,
+ _length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
@@ -59,6 +60,13 @@ impl ServerCommandHandler for ChangePassword {
)
})?;
+ let event = ShardEvent::ChangedPassword {
+ user_id: self.user_id.clone(),
+ current_password: self.current_password.clone(),
+ new_password: self.new_password.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
+
// For the security of the system, we hash the password before storing
it in metadata.
shard
.state
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs
b/core/server/src/binary/handlers/users/create_user_handler.rs
index e51b147b..77ce6249 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -17,6 +17,7 @@
*/
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
@@ -31,6 +32,7 @@ use anyhow::Result;
use error_set::ErrContext;
use iggy_common::IggyError;
use iggy_common::create_user::CreateUser;
+use tower_http::map_response_body;
use tracing::{debug, instrument};
impl ServerCommandHandler for CreateUser {
@@ -56,13 +58,19 @@ impl ServerCommandHandler for CreateUser {
self.status,
self.permissions.clone(),
)
- .await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to create user
with name: {}, session: {session}",
self.username
)
})?;
+ let event = ShardEvent::CreatedUser {
+ username: self.username.to_owned(),
+ password: self.password.to_owned(),
+ status: self.status,
+ permissions: self.permissions.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let user_id = user.id;
let response = mapper::map_user(&user);
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index 10de56a4..cd2f082c 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -39,7 +40,7 @@ impl ServerCommandHandler for DeleteUser {
async fn handle(
self,
sender: &mut SenderKind,
- length: u32,
+ _length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
@@ -54,6 +55,11 @@ impl ServerCommandHandler for DeleteUser {
)
})?;
+ let event = ShardEvent::DeletedUser {
+ user_id: self.user_id.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
+
let user_id = self.user_id.clone();
shard
.state
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index d1ef95bf..cfa37cd5 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -38,7 +39,7 @@ impl ServerCommandHandler for LogoutUser {
async fn handle(
self,
sender: &mut SenderKind,
- length: u32,
+ _length: u32,
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
@@ -46,6 +47,10 @@ impl ServerCommandHandler for LogoutUser {
shard.logout_user(session).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to logout user,
session: {session}")
})?;
+ let event = ShardEvent::LogoutUser {
+ client_id: session.client_id,
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
session.clear_user_id();
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/users/update_permissions_handler.rs
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index dea49a1c..d28d3812 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::transmission::event::ShardEvent;
use crate::shard::IggyShard;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
@@ -50,7 +51,12 @@ impl ServerCommandHandler for UpdatePermissions {
.with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to update permissions for user_id: {}, session: {session}",
self.user_id
))?;
-
+ let event = ShardEvent::UpdatedPermissions {
+ user_id: self.user_id.clone(),
+ permissions: self.permissions.clone(),
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
+
shard
.state
.apply(
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs
b/core/server/src/binary/handlers/users/update_user_handler.rs
index 982c4ae1..b90cb442 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -59,8 +60,14 @@ impl ServerCommandHandler for UpdateUser {
)
})?;
- let user_id = self.user_id.clone();
+ let event = ShardEvent::UpdatedUser {
+ user_id: self.user_id.clone(),
+ username: self.username.clone(),
+ status: self.status,
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
+ let user_id = self.user_id.clone();
shard
.state
.apply(session.get_user_id(), &EntryCommand::UpdateUser(self))
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index e01f03ec..2971347e 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -48,8 +48,7 @@ use server::state::StateKind;
use server::state::command::EntryCommand;
use server::state::file::FileState;
use server::state::models::CreateUserWithId;
-use server::state::system::SystemState;
-use server::streaming::utils::{MemoryPool, crypto};
+use server::streaming::utils::MemoryPool;
use server::versioning::SemanticVersion;
use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV, map_toggle_str};
use tokio::time::Instant;
@@ -183,7 +182,7 @@ fn main() -> Result<(), ServerError> {
// We can't use std::sync::Once because it doesn't support
async.
// Trait bound on the closure is FnOnce.
- // Peak into the state to check if the root user exists.
+ // Peek into the state to check if the root user exists.
// If it does not exist, create it.
barrier.with_async::<Result<(), IggyError>>(async
|barrier_state| {
// A thread already initialized state
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index a8158bc1..ecbe2be1 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -64,7 +64,6 @@ use crate::{
streaming::{
clients::client_manager::ClientManager,
diagnostics::metrics::Metrics,
- partitions::partition,
personal_access_tokens::personal_access_token::PersonalAccessToken,
session::Session,
storage::SystemStorage,
@@ -512,240 +511,271 @@ impl IggyShard {
async fn handle_event(&self, event: Arc<ShardEvent>) -> Result<(),
IggyError> {
match &*event {
ShardEvent::CreatedStream { stream_id, name } => {
- self.create_stream_bypass_auth(*stream_id, name)
- }
+ self.create_stream_bypass_auth(*stream_id, name)
+ }
ShardEvent::CreatedTopic {
- stream_id,
- topic_id,
- name,
- partitions_count,
- message_expiry,
- compression_algorithm,
- max_topic_size,
- replication_factor,
- } => {
- let topic_id = topic_id.get_u32_value().ok();
- self.create_topic_bypass_auth(
- stream_id,
- topic_id,
- name,
- *partitions_count,
- *message_expiry,
- *compression_algorithm,
- *max_topic_size,
- *replication_factor,
- )
- .await
- }
+ stream_id,
+ topic_id,
+ name,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ } => {
+ let topic_id = topic_id.get_u32_value().ok();
+ self.create_topic_bypass_auth(
+ stream_id,
+ topic_id,
+ name,
+ *partitions_count,
+ *message_expiry,
+ *compression_algorithm,
+ *max_topic_size,
+ *replication_factor,
+ )
+ .await
+ }
ShardEvent::LoginUser {
- client_id,
- username,
- password,
- } => self.login_user_event(*client_id, username, password),
+ client_id,
+ username,
+ password,
+ } => self.login_user_event(*client_id, username, password),
ShardEvent::NewSession { address, transport } => {
- let session = self.add_client(address, *transport);
- self.add_active_session(session);
- Ok(())
- }
+ let session = self.add_client(address, *transport);
+ self.add_active_session(session);
+ Ok(())
+ }
ShardEvent::CreatedShardTableRecords {
- stream_id,
- topic_id,
- partition_ids,
- } => {
- let records = self
- .create_shard_table_records(&partition_ids, *stream_id,
*topic_id)
- .collect::<Vec<_>>();
- let stream =
self.get_stream(&Identifier::numeric(*stream_id)?)?;
- let topic =
stream.get_topic(&Identifier::numeric(*topic_id)?)?;
- // Open partition and segments for that particular shard.
- for (ns, shard_info) in records.iter() {
- if shard_info.id() == self.id {
- let partition_id = ns.partition_id;
- let partition = topic.get_partition(partition_id)?;
- let mut partition = partition.write().await;
- partition.open().await.with_error_context(|error| {
+ stream_id,
+ topic_id,
+ partition_ids,
+ } => {
+ let records = self
+ .create_shard_table_records(&partition_ids,
*stream_id, *topic_id)
+ .collect::<Vec<_>>();
+ let stream =
self.get_stream(&Identifier::numeric(*stream_id)?)?;
+ let topic =
stream.get_topic(&Identifier::numeric(*topic_id)?)?;
+ // Open partition and segments for that particular
shard.
+ for (ns, shard_info) in records.iter() {
+ if shard_info.id() == self.id {
+ let partition_id = ns.partition_id;
+ let partition =
topic.get_partition(partition_id)?;
+ let mut partition = partition.write().await;
+
partition.open().await.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error:
{error}) - failed to open partition with ID: {partition_id} in topic with ID:
{topic_id} for stream with ID: {stream_id}"
+ )
+ })?;
+ }
+ }
+ self.insert_shard_table_records(records);
+ Ok(())
+ }
+ ShardEvent::CreatedPartitions {
+ stream_id,
+ topic_id,
+ partitions_count,
+ } => {
+ let mut stream = self.get_stream_mut(stream_id)?;
+ let topic = stream.get_topic_mut(topic_id)?;
+ topic.add_persisted_partitions(*partitions_count)
+ .with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error:
{error}) - failed to create partitions for topic with ID: {topic_id} in stream
with ID: {stream_id}"
+ )
+ })?;
+ topic.reassign_consumer_groups();
+ self.metrics.increment_partitions(*partitions_count);
+ self.metrics.increment_segments(*partitions_count);
+ Ok(())
+ }
+ ShardEvent::DeletedPartitions {
+ stream_id,
+ topic_id,
+ partition_ids,
+ } => {
+ let mut stream = self.get_stream_mut(stream_id)?;
+ let topic = stream.get_topic_mut(topic_id)?;
+ let partitions = topic
+ .
delete_persisted_partitions_by_ids(partition_ids)
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error:
{error}) - failed to delete persisted partitions for topic: {topic}")
+ })?;
+ drop(stream);
+
+ let mut segments_count = 0;
+ let mut messages_count = 0;
+ let partitions_count = partitions.len();
+ for partition in &partitions {
+ let partition = partition.read().await;
+ let partition_messages_count =
partition.get_messages_count();
+ segments_count += partition.get_segments_count();
+ messages_count += partition_messages_count;
+ }
+
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to get
stream with ID: {stream_id}"
+ )
+ })?;
+ let topic =
stream.get_topic_mut(topic_id).with_error_context(|error| {
format!(
- "{COMPONENT} (error: {error}) - failed to open
partition with ID: {partition_id} in topic with ID: {topic_id} for stream with
ID: {stream_id}"
+ "{COMPONENT} (error: {error}) - failed to get
topic with ID: {topic_id}"
)
})?;
+ topic.reassign_consumer_groups();
+ if partitions.len() > 0 {
+ self.metrics.decrement_partitions(partitions_count
as u32);
+ self.metrics.decrement_segments(segments_count);
+ self.metrics.decrement_messages(messages_count);
+ }
+ Ok(())
}
- }
- self.insert_shard_table_records(records);
- Ok(())
- }
- ShardEvent::CreatedPartitions {
- stream_id,
- topic_id,
- partitions_count,
- } => {
- let mut stream = self.get_stream_mut(stream_id)?;
- let topic = stream.get_topic_mut(topic_id)?;
- topic.add_persisted_partitions(*partitions_count)
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to create
partitions for topic with ID: {topic_id} in stream with ID: {stream_id}"
- )
- })?;
- topic.reassign_consumer_groups();
- self.metrics.increment_partitions(*partitions_count);
- self.metrics.increment_segments(*partitions_count);
- Ok(())
- }
- ShardEvent::DeletedPartitions {
- stream_id,
- topic_id,
- partition_ids,
- } => {
- let mut stream = self.get_stream_mut(stream_id)?;
- let topic = stream.get_topic_mut(topic_id)?;
- let partitions = topic
- . delete_persisted_partitions_by_ids(partition_ids)
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to
delete persisted partitions for topic: {topic}")
- })?;
- drop(stream);
-
- let mut segments_count = 0;
- let mut messages_count = 0;
- let partitions_count = partitions.len();
- for partition in &partitions {
- let partition = partition.read().await;
- let partition_messages_count =
partition.get_messages_count();
- segments_count += partition.get_segments_count();
- messages_count += partition_messages_count;
- }
-
- let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to get stream
with ID: {stream_id}"
- )
- })?;
- let topic =
stream.get_topic_mut(topic_id).with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to get topic
with ID: {topic_id}"
- )
- })?;
- topic.reassign_consumer_groups();
- if partitions.len() > 0 {
- self.metrics.decrement_partitions(partitions_count as u32);
- self.metrics.decrement_segments(segments_count);
- self.metrics.decrement_messages(messages_count);
- }
- Ok(())
- }
ShardEvent::DeletedShardTableRecords { namespaces } => {
- let (stream_id, topic_id) = namespaces
- .first()
- .map(|ns| (ns.stream_id, ns.topic_id))
- .unwrap();
- let stream =
self.get_stream(&Identifier::numeric(stream_id).unwrap())?;
- let topic =
stream.get_topic(&Identifier::numeric(topic_id).unwrap())?;
- let records = self.remove_shard_table_records(&namespaces);
- for (ns, shard_info) in records.iter() {
- if shard_info.id() == self.id {
- let partition = topic.get_partition(ns.partition_id)?;
- let mut partition = partition.write().await;
- partition.delete().await.with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to delete
partition with ID: {} in topic with ID: {}",
- ns.partition_id,
- topic_id
- )
- })?;
+ let (stream_id, topic_id) = namespaces
+ .first()
+ .map(|ns| (ns.stream_id, ns.topic_id))
+ .unwrap();
+ let stream =
self.get_stream(&Identifier::numeric(stream_id).unwrap())?;
+ let topic =
stream.get_topic(&Identifier::numeric(topic_id).unwrap())?;
+ let records =
self.remove_shard_table_records(&namespaces);
+ for (ns, shard_info) in records.iter() {
+ if shard_info.id() == self.id {
+ let partition =
topic.get_partition(ns.partition_id)?;
+ let mut partition = partition.write().await;
+
partition.delete().await.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error:
{error}) - failed to delete partition with ID: {} in topic with ID: {}",
+ ns.partition_id,
+ topic_id
+ )
+ })?;
+ }
+ }
+ Ok(())
}
- }
- Ok(())
- }
ShardEvent::DeletedStream { stream_id } => {
- let shard_id = self.id;
-
self.delete_stream_bypass_auth(stream_id).with_error_context(|err| {
- format!(
- "{COMPONENT} (error: {err}) - failed to delete, when
handling event on shard: {shard_id} stream with ID: {stream_id}",
- )
- })?;
- Ok(())
- }
+ let shard_id = self.id;
+
self.delete_stream_bypass_auth(stream_id).with_error_context(|err| {
+ format!(
+ "{COMPONENT} (error: {err}) -
failed to delete, when handling event on shard: {shard_id} stream with ID:
{stream_id}",
+ )
+ })?;
+ Ok(())
+ }
ShardEvent::UpdatedStream { stream_id, name } => {
- self.update_stream_bypass_auth(stream_id, name)?;
- Ok(())
- }
+ self.update_stream_bypass_auth(stream_id, name)?;
+ Ok(())
+ }
ShardEvent::UpdatedTopic {
- stream_id,
- topic_id,
- name,
- message_expiry,
- compression_algorithm,
- max_topic_size,
- replication_factor,
- } => {
- self.update_topic_bypass_auth(
- stream_id,
- topic_id,
- name,
- *message_expiry,
- *compression_algorithm,
- *max_topic_size,
- *replication_factor,
- )
- .await?;
- Ok(())
- }
+ stream_id,
+ topic_id,
+ name,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ } => {
+ self.update_topic_bypass_auth(
+ stream_id,
+ topic_id,
+ name,
+ *message_expiry,
+ *compression_algorithm,
+ *max_topic_size,
+ *replication_factor,
+ )
+ .await?;
+ Ok(())
+ }
ShardEvent::PurgedStream { stream_id: _ } => todo!(),
- ShardEvent::CreatedConsumerGroup {
- stream_id: _,
- topic_id: _,
- consumer_group_id: _,
- name: _,
- } => todo!(),
- ShardEvent::DeletedConsumerGroup {
- stream_id: _,
- topic_id: _,
- consumer_group_id: _,
- } => todo!(),
ShardEvent::PurgedTopic {
- stream_id: _,
- topic_id: _,
- } => todo!(),
+ stream_id: _,
+ topic_id: _,
+ } => todo!(),
ShardEvent::DeletedTopic {
- stream_id,
- topic_id,
- } => {
- self.delete_topic_bypass_auth(stream_id, topic_id)
- .await
- .with_error_context(|err| {
- format!(
- "{COMPONENT} (error: {err}) - failed to delete
topic with ID: {topic_id} in stream with ID: {stream_id}"
- )
- })?;
- Ok(())
- }
+ stream_id,
+ topic_id,
+ } => {
+ self.delete_topic_bypass_auth(stream_id, topic_id)
+ .await
+ .with_error_context(|err| {
+ format!(
+ "{COMPONENT} (error:
{err}) - failed to delete topic with ID: {topic_id} in stream with ID:
{stream_id}"
+ )
+ })?;
+ Ok(())
+ }
+ ShardEvent::CreatedConsumerGroup {
+ stream_id,
+ topic_id,
+ consumer_group_id,
+ name,
+ } => self.create_consumer_group_bypass_auth(
+ stream_id,
+ topic_id,
+ *consumer_group_id,
+ name,
+ ),
+ ShardEvent::DeletedConsumerGroup {
+ stream_id,
+ topic_id,
+ consumer_group_id,
+ } => {
+ self.delete_consumer_group_bypass_auth(stream_id,
topic_id, consumer_group_id)?;
+ Ok(())
+ }
ShardEvent::CreatedUser {
- username: _,
- password: _,
- status: _,
- permissions: _,
- } => todo!(),
- ShardEvent::DeletedUser { user_id: _ } => todo!(),
- ShardEvent::LogoutUser { client_id: _ } => todo!(),
- ShardEvent::UpdatedUser {
- user_id: _,
- username: _,
- status: _,
- } => todo!(),
+ username,
+ password,
+ status,
+ permissions,
+ } => {
+ self.create_user_bypass_auth(username, password,
*status, permissions.clone())?;
+ Ok(())
+ }
+ ShardEvent::DeletedUser { user_id } => {
+ self.delete_user_bypass_auth(user_id)?;
+ Ok(())
+ }
+ ShardEvent::LogoutUser { client_id } => {
+ let sessions = self.active_sessions.borrow();
+ let session = sessions.iter().find(|s| s.client_id ==
*client_id).unwrap();
+ self.logout_user(session)?;
+ self.remove_active_session(session.get_user_id());
+
+ Ok(())
+ }
ShardEvent::ChangedPassword {
- user_id: _,
- current_password: _,
- new_password: _,
- } => todo!(),
- ShardEvent::CreatedPersonalAccessToken { name: _, expiry: _ } =>
todo!(),
- ShardEvent::DeletedPersonalAccessToken { name: _ } => todo!(),
+ user_id,
+ current_password,
+ new_password,
+ } => {
+ self.change_password_bypass_auth(user_id,
current_password, new_password)?;
+ Ok(())
+ }
+ ShardEvent::CreatedPersonalAccessToken {personal_access_token} => {
+
self.create_personal_access_token_bypass_auth(personal_access_token.to_owned())?;
+ Ok(())
+ },
+ ShardEvent::DeletedPersonalAccessToken { user_id, name } => {
+ self.delete_personal_access_token_bypass_auth(*user_id, name)?;
+ Ok(())
+ },
ShardEvent::LoginWithPersonalAccessToken { token: _ } => todo!(),
- ShardEvent::StoredConsumerOffset {
- stream_id: _,
- topic_id: _,
- consumer: _,
- offset: _,
- } => todo!(),
+ ShardEvent::UpdatedUser {
+ user_id,
+ username,
+ status,
+ } => {
+ self.update_user_bypass_auth(user_id,
username.to_owned(), *status)?;
+ Ok(())
+ },
+ ShardEvent::UpdatedPermissions { user_id, permissions } => {
+ self.update_permissions_bypass_auth(user_id,
permissions.to_owned())?;
+ Ok(())
+ }
}
}
@@ -779,9 +809,13 @@ impl IggyShard {
}
}
- pub async fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>)
-> Vec<ShardResponse> {
+ pub async fn broadcast_event_to_all_shards(
+ &self,
+ event: Arc<ShardEvent>,
+ ) -> Vec<ShardResponse> {
let mut responses =
Vec::with_capacity(self.get_available_shards_count() as usize);
- for maybe_receiver in self.shards
+ for maybe_receiver in self
+ .shards
.iter()
.filter_map(|shard| {
if shard.id != self.id {
@@ -801,18 +835,19 @@ impl IggyShard {
conn.send(ShardFrame::new(event.clone().into(), None));
None
}
- }) {
- match maybe_receiver {
- Some(receiver) => {
- let response = receiver.recv().await.unwrap();
- responses.push(response);
- }
- None => {
- responses.push(ShardResponse::Event);
- }
+ })
+ {
+ match maybe_receiver {
+ Some(receiver) => {
+ let response = receiver.recv().await.unwrap();
+ responses.push(response);
+ }
+ None => {
+ responses.push(ShardResponse::Event);
}
}
- responses
+ }
+ responses
}
fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
@@ -866,6 +901,20 @@ impl IggyShard {
self.active_sessions.borrow_mut().push(session);
}
+ pub fn remove_active_session(&self, user_id: u32) {
+ let mut active_sessions = self.active_sessions.borrow_mut();
+ let pos = active_sessions
+ .iter()
+ .position(|s| s.get_user_id() == user_id);
+ if let Some(pos) = pos {
+ active_sessions.remove(pos);
+ } else {
+ error!(
+ "{COMPONENT} - failed to remove active session for user ID:
{user_id}, session not found."
+ );
+ }
+ }
+
pub fn ensure_authenticated(&self, session: &Session) -> Result<u32,
IggyError> {
let active_sessions = self.active_sessions.borrow();
let user_id = active_sessions
diff --git a/core/server/src/shard/system/consumer_groups.rs
b/core/server/src/shard/system/consumer_groups.rs
index 7b98205f..96109b85 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -26,6 +26,7 @@ use crate::streaming::topics::consumer_group::ConsumerGroup;
use error_set::ErrContext;
use iggy_common::Identifier;
use iggy_common::IggyError;
+use iggy_common::locking::IggySharedMutFn;
impl IggyShard {
pub fn get_consumer_group<'cg, 'stream>(
@@ -85,6 +86,17 @@ impl IggyShard {
Ok(topic.get_consumer_groups())
}
+ pub fn create_consumer_group_bypass_auth(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ group_id: Option<u32>,
+ name: &str,
+ ) -> Result<(), IggyError> {
+ self.create_consumer_group_base(stream_id, topic_id, group_id, name)?;
+ Ok(())
+ }
+
pub fn create_consumer_group(
&self,
session: &Session,
@@ -109,7 +121,16 @@ impl IggyShard {
topic.topic_id,
).with_error_context(|error| format!("{COMPONENT} (error: {error})
- permission denied to create consumer group for user {} on stream ID: {},
topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
}
+ self.create_consumer_group_base(stream_id, topic_id, group_id, name)
+ }
+ fn create_consumer_group_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ group_id: Option<u32>,
+ name: &str,
+ ) -> Result<Identifier, IggyError> {
let mut stream = self.get_stream_mut(stream_id)
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to get mutable reference to stream with ID: {stream_id}"))?;
let topic = stream.get_topic_mut(topic_id)
@@ -122,6 +143,15 @@ impl IggyShard {
})
}
+ pub fn delete_consumer_group_bypass_auth(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ consumer_group_id: &Identifier,
+ ) -> Result<ConsumerGroup, IggyError> {
+ self.delete_consumer_group_base(stream_id, topic_id, consumer_group_id)
+ }
+
pub async fn delete_consumer_group(
&self,
session: &Session,
@@ -130,8 +160,6 @@ impl IggyShard {
consumer_group_id: &Identifier,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
- let stream_id_value;
- let topic_id_value;
{
let stream = self.get_stream(stream_id).with_error_context(|error|
{
format!(
@@ -146,11 +174,32 @@ impl IggyShard {
topic.stream_id,
topic.topic_id,
).with_error_context(|error| format!("{COMPONENT} (error: {error})
- permission denied to delete consumer group for user {} on stream ID: {},
topic ID: {}", session.get_user_id(), topic.stream_id, topic.topic_id))?;
-
- stream_id_value = topic.stream_id;
- topic_id_value = topic.topic_id;
}
+ let cg = self.delete_consumer_group_base(stream_id, topic_id,
consumer_group_id)?;
+ let stream = self.get_stream(stream_id)?;
+ let topic = stream.get_topic(topic_id)?;
+
+ for (_, partition) in topic.partitions.iter() {
+ let partition = partition.read().await;
+ if let Some((_, offset)) =
partition.consumer_group_offsets.remove(&cg.group_id) {
+ self.storage
+ .partition
+ .delete_consumer_offset(&offset.path)
+ .await?;
+ }
+ }
+
+ Ok(())
+ }
+ fn delete_consumer_group_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ consumer_group_id: &Identifier,
+ ) -> Result<ConsumerGroup, IggyError> {
+ let stream_id_value;
+ let topic_id_value;
let consumer_group;
{
let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
@@ -161,8 +210,9 @@ impl IggyShard {
let topic =
stream.get_topic_mut(topic_id).with_error_context(|error| format!("{COMPONENT}
(error: {error}) - topic not found for stream ID: {stream_id}, topic_id:
{topic_id}"))?;
consumer_group = topic.delete_consumer_group(consumer_group_id)
- .await
- .with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to delete consumer group with ID: {consumer_group_id}"))?
+ .with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to delete consumer group with ID: {consumer_group_id}"))?;
+ stream_id_value = topic.stream_id;
+ topic_id_value = topic.topic_id;
}
for member in consumer_group.get_members() {
@@ -174,8 +224,7 @@ impl IggyShard {
)
.with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to make client leave consumer group for client ID: {}, group
ID: {}", member.id, consumer_group.group_id))?;
}
-
- Ok(())
+ Ok(consumer_group)
}
pub fn join_consumer_group(
diff --git a/core/server/src/shard/system/personal_access_tokens.rs
b/core/server/src/shard/system/personal_access_tokens.rs
index 8cc89cdc..8448cc9d 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -58,7 +58,7 @@ impl IggyShard {
session: &Session,
name: &str,
expiry: IggyExpiry,
- ) -> Result<String, IggyError> {
+ ) -> Result<(PersonalAccessToken, String), IggyError> {
self.ensure_authenticated(session)?;
let user_id = session.get_user_id();
let identifier = user_id.try_into()?;
@@ -78,6 +78,27 @@ impl IggyShard {
}
}
+ let (personal_access_token, token) =
+ PersonalAccessToken::new(user_id, name, IggyTimestamp::now(),
expiry);
+ self.create_personal_access_token_base(personal_access_token.clone())?;
+ Ok((personal_access_token, token))
+ }
+
+ pub fn create_personal_access_token_bypass_auth(
+ &self,
+ personal_access_token: PersonalAccessToken,
+ ) -> Result<(), IggyError> {
+ self.create_personal_access_token_base(personal_access_token)
+ }
+
+ fn create_personal_access_token_base(
+ &self,
+ personal_access_token: PersonalAccessToken,
+ ) -> Result<(), IggyError> {
+ let user_id = personal_access_token.user_id;
+ let name = personal_access_token.name.clone();
+ let token_hash = personal_access_token.token.clone();
+ let identifier = user_id.try_into()?;
let user = self.get_user_mut(&identifier).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}")
})?;
@@ -85,22 +106,20 @@ impl IggyShard {
if user
.personal_access_tokens
.iter()
- .any(|pat| pat.name.as_str() == name)
+ .any(|pat| pat.name.as_str() == name.as_str())
{
error!("Personal access token: {name} for user with ID: {user_id}
already exists.");
return Err(IggyError::PersonalAccessTokenAlreadyExists(
- name.to_owned(),
+ name.to_string(),
user_id,
));
}
info!("Creating personal access token: {name} for user with ID:
{user_id}...");
- let (personal_access_token, token) =
- PersonalAccessToken::new(user_id, name, IggyTimestamp::now(),
expiry);
user.personal_access_tokens
- .insert(personal_access_token.token.clone(),
personal_access_token);
+ .insert(token_hash, personal_access_token);
info!("Created personal access token: {name} for user with ID:
{user_id}.");
- Ok(token)
+ Ok(())
}
pub fn delete_personal_access_token(
@@ -110,6 +129,22 @@ impl IggyShard {
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let user_id = session.get_user_id();
+ self.delete_personal_access_token_base(user_id, name)
+ }
+
+ pub fn delete_personal_access_token_bypass_auth(
+ &self,
+ user_id: u32,
+ name: &str,
+ ) -> Result<(), IggyError> {
+ self.delete_personal_access_token_base(user_id, name)
+ }
+
+ fn delete_personal_access_token_base(
+ &self,
+ user_id: u32,
+ name: &str,
+ ) -> Result<(), IggyError> {
let user = self
.get_user_mut(&user_id.try_into()?)
.with_error_context(|error| {
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index f0e87fc0..0f92a912 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -143,7 +143,7 @@ impl IggyShard {
Ok(self.users.borrow().values().cloned().collect())
}
- pub async fn create_user(
+ pub fn create_user(
&self,
session: &Session,
username: &str,
@@ -162,6 +162,31 @@ impl IggyShard {
)
})?;
+ let user_id = self.create_user_base(username, password, status,
permissions)?;
+ self.get_user(&user_id.try_into()?)
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get user
with id: {user_id}")
+ })
+ }
+
+ pub fn create_user_bypass_auth(
+ &self,
+ username: &str,
+ password: &str,
+ status: UserStatus,
+ permissions: Option<Permissions>,
+ ) -> Result<u32, IggyError> {
+ let user_id = self.create_user_base(username, password, status,
permissions)?;
+ Ok(user_id)
+ }
+
+ fn create_user_base(
+ &self,
+ username: &str,
+ password: &str,
+ status: UserStatus,
+ permissions: Option<Permissions>,
+ ) -> Result<u32, IggyError> {
if self
.users
.borrow()
@@ -186,26 +211,32 @@ impl IggyShard {
self.users.borrow_mut().insert(user.id, user);
info!("Created user: {username} with ID: {user_id}.");
self.metrics.increment_users(1);
- self.get_user(&user_id.try_into()?)
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get user
with id: {user_id}")
- })
+ Ok(user_id)
}
pub fn delete_user(&self, session: &Session, user_id: &Identifier) ->
Result<User, IggyError> {
self.ensure_authenticated(session)?;
+ self.permissioner
+ .borrow()
+ .delete_user(session.get_user_id())
+ .with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - permission denied to
delete user for user with id: {}",
+ session.get_user_id()
+ )
+ })?;
+
+ self.delete_user_base(user_id)
+ }
+
+ pub fn delete_user_bypass_auth(&self, user_id: &Identifier) ->
Result<User, IggyError> {
+ self.delete_user_base(user_id)
+ }
+
+ fn delete_user_base(&self, user_id: &Identifier) -> Result<User,
IggyError> {
let existing_user_id;
let existing_username;
{
- self.permissioner
- .borrow()
- .delete_user(session.get_user_id())
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - permission denied to
delete user for user with id: {}",
- session.get_user_id()
- )
- })?;
let user = self.get_user(user_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get user
with id: {user_id}")
})?;
@@ -257,6 +288,24 @@ impl IggyShard {
)
})?;
+ self.update_user_base(user_id, username, status)
+ }
+
+ pub fn update_user_bypass_auth(
+ &self,
+ user_id: &Identifier,
+ username: Option<String>,
+ status: Option<UserStatus>,
+ ) -> Result<User, IggyError> {
+ self.update_user_base(user_id, username, status)
+ }
+
+ fn update_user_base(
+ &self,
+ user_id: &Identifier,
+ username: Option<String>,
+ status: Option<UserStatus>,
+ ) -> Result<User, IggyError> {
if let Some(username) = username.to_owned() {
let user = self.get_user(user_id)?;
let existing_user =
self.get_user(&username.to_owned().try_into()?);
@@ -310,7 +359,29 @@ impl IggyShard {
error!("Cannot change the root user permissions.");
return Err(IggyError::CannotChangePermissions(user.id));
}
+ }
+ self.update_permissions_base(user_id, permissions)
+ }
+
+ pub fn update_permissions_bypass_auth(
+ &self,
+ user_id: &Identifier,
+ permissions: Option<Permissions>,
+ ) -> Result<(), IggyError> {
+ self.update_permissions_base(user_id, permissions)
+ }
+
+ fn update_permissions_base(
+ &self,
+ user_id: &Identifier,
+ permissions: Option<Permissions>,
+ ) -> Result<(), IggyError> {
+ {
+ let user: User = self.get_user(user_id).with_error_context(|error|
{
+ format!("{COMPONENT} (error: {error}) - failed to get user
with id: {user_id}")
+ })?;
+
self.permissioner
.borrow_mut()
.update_permissions_for_user(user.id, permissions.clone());
@@ -353,6 +424,24 @@ impl IggyShard {
}
}
+ self.change_password_base(user_id, current_password, new_password)
+ }
+
+ pub fn change_password_bypass_auth(
+ &self,
+ user_id: &Identifier,
+ current_password: &str,
+ new_password: &str,
+ ) -> Result<(), IggyError> {
+ self.change_password_base(user_id, current_password, new_password)
+ }
+
+ fn change_password_base(
+ &self,
+ user_id: &Identifier,
+ current_password: &str,
+ new_password: &str,
+ ) -> Result<(), IggyError> {
let mut user = self.get_user_mut(user_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}")
})?;
@@ -456,25 +545,29 @@ impl IggyShard {
pub fn logout_user(&self, session: &Session) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
+ let client_id = session.client_id;
+ let user_id = session.get_user_id();
+ self.logout_user_base(user_id, client_id)?;
+ Ok(())
+ }
+
+ fn logout_user_base(&self, user_id: u32, client_id: u32) -> Result<(),
IggyError> {
let user = self
- .get_user(&Identifier::numeric(session.get_user_id())?)
+ .get_user(&Identifier::numeric(user_id)?)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get user with
id: {}",
- session.get_user_id()
+ user_id,
)
})?;
info!(
"Logging out user: {} with ID: {}...",
user.username, user.id
);
- if session.client_id > 0 {
+ if client_id > 0 {
let mut client_manager = self.client_manager.borrow_mut();
- client_manager.clear_user_id(session.client_id)?;
- info!(
- "Cleared user ID: {} for client: {}.",
- user.id, session.client_id
- );
+ client_manager.clear_user_id(client_id)?;
+ info!("Cleared user ID: {} for client: {}.", user.id, client_id);
}
info!("Logged out user: {} with ID: {}.", user.username, user.id);
Ok(())
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 02daba5e..3b62d3d2 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -6,7 +6,7 @@ use iggy_common::{
use crate::{
shard::namespace::IggyNamespace,
- streaming::{clients::client_manager::Transport,
polling_consumer::PollingConsumer},
+ streaming::{clients::client_manager::Transport,
polling_consumer::PollingConsumer,
personal_access_tokens::personal_access_token::PersonalAccessToken},
};
#[derive(Debug)]
@@ -87,6 +87,10 @@ pub enum ShardEvent {
status: UserStatus,
permissions: Option<Permissions>,
},
+ UpdatedPermissions {
+ user_id: Identifier,
+ permissions: Option<Permissions>,
+ },
DeletedUser {
user_id: Identifier,
},
@@ -109,21 +113,15 @@ pub enum ShardEvent {
new_password: String,
},
CreatedPersonalAccessToken {
- name: String,
- expiry: IggyExpiry,
+ personal_access_token: PersonalAccessToken,
},
DeletedPersonalAccessToken {
+ user_id: u32,
name: String,
},
LoginWithPersonalAccessToken {
token: String,
},
- StoredConsumerOffset {
- stream_id: Identifier,
- topic_id: Identifier,
- consumer: PollingConsumer,
- offset: u64,
- },
NewSession {
address: SocketAddr,
transport: Transport,
diff --git a/core/server/src/streaming/topics/consumer_groups.rs
b/core/server/src/streaming/topics/consumer_groups.rs
index 79c7f9f3..ccb8b69f 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -216,10 +216,7 @@ impl Topic {
Ok(Identifier::numeric(id)?)
}
- pub async fn delete_consumer_group(
- &mut self,
- id: &Identifier,
- ) -> Result<ConsumerGroup, IggyError> {
+ pub fn delete_consumer_group(&mut self, id: &Identifier) ->
Result<ConsumerGroup, IggyError> {
let group_id;
{
let consumer_group =
self.get_consumer_group(id).with_error_context(|error| {
@@ -244,16 +241,6 @@ impl Topic {
.store(group_id, Ordering::SeqCst);
}
- for (_, partition) in self.partitions.iter() {
- let partition = partition.read().await;
- if let Some((_, offset)) =
partition.consumer_group_offsets.remove(&group_id) {
- self.storage
- .partition
- .delete_consumer_offset(&offset.path)
- .await?;
- }
- }
-
info!(
"Deleted consumer group with ID: {} from topic with ID: {} and
stream with ID: {}.",
id, self.topic_id, self.stream_id
@@ -380,9 +367,7 @@ mod tests {
let result = topic.create_consumer_group(Some(group_id), name);
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.borrow().len(), 1);
- let result = topic
- .delete_consumer_group(&Identifier::numeric(group_id).unwrap())
- .await;
+ let result =
topic.delete_consumer_group(&Identifier::numeric(group_id).unwrap());
assert!(result.is_ok());
assert!(topic.consumer_groups.borrow().is_empty());
}
@@ -396,9 +381,7 @@ mod tests {
assert!(result.is_ok());
assert_eq!(topic.consumer_groups.borrow().len(), 1);
let group_id = group_id + 1;
- let result = topic
- .delete_consumer_group(&Identifier::numeric(group_id).unwrap())
- .await;
+ let result =
topic.delete_consumer_group(&Identifier::numeric(group_id).unwrap());
assert!(result.is_err());
assert_eq!(topic.consumer_groups.borrow().len(), 1);
}