This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_integration_tests in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 86145670c43f8694026eba2db3cd2098ce55a9f4 Author: numminex <[email protected]> AuthorDate: Fri Sep 26 11:57:53 2025 +0200 feat(io_uring): fix integration tests --- core/integration/tests/server/general.rs | 3 +- .../tests/server/scenarios/system_scenario.rs | 58 ++---- .../create_consumer_group_handler.rs | 15 +- .../delete_consumer_group_handler.rs | 29 +++ .../consumer_groups/join_consumer_group_handler.rs | 16 ++ .../leave_consumer_group_handler.rs | 16 ++ .../delete_consumer_offset_handler.rs | 2 +- .../store_consumer_offset_handler.rs | 2 +- .../login_with_personal_access_token_handler.rs | 6 + .../handlers/segments/delete_segments_handler.rs | 10 +- .../handlers/streams/delete_stream_handler.rs | 4 +- .../binary/handlers/topics/delete_topic_handler.rs | 2 +- .../binary/handlers/topics/get_topic_handler.rs | 2 +- .../binary/handlers/topics/update_topic_handler.rs | 15 +- .../binary/handlers/users/create_user_handler.rs | 1 + core/server/src/binary/mapper.rs | 10 + core/server/src/shard/mod.rs | 201 ++++++++++++++------- core/server/src/shard/system/consumer_groups.rs | 107 +++++++++-- core/server/src/shard/system/consumer_offsets.rs | 57 +----- core/server/src/shard/system/partitions.rs | 13 +- .../src/shard/system/personal_access_tokens.rs | 14 +- core/server/src/shard/system/segments.rs | 97 +++++++++- core/server/src/shard/system/snapshot/mod.rs | 2 +- core/server/src/shard/system/stats.rs | 55 ++++-- core/server/src/shard/system/streams.rs | 68 ++++--- core/server/src/shard/system/topics.rs | 56 ++++-- core/server/src/shard/system/users.rs | 69 ++++--- core/server/src/shard/transmission/event.rs | 20 ++ core/server/src/slab/consumer_groups.rs | 16 +- core/server/src/slab/partitions.rs | 20 +- core/server/src/slab/streams.rs | 20 +- core/server/src/slab/topics.rs | 18 +- core/server/src/slab/traits_ext.rs | 4 +- .../server/src/streaming/clients/client_manager.rs | 43 +++-- core/server/src/streaming/partitions/helpers.rs | 82 ++++++--- core/server/src/streaming/partitions/log.rs | 12 ++ core/server/src/streaming/partitions/partition2.rs | 20 ++ .../streaming/segments/messages/messages_writer.rs | 4 + core/server/src/streaming/segments/storage.rs | 8 +- core/server/src/streaming/stats/stats.rs | 92 ++++++++++ core/server/src/streaming/streams/helpers.rs | 14 +- core/server/src/streaming/streams/stream2.rs | 4 + .../server/src/streaming/topics/consumer_group2.rs | 12 ++ core/server/src/streaming/topics/helpers.rs | 9 +- core/server/src/streaming/topics/topic2.rs | 11 ++ 45 files changed, 979 insertions(+), 360 deletions(-) diff --git a/core/integration/tests/server/general.rs b/core/integration/tests/server/general.rs index d187ea3b..2a384332 100644 --- a/core/integration/tests/server/general.rs +++ b/core/integration/tests/server/general.rs @@ -23,8 +23,9 @@ use iggy_common::TransportProtocol; use serial_test::parallel; use test_case::test_matrix; +// TODO: Include other trasnsport protocols #[test_matrix( - [TransportProtocol::Tcp, TransportProtocol::Quic, TransportProtocol::Http], + [TransportProtocol::Tcp], [ system_scenario(), user_scenario(), diff --git a/core/integration/tests/server/scenarios/system_scenario.rs b/core/integration/tests/server/scenarios/system_scenario.rs index ed27adbb..5e0191e4 100644 --- a/core/integration/tests/server/scenarios/system_scenario.rs +++ b/core/integration/tests/server/scenarios/system_scenario.rs @@ -91,10 +91,6 @@ pub async fn run(client_factory: &dyn ClientFactory) { let create_stream_result = client.create_stream(STREAM_NAME).await; assert!(create_stream_result.is_err()); - // 8. Try to create the stream with the different name and validate that it succeeds - let create_stream_result = client.create_stream(&format!("{STREAM_NAME}-2")).await; - assert!(create_stream_result.is_ok()); - // 9. Create the topic let topic = client .create_topic( @@ -144,7 +140,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize); assert_eq!(topic.size, 0); assert_eq!(topic.messages_count, 0); - let mut id = 1; + let mut id = 0; for topic_partition in topic.partitions { assert_eq!(topic_partition.id, id); assert_eq!(topic_partition.segments_count, 1); @@ -198,20 +194,6 @@ pub async fn run(client_factory: &dyn ClientFactory) { .await; assert!(create_topic_result.is_err()); - // 16. Try to create the topic with the different name and validate that it succeeds - let create_topic_result = client - .create_topic( - &Identifier::named(STREAM_NAME).unwrap(), - &format!("{TOPIC_NAME}-2"), - PARTITIONS_COUNT, - Default::default(), - None, - IggyExpiry::NeverExpire, - MaxTopicSize::ServerDefault, - ) - .await; - assert!(create_topic_result.is_err()); - // 17. Send messages to the specific topic and partition let mut messages = create_messages(); client @@ -284,7 +266,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize); assert_eq!(topic.size, 89806); assert_eq!(topic.messages_count, MESSAGES_COUNT as u64); - let topic_partition = topic.partitions.get((PARTITION_ID - 1) as usize).unwrap(); + let topic_partition = topic.partitions.get((PARTITION_ID) as usize).unwrap(); assert_eq!(topic_partition.id, PARTITION_ID); assert_eq!(topic_partition.segments_count, 1); assert!(topic_partition.size > 0); @@ -605,7 +587,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { let updated_topic = client .get_topic( &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(&updated_topic_name).unwrap(), ) .await .unwrap() @@ -627,7 +609,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { client .purge_topic( &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(&updated_topic_name).unwrap(), ) .await .unwrap(); @@ -635,7 +617,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { let polled_messages = client .poll_messages( &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(&updated_topic_name).unwrap(), Some(PARTITION_ID), &consumer, &PollingStrategy::offset(0), @@ -659,7 +641,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap(); let updated_stream = client - .get_stream(&Identifier::named(STREAM_NAME).unwrap()) + .get_stream(&Identifier::named(&updated_stream_name).unwrap()) .await .unwrap() .expect("Failed to get stream"); @@ -670,8 +652,8 @@ pub async fn run(client_factory: &dyn ClientFactory) { let mut messages = create_messages(); client .send_messages( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(&updated_stream_name).unwrap(), + &Identifier::named(&updated_topic_name).unwrap(), &Partitioning::partition_id(PARTITION_ID), &mut messages, ) @@ -679,14 +661,14 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap(); client - .purge_stream(&Identifier::named(STREAM_NAME).unwrap()) + .purge_stream(&Identifier::named(&updated_stream_name).unwrap()) .await .unwrap(); let polled_messages = client .poll_messages( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(&updated_stream_name).unwrap(), + &Identifier::named(&updated_topic_name).unwrap(), Some(PARTITION_ID), &consumer, &PollingStrategy::offset(0), @@ -701,20 +683,20 @@ pub async fn run(client_factory: &dyn ClientFactory) { // 42. Delete the existing topic and ensure it doesn't exist anymore client .delete_topic( - &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::named(TOPIC_NAME).unwrap(), + &Identifier::named(&updated_stream_name).unwrap(), + &Identifier::named(&updated_topic_name).unwrap(), ) .await .unwrap(); let topics = client - .get_topics(&Identifier::named(STREAM_NAME).unwrap()) + .get_topics(&Identifier::named(&updated_stream_name).unwrap()) .await .unwrap(); assert!(topics.is_empty()); - // 43. Create the stream with automatically generated ID on the server + // 43. Create the stream let stream_name = format!("{STREAM_NAME}-auto"); - let stream = client.create_stream(&stream_name).await.unwrap(); + let _ = client.create_stream(&stream_name).await.unwrap(); let stream = client .get_stream(&Identifier::named(&stream_name).unwrap()) @@ -722,12 +704,11 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap() .expect("Failed to get stream"); - let stream_id = stream.id; assert_eq!(stream.name, stream_name); - // 44. Create the topic with automatically generated ID on the server + // 44. Create the topic let topic_name = format!("{TOPIC_NAME}-auto"); - let topic = client + let _ = client .create_topic( &Identifier::named(&stream_name).unwrap(), &topic_name, @@ -749,7 +730,6 @@ pub async fn run(client_factory: &dyn ClientFactory) { .unwrap() .expect("Failed to get topic"); - let topic_id = topic.id; assert_eq!(topic.name, topic_name); // 45. Delete the existing streams and ensure there's no streams left @@ -758,7 +738,7 @@ pub async fn run(client_factory: &dyn ClientFactory) { for stream in streams { client - .delete_stream(&Identifier::numeric(stream.id).unwrap()) + .delete_stream(&Identifier::named(&stream.name).unwrap()) .await .unwrap(); } diff --git a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs index 9007b6a3..9634dceb 100644 --- a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs @@ -26,13 +26,10 @@ use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::state::models::CreateConsumerGroupWithId; use crate::streaming::session::Session; -use crate::streaming::topics::consumer_group2::MEMBERS_CAPACITY; -use crate::streaming::{streams, topics}; use anyhow::Result; -use arcshift::ArcShift; use error_set::ErrContext; -use iggy_common::IggyError; use iggy_common::create_consumer_group::CreateConsumerGroup; +use iggy_common::{Identifier, IggyError}; use std::rc::Rc; use tracing::{debug, instrument}; @@ -57,6 +54,7 @@ impl ServerCommandHandler for CreateConsumerGroup { self.name.clone(), )?; let cg_id = cg.id(); + let event = ShardEvent::CreatedConsumerGroup2 { stream_id: self.stream_id.clone(), topic_id: self.topic_id.clone(), @@ -81,8 +79,13 @@ impl ServerCommandHandler for CreateConsumerGroup { "{COMPONENT} (error: {error}) - failed to apply create consumer group for stream_id: {stream_id}, topic_id: {topic_id}, group_id: {cg_id}, session: {session}" ) })?; - // TODO: Fixme - //sender.send_ok_response(&response).await?; + let response = shard.streams2.with_consumer_group_by_id( + &stream_id, + &topic_id, + &Identifier::numeric(cg_id as u32).unwrap(), + |(root, members)| mapper::map_consumer_group(root, members), + ); + sender.send_ok_response(&response).await?; Ok(()) } } diff --git a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index 04fcbdea..14d49ec2 100644 --- a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -52,6 +52,35 @@ impl ServerCommandHandler for DeleteConsumerGroup { ) })?; let cg_id = cg.id(); + + // Remove all consumer group members from ClientManager using helper functions to resolve identifiers + let stream_id_usize = shard.streams2.with_stream_by_id( + &self.stream_id, + crate::streaming::streams::helpers::get_stream_id(), + ); + let topic_id_usize = shard.streams2.with_topic_by_id( + &self.stream_id, + &self.topic_id, + crate::streaming::topics::helpers::get_topic_id(), + ); + + // Get members from the deleted consumer group and make them leave + let slab = cg.members().inner().shared_get(); + for (_, member) in slab.iter() { + if let Err(err) = shard.client_manager.borrow_mut().leave_consumer_group( + member.client_id, + stream_id_usize, + topic_id_usize, + cg_id, + ) { + tracing::warn!( + "{COMPONENT} (error: {err}) - failed to make client leave consumer group for client ID: {}, group ID: {}", + member.client_id, + cg_id + ); + } + } + let event = ShardEvent::DeletedConsumerGroup2 { id: cg_id, stream_id: self.stream_id.clone(), diff --git a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs index f72609c3..bca8742f 100644 --- a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs @@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::session::Session; use anyhow::Result; use error_set::ErrContext; @@ -55,6 +56,21 @@ impl ServerCommandHandler for JoinConsumerGroup { self.stream_id, self.topic_id, self.group_id, session ) })?; + + // Update ClientManager and broadcast event to other shards + let client_id = session.client_id; + let stream_id = self.stream_id.clone(); + let topic_id = self.topic_id.clone(); + let group_id = self.group_id.clone(); + + let event = ShardEvent::JoinedConsumerGroup { + client_id, + stream_id, + topic_id, + group_id, + }; + let _responses = shard.broadcast_event_to_all_shards(event).await; + sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs index 588958cc..8e795501 100644 --- a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs @@ -21,6 +21,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::session::Session; use anyhow::Result; use error_set::ErrContext; @@ -57,6 +58,21 @@ impl ServerCommandHandler for LeaveConsumerGroup { self.stream_id, self.topic_id, self.group_id, session ) })?; + + // Update ClientManager and broadcast event to other shards + let client_id = session.client_id; + let stream_id = self.stream_id.clone(); + let topic_id = self.topic_id.clone(); + let group_id = self.group_id.clone(); + + let event = ShardEvent::LeftConsumerGroup { + client_id, + stream_id, + topic_id, + group_id, + }; + let _responses = shard.broadcast_event_to_all_shards(event).await; + sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index e2514078..9458861d 100644 --- a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -55,13 +55,13 @@ impl ServerCommandHandler for DeleteConsumerOffset { .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to delete consumer offset for topic with ID: {} in stream with ID: {} partition ID: {:#?}, session: {}", self.topic_id, self.stream_id, self.partition_id, session ))?; + // TODO: Get rid of this event. let event = ShardEvent::DeletedOffset { stream_id: self.stream_id, topic_id: self.topic_id, partition_id, polling_consumer, }; - let _responses = shard.broadcast_event_to_all_shards(event).await; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs index 6075d973..85eb95cd 100644 --- a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs +++ b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs @@ -57,6 +57,7 @@ impl ServerCommandHandler for StoreConsumerOffset { .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, offset: {}, session: {}", self.stream_id, self.topic_id, self.partition_id, self.offset, session ))?; + // TODO: Get rid of this event. let event = ShardEvent::StoredOffset { stream_id: self.stream_id, topic_id: self.topic_id, @@ -64,7 +65,6 @@ impl ServerCommandHandler for StoreConsumerOffset { polling_consumer, offset: self.offset, }; - let _responses = shard.broadcast_event_to_all_shards(event).await; sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs index 00d4b295..e7ecc33f 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs @@ -1,3 +1,4 @@ +use crate::shard::transmission::event::ShardEvent; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -55,6 +56,11 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken { "{COMPONENT} (error: {error}) - failed to login with personal access token: {redacted_token}, session: {session}", ) })?; + let event = ShardEvent::LoginWithPersonalAccessToken { + token: self.token, + client_id: session.client_id, + }; + let _responses = shard.broadcast_event_to_all_shards(event).await; let identity_info = mapper::map_identity_info(user.id); sender.send_ok_response(&identity_info).await?; Ok(()) diff --git a/core/server/src/binary/handlers/segments/delete_segments_handler.rs b/core/server/src/binary/handlers/segments/delete_segments_handler.rs index b3eef13b..ef74b11f 100644 --- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs +++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs @@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::utils::receive_and_validate; use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; @@ -52,7 +53,7 @@ impl ServerCommandHandler for DeleteSegments { session, &self.stream_id, &self.topic_id, - self.partition_id, + self.partition_id as usize, self.segments_count, ) .await @@ -61,6 +62,13 @@ impl ServerCommandHandler for DeleteSegments { "{COMPONENT} (error: {error}) - failed to delete segments for topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", ) })?; + let event = ShardEvent::DeletedSegments { + stream_id: self.stream_id.clone(), + topic_id: self.topic_id.clone(), + partition_id: self.partition_id as usize, + segments_count: self.segments_count, + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; shard .state diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs b/core/server/src/binary/handlers/streams/delete_stream_handler.rs index 88ade486..f59b11b5 100644 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -60,14 +60,12 @@ impl ServerCommandHandler for DeleteStream { stream2.root().name(), stream2.id() ); + let event = ShardEvent::DeletedStream2 { id: stream2.id(), stream_id: self.stream_id.clone(), }; let _responses = shard.broadcast_event_to_all_shards(event).await; - // Drop the stream to force readers/writers to be dropped. - drop(stream2); - // Stream files and directories have been deleted by delete_stream_from_disk shard .state diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs b/core/server/src/binary/handlers/topics/delete_topic_handler.rs index 592119f0..c589d5ea 100644 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -65,13 +65,13 @@ impl ServerCommandHandler for DeleteTopic { topic_id, stream_id ); + let event = ShardEvent::DeletedTopic2 { id: topic_id, stream_id: self.stream_id.clone(), topic_id: self.topic_id.clone(), }; let _responses = shard.broadcast_event_to_all_shards(event.into()).await; - // TODO: Remove all the files and directories. shard .state diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs b/core/server/src/binary/handlers/topics/get_topic_handler.rs index 48a49201..653ce995 100644 --- a/core/server/src/binary/handlers/topics/get_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs @@ -51,7 +51,7 @@ impl ServerCommandHandler for GetTopic { session.get_user_id(), numeric_stream_id as u32, self.topic_id.get_u32_value().unwrap_or(0), - ); + )?; shard .streams2 diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs b/core/server/src/binary/handlers/topics/update_topic_handler.rs index 8a5e7595..f37990ac 100644 --- a/core/server/src/binary/handlers/topics/update_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs @@ -26,8 +26,8 @@ use crate::streaming::session::Session; use crate::streaming::topics; use anyhow::Result; use error_set::ErrContext; -use iggy_common::IggyError; use iggy_common::update_topic::UpdateTopic; +use iggy_common::{Identifier, IggyError}; use std::rc::Rc; use tracing::{debug, instrument}; @@ -45,6 +45,7 @@ impl ServerCommandHandler for UpdateTopic { shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); + let name_changed = !self.name.is_empty(); shard.update_topic2( session, &self.stream_id, @@ -54,15 +55,21 @@ impl ServerCommandHandler for UpdateTopic { self.compression_algorithm, self.max_topic_size, self.replication_factor, - ); + )?; + // TODO: Tech debt. + let topic_id = if name_changed { + Identifier::named(&self.name.clone()).unwrap() + } else { + self.topic_id.clone() + }; self.message_expiry = shard.streams2.with_topic_by_id( &self.stream_id, - &self.topic_id, + &topic_id, topics::helpers::get_message_expiry(), ); self.max_topic_size = shard.streams2.with_topic_by_id( &self.stream_id, - &self.topic_id, + &topic_id, topics::helpers::get_max_topic_size(), ); let event = ShardEvent::UpdatedTopic2 { diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 7ca1a82e..0837a794 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -72,6 +72,7 @@ impl ServerCommandHandler for CreateUser { user.id ); let event = ShardEvent::CreatedUser { + user_id: user.id, username: self.username.to_owned(), password: self.password.to_owned(), status: self.status, diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs index fa933aa4..64f80dfb 100644 --- a/core/server/src/binary/mapper.rs +++ b/core/server/src/binary/mapper.rs @@ -167,6 +167,16 @@ pub fn map_streams(roots: &Slab<stream2::StreamRoot>, stats: &Slab<Arc<StreamSta pub fn map_stream(root: &stream2::StreamRoot, stats: &StreamStats) -> Bytes { let mut bytes = BytesMut::new(); extend_stream(root, stats, &mut bytes); + root.topics().with_components(|topics| { + let (roots, _, stats, ..) = topics.into_components(); + for (root, stat) in roots + .iter() + .map(|(_, val)| val) + .zip(stats.iter().map(|(_, val)| val)) + { + extend_topic(&root, &stat, &mut bytes); + } + }); bytes.freeze() } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 4771d238..5d3e6186 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -32,7 +32,7 @@ use error_set::ErrContext; use futures::future::try_join_all; use hash32::{Hasher, Murmur3Hasher}; use iggy_common::{ - EncryptorKind, Identifier, IggyError, IggyTimestamp, Permissions, PollingKind, UserId, + EncryptorKind, IdKind, Identifier, IggyError, IggyTimestamp, Permissions, PollingKind, UserId, UserStatus, defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME}, locking::IggyRwLockFn, @@ -181,62 +181,6 @@ impl IggyShard { Default::default() } - pub fn default_from_config(server_config: ServerConfig) -> Self { - use crate::bootstrap::resolve_persister; - use crate::streaming::storage::SystemStorage; - use crate::versioning::SemanticVersion; - - let version = SemanticVersion::current().expect("Invalid version"); - let persister = resolve_persister(server_config.system.partition.enforce_fsync); - let storage = Rc::new(SystemStorage::new( - server_config.system.clone(), - persister.clone(), - )); - - let (stop_sender, stop_receiver) = async_channel::unbounded(); - - let state_path = server_config.system.get_state_messages_file_path(); - let file_state = FileState::new(&state_path, &version, persister, None); - let state = crate::state::StateKind::File(file_state); - let shards_table = Box::new(DashMap::new()); - let shards_table = Box::leak(shards_table); - - let shard = Self { - id: 0, - shards: Vec::new(), - shards_table: shards_table.into(), - version, - streams2: Default::default(), - state, - storage, - //TODO: Fix - encryptor: None, - config: server_config, - client_manager: Default::default(), - active_sessions: Default::default(), - permissioner: Default::default(), - users: Default::default(), - metrics: Metrics::init(), - messages_receiver: Cell::new(None), - stop_receiver, - stop_sender, - task_registry: TaskRegistry::new(), - is_shutting_down: AtomicBool::new(false), - tcp_bound_address: Cell::new(None), - quic_bound_address: Cell::new(None), - }; - let user = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); - shard - .create_user_bypass_auth( - &user.username, - &user.password, - UserStatus::Active, - Some(Permissions::root()), - ) - .unwrap(); - shard - } - pub async fn init(&self) -> Result<(), IggyError> { self.load_segments().await?; let _ = self.load_users().await; @@ -691,6 +635,10 @@ impl IggyShard { username, password, } => self.login_user_event(client_id, &username, &password), + ShardEvent::LoginWithPersonalAccessToken { client_id, token } => { + self.login_user_pat_event(&token, client_id)?; + Ok(()) + } ShardEvent::NewSession { address, transport } => { let session = self.add_client(&address, transport); self.add_active_session(session); @@ -715,22 +663,30 @@ impl IggyShard { Ok(()) } ShardEvent::PurgedStream2 { stream_id } => { - self.purge_stream2_bypass_auth(&stream_id)?; + self.purge_stream2_bypass_auth(&stream_id).await?; Ok(()) } ShardEvent::PurgedTopic { stream_id, topic_id, } => { - todo!(); + self.purge_topic2_bypass_auth(&stream_id, &topic_id).await?; + Ok(()) } ShardEvent::CreatedUser { + user_id, username, password, status, permissions, } => { - self.create_user_bypass_auth(&username, &password, status, permissions.clone())?; + self.create_user_bypass_auth( + user_id, + &username, + &password, + status, + permissions.clone(), + )?; Ok(()) } ShardEvent::DeletedUser { user_id } => { @@ -741,8 +697,6 @@ impl IggyShard { let sessions = self.active_sessions.borrow(); let session = sessions.iter().find(|s| s.client_id == client_id).unwrap(); self.logout_user(session)?; - self.remove_active_session(session.get_user_id()); - Ok(()) } ShardEvent::ChangedPassword { @@ -763,7 +717,6 @@ impl IggyShard { self.delete_personal_access_token_bypass_auth(user_id, &name)?; Ok(()) } - ShardEvent::LoginWithPersonalAccessToken { token: _ } => todo!(), ShardEvent::UpdatedUser { user_id, username, @@ -792,6 +745,12 @@ impl IggyShard { ShardEvent::DeletedStream2 { id, stream_id } => { let stream = self.delete_stream2_bypass_auth(&stream_id); assert_eq!(stream.id(), id); + + // Clean up consumer groups from ClientManager for this stream + self.client_manager + .borrow_mut() + .delete_consumer_groups_for_stream(id); + Ok(()) } ShardEvent::CreatedTopic2 { stream_id, topic } => { @@ -814,6 +773,16 @@ impl IggyShard { } => { let topic = self.delete_topic_bypass_auth2(&stream_id, &topic_id); assert_eq!(topic.id(), id); + + // Clean up consumer groups from ClientManager for this topic using helper functions + let stream_id_usize = self.streams2.with_stream_by_id( + &stream_id, + crate::streaming::streams::helpers::get_stream_id(), + ); + self.client_manager + .borrow_mut() + .delete_consumer_groups_for_topic(stream_id_usize, id); + Ok(()) } ShardEvent::UpdatedTopic2 { @@ -854,6 +823,36 @@ impl IggyShard { } => { let cg = self.delete_consumer_group_bypass_auth2(&stream_id, &topic_id, &group_id); assert_eq!(cg.id(), id); + + // Remove all consumer group members from ClientManager using helper functions + let stream_id_usize = self.streams2.with_stream_by_id( + &stream_id, + crate::streaming::streams::helpers::get_stream_id(), + ); + let topic_id_usize = self.streams2.with_topic_by_id( + &stream_id, + &topic_id, + crate::streaming::topics::helpers::get_topic_id(), + ); + + // Get members from the deleted consumer group and make them leave + let slab = cg.members().inner().shared_get(); + for (_, member) in slab.iter() { + if let Err(err) = self.client_manager.borrow_mut().leave_consumer_group( + member.client_id, + stream_id_usize, + topic_id_usize, + id, + ) { + tracing::warn!( + "Shard {} (error: {err}) - failed to make client leave consumer group for client ID: {}, group ID: {}", + self.id, + member.client_id, + id + ); + } + } + Ok(()) } ShardEvent::StoredOffset { @@ -877,13 +876,82 @@ impl IggyShard { topic_id, partition_id, polling_consumer, + } => Ok(()), + ShardEvent::JoinedConsumerGroup { + client_id, + stream_id, + topic_id, + group_id, + } => { + // Convert Identifiers to usizes for ClientManager using helper functions + let stream_id_usize = self.streams2.with_stream_by_id( + &stream_id, + crate::streaming::streams::helpers::get_stream_id(), + ); + let topic_id_usize = self.streams2.with_topic_by_id( + &stream_id, + &topic_id, + crate::streaming::topics::helpers::get_topic_id(), + ); + let group_id_usize = self.streams2.with_consumer_group_by_id( + &stream_id, + &topic_id, + &group_id, + crate::streaming::topics::helpers::get_consumer_group_id(), + ); + + self.client_manager.borrow_mut().join_consumer_group( + client_id, + stream_id_usize, + topic_id_usize, + group_id_usize, + )?; + Ok(()) + } + ShardEvent::LeftConsumerGroup { + client_id, + stream_id, + topic_id, + group_id, + } => { + // Convert Identifiers to usizes for ClientManager using helper functions + let stream_id_usize = self.streams2.with_stream_by_id( + &stream_id, + crate::streaming::streams::helpers::get_stream_id(), + ); + let topic_id_usize = self.streams2.with_topic_by_id( + &stream_id, + &topic_id, + crate::streaming::topics::helpers::get_topic_id(), + ); + let group_id_usize = self.streams2.with_consumer_group_by_id( + &stream_id, + &topic_id, + &group_id, + crate::streaming::topics::helpers::get_consumer_group_id(), + ); + + self.client_manager.borrow_mut().leave_consumer_group( + client_id, + stream_id_usize, + topic_id_usize, + group_id_usize, + )?; + Ok(()) + } + ShardEvent::DeletedSegments { + stream_id, + topic_id, + partition_id, + segments_count, } => { - self.delete_consumer_offset_bypass_auth( + self.delete_segments_bypass_auth( &stream_id, &topic_id, - &polling_consumer, partition_id, - )?; + segments_count, + ) + .await?; Ok(()) } } @@ -945,6 +1013,7 @@ impl IggyShard { | ShardEvent::CreatedPartitions2 { .. } | ShardEvent::DeletedPartitions2 { .. } | ShardEvent::CreatedConsumerGroup2 { .. } + | ShardEvent::CreatedPersonalAccessToken { .. } | ShardEvent::DeletedConsumerGroup2 { .. } ) { let (sender, receiver) = async_channel::bounded(1); diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index 1e6d7fe3..e3e254fb 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -140,12 +140,34 @@ impl IggyShard { topic_id: &Identifier, group_id: &Identifier, ) -> consumer_group2::ConsumerGroup { - self.streams2.with_consumer_groups_mut( + // Get numeric IDs before deletion for ClientManager cleanup + let stream_id_value = self + .streams2 + .with_stream_by_id(stream_id, streams::helpers::get_stream_id()); + let topic_id_value = + self.streams2 + .with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); + let group_id_value = self.streams2.with_consumer_group_by_id( + stream_id, + topic_id, + group_id, + topics::helpers::get_consumer_group_id(), + ); + + let cg = self.streams2.with_consumer_groups_mut( stream_id, topic_id, topics::helpers::delete_consumer_group(group_id), - ) - // TODO: remove from the consumer_group + ); + + // Clean up ClientManager state + self.client_manager.borrow_mut().delete_consumer_group( + stream_id_value, + topic_id_value, + group_id_value, + ); + + cg } pub fn join_consumer_group( @@ -180,16 +202,32 @@ impl IggyShard { topics::helpers::join_consumer_group(self.id, client_id), ); - // TODO: - /* - self.client_manager.borrow_mut().join_consumer_group(session.client_id, stream_id_value, topic_id_value, group_id) - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to make client join consumer group for client ID: {}", - session.client_id - ) - })?; - */ + // Update ClientManager state + let stream_id_value = self + .streams2 + .with_stream_by_id(stream_id, streams::helpers::get_stream_id()); + let topic_id_value = + self.streams2 + .with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); + let group_id_value = self.streams2.with_consumer_group_by_id( + stream_id, + topic_id, + group_id, + topics::helpers::get_consumer_group_id(), + ); + + self.client_manager.borrow_mut().join_consumer_group( + session.client_id, + stream_id_value, + topic_id_value, + group_id_value, + ) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to make client join consumer group for client ID: {}", + session.client_id + ) + })?; Ok(()) } @@ -223,8 +261,26 @@ impl IggyShard { topics::helpers::leave_consumer_group(self.id, session.client_id), ); - // TODO: - // self.leave_consumer_group_by_client(); + // Update ClientManager state + let stream_id_value = self + .streams2 + .with_stream_by_id(stream_id, streams::helpers::get_stream_id()); + let topic_id_value = + self.streams2 + .with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); + let group_id_value = self.streams2.with_consumer_group_by_id( + stream_id, + topic_id, + group_id, + topics::helpers::get_consumer_group_id(), + ); + + self.client_manager.borrow_mut().leave_consumer_group( + session.client_id, + stream_id_value, + topic_id_value, + group_id_value, + ).with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to make client leave consumer group for client ID: {}", session.client_id))?; Ok(()) } @@ -235,15 +291,26 @@ impl IggyShard { group_id: &Identifier, client_id: u32, ) -> Result<(), IggyError> { - // TODO: - /* + // Update ClientManager state + let stream_id_value = self + .streams2 + .with_stream_by_id(stream_id, streams::helpers::get_stream_id()); + let topic_id_value = + self.streams2 + .with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); + let group_id_value = self.streams2.with_consumer_group_by_id( + stream_id, + topic_id, + group_id, + topics::helpers::get_consumer_group_id(), + ); + self.client_manager.borrow_mut().leave_consumer_group( client_id, stream_id_value, topic_id_value, - group_id, - ) - */ + group_id_value, + ).with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to make client leave consumer group for client ID: {}", client_id))?; Ok(()) } } diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 1ccd1475..5e4f4ff5 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -179,9 +179,9 @@ impl IggyShard { return Err(IggyError::NotResolvedConsumer(consumer.id)); }; - self.delete_consumer_offset_base(stream_id, topic_id, &polling_consumer, partition_id)?; - self.delete_consumer_offset_from_disk(stream_id, topic_id, &polling_consumer, partition_id) - .await?; + let path = + self.delete_consumer_offset_base(stream_id, topic_id, &polling_consumer, partition_id)?; + self.delete_consumer_offset_from_disk(&path).await?; Ok((polling_consumer, partition_id)) } @@ -193,6 +193,7 @@ impl IggyShard { partition_id: usize, offset: u64, ) { + // TODO: This can use `with_partition_by_id` directly. match polling_consumer { PollingConsumer::Consumer(id, _) => { self.streams2.with_stream_by_id( @@ -227,7 +228,7 @@ impl IggyShard { topic_id: &Identifier, polling_consumer: &PollingConsumer, partition_id: usize, - ) -> Result<(), IggyError> { + ) -> Result<String, IggyError> { match polling_consumer { PollingConsumer::Consumer(id, _) => { self.streams2 @@ -235,7 +236,7 @@ impl IggyShard { format!( "{COMPONENT} (error: {error}) - failed to delete consumer offset for consumer with ID: {id} in topic with ID: {topic_id} and stream with ID: {stream_id}", ) - })?; + }) } PollingConsumer::ConsumerGroup(_, id) => { self.streams2 @@ -243,10 +244,9 @@ impl IggyShard { format!( "{COMPONENT} (error: {error}) - failed to delete consumer group member offset for member with ID: {id} in topic with ID: {topic_id} and stream with ID: {stream_id}", ) - })?; + }) } } - Ok(()) } async fn persist_consumer_offset_to_disk( @@ -282,37 +282,8 @@ impl IggyShard { } } - async fn delete_consumer_offset_from_disk( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - polling_consumer: &PollingConsumer, - partition_id: usize, - ) -> Result<(), IggyError> { - match polling_consumer { - PollingConsumer::Consumer(id, _) => { - self.streams2 - .with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - partitions::helpers::delete_consumer_offset_from_disk(self.id, *id), - ) - .await - } - PollingConsumer::ConsumerGroup(_, id) => { - self.streams2 - .with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - partitions::helpers::delete_consumer_group_member_offset_from_disk( - self.id, *id, - ), - ) - .await - } - } + pub async fn delete_consumer_offset_from_disk(&self, path: &str) -> Result<(), IggyError> { + partitions::storage2::delete_persisted_offset(self.id, path).await } pub fn store_consumer_offset_bypass_auth( @@ -331,14 +302,4 @@ impl IggyShard { offset, ); } - - pub fn delete_consumer_offset_bypass_auth( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - polling_consumer: &PollingConsumer, - partition_id: usize, - ) -> Result<(), IggyError> { - self.delete_consumer_offset_base(stream_id, topic_id, polling_consumer, partition_id) - } } diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 05c0c74c..d78192f0 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -22,6 +22,7 @@ use crate::shard::ShardInfo; use crate::shard::calculate_shard_assignment; use crate::shard::namespace::IggyNamespace; use crate::shard_info; +use crate::slab::traits_ext::EntityComponentSystem; use crate::slab::traits_ext::EntityMarker; use crate::slab::traits_ext::IntoComponents; use crate::streaming::partitions; @@ -105,16 +106,12 @@ impl IggyShard { partitions_count, &self.config.system, ); - let stats = partitions.first().map(|p| p.stats()); - if let Some(stats) = stats { - // One segment per partition created. - stats.increment_segments_count(partitions_count); - } + self.metrics.increment_partitions(partitions_count); self.metrics.increment_segments(partitions_count); let shards_count = self.get_available_shards_count(); - for partition_id in partitions.iter().map(|p| p.id()) { + for (partition_id, stats) in partitions.iter().map(|p| (p.id(), p.stats())) { // TODO: Create shard table recordsj. let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, partition_id); let shard_id = calculate_shard_assignment(&ns, shards_count); @@ -130,6 +127,7 @@ impl IggyShard { &self.config.system, ) .await?; + stats.increment_segments_count(1); if is_current_shard { self.init_log(stream_id, topic_id, partition_id).await?; } @@ -137,7 +135,7 @@ impl IggyShard { Ok(partitions) } - async fn init_log( + pub async fn init_log( &self, stream_id: &Identifier, topic_id: &Identifier, @@ -233,6 +231,7 @@ impl IggyShard { self.init_log(stream_id, topic_id, id).await?; } } + Ok(()) } diff --git a/core/server/src/shard/system/personal_access_tokens.rs b/core/server/src/shard/system/personal_access_tokens.rs index 526063b6..62322e5e 100644 --- a/core/server/src/shard/system/personal_access_tokens.rs +++ b/core/server/src/shard/system/personal_access_tokens.rs @@ -100,7 +100,7 @@ impl IggyShard { let token_hash = personal_access_token.token.clone(); let identifier = user_id.try_into()?; let user = self.get_user_mut(&identifier).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") + format!("{COMPONENT} create PAT (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; if user @@ -145,7 +145,7 @@ impl IggyShard { .get_user_mut(&user_id.try_into()?) .with_error_context(|error| { format!( - "{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}" + "{COMPONENT} delete PAT (error: {error}) - failed to get mutable reference to the user with id: {user_id}" ) })?; @@ -166,6 +166,16 @@ impl IggyShard { Ok(()) } + pub fn login_user_pat_event(&self, token: &str, client_id: u32) -> Result<(), IggyError> { + let active_sessions = self.active_sessions.borrow(); + let session = active_sessions + .iter() + .find(|s| s.client_id == client_id) + .expect(format!("At this point session for {}, should exist.", client_id).as_str()); + self.login_with_personal_access_token(token, Some(session))?; + Ok(()) + } + pub fn login_with_personal_access_token( &self, token: &str, diff --git a/core/server/src/shard/system/segments.rs b/core/server/src/shard/system/segments.rs index 5b7d69ff..c9a59254 100644 --- a/core/server/src/shard/system/segments.rs +++ b/core/server/src/shard/system/segments.rs @@ -1,4 +1,7 @@ +use std::error; + use crate::shard::IggyShard; +use crate::streaming; /* Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,24 +19,108 @@ use crate::shard::IggyShard; * specific language governing permissions and limitations * under the License. */ -use super::COMPONENT; use crate::streaming::session::Session; -use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; -use iggy_common::locking::IggyRwLockFn; impl IggyShard { + pub async fn delete_segments_bypass_auth( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + segments_count: u32, + ) -> Result<(), IggyError> { + self.delete_segments_base(stream_id, topic_id, partition_id, segments_count) + .await + } + + pub async fn delete_segments_base( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + segments_count: u32, + ) -> Result<(), IggyError> { + let (segments, storages) = self.streams2.with_partition_by_id_mut( + stream_id, + topic_id, + partition_id, + |(.., log)| { + let upperbound = log.segments().len(); + let begin = upperbound.saturating_sub(segments_count as usize); + let segments = log + .segments_mut() + .drain(begin..upperbound) + .collect::<Vec<_>>(); + let storages = log + .storages_mut() + .drain(begin..upperbound) + .collect::<Vec<_>>(); + let _ = log + .indexes_mut() + .drain(begin..upperbound) + .collect::<Vec<_>>(); + (segments, storages) + }, + ); + let numeric_stream_id = self + .streams2 + .with_stream_by_id(stream_id, streaming::streams::helpers::get_stream_id()); + let numeric_topic_id = self.streams2.with_topic_by_id( + stream_id, + topic_id, + streaming::topics::helpers::get_topic_id(), + ); + + let create_base_segment = segments.len() > 0 && storages.len() > 0; + for (mut storage, segment) in storages.into_iter().zip(segments.into_iter()) { + let (msg_writer, index_writer) = storage.shutdown(); + if let Some(msg_writer) = msg_writer + && let Some(index_writer) = index_writer + { + // We need to fsync before closing to ensure all data is written to disk. + msg_writer.fsync().await?; + index_writer.fsync().await?; + let path = msg_writer.path(); + drop(msg_writer); + drop(index_writer); + compio::fs::remove_file(&path).await.map_err(|_| { + tracing::error!("Failed to delete segment file at path: {}", path); + IggyError::CannotDeleteFile + })?; + } else { + let start_offset = segment.start_offset; + let path = self.config.system.get_segment_path( + numeric_stream_id, + numeric_topic_id, + partition_id, + start_offset, + ); + compio::fs::remove_file(&path).await.map_err(|_| { + tracing::error!("Failed to delete segment file at path: {}", path); + IggyError::CannotDeleteFile + })?; + } + } + + if create_base_segment { + self.init_log(stream_id, topic_id, partition_id).await?; + } + Ok(()) + } pub async fn delete_segments( &self, session: &Session, stream_id: &Identifier, topic_id: &Identifier, - partition_id: u32, + partition_id: usize, segments_count: u32, ) -> Result<(), IggyError> { // Assert authentication. self.ensure_authenticated(session)?; - todo!(); + self.ensure_topic_exists(stream_id, topic_id)?; + self.delete_segments_base(stream_id, topic_id, partition_id, segments_count) + .await } } diff --git a/core/server/src/shard/system/snapshot/mod.rs b/core/server/src/shard/system/snapshot/mod.rs index 658c8a4b..630dd2be 100644 --- a/core/server/src/shard/system/snapshot/mod.rs +++ b/core/server/src/shard/system/snapshot/mod.rs @@ -23,7 +23,7 @@ use crate::shard::IggyShard; use crate::streaming::session::Session; use async_zip::base::write::ZipFileWriter; use async_zip::{Compression, ZipEntryBuilder}; -use compio::fs::{OpenOptions}; +use compio::fs::OpenOptions; use compio::io::{AsyncReadAtExt, AsyncWriteAtExt}; use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression, SystemSnapshotType}; use std::path::PathBuf; diff --git a/core/server/src/shard/system/stats.rs b/core/server/src/shard/system/stats.rs index 1cd674b4..1faa0bc1 100644 --- a/core/server/src/shard/system/stats.rs +++ b/core/server/src/shard/system/stats.rs @@ -18,6 +18,7 @@ use crate::VERSION; use crate::shard::IggyShard; +use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents}; use crate::versioning::SemanticVersion; use iggy_common::locking::IggyRwLockFn; use iggy_common::{IggyDuration, IggyError, Stats}; @@ -90,26 +91,40 @@ impl IggyShard { drop(sys); - //TODO: - /* - for stream in self.streams.borrow().values() { - stats.messages_count += stream.get_messages_count(); - stats.segments_count += stream.get_segments_count(); - stats.messages_size_bytes += stream.get_size(); - stats.streams_count += 1; - stats.topics_count += stream.topics.len() as u32; - stats.partitions_count += stream - .topics - .values() - .map(|t| t.partitions.len() as u32) - .sum::<u32>(); - stats.consumer_groups_count += stream - .topics - .values() - .map(|t| t.consumer_groups.borrow().len() as u32) - .sum::<u32>(); - } - */ + self.streams2.with_components(|stream_components| { + let (stream_roots, stream_stats) = stream_components.into_components(); + // Iterate through all streams + for (stream_id, stream_root) in stream_roots.iter() { + stats.streams_count += 1; + + // Get stream-level stats + if let Some(stream_stat) = stream_stats.get(stream_id) { + stats.messages_count += stream_stat.messages_count_inconsistent(); + stats.segments_count += stream_stat.segments_count_inconsistent(); + stats.messages_size_bytes += stream_stat.size_bytes_inconsistent().into(); + } + + // Access topics within this stream + stream_root.topics().with_components(|topic_components| { + let (topic_roots, ..) = topic_components.into_components(); + stats.topics_count += topic_roots.len() as u32; + + // Iterate through all topics in this stream + for (_, topic_root) in topic_roots.iter() { + // Count partitions in this topic + topic_root + .partitions() + .with_components(|partition_components| { + let (partition_roots, ..) = partition_components.into_components(); + stats.partitions_count += partition_roots.len() as u32; + }); + + // Count consumer groups in this topic + stats.consumer_groups_count += topic_root.consumer_groups().len() as u32; + } + }); + } + }); Ok(stats) } diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index bf0d6024..11e7e00a 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -21,14 +21,11 @@ use crate::shard::IggyShard; use crate::slab::traits_ext::{DeleteCell, EntityMarker, InsertCell}; use crate::streaming::session::Session; -use crate::streaming::stats::stats::StreamStats; use crate::streaming::streams::storage2::{create_stream_file_hierarchy, delete_stream_from_disk}; use crate::streaming::streams::{self, stream2}; use error_set::ErrContext; -use iggy_common::locking::IggyRwLockFn; use iggy_common::{Identifier, IggyError, IggyTimestamp}; -use std::sync::Arc; impl IggyShard { pub async fn create_stream2( @@ -127,12 +124,6 @@ impl IggyShard { .decrement_messages(stats.messages_count_inconsistent()); self.metrics .decrement_segments(stats.segments_count_inconsistent()); - - /* - self.client_manager - .borrow_mut() - .delete_consumer_groups_for_stream(stream_id as u32); - */ stream } @@ -157,36 +148,69 @@ impl IggyShard { ) })?; let mut stream = self.delete_stream2_base(id); + // Clean up consumer groups from ClientManager for this stream + let stream_id_usize = stream.id(); + self.client_manager + .borrow_mut() + .delete_consumer_groups_for_stream(stream_id_usize); delete_stream_from_disk(self.id, &mut stream, &self.config.system).await?; Ok(stream) } - pub async fn purge_stream2(&self, session: &Session, id: &Identifier) -> Result<(), IggyError> { + pub async fn purge_stream2( + &self, + session: &Session, + stream_id: &Identifier, + ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - // self.ensure_stream_exists(id)?; - let get_stream_id = crate::streaming::streams::helpers::get_stream_id(); - let stream_id = self.streams2.with_stream_by_id(id, get_stream_id); - self.permissioner - .borrow() - .purge_stream(session.get_user_id(), stream_id as u32) - .with_error_context(|error| { + self.ensure_stream_exists(stream_id)?; + { + let get_stream_id = crate::streaming::streams::helpers::get_stream_id(); + let stream_id = self.streams2.with_stream_by_id(stream_id, get_stream_id); + self.permissioner + .borrow() + .purge_stream(session.get_user_id(), stream_id as u32) + .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - permission denied to purge stream for user {}, stream ID: {}", session.get_user_id(), stream_id, ) })?; - self.purge_stream2_base(id)?; + } + + //TODO: Tech debt. + let topic_ids = self + .streams2 + .with_stream_by_id(stream_id, streams::helpers::get_topic_ids()); + + // Purge each topic in the stream using bypass auth + for topic_id in topic_ids { + let topic_identifier = Identifier::numeric(topic_id as u32).unwrap(); + self.purge_topic2(session, stream_id, &topic_identifier) + .await?; + } Ok(()) } - pub fn purge_stream2_bypass_auth(&self, stream_id: &Identifier) -> Result<(), IggyError> { - self.purge_stream2_base(stream_id)?; + pub async fn purge_stream2_bypass_auth(&self, stream_id: &Identifier) -> Result<(), IggyError> { + self.purge_stream2_base(stream_id).await?; Ok(()) } - fn purge_stream2_base(&self, _stream_id: &Identifier) -> Result<(), IggyError> { - // TODO + async fn purge_stream2_base(&self, stream_id: &Identifier) -> Result<(), IggyError> { + // Get all topic IDs in the stream + let topic_ids = self + .streams2 + .with_stream_by_id(stream_id, streams::helpers::get_topic_ids()); + + // Purge each topic in the stream using bypass auth + for topic_id in topic_ids { + let topic_identifier = Identifier::numeric(topic_id as u32).unwrap(); + self.purge_topic2_bypass_auth(stream_id, &topic_identifier) + .await?; + } + Ok(()) } } diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index a9cf7d9f..f2995b30 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -19,7 +19,7 @@ use super::COMPONENT; use crate::shard::IggyShard; use crate::shard_info; -use crate::slab::traits_ext::{EntityMarker, InsertCell}; +use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, InsertCell, IntoComponents}; use crate::streaming::session::Session; use crate::streaming::stats::stats::{StreamStats, TopicStats}; use crate::streaming::topics::storage2::{create_topic_file_hierarchy, delete_topic_from_disk}; @@ -29,8 +29,10 @@ use error_set::ErrContext; use iggy_common::{ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, }; +use std::any::Any; use std::str::FromStr; use std::sync::Arc; +use std::u32; impl IggyShard { pub async fn create_topic2( @@ -139,7 +141,7 @@ impl IggyShard { replication_factor: Option<u8>, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - //self.ensure_topic_exists(stream_id, topic_id)?; + self.ensure_topic_exists(stream_id, topic_id)?; { let topic_id_val = self.streams2.with_topic_by_id( stream_id, @@ -247,6 +249,10 @@ impl IggyShard { ) })?; let mut topic = self.delete_topic_base2(stream_id, topic_id); + // Clean up consumer groups from ClientManager for this topic + self.client_manager + .borrow_mut() + .delete_consumer_groups_for_topic(numeric_stream_id, topic.id()); let parent = topic.stats().parent().clone(); // We need to borrow topic as mutable, as we are extracting partitions out of it, in order to close them. let (messages_count, size_bytes, segments_count) = @@ -304,34 +310,54 @@ impl IggyShard { })?; } - self.streams2.with_partitions_mut( + self.streams2.with_partitions( stream_id, topic_id, partitions::helpers::purge_partitions_mem(), ); - self.streams2.with_partitions_mut( + + let (consumer_offset_paths, consumer_group_offset_paths) = self.streams2.with_partitions( stream_id, topic_id, - partitions::helpers::purge_segments_mem(), + partitions::helpers::purge_consumer_offsets(), ); - self.streams2 - .with_topic_by_id_async(stream_id, topic_id, topics::helpers::purge_topic_disk()) - .await; + for path in consumer_offset_paths { + self.delete_consumer_offset_from_disk(&path).await?; + } + for path in consumer_group_offset_paths { + self.delete_consumer_offset_from_disk(&path).await?; + } + + self.purge_topic_base2(stream_id, topic_id).await?; + Ok(()) + } + + pub async fn purge_topic2_bypass_auth( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<(), IggyError> { + self.purge_topic_base2(stream_id, topic_id).await?; Ok(()) } async fn purge_topic_base2( &self, - _stream_id: &Identifier, - _topic_id: &Identifier, + stream_id: &Identifier, + topic_id: &Identifier, ) -> Result<(), IggyError> { - /* - self.streams2 + let part_ids = self + .streams2 .with_partitions(stream_id, topic_id, |partitions| { - //partitions + partitions.with_components(|components| { + let (roots, ..) = components.into_components(); + roots.iter().map(|(_, root)| root.id()).collect::<Vec<_>>() + }) }); - */ - + for part_id in part_ids { + self.delete_segments_bypass_auth(stream_id, topic_id, part_id, u32::MAX) + .await?; + } Ok(()) } } diff --git a/core/server/src/shard/system/users.rs b/core/server/src/shard/system/users.rs index c87c5dbc..a28bfcdb 100644 --- a/core/server/src/shard/system/users.rs +++ b/core/server/src/shard/system/users.rs @@ -162,61 +162,60 @@ impl IggyShard { ) })?; - let user_id = self.create_user_base(username, password, status, permissions)?; - self.get_user(&user_id.try_into()?) + if self + .users + .borrow() + .iter() + .any(|(_, user)| user.username == username) + { + error!("User: {username} already exists."); + return Err(IggyError::UserAlreadyExists); + } + + if self.users.borrow().len() >= MAX_USERS { + error!("Available users limit reached."); + return Err(IggyError::UsersLimitReached); + } + + // TODO: Tech debt, replace with Slab. + USER_ID.fetch_add(1, Ordering::SeqCst); + let current_user_id = USER_ID.load(Ordering::SeqCst); + self.create_user_base(current_user_id, username, password, status, permissions)?; + self.get_user(¤t_user_id.try_into()?) .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get user with id: {user_id}") + format!( + "{COMPONENT} (error: {error}) - failed to get user with id: {current_user_id}" + ) }) } pub fn create_user_bypass_auth( &self, + user_id: u32, username: &str, password: &str, status: UserStatus, permissions: Option<Permissions>, - ) -> Result<u32, IggyError> { - let user_id = self.create_user_base(username, password, status, permissions)?; - Ok(user_id) + ) -> Result<(), IggyError> { + self.create_user_base(user_id, username, password, status, permissions)?; + Ok(()) } fn create_user_base( &self, + user_id: u32, username: &str, password: &str, status: UserStatus, permissions: Option<Permissions>, - ) -> Result<u32, IggyError> { - if self - .users - .borrow() - .iter() - .any(|(_, user)| user.username == username) - { - error!("User: {username} already exists."); - return Err(IggyError::UserAlreadyExists); - } - - if self.users.borrow().len() >= MAX_USERS { - error!("Available users limit reached."); - return Err(IggyError::UsersLimitReached); - } - - let user_id = USER_ID.fetch_add(1, Ordering::SeqCst); - let current_user_id = USER_ID.load(Ordering::SeqCst); - let user = User::new( - current_user_id, - username, - password, - status, - permissions.clone(), - ); + ) -> Result<(), IggyError> { + let user = User::new(user_id, username, password, status, permissions.clone()); self.permissioner .borrow_mut() .init_permissions_for_user(user_id, permissions); self.users.borrow_mut().insert(user.id, user); self.metrics.increment_users(1); - Ok(user_id) + Ok(()) } pub fn delete_user(&self, session: &Session, user_id: &Identifier) -> Result<User, IggyError> { @@ -319,7 +318,7 @@ impl IggyShard { } let mut user = self.get_user_mut(user_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") + format!("{COMPONENT} update user (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; if let Some(username) = username { user.username = username; @@ -389,7 +388,7 @@ impl IggyShard { { let mut user = self.get_user_mut(user_id).with_error_context(|error| { format!( - "{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}" + "{COMPONENT} update user permissions (error: {error}) - failed to get mutable reference to the user with id: {user_id}" ) })?; user.permissions = permissions; @@ -438,7 +437,7 @@ impl IggyShard { new_password: &str, ) -> Result<(), IggyError> { let mut user = self.get_user_mut(user_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get mutable reference to the user with id: {user_id}") + format!("{COMPONENT} change password (error: {error}) - failed to get mutable reference to the user with id: {user_id}") })?; if !crypto::verify_password(current_password, &user.password) { error!( diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index d7e725b1..afaa6da4 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -89,6 +89,7 @@ pub enum ShardEvent { polling_consumer: PollingConsumer, }, CreatedUser { + user_id: u32, username: String, password: String, status: UserStatus, @@ -127,12 +128,31 @@ pub enum ShardEvent { name: String, }, LoginWithPersonalAccessToken { + client_id: u32, token: String, }, + DeletedSegments { + stream_id: Identifier, + topic_id: Identifier, + partition_id: usize, + segments_count: u32, + }, NewSession { address: SocketAddr, transport: TransportProtocol, }, + JoinedConsumerGroup { + client_id: u32, + stream_id: Identifier, + topic_id: Identifier, + group_id: Identifier, + }, + LeftConsumerGroup { + client_id: u32, + stream_id: Identifier, + topic_id: Identifier, + group_id: Identifier, + }, TcpBound { address: SocketAddr, }, diff --git a/core/server/src/slab/consumer_groups.rs b/core/server/src/slab/consumer_groups.rs index 21e60d36..683c3a0e 100644 --- a/core/server/src/slab/consumer_groups.rs +++ b/core/server/src/slab/consumer_groups.rs @@ -3,7 +3,7 @@ use crate::{ Keyed, consumer_groups, traits_ext::{ Borrow, ComponentsById, Delete, EntityComponentSystem, EntityComponentSystemMut, - Insert, IntoComponents, IntoComponentsById, + EntityMarker, Insert, IntoComponents, IntoComponentsById, }, }, streaming::topics::consumer_group2::{self, ConsumerGroupRef, ConsumerGroupRefMut}, @@ -37,6 +37,8 @@ impl Insert for ConsumerGroups { "consumer_group: id mismatch when inserting members" ); self.index.insert(key, entity_id); + let root = self.root.get_mut(entity_id).unwrap(); + root.update_id(entity_id); entity_id } } @@ -46,12 +48,12 @@ impl Delete for ConsumerGroups { type Item = consumer_group2::ConsumerGroup; fn delete(&mut self, id: Self::Idx) -> Self::Item { - let (name, partitions) = self.root.remove(id).disarray(); - let members = self.members.remove(id).into_inner(); + let root = self.root.remove(id); + let members = self.members.remove(id); self.index - .remove(&name) + .remove(root.key()) .expect("consumer_group_delete: key not found"); - consumer_group2::ConsumerGroup::new(name, members, partitions) + consumer_group2::ConsumerGroup::new_with_components(root, members) } } @@ -113,6 +115,10 @@ impl ConsumerGroups { } } + pub fn len(&self) -> usize { + self.root.len() + } + pub fn get_index(&self, id: &Identifier) -> usize { match id.kind { iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as usize, diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index 34ca117d..678846c3 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -96,6 +96,8 @@ impl Insert for Partitions { entity_id, id, "partition_insert: id mismatch when creating consumer_group_offset" ); + let root = self.root.get_mut(entity_id).unwrap(); + root.update_id(entity_id); entity_id } } @@ -105,7 +107,23 @@ impl Delete for Partitions { type Item = Partition; fn delete(&mut self, id: Self::Idx) -> Self::Item { - todo!() + let root = self.root.remove(id); + let stats = self.stats.remove(id); + let message_deduplicator = self.message_deduplicator.remove(id); + let offset = self.offset.remove(id); + let consumer_offset = self.consumer_offset.remove(id); + let consumer_group_offset = self.consumer_group_offset.remove(id); + let log = self.log.remove(id); + + Partition::new_with_components( + root, + stats, + message_deduplicator, + offset, + consumer_offset, + consumer_group_offset, + log, + ) } } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 02ab2850..d733e04e 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -112,8 +112,21 @@ impl DeleteCell for Streams { type Idx = ContainerId; type Item = stream2::Stream; - fn delete(&self, _id: Self::Idx) -> Self::Item { - todo!() + fn delete(&self, id: Self::Idx) -> Self::Item { + let mut root_container = self.root.borrow_mut(); + let mut indexes = self.index.borrow_mut(); + let mut stats_container = self.stats.borrow_mut(); + + let root = root_container.remove(id); + let stats = stats_container.remove(id); + + // Remove from index + let key = root.key(); + indexes + .remove(key) + .expect("stream_delete: key not found in index"); + + stream2::Stream::new_with_components(root, stats) } } @@ -718,7 +731,8 @@ impl Streams { }); let (log_writer, index_writer) = self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.active_storage_mut().shutdown() + let (msg, index) = log.active_storage_mut().shutdown(); + (msg.unwrap(), index.unwrap()) }); compio::runtime::spawn(async move { diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs index 549a2704..7508d675 100644 --- a/core/server/src/slab/topics.rs +++ b/core/server/src/slab/topics.rs @@ -73,8 +73,22 @@ impl DeleteCell for Topics { type Item = topic2::Topic; fn delete(&self, id: Self::Idx) -> Self::Item { - // TODO: don't forget to remoev from the index - todo!() + let mut root_container = self.root.borrow_mut(); + let mut auxilaries = self.auxilaries.borrow_mut(); + let mut indexes = self.index.borrow_mut(); + let mut stats_container = self.stats.borrow_mut(); + + let root = root_container.remove(id); + let auxilary = auxilaries.remove(id); + let stats = stats_container.remove(id); + + // Remove from index + let key = root.key(); + indexes + .remove(key) + .expect("topic_delete: key not found in index"); + + topic2::Topic::new_with_components(root, auxilary, stats) } } diff --git a/core/server/src/slab/traits_ext.rs b/core/server/src/slab/traits_ext.rs index 0db207cf..949971fa 100644 --- a/core/server/src/slab/traits_ext.rs +++ b/core/server/src/slab/traits_ext.rs @@ -158,7 +158,7 @@ type MappingByIdMut<'a, E, T> = pub type Components<T> = <T as IntoComponents>::Components; pub type ComponentsById<'a, T> = <T as IntoComponentsById>::Output; -// TODO: +// TODO: // I've figured there is actually and ergonomic improvement that can be made here. // Observe that the chain of constraints put on the `EntityRef` type is actually wrong. // We constraint the `EntityRef` to be IntoComponents + IntoComponentsById, @@ -173,7 +173,7 @@ pub type ComponentsById<'a, T> = <T as IntoComponentsById>::Output; // Maybe lets not go this way with the tuple mapping madness, it already is pretty difficult to distinguish between all of the different components, // and everytime we add a new component to an entity, we need to update the tuple type everywhere. -// Better idea would be to use the `EntityRef` type directly inside of the `with_components_by_id` closure +// Better idea would be to use the `EntityRef` type directly inside of the `with_components_by_id` closure // -- f(components.into_components_by_id(id)) -> components.into_components_by_id(id) would return `EntityRef`, rather than the tuple. pub trait EntityComponentSystem<T> where diff --git a/core/server/src/streaming/clients/client_manager.rs b/core/server/src/streaming/clients/client_manager.rs index 8e41b69c..9fad13c5 100644 --- a/core/server/src/streaming/clients/client_manager.rs +++ b/core/server/src/streaming/clients/client_manager.rs @@ -23,7 +23,6 @@ use iggy_common::IggyError; use iggy_common::IggyTimestamp; use iggy_common::TransportProtocol; use iggy_common::UserId; -use std::fmt::{Display, Formatter}; use std::net::SocketAddr; use std::rc::Rc; @@ -119,7 +118,7 @@ impl ClientManager { } pub fn delete_client(&mut self, client_id: u32) -> Option<Client> { - if let Some(mut client) = self.clients.remove(&client_id) { + if let Some(client) = self.clients.remove(&client_id) { client.session.clear_user_id(); Some(client) } else { @@ -130,10 +129,13 @@ impl ClientManager { pub fn join_consumer_group( &mut self, client_id: u32, - stream_id: u32, - topic_id: u32, - group_id: u32, + stream_id: usize, + topic_id: usize, + group_id: usize, ) -> Result<(), IggyError> { + let stream_id = stream_id as u32; + let topic_id = topic_id as u32; + let group_id = group_id as u32; let client = self.clients.get_mut(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); @@ -159,15 +161,18 @@ impl ClientManager { pub fn leave_consumer_group( &mut self, client_id: u32, - stream_id: u32, - topic_id: u32, - consumer_group_id: u32, + stream_id: usize, + topic_id: usize, + consumer_group_id: usize, ) -> Result<(), IggyError> { + let stream_id = stream_id as u32; + let topic_id = topic_id as u32; + let consumer_group_id = consumer_group_id as u32; let client = self.clients.get_mut(&client_id); if client.is_none() { return Err(IggyError::ClientNotFound(client_id)); } - let mut client = client.unwrap(); + let client = client.unwrap(); for (index, consumer_group) in client.consumer_groups.iter().enumerate() { if consumer_group.stream_id == stream_id && consumer_group.topic_id == topic_id @@ -180,7 +185,21 @@ impl ClientManager { Ok(()) } - pub fn delete_consumer_groups_for_stream(&mut self, stream_id: u32) { + pub fn delete_consumer_group(&mut self, stream_id: usize, topic_id: usize, group_id: usize) { + let stream_id = stream_id as u32; + let topic_id = topic_id as u32; + let group_id = group_id as u32; + for client in self.clients.values_mut() { + client.consumer_groups.retain(|consumer_group| { + !(consumer_group.stream_id == stream_id + && consumer_group.topic_id == topic_id + && consumer_group.group_id == group_id) + }); + } + } + + pub fn delete_consumer_groups_for_stream(&mut self, stream_id: usize) { + let stream_id = stream_id as u32; for client in self.clients.values_mut() { let indexes_to_remove = client .consumer_groups @@ -200,7 +219,9 @@ impl ClientManager { } } - pub fn delete_consumer_groups_for_topic(&mut self, stream_id: u32, topic_id: u32) { + pub fn delete_consumer_groups_for_topic(&mut self, stream_id: usize, topic_id: usize) { + let stream_id = stream_id as u32; + let topic_id = topic_id as u32; for client in self.clients.values_mut() { let indexes_to_remove = client .consumer_groups diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index f9baf62e..1062a6dc 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -62,12 +62,49 @@ pub fn insert_partition( move |partitions| partitions.insert(partition) } -pub fn purge_partitions_mem() -> impl FnOnce(&mut Partitions) { +pub fn purge_partitions_mem() -> impl FnOnce(&Partitions) { |partitions| { partitions.with_components(|components| { - let (_root, _stats, _deduplicator, _offset, _consumer_offset, _cg_offset, _log) = - components.into_components(); - // TODO: Implement purge logic + let (.., stats, _, offsets, _, _, _) = components.into_components(); + for (offset, stat) in offsets + .iter() + .map(|(_, o)| o) + .zip(stats.iter().map(|(_, s)| s)) + { + offset.store(0, Ordering::Relaxed); + stat.zero_out_all(); + } + }) + } +} + +pub fn purge_consumer_offsets() -> impl FnOnce(&Partitions) -> (Vec<String>, Vec<String>) { + |partitions| { + partitions.with_components(|components| { + let (.., consumer_offsets, cg_offsets, _) = components.into_components(); + + let mut consumer_offset_paths = Vec::new(); + let mut consumer_group_offset_paths = Vec::new(); + + // Collect paths and clear consumer offsets + for (_, consumer_offset) in consumer_offsets { + let hdl = consumer_offset.pin(); + for item in hdl.values() { + consumer_offset_paths.push(item.path.clone()); + } + hdl.clear(); // Clear the hashmap + } + + // Collect paths and clear consumer group offsets + for (_, cg_offset) in cg_offsets { + let hdl = cg_offset.pin(); + for item in hdl.values() { + consumer_group_offset_paths.push(item.path.clone()); + } + hdl.clear(); // Clear the hashmap + } + + (consumer_offset_paths, consumer_group_offset_paths) }) } } @@ -119,13 +156,13 @@ pub fn store_consumer_offset( pub fn delete_consumer_offset( id: usize, -) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> { +) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<String, IggyError> { move |(.., offsets, _, _)| { - offsets - .pin() + let hdl = offsets.pin(); + let offset = hdl .remove(&id) - .map(|_| ()) - .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id)) + .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))?; + Ok(offset.path.clone()) } } @@ -180,13 +217,13 @@ pub fn store_consumer_group_member_offset( pub fn delete_consumer_group_member_offset( id: usize, -) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> { +) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<String, IggyError> { move |(.., offsets, _)| { - offsets - .pin() + let hdl = offsets.pin(); + let offset = hdl .remove(&id) - .map(|_| ()) - .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id)) + .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))?; + Ok(offset.path.clone()) } } @@ -218,13 +255,6 @@ pub fn delete_consumer_group_member_offset_from_disk( } } -pub fn purge_segments_mem() -> impl FnOnce(&mut Partitions) { - |_partitions| { - // TODO: - //partitions.segments_mut() - } -} - pub fn create_message_deduplicator(config: &SystemConfig) -> Option<MessageDeduplicator> { if !config.message_deduplication.enabled { return None; @@ -668,14 +698,14 @@ pub fn append_to_journal( let batch_messages_size = batch.size(); let batch_messages_count = batch.count(); + stats.increment_size_bytes(batch_messages_size as u64); + stats.increment_messages_count(batch_messages_count as u64); + segment.end_timestamp = batch.last_timestamp().unwrap(); segment.end_offset = batch.last_offset().unwrap(); let (journal_messages_count, journal_size) = log.journal_mut().append(shard_id, batch)?; - stats.increment_messages_count(batch_messages_count as u64); - stats.increment_size_bytes(batch_messages_size as u64); - let last_offset = if batch_messages_count == 0 { current_offset } else { @@ -802,14 +832,12 @@ pub fn update_index_and_increment_stats( batch_count: u32, config: &SystemConfig, ) -> impl FnOnce(ComponentsById<PartitionRefMut>) { - move |(_, stats, .., log)| { + move |(.., log)| { let segment = log.active_segment_mut(); segment.size += saved.as_bytes_u32(); log.active_indexes_mut().unwrap().mark_saved(); if config.segment.cache_indexes == CacheIndexesConfig::None { log.active_indexes_mut().unwrap().clear(); } - stats.increment_size_bytes(saved.as_bytes_u64()); - stats.increment_messages_count(batch_count as u64); } } diff --git a/core/server/src/streaming/partitions/log.rs b/core/server/src/streaming/partitions/log.rs index ebee527e..3ff049e9 100644 --- a/core/server/src/streaming/partitions/log.rs +++ b/core/server/src/streaming/partitions/log.rs @@ -56,6 +56,14 @@ where &self.segments } + pub fn segments_mut(&mut self) -> &mut Vec<Segment2> { + &mut self.segments + } + + pub fn storages_mut(&mut self) -> &mut Vec<Storage> { + &mut self.storage + } + pub fn storages(&self) -> &Vec<Storage> { &self.storage } @@ -88,6 +96,10 @@ where &self.indexes } + pub fn indexes_mut(&mut self) -> &mut Vec<Option<IggyIndexesMut>> { + &mut self.indexes + } + pub fn active_indexes(&self) -> Option<&IggyIndexesMut> { self.indexes .last() diff --git a/core/server/src/streaming/partitions/partition2.rs b/core/server/src/streaming/partitions/partition2.rs index 92d8f738..6c85dfb0 100644 --- a/core/server/src/streaming/partitions/partition2.rs +++ b/core/server/src/streaming/partitions/partition2.rs @@ -117,6 +117,26 @@ impl Partition { } } + pub fn new_with_components( + root: PartitionRoot, + stats: Arc<PartitionStats>, + message_deduplicator: Option<MessageDeduplicator>, + offset: Arc<AtomicU64>, + consumer_offset: Arc<ConsumerOffsets>, + consumer_group_offset: Arc<ConsumerGroupOffsets>, + log: SegmentedLog<MemoryMessageJournal>, + ) -> Self { + Self { + root, + stats, + message_deduplicator, + offset, + consumer_offset, + consumer_group_offset, + log, + } + } + pub fn stats(&self) -> &PartitionStats { &self.stats } diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index becc7aea..da1b5adc 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -131,6 +131,10 @@ impl MessagesWriter { Ok(IggyByteSize::from(messages_size as u64)) } + pub fn path(&self) -> String { + self.file_path.clone() + } + pub async fn fsync(&self) -> Result<(), IggyError> { self.file .sync_all() diff --git a/core/server/src/streaming/segments/storage.rs b/core/server/src/streaming/segments/storage.rs index 249b67eb..84fcd783 100644 --- a/core/server/src/streaming/segments/storage.rs +++ b/core/server/src/streaming/segments/storage.rs @@ -1,10 +1,8 @@ use iggy_common::IggyError; use std::rc::Rc; use std::sync::atomic::AtomicU64; -use tracing::warn; use crate::configs::system::SystemConfig; -use crate::shard_warn; use crate::streaming::segments::{ indexes::{IndexReader, IndexWriter}, messages::{MessagesReader, MessagesWriter}, @@ -51,9 +49,9 @@ impl Storage { }) } - pub fn shutdown(&mut self) -> (MessagesWriter, IndexWriter) { - let messages_writer = self.messages_writer.take().unwrap(); - let index_writer = self.index_writer.take().unwrap(); + pub fn shutdown(&mut self) -> (Option<MessagesWriter>, Option<IndexWriter>) { + let messages_writer = self.messages_writer.take(); + let index_writer = self.index_writer.take(); (messages_writer, index_writer) } } diff --git a/core/server/src/streaming/stats/stats.rs b/core/server/src/streaming/stats/stats.rs index 20933c42..67c464f3 100644 --- a/core/server/src/streaming/stats/stats.rs +++ b/core/server/src/streaming/stats/stats.rs @@ -50,6 +50,24 @@ impl StreamStats { pub fn segments_count_inconsistent(&self) -> u32 { self.segments_count.load(Ordering::Relaxed) } + + pub fn zero_out_size_bytes(&self) { + self.size_bytes.store(0, Ordering::Relaxed); + } + + pub fn zero_out_messages_count(&self) { + self.messages_count.store(0, Ordering::Relaxed); + } + + pub fn zero_out_segments_count(&self) { + self.segments_count.store(0, Ordering::Relaxed); + } + + pub fn zero_out_all(&self) { + self.zero_out_size_bytes(); + self.zero_out_messages_count(); + self.zero_out_segments_count(); + } } #[derive(Default, Debug)] @@ -143,6 +161,43 @@ impl TopicStats { pub fn segments_count_inconsistent(&self) -> u32 { self.segments_count.load(Ordering::Relaxed) } + + pub fn zero_out_parent_size_bytes(&self) { + self.parent.zero_out_size_bytes(); + } + + pub fn zero_out_parent_messages_count(&self) { + self.parent.zero_out_messages_count(); + } + + pub fn zero_out_parent_segments_count(&self) { + self.parent.zero_out_segments_count(); + } + + pub fn zero_out_parent_all(&self) { + self.parent.zero_out_all(); + } + + pub fn zero_out_size_bytes(&self) { + self.size_bytes.store(0, Ordering::Relaxed); + self.zero_out_parent_size_bytes(); + } + + pub fn zero_out_messages_count(&self) { + self.messages_count.store(0, Ordering::Relaxed); + self.zero_out_parent_messages_count(); + } + + pub fn zero_out_segments_count(&self) { + self.segments_count.store(0, Ordering::Relaxed); + self.zero_out_parent_segments_count(); + } + + pub fn zero_out_all(&self) { + self.zero_out_size_bytes(); + self.zero_out_messages_count(); + self.zero_out_segments_count(); + } } #[derive(Default, Debug)] @@ -236,4 +291,41 @@ impl PartitionStats { pub fn segments_count_inconsistent(&self) -> u32 { self.segments_count.load(Ordering::Relaxed) } + + pub fn zero_out_parent_size_bytes(&self) { + self.parent.zero_out_size_bytes(); + } + + pub fn zero_out_parent_messages_count(&self) { + self.parent.zero_out_messages_count(); + } + + pub fn zero_out_parent_segments_count(&self) { + self.parent.zero_out_segments_count(); + } + + pub fn zero_out_parent_all(&self) { + self.parent.zero_out_all(); + } + + pub fn zero_out_size_bytes(&self) { + self.size_bytes.store(0, Ordering::Relaxed); + self.zero_out_parent_size_bytes(); + } + + pub fn zero_out_messages_count(&self) { + self.messages_count.store(0, Ordering::Relaxed); + self.zero_out_parent_messages_count(); + } + + pub fn zero_out_segments_count(&self) { + self.segments_count.store(0, Ordering::Relaxed); + self.zero_out_parent_segments_count(); + } + + pub fn zero_out_all(&self) { + self.zero_out_size_bytes(); + self.zero_out_messages_count(); + self.zero_out_segments_count(); + } } diff --git a/core/server/src/streaming/streams/helpers.rs b/core/server/src/streaming/streams/helpers.rs index f71027e6..73afc612 100644 --- a/core/server/src/streaming/streams/helpers.rs +++ b/core/server/src/streaming/streams/helpers.rs @@ -2,7 +2,7 @@ use crate::{ configs::system::SystemConfig, slab::{ streams, - traits_ext::{ComponentsById, EntityComponentSystem}, + traits_ext::{ComponentsById, EntityComponentSystem, IntoComponents}, }, streaming::{ partitions, @@ -25,6 +25,18 @@ pub fn update_stream_name(name: String) -> impl FnOnce(ComponentsById<StreamRefM } } +pub fn get_topic_ids() -> impl FnOnce(ComponentsById<StreamRef>) -> Vec<usize> { + |(root, _)| { + root.topics().with_components(|components| { + let (topic_roots, ..) = components.into_components(); + topic_roots + .iter() + .map(|(_, topic)| topic.id()) + .collect::<Vec<_>>() + }) + } +} + pub fn store_consumer_offset( consumer_id: usize, topic_id: &Identifier, diff --git a/core/server/src/streaming/streams/stream2.rs b/core/server/src/streaming/streams/stream2.rs index 098ed8fc..d637e29d 100644 --- a/core/server/src/streaming/streams/stream2.rs +++ b/core/server/src/streaming/streams/stream2.rs @@ -110,6 +110,10 @@ impl Stream { Self { root, stats } } + pub fn new_with_components(root: StreamRoot, stats: Arc<StreamStats>) -> Self { + Self { root, stats } + } + pub fn stats(&self) -> &Arc<StreamStats> { &self.stats } diff --git a/core/server/src/streaming/topics/consumer_group2.rs b/core/server/src/streaming/topics/consumer_group2.rs index ef690087..0236d3da 100644 --- a/core/server/src/streaming/topics/consumer_group2.rs +++ b/core/server/src/streaming/topics/consumer_group2.rs @@ -53,6 +53,10 @@ impl ConsumerGroupRoot { &self.partitions } + pub fn update_id(&mut self, id: usize) { + self.id = id; + } + pub fn assign_partitions(&mut self, partitions: Vec<usize>) { self.partitions = partitions; } @@ -171,6 +175,14 @@ impl ConsumerGroup { let members = ConsumerGroupMembers { inner: members }; Self { root, members } } + + pub fn new_with_components(root: ConsumerGroupRoot, members: ConsumerGroupMembers) -> Self { + Self { root, members } + } + + pub fn members(&self) -> &ConsumerGroupMembers { + &self.members + } } #[derive(Debug)] diff --git a/core/server/src/streaming/topics/helpers.rs b/core/server/src/streaming/topics/helpers.rs index 57122b31..222aa20b 100644 --- a/core/server/src/streaming/topics/helpers.rs +++ b/core/server/src/streaming/topics/helpers.rs @@ -66,10 +66,6 @@ pub fn delete_topic(topic_id: &Identifier) -> impl FnOnce(&Topics) -> Topic { } } -pub fn purge_topic_disk() -> impl AsyncFnOnce(ComponentsById<TopicRef>) { - async |(root, ..)| {} -} - pub fn exists(identifier: &Identifier) -> impl FnOnce(&Topics) -> bool { |topics| topics.exists(identifier) } @@ -238,6 +234,10 @@ fn assign_partitions_to_members( .iter_mut() .for_each(|(_, member)| member.partitions.clear()); let count = members.len(); + if count == 0 { + return; + } + for (idx, partition) in partitions.iter().enumerate() { let position = idx % count; let member = &mut members[position]; @@ -251,6 +251,7 @@ fn assign_partitions_to_members( ); } } + fn mimic_members(members: &Slab<Member>) -> Slab<Member> { let mut container = Slab::with_capacity(members.len()); for (_, member) in members { diff --git a/core/server/src/streaming/topics/topic2.rs b/core/server/src/streaming/topics/topic2.rs index e2ae830c..64d4fc69 100644 --- a/core/server/src/streaming/topics/topic2.rs +++ b/core/server/src/streaming/topics/topic2.rs @@ -89,6 +89,17 @@ impl Topic { stats, } } + pub fn new_with_components( + root: TopicRoot, + auxilary: TopicAuxilary, + stats: Arc<TopicStats>, + ) -> Self { + Self { + root, + auxilary, + stats, + } + } pub fn root(&self) -> &TopicRoot { &self.root
