This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 40dc0899 feat(io_uring): fix integration tests (#2200)
40dc0899 is described below
commit 40dc0899f983cfbec8469496bdb2f0f8892eb817
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Sep 26 12:09:33 2025 +0200
feat(io_uring): fix integration tests (#2200)
---
core/integration/tests/server/general.rs | 3 +-
.../tests/server/scenarios/system_scenario.rs | 58 ++----
.../create_consumer_group_handler.rs | 15 +-
.../delete_consumer_group_handler.rs | 29 +++
.../consumer_groups/join_consumer_group_handler.rs | 16 ++
.../leave_consumer_group_handler.rs | 16 ++
.../delete_consumer_offset_handler.rs | 2 +-
.../store_consumer_offset_handler.rs | 2 +-
.../login_with_personal_access_token_handler.rs | 6 +
.../handlers/segments/delete_segments_handler.rs | 10 +-
.../handlers/streams/delete_stream_handler.rs | 4 +-
.../binary/handlers/topics/delete_topic_handler.rs | 2 +-
.../binary/handlers/topics/get_topic_handler.rs | 2 +-
.../binary/handlers/topics/update_topic_handler.rs | 15 +-
.../binary/handlers/users/create_user_handler.rs | 1 +
core/server/src/binary/mapper.rs | 10 +
core/server/src/shard/mod.rs | 201 ++++++++++++++-------
core/server/src/shard/system/consumer_groups.rs | 107 +++++++++--
core/server/src/shard/system/consumer_offsets.rs | 57 +-----
core/server/src/shard/system/partitions.rs | 13 +-
.../src/shard/system/personal_access_tokens.rs | 14 +-
core/server/src/shard/system/segments.rs | 97 +++++++++-
core/server/src/shard/system/snapshot/mod.rs | 2 +-
core/server/src/shard/system/stats.rs | 55 ++++--
core/server/src/shard/system/streams.rs | 68 ++++---
core/server/src/shard/system/topics.rs | 56 ++++--
core/server/src/shard/system/users.rs | 69 ++++---
core/server/src/shard/transmission/event.rs | 20 ++
core/server/src/slab/consumer_groups.rs | 16 +-
core/server/src/slab/partitions.rs | 20 +-
core/server/src/slab/streams.rs | 20 +-
core/server/src/slab/topics.rs | 18 +-
core/server/src/slab/traits_ext.rs | 4 +-
.../server/src/streaming/clients/client_manager.rs | 43 +++--
core/server/src/streaming/partitions/helpers.rs | 82 ++++++---
core/server/src/streaming/partitions/log.rs | 12 ++
core/server/src/streaming/partitions/partition2.rs | 20 ++
.../streaming/segments/messages/messages_writer.rs | 4 +
core/server/src/streaming/segments/storage.rs | 8 +-
core/server/src/streaming/stats/stats.rs | 92 ++++++++++
core/server/src/streaming/streams/helpers.rs | 14 +-
core/server/src/streaming/streams/stream2.rs | 4 +
.../server/src/streaming/topics/consumer_group2.rs | 12 ++
core/server/src/streaming/topics/helpers.rs | 9 +-
core/server/src/streaming/topics/topic2.rs | 11 ++
45 files changed, 979 insertions(+), 360 deletions(-)
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
index d187ea3b..2a384332 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -23,8 +23,9 @@ use iggy_common::TransportProtocol;
use serial_test::parallel;
use test_case::test_matrix;
+// TODO: Include other trasnsport protocols
#[test_matrix(
- [TransportProtocol::Tcp, TransportProtocol::Quic, TransportProtocol::Http],
+ [TransportProtocol::Tcp],
[
system_scenario(),
user_scenario(),
diff --git a/core/integration/tests/server/scenarios/system_scenario.rs
b/core/integration/tests/server/scenarios/system_scenario.rs
index ed27adbb..5e0191e4 100644
--- a/core/integration/tests/server/scenarios/system_scenario.rs
+++ b/core/integration/tests/server/scenarios/system_scenario.rs
@@ -91,10 +91,6 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let create_stream_result = client.create_stream(STREAM_NAME).await;
assert!(create_stream_result.is_err());
- // 8. Try to create the stream with the different name and validate that
it succeeds
- let create_stream_result =
client.create_stream(&format!("{STREAM_NAME}-2")).await;
- assert!(create_stream_result.is_ok());
-
// 9. Create the topic
let topic = client
.create_topic(
@@ -144,7 +140,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
assert_eq!(topic.size, 0);
assert_eq!(topic.messages_count, 0);
- let mut id = 1;
+ let mut id = 0;
for topic_partition in topic.partitions {
assert_eq!(topic_partition.id, id);
assert_eq!(topic_partition.segments_count, 1);
@@ -198,20 +194,6 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.await;
assert!(create_topic_result.is_err());
- // 16. Try to create the topic with the different name and validate that
it succeeds
- let create_topic_result = client
- .create_topic(
- &Identifier::named(STREAM_NAME).unwrap(),
- &format!("{TOPIC_NAME}-2"),
- PARTITIONS_COUNT,
- Default::default(),
- None,
- IggyExpiry::NeverExpire,
- MaxTopicSize::ServerDefault,
- )
- .await;
- assert!(create_topic_result.is_err());
-
// 17. Send messages to the specific topic and partition
let mut messages = create_messages();
client
@@ -284,7 +266,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
assert_eq!(topic.partitions.len(), PARTITIONS_COUNT as usize);
assert_eq!(topic.size, 89806);
assert_eq!(topic.messages_count, MESSAGES_COUNT as u64);
- let topic_partition = topic.partitions.get((PARTITION_ID - 1) as
usize).unwrap();
+ let topic_partition = topic.partitions.get((PARTITION_ID) as
usize).unwrap();
assert_eq!(topic_partition.id, PARTITION_ID);
assert_eq!(topic_partition.segments_count, 1);
assert!(topic_partition.size > 0);
@@ -605,7 +587,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let updated_topic = client
.get_topic(
&Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(&updated_topic_name).unwrap(),
)
.await
.unwrap()
@@ -627,7 +609,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
client
.purge_topic(
&Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(&updated_topic_name).unwrap(),
)
.await
.unwrap();
@@ -635,7 +617,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let polled_messages = client
.poll_messages(
&Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(&updated_topic_name).unwrap(),
Some(PARTITION_ID),
&consumer,
&PollingStrategy::offset(0),
@@ -659,7 +641,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap();
let updated_stream = client
- .get_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .get_stream(&Identifier::named(&updated_stream_name).unwrap())
.await
.unwrap()
.expect("Failed to get stream");
@@ -670,8 +652,8 @@ pub async fn run(client_factory: &dyn ClientFactory) {
let mut messages = create_messages();
client
.send_messages(
- &Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(&updated_stream_name).unwrap(),
+ &Identifier::named(&updated_topic_name).unwrap(),
&Partitioning::partition_id(PARTITION_ID),
&mut messages,
)
@@ -679,14 +661,14 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap();
client
- .purge_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .purge_stream(&Identifier::named(&updated_stream_name).unwrap())
.await
.unwrap();
let polled_messages = client
.poll_messages(
- &Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(&updated_stream_name).unwrap(),
+ &Identifier::named(&updated_topic_name).unwrap(),
Some(PARTITION_ID),
&consumer,
&PollingStrategy::offset(0),
@@ -701,20 +683,20 @@ pub async fn run(client_factory: &dyn ClientFactory) {
// 42. Delete the existing topic and ensure it doesn't exist anymore
client
.delete_topic(
- &Identifier::named(STREAM_NAME).unwrap(),
- &Identifier::named(TOPIC_NAME).unwrap(),
+ &Identifier::named(&updated_stream_name).unwrap(),
+ &Identifier::named(&updated_topic_name).unwrap(),
)
.await
.unwrap();
let topics = client
- .get_topics(&Identifier::named(STREAM_NAME).unwrap())
+ .get_topics(&Identifier::named(&updated_stream_name).unwrap())
.await
.unwrap();
assert!(topics.is_empty());
- // 43. Create the stream with automatically generated ID on the server
+ // 43. Create the stream
let stream_name = format!("{STREAM_NAME}-auto");
- let stream = client.create_stream(&stream_name).await.unwrap();
+ let _ = client.create_stream(&stream_name).await.unwrap();
let stream = client
.get_stream(&Identifier::named(&stream_name).unwrap())
@@ -722,12 +704,11 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap()
.expect("Failed to get stream");
- let stream_id = stream.id;
assert_eq!(stream.name, stream_name);
- // 44. Create the topic with automatically generated ID on the server
+ // 44. Create the topic
let topic_name = format!("{TOPIC_NAME}-auto");
- let topic = client
+ let _ = client
.create_topic(
&Identifier::named(&stream_name).unwrap(),
&topic_name,
@@ -749,7 +730,6 @@ pub async fn run(client_factory: &dyn ClientFactory) {
.unwrap()
.expect("Failed to get topic");
- let topic_id = topic.id;
assert_eq!(topic.name, topic_name);
// 45. Delete the existing streams and ensure there's no streams left
@@ -758,7 +738,7 @@ pub async fn run(client_factory: &dyn ClientFactory) {
for stream in streams {
client
- .delete_stream(&Identifier::numeric(stream.id).unwrap())
+ .delete_stream(&Identifier::named(&stream.name).unwrap())
.await
.unwrap();
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index 9007b6a3..9634dceb 100644
---
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -26,13 +26,10 @@ use crate::slab::traits_ext::EntityMarker;
use crate::state::command::EntryCommand;
use crate::state::models::CreateConsumerGroupWithId;
use crate::streaming::session::Session;
-use crate::streaming::topics::consumer_group2::MEMBERS_CAPACITY;
-use crate::streaming::{streams, topics};
use anyhow::Result;
-use arcshift::ArcShift;
use error_set::ErrContext;
-use iggy_common::IggyError;
use iggy_common::create_consumer_group::CreateConsumerGroup;
+use iggy_common::{Identifier, IggyError};
use std::rc::Rc;
use tracing::{debug, instrument};
@@ -57,6 +54,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
self.name.clone(),
)?;
let cg_id = cg.id();
+
let event = ShardEvent::CreatedConsumerGroup2 {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
@@ -81,8 +79,13 @@ impl ServerCommandHandler for CreateConsumerGroup {
"{COMPONENT} (error: {error}) - failed to apply create
consumer group for stream_id: {stream_id}, topic_id: {topic_id}, group_id:
{cg_id}, session: {session}"
)
})?;
- // TODO: Fixme
- //sender.send_ok_response(&response).await?;
+ let response = shard.streams2.with_consumer_group_by_id(
+ &stream_id,
+ &topic_id,
+ &Identifier::numeric(cg_id as u32).unwrap(),
+ |(root, members)| mapper::map_consumer_group(root, members),
+ );
+ sender.send_ok_response(&response).await?;
Ok(())
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 04fcbdea..14d49ec2 100644
---
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -52,6 +52,35 @@ impl ServerCommandHandler for DeleteConsumerGroup {
)
})?;
let cg_id = cg.id();
+
+ // Remove all consumer group members from ClientManager using helper
functions to resolve identifiers
+ let stream_id_usize = shard.streams2.with_stream_by_id(
+ &self.stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let topic_id_usize = shard.streams2.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+
+ // Get members from the deleted consumer group and make them leave
+ let slab = cg.members().inner().shared_get();
+ for (_, member) in slab.iter() {
+ if let Err(err) =
shard.client_manager.borrow_mut().leave_consumer_group(
+ member.client_id,
+ stream_id_usize,
+ topic_id_usize,
+ cg_id,
+ ) {
+ tracing::warn!(
+ "{COMPONENT} (error: {err}) - failed to make client leave
consumer group for client ID: {}, group ID: {}",
+ member.client_id,
+ cg_id
+ );
+ }
+ }
+
let event = ShardEvent::DeletedConsumerGroup2 {
id: cg_id,
stream_id: self.stream_id.clone(),
diff --git
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index f72609c3..bca8742f 100644
---
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -55,6 +56,21 @@ impl ServerCommandHandler for JoinConsumerGroup {
self.stream_id, self.topic_id, self.group_id, session
)
})?;
+
+ // Update ClientManager and broadcast event to other shards
+ let client_id = session.client_id;
+ let stream_id = self.stream_id.clone();
+ let topic_id = self.topic_id.clone();
+ let group_id = self.group_id.clone();
+
+ let event = ShardEvent::JoinedConsumerGroup {
+ client_id,
+ stream_id,
+ topic_id,
+ group_id,
+ };
+ let _responses = shard.broadcast_event_to_all_shards(event).await;
+
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 588958cc..8e795501 100644
---
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -21,6 +21,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
use anyhow::Result;
use error_set::ErrContext;
@@ -57,6 +58,21 @@ impl ServerCommandHandler for LeaveConsumerGroup {
self.stream_id, self.topic_id, self.group_id, session
)
})?;
+
+ // Update ClientManager and broadcast event to other shards
+ let client_id = session.client_id;
+ let stream_id = self.stream_id.clone();
+ let topic_id = self.topic_id.clone();
+ let group_id = self.group_id.clone();
+
+ let event = ShardEvent::LeftConsumerGroup {
+ client_id,
+ stream_id,
+ topic_id,
+ group_id,
+ };
+ let _responses = shard.broadcast_event_to_all_shards(event).await;
+
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
index e2514078..9458861d 100644
---
a/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs
@@ -55,13 +55,13 @@ impl ServerCommandHandler for DeleteConsumerOffset {
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to delete consumer offset for topic with ID: {} in stream with ID: {}
partition ID: {:#?}, session: {}",
self.topic_id, self.stream_id, self.partition_id, session
))?;
+ // TODO: Get rid of this event.
let event = ShardEvent::DeletedOffset {
stream_id: self.stream_id,
topic_id: self.topic_id,
partition_id,
polling_consumer,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
index 6075d973..85eb95cd 100644
---
a/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
+++
b/core/server/src/binary/handlers/consumer_offsets/store_consumer_offset_handler.rs
@@ -57,6 +57,7 @@ impl ServerCommandHandler for StoreConsumerOffset {
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to store consumer offset for stream_id: {}, topic_id: {},
partition_id: {:?}, offset: {}, session: {}",
self.stream_id, self.topic_id, self.partition_id, self.offset,
session
))?;
+ // TODO: Get rid of this event.
let event = ShardEvent::StoredOffset {
stream_id: self.stream_id,
topic_id: self.topic_id,
@@ -64,7 +65,6 @@ impl ServerCommandHandler for StoreConsumerOffset {
polling_consumer,
offset: self.offset,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index 00d4b295..e7ecc33f 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -1,3 +1,4 @@
+use crate::shard::transmission::event::ShardEvent;
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -55,6 +56,11 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
"{COMPONENT} (error: {error}) - failed to login with
personal access token: {redacted_token}, session: {session}",
)
})?;
+ let event = ShardEvent::LoginWithPersonalAccessToken {
+ token: self.token,
+ client_id: session.client_id,
+ };
+ let _responses = shard.broadcast_event_to_all_shards(event).await;
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index b3eef13b..ef74b11f 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -52,7 +53,7 @@ impl ServerCommandHandler for DeleteSegments {
session,
&self.stream_id,
&self.topic_id,
- self.partition_id,
+ self.partition_id as usize,
self.segments_count,
)
.await
@@ -61,6 +62,13 @@ impl ServerCommandHandler for DeleteSegments {
"{COMPONENT} (error: {error}) - failed to delete segments
for topic with ID: {topic_id} in stream with ID: {stream_id}, session:
{session}",
)
})?;
+ let event = ShardEvent::DeletedSegments {
+ stream_id: self.stream_id.clone(),
+ topic_id: self.topic_id.clone(),
+ partition_id: self.partition_id as usize,
+ segments_count: self.segments_count,
+ };
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
shard
.state
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index 88ade486..f59b11b5 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -60,14 +60,12 @@ impl ServerCommandHandler for DeleteStream {
stream2.root().name(),
stream2.id()
);
+
let event = ShardEvent::DeletedStream2 {
id: stream2.id(),
stream_id: self.stream_id.clone(),
};
let _responses = shard.broadcast_event_to_all_shards(event).await;
- // Drop the stream to force readers/writers to be dropped.
- drop(stream2);
- // Stream files and directories have been deleted by
delete_stream_from_disk
shard
.state
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 592119f0..c589d5ea 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -65,13 +65,13 @@ impl ServerCommandHandler for DeleteTopic {
topic_id,
stream_id
);
+
let event = ShardEvent::DeletedTopic2 {
id: topic_id,
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
};
let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
- // TODO: Remove all the files and directories.
shard
.state
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index 48a49201..653ce995 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -51,7 +51,7 @@ impl ServerCommandHandler for GetTopic {
session.get_user_id(),
numeric_stream_id as u32,
self.topic_id.get_u32_value().unwrap_or(0),
- );
+ )?;
shard
.streams2
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 8a5e7595..f37990ac 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -26,8 +26,8 @@ use crate::streaming::session::Session;
use crate::streaming::topics;
use anyhow::Result;
use error_set::ErrContext;
-use iggy_common::IggyError;
use iggy_common::update_topic::UpdateTopic;
+use iggy_common::{Identifier, IggyError};
use std::rc::Rc;
use tracing::{debug, instrument};
@@ -45,6 +45,7 @@ impl ServerCommandHandler for UpdateTopic {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
+ let name_changed = !self.name.is_empty();
shard.update_topic2(
session,
&self.stream_id,
@@ -54,15 +55,21 @@ impl ServerCommandHandler for UpdateTopic {
self.compression_algorithm,
self.max_topic_size,
self.replication_factor,
- );
+ )?;
+ // TODO: Tech debt.
+ let topic_id = if name_changed {
+ Identifier::named(&self.name.clone()).unwrap()
+ } else {
+ self.topic_id.clone()
+ };
self.message_expiry = shard.streams2.with_topic_by_id(
&self.stream_id,
- &self.topic_id,
+ &topic_id,
topics::helpers::get_message_expiry(),
);
self.max_topic_size = shard.streams2.with_topic_by_id(
&self.stream_id,
- &self.topic_id,
+ &topic_id,
topics::helpers::get_max_topic_size(),
);
let event = ShardEvent::UpdatedTopic2 {
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 7ca1a82e..0837a794 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -72,6 +72,7 @@ impl ServerCommandHandler for CreateUser {
user.id
);
let event = ShardEvent::CreatedUser {
+ user_id: user.id,
username: self.username.to_owned(),
password: self.password.to_owned(),
status: self.status,
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index fa933aa4..64f80dfb 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -167,6 +167,16 @@ pub fn map_streams(roots: &Slab<stream2::StreamRoot>,
stats: &Slab<Arc<StreamSta
pub fn map_stream(root: &stream2::StreamRoot, stats: &StreamStats) -> Bytes {
let mut bytes = BytesMut::new();
extend_stream(root, stats, &mut bytes);
+ root.topics().with_components(|topics| {
+ let (roots, _, stats, ..) = topics.into_components();
+ for (root, stat) in roots
+ .iter()
+ .map(|(_, val)| val)
+ .zip(stats.iter().map(|(_, val)| val))
+ {
+ extend_topic(&root, &stat, &mut bytes);
+ }
+ });
bytes.freeze()
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 4771d238..5d3e6186 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -32,7 +32,7 @@ use error_set::ErrContext;
use futures::future::try_join_all;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::{
- EncryptorKind, Identifier, IggyError, IggyTimestamp, Permissions,
PollingKind, UserId,
+ EncryptorKind, IdKind, Identifier, IggyError, IggyTimestamp, Permissions,
PollingKind, UserId,
UserStatus,
defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME},
locking::IggyRwLockFn,
@@ -181,62 +181,6 @@ impl IggyShard {
Default::default()
}
- pub fn default_from_config(server_config: ServerConfig) -> Self {
- use crate::bootstrap::resolve_persister;
- use crate::streaming::storage::SystemStorage;
- use crate::versioning::SemanticVersion;
-
- let version = SemanticVersion::current().expect("Invalid version");
- let persister =
resolve_persister(server_config.system.partition.enforce_fsync);
- let storage = Rc::new(SystemStorage::new(
- server_config.system.clone(),
- persister.clone(),
- ));
-
- let (stop_sender, stop_receiver) = async_channel::unbounded();
-
- let state_path = server_config.system.get_state_messages_file_path();
- let file_state = FileState::new(&state_path, &version, persister,
None);
- let state = crate::state::StateKind::File(file_state);
- let shards_table = Box::new(DashMap::new());
- let shards_table = Box::leak(shards_table);
-
- let shard = Self {
- id: 0,
- shards: Vec::new(),
- shards_table: shards_table.into(),
- version,
- streams2: Default::default(),
- state,
- storage,
- //TODO: Fix
- encryptor: None,
- config: server_config,
- client_manager: Default::default(),
- active_sessions: Default::default(),
- permissioner: Default::default(),
- users: Default::default(),
- metrics: Metrics::init(),
- messages_receiver: Cell::new(None),
- stop_receiver,
- stop_sender,
- task_registry: TaskRegistry::new(),
- is_shutting_down: AtomicBool::new(false),
- tcp_bound_address: Cell::new(None),
- quic_bound_address: Cell::new(None),
- };
- let user = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD);
- shard
- .create_user_bypass_auth(
- &user.username,
- &user.password,
- UserStatus::Active,
- Some(Permissions::root()),
- )
- .unwrap();
- shard
- }
-
pub async fn init(&self) -> Result<(), IggyError> {
self.load_segments().await?;
let _ = self.load_users().await;
@@ -691,6 +635,10 @@ impl IggyShard {
username,
password,
} => self.login_user_event(client_id, &username, &password),
+ ShardEvent::LoginWithPersonalAccessToken { client_id, token } => {
+ self.login_user_pat_event(&token, client_id)?;
+ Ok(())
+ }
ShardEvent::NewSession { address, transport } => {
let session = self.add_client(&address, transport);
self.add_active_session(session);
@@ -715,22 +663,30 @@ impl IggyShard {
Ok(())
}
ShardEvent::PurgedStream2 { stream_id } => {
- self.purge_stream2_bypass_auth(&stream_id)?;
+ self.purge_stream2_bypass_auth(&stream_id).await?;
Ok(())
}
ShardEvent::PurgedTopic {
stream_id,
topic_id,
} => {
- todo!();
+ self.purge_topic2_bypass_auth(&stream_id, &topic_id).await?;
+ Ok(())
}
ShardEvent::CreatedUser {
+ user_id,
username,
password,
status,
permissions,
} => {
- self.create_user_bypass_auth(&username, &password, status,
permissions.clone())?;
+ self.create_user_bypass_auth(
+ user_id,
+ &username,
+ &password,
+ status,
+ permissions.clone(),
+ )?;
Ok(())
}
ShardEvent::DeletedUser { user_id } => {
@@ -741,8 +697,6 @@ impl IggyShard {
let sessions = self.active_sessions.borrow();
let session = sessions.iter().find(|s| s.client_id ==
client_id).unwrap();
self.logout_user(session)?;
- self.remove_active_session(session.get_user_id());
-
Ok(())
}
ShardEvent::ChangedPassword {
@@ -763,7 +717,6 @@ impl IggyShard {
self.delete_personal_access_token_bypass_auth(user_id, &name)?;
Ok(())
}
- ShardEvent::LoginWithPersonalAccessToken { token: _ } => todo!(),
ShardEvent::UpdatedUser {
user_id,
username,
@@ -792,6 +745,12 @@ impl IggyShard {
ShardEvent::DeletedStream2 { id, stream_id } => {
let stream = self.delete_stream2_bypass_auth(&stream_id);
assert_eq!(stream.id(), id);
+
+ // Clean up consumer groups from ClientManager for this stream
+ self.client_manager
+ .borrow_mut()
+ .delete_consumer_groups_for_stream(id);
+
Ok(())
}
ShardEvent::CreatedTopic2 { stream_id, topic } => {
@@ -814,6 +773,16 @@ impl IggyShard {
} => {
let topic = self.delete_topic_bypass_auth2(&stream_id,
&topic_id);
assert_eq!(topic.id(), id);
+
+ // Clean up consumer groups from ClientManager for this topic
using helper functions
+ let stream_id_usize = self.streams2.with_stream_by_id(
+ &stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ self.client_manager
+ .borrow_mut()
+ .delete_consumer_groups_for_topic(stream_id_usize, id);
+
Ok(())
}
ShardEvent::UpdatedTopic2 {
@@ -854,6 +823,36 @@ impl IggyShard {
} => {
let cg = self.delete_consumer_group_bypass_auth2(&stream_id,
&topic_id, &group_id);
assert_eq!(cg.id(), id);
+
+ // Remove all consumer group members from ClientManager using
helper functions
+ let stream_id_usize = self.streams2.with_stream_by_id(
+ &stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let topic_id_usize = self.streams2.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+
+ // Get members from the deleted consumer group and make them
leave
+ let slab = cg.members().inner().shared_get();
+ for (_, member) in slab.iter() {
+ if let Err(err) =
self.client_manager.borrow_mut().leave_consumer_group(
+ member.client_id,
+ stream_id_usize,
+ topic_id_usize,
+ id,
+ ) {
+ tracing::warn!(
+ "Shard {} (error: {err}) - failed to make client
leave consumer group for client ID: {}, group ID: {}",
+ self.id,
+ member.client_id,
+ id
+ );
+ }
+ }
+
Ok(())
}
ShardEvent::StoredOffset {
@@ -877,13 +876,82 @@ impl IggyShard {
topic_id,
partition_id,
polling_consumer,
+ } => Ok(()),
+ ShardEvent::JoinedConsumerGroup {
+ client_id,
+ stream_id,
+ topic_id,
+ group_id,
+ } => {
+ // Convert Identifiers to usizes for ClientManager using
helper functions
+ let stream_id_usize = self.streams2.with_stream_by_id(
+ &stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let topic_id_usize = self.streams2.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+ let group_id_usize = self.streams2.with_consumer_group_by_id(
+ &stream_id,
+ &topic_id,
+ &group_id,
+ crate::streaming::topics::helpers::get_consumer_group_id(),
+ );
+
+ self.client_manager.borrow_mut().join_consumer_group(
+ client_id,
+ stream_id_usize,
+ topic_id_usize,
+ group_id_usize,
+ )?;
+ Ok(())
+ }
+ ShardEvent::LeftConsumerGroup {
+ client_id,
+ stream_id,
+ topic_id,
+ group_id,
+ } => {
+ // Convert Identifiers to usizes for ClientManager using
helper functions
+ let stream_id_usize = self.streams2.with_stream_by_id(
+ &stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let topic_id_usize = self.streams2.with_topic_by_id(
+ &stream_id,
+ &topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+ let group_id_usize = self.streams2.with_consumer_group_by_id(
+ &stream_id,
+ &topic_id,
+ &group_id,
+ crate::streaming::topics::helpers::get_consumer_group_id(),
+ );
+
+ self.client_manager.borrow_mut().leave_consumer_group(
+ client_id,
+ stream_id_usize,
+ topic_id_usize,
+ group_id_usize,
+ )?;
+ Ok(())
+ }
+ ShardEvent::DeletedSegments {
+ stream_id,
+ topic_id,
+ partition_id,
+ segments_count,
} => {
- self.delete_consumer_offset_bypass_auth(
+ self.delete_segments_bypass_auth(
&stream_id,
&topic_id,
- &polling_consumer,
partition_id,
- )?;
+ segments_count,
+ )
+ .await?;
Ok(())
}
}
@@ -945,6 +1013,7 @@ impl IggyShard {
| ShardEvent::CreatedPartitions2 { .. }
| ShardEvent::DeletedPartitions2 { .. }
| ShardEvent::CreatedConsumerGroup2 { .. }
+ | ShardEvent::CreatedPersonalAccessToken { .. }
| ShardEvent::DeletedConsumerGroup2 { .. }
) {
let (sender, receiver) = async_channel::bounded(1);
diff --git a/core/server/src/shard/system/consumer_groups.rs
b/core/server/src/shard/system/consumer_groups.rs
index 1e6d7fe3..e3e254fb 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -140,12 +140,34 @@ impl IggyShard {
topic_id: &Identifier,
group_id: &Identifier,
) -> consumer_group2::ConsumerGroup {
- self.streams2.with_consumer_groups_mut(
+ // Get numeric IDs before deletion for ClientManager cleanup
+ let stream_id_value = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let topic_id_value =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+ let group_id_value = self.streams2.with_consumer_group_by_id(
+ stream_id,
+ topic_id,
+ group_id,
+ topics::helpers::get_consumer_group_id(),
+ );
+
+ let cg = self.streams2.with_consumer_groups_mut(
stream_id,
topic_id,
topics::helpers::delete_consumer_group(group_id),
- )
- // TODO: remove from the consumer_group
+ );
+
+ // Clean up ClientManager state
+ self.client_manager.borrow_mut().delete_consumer_group(
+ stream_id_value,
+ topic_id_value,
+ group_id_value,
+ );
+
+ cg
}
pub fn join_consumer_group(
@@ -180,16 +202,32 @@ impl IggyShard {
topics::helpers::join_consumer_group(self.id, client_id),
);
- // TODO:
- /*
-
self.client_manager.borrow_mut().join_consumer_group(session.client_id,
stream_id_value, topic_id_value, group_id)
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to make client join
consumer group for client ID: {}",
- session.client_id
- )
- })?;
- */
+ // Update ClientManager state
+ let stream_id_value = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let topic_id_value =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+ let group_id_value = self.streams2.with_consumer_group_by_id(
+ stream_id,
+ topic_id,
+ group_id,
+ topics::helpers::get_consumer_group_id(),
+ );
+
+ self.client_manager.borrow_mut().join_consumer_group(
+ session.client_id,
+ stream_id_value,
+ topic_id_value,
+ group_id_value,
+ )
+ .with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to make client join
consumer group for client ID: {}",
+ session.client_id
+ )
+ })?;
Ok(())
}
@@ -223,8 +261,26 @@ impl IggyShard {
topics::helpers::leave_consumer_group(self.id, session.client_id),
);
- // TODO:
- // self.leave_consumer_group_by_client();
+ // Update ClientManager state
+ let stream_id_value = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let topic_id_value =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+ let group_id_value = self.streams2.with_consumer_group_by_id(
+ stream_id,
+ topic_id,
+ group_id,
+ topics::helpers::get_consumer_group_id(),
+ );
+
+ self.client_manager.borrow_mut().leave_consumer_group(
+ session.client_id,
+ stream_id_value,
+ topic_id_value,
+ group_id_value,
+ ).with_error_context(|error| format!("{COMPONENT} (error: {error}) -
failed to make client leave consumer group for client ID: {}",
session.client_id))?;
Ok(())
}
@@ -235,15 +291,26 @@ impl IggyShard {
group_id: &Identifier,
client_id: u32,
) -> Result<(), IggyError> {
- // TODO:
- /*
+ // Update ClientManager state
+ let stream_id_value = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
+ let topic_id_value =
+ self.streams2
+ .with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+ let group_id_value = self.streams2.with_consumer_group_by_id(
+ stream_id,
+ topic_id,
+ group_id,
+ topics::helpers::get_consumer_group_id(),
+ );
+
self.client_manager.borrow_mut().leave_consumer_group(
client_id,
stream_id_value,
topic_id_value,
- group_id,
- )
- */
+ group_id_value,
+ ).with_error_context(|error| format!("{COMPONENT} (error: {error}) -
failed to make client leave consumer group for client ID: {}", client_id))?;
Ok(())
}
}
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 1ccd1475..5e4f4ff5 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -179,9 +179,9 @@ impl IggyShard {
return Err(IggyError::NotResolvedConsumer(consumer.id));
};
- self.delete_consumer_offset_base(stream_id, topic_id,
&polling_consumer, partition_id)?;
- self.delete_consumer_offset_from_disk(stream_id, topic_id,
&polling_consumer, partition_id)
- .await?;
+ let path =
+ self.delete_consumer_offset_base(stream_id, topic_id,
&polling_consumer, partition_id)?;
+ self.delete_consumer_offset_from_disk(&path).await?;
Ok((polling_consumer, partition_id))
}
@@ -193,6 +193,7 @@ impl IggyShard {
partition_id: usize,
offset: u64,
) {
+ // TODO: This can use `with_partition_by_id` directly.
match polling_consumer {
PollingConsumer::Consumer(id, _) => {
self.streams2.with_stream_by_id(
@@ -227,7 +228,7 @@ impl IggyShard {
topic_id: &Identifier,
polling_consumer: &PollingConsumer,
partition_id: usize,
- ) -> Result<(), IggyError> {
+ ) -> Result<String, IggyError> {
match polling_consumer {
PollingConsumer::Consumer(id, _) => {
self.streams2
@@ -235,7 +236,7 @@ impl IggyShard {
format!(
"{COMPONENT} (error: {error}) - failed to delete
consumer offset for consumer with ID: {id} in topic with ID: {topic_id} and
stream with ID: {stream_id}",
)
- })?;
+ })
}
PollingConsumer::ConsumerGroup(_, id) => {
self.streams2
@@ -243,10 +244,9 @@ impl IggyShard {
format!(
"{COMPONENT} (error: {error}) - failed to delete
consumer group member offset for member with ID: {id} in topic with ID:
{topic_id} and stream with ID: {stream_id}",
)
- })?;
+ })
}
}
- Ok(())
}
async fn persist_consumer_offset_to_disk(
@@ -282,37 +282,8 @@ impl IggyShard {
}
}
- async fn delete_consumer_offset_from_disk(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- polling_consumer: &PollingConsumer,
- partition_id: usize,
- ) -> Result<(), IggyError> {
- match polling_consumer {
- PollingConsumer::Consumer(id, _) => {
- self.streams2
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
-
partitions::helpers::delete_consumer_offset_from_disk(self.id, *id),
- )
- .await
- }
- PollingConsumer::ConsumerGroup(_, id) => {
- self.streams2
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
-
partitions::helpers::delete_consumer_group_member_offset_from_disk(
- self.id, *id,
- ),
- )
- .await
- }
- }
+ pub async fn delete_consumer_offset_from_disk(&self, path: &str) ->
Result<(), IggyError> {
+ partitions::storage2::delete_persisted_offset(self.id, path).await
}
pub fn store_consumer_offset_bypass_auth(
@@ -331,14 +302,4 @@ impl IggyShard {
offset,
);
}
-
- pub fn delete_consumer_offset_bypass_auth(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- polling_consumer: &PollingConsumer,
- partition_id: usize,
- ) -> Result<(), IggyError> {
- self.delete_consumer_offset_base(stream_id, topic_id,
polling_consumer, partition_id)
- }
}
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 05c0c74c..d78192f0 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -22,6 +22,7 @@ use crate::shard::ShardInfo;
use crate::shard::calculate_shard_assignment;
use crate::shard::namespace::IggyNamespace;
use crate::shard_info;
+use crate::slab::traits_ext::EntityComponentSystem;
use crate::slab::traits_ext::EntityMarker;
use crate::slab::traits_ext::IntoComponents;
use crate::streaming::partitions;
@@ -105,16 +106,12 @@ impl IggyShard {
partitions_count,
&self.config.system,
);
- let stats = partitions.first().map(|p| p.stats());
- if let Some(stats) = stats {
- // One segment per partition created.
- stats.increment_segments_count(partitions_count);
- }
+
self.metrics.increment_partitions(partitions_count);
self.metrics.increment_segments(partitions_count);
let shards_count = self.get_available_shards_count();
- for partition_id in partitions.iter().map(|p| p.id()) {
+ for (partition_id, stats) in partitions.iter().map(|p| (p.id(),
p.stats())) {
// TODO: Create shard table recordsj.
let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id,
partition_id);
let shard_id = calculate_shard_assignment(&ns, shards_count);
@@ -130,6 +127,7 @@ impl IggyShard {
&self.config.system,
)
.await?;
+ stats.increment_segments_count(1);
if is_current_shard {
self.init_log(stream_id, topic_id, partition_id).await?;
}
@@ -137,7 +135,7 @@ impl IggyShard {
Ok(partitions)
}
- async fn init_log(
+ pub async fn init_log(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
@@ -233,6 +231,7 @@ impl IggyShard {
self.init_log(stream_id, topic_id, id).await?;
}
}
+
Ok(())
}
diff --git a/core/server/src/shard/system/personal_access_tokens.rs
b/core/server/src/shard/system/personal_access_tokens.rs
index 526063b6..62322e5e 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -100,7 +100,7 @@ impl IggyShard {
let token_hash = personal_access_token.token.clone();
let identifier = user_id.try_into()?;
let user = self.get_user_mut(&identifier).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}")
+ format!("{COMPONENT} create PAT (error: {error}) - failed to get
mutable reference to the user with id: {user_id}")
})?;
if user
@@ -145,7 +145,7 @@ impl IggyShard {
.get_user_mut(&user_id.try_into()?)
.with_error_context(|error| {
format!(
- "{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}"
+ "{COMPONENT} delete PAT (error: {error}) - failed to get
mutable reference to the user with id: {user_id}"
)
})?;
@@ -166,6 +166,16 @@ impl IggyShard {
Ok(())
}
+ pub fn login_user_pat_event(&self, token: &str, client_id: u32) ->
Result<(), IggyError> {
+ let active_sessions = self.active_sessions.borrow();
+ let session = active_sessions
+ .iter()
+ .find(|s| s.client_id == client_id)
+ .expect(format!("At this point session for {}, should exist.",
client_id).as_str());
+ self.login_with_personal_access_token(token, Some(session))?;
+ Ok(())
+ }
+
pub fn login_with_personal_access_token(
&self,
token: &str,
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index 5b7d69ff..c9a59254 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -1,4 +1,7 @@
+use std::error;
+
use crate::shard::IggyShard;
+use crate::streaming;
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,24 +19,108 @@ use crate::shard::IggyShard;
* specific language governing permissions and limitations
* under the License.
*/
-use super::COMPONENT;
use crate::streaming::session::Session;
-use error_set::ErrContext;
use iggy_common::Identifier;
use iggy_common::IggyError;
-use iggy_common::locking::IggyRwLockFn;
impl IggyShard {
+ pub async fn delete_segments_bypass_auth(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ segments_count: u32,
+ ) -> Result<(), IggyError> {
+ self.delete_segments_base(stream_id, topic_id, partition_id,
segments_count)
+ .await
+ }
+
+ pub async fn delete_segments_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ segments_count: u32,
+ ) -> Result<(), IggyError> {
+ let (segments, storages) = self.streams2.with_partition_by_id_mut(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(.., log)| {
+ let upperbound = log.segments().len();
+ let begin = upperbound.saturating_sub(segments_count as usize);
+ let segments = log
+ .segments_mut()
+ .drain(begin..upperbound)
+ .collect::<Vec<_>>();
+ let storages = log
+ .storages_mut()
+ .drain(begin..upperbound)
+ .collect::<Vec<_>>();
+ let _ = log
+ .indexes_mut()
+ .drain(begin..upperbound)
+ .collect::<Vec<_>>();
+ (segments, storages)
+ },
+ );
+ let numeric_stream_id = self
+ .streams2
+ .with_stream_by_id(stream_id,
streaming::streams::helpers::get_stream_id());
+ let numeric_topic_id = self.streams2.with_topic_by_id(
+ stream_id,
+ topic_id,
+ streaming::topics::helpers::get_topic_id(),
+ );
+
+ let create_base_segment = segments.len() > 0 && storages.len() > 0;
+ for (mut storage, segment) in
storages.into_iter().zip(segments.into_iter()) {
+ let (msg_writer, index_writer) = storage.shutdown();
+ if let Some(msg_writer) = msg_writer
+ && let Some(index_writer) = index_writer
+ {
+ // We need to fsync before closing to ensure all data is
written to disk.
+ msg_writer.fsync().await?;
+ index_writer.fsync().await?;
+ let path = msg_writer.path();
+ drop(msg_writer);
+ drop(index_writer);
+ compio::fs::remove_file(&path).await.map_err(|_| {
+ tracing::error!("Failed to delete segment file at path:
{}", path);
+ IggyError::CannotDeleteFile
+ })?;
+ } else {
+ let start_offset = segment.start_offset;
+ let path = self.config.system.get_segment_path(
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ start_offset,
+ );
+ compio::fs::remove_file(&path).await.map_err(|_| {
+ tracing::error!("Failed to delete segment file at path:
{}", path);
+ IggyError::CannotDeleteFile
+ })?;
+ }
+ }
+
+ if create_base_segment {
+ self.init_log(stream_id, topic_id, partition_id).await?;
+ }
+ Ok(())
+ }
pub async fn delete_segments(
&self,
session: &Session,
stream_id: &Identifier,
topic_id: &Identifier,
- partition_id: u32,
+ partition_id: usize,
segments_count: u32,
) -> Result<(), IggyError> {
// Assert authentication.
self.ensure_authenticated(session)?;
- todo!();
+ self.ensure_topic_exists(stream_id, topic_id)?;
+ self.delete_segments_base(stream_id, topic_id, partition_id,
segments_count)
+ .await
}
}
diff --git a/core/server/src/shard/system/snapshot/mod.rs
b/core/server/src/shard/system/snapshot/mod.rs
index 658c8a4b..630dd2be 100644
--- a/core/server/src/shard/system/snapshot/mod.rs
+++ b/core/server/src/shard/system/snapshot/mod.rs
@@ -23,7 +23,7 @@ use crate::shard::IggyShard;
use crate::streaming::session::Session;
use async_zip::base::write::ZipFileWriter;
use async_zip::{Compression, ZipEntryBuilder};
-use compio::fs::{OpenOptions};
+use compio::fs::OpenOptions;
use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
use iggy_common::{IggyDuration, IggyError, Snapshot, SnapshotCompression,
SystemSnapshotType};
use std::path::PathBuf;
diff --git a/core/server/src/shard/system/stats.rs
b/core/server/src/shard/system/stats.rs
index 1cd674b4..1faa0bc1 100644
--- a/core/server/src/shard/system/stats.rs
+++ b/core/server/src/shard/system/stats.rs
@@ -18,6 +18,7 @@
use crate::VERSION;
use crate::shard::IggyShard;
+use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
use crate::versioning::SemanticVersion;
use iggy_common::locking::IggyRwLockFn;
use iggy_common::{IggyDuration, IggyError, Stats};
@@ -90,26 +91,40 @@ impl IggyShard {
drop(sys);
- //TODO:
- /*
- for stream in self.streams.borrow().values() {
- stats.messages_count += stream.get_messages_count();
- stats.segments_count += stream.get_segments_count();
- stats.messages_size_bytes += stream.get_size();
- stats.streams_count += 1;
- stats.topics_count += stream.topics.len() as u32;
- stats.partitions_count += stream
- .topics
- .values()
- .map(|t| t.partitions.len() as u32)
- .sum::<u32>();
- stats.consumer_groups_count += stream
- .topics
- .values()
- .map(|t| t.consumer_groups.borrow().len() as u32)
- .sum::<u32>();
- }
- */
+ self.streams2.with_components(|stream_components| {
+ let (stream_roots, stream_stats) =
stream_components.into_components();
+ // Iterate through all streams
+ for (stream_id, stream_root) in stream_roots.iter() {
+ stats.streams_count += 1;
+
+ // Get stream-level stats
+ if let Some(stream_stat) = stream_stats.get(stream_id) {
+ stats.messages_count +=
stream_stat.messages_count_inconsistent();
+ stats.segments_count +=
stream_stat.segments_count_inconsistent();
+ stats.messages_size_bytes +=
stream_stat.size_bytes_inconsistent().into();
+ }
+
+ // Access topics within this stream
+ stream_root.topics().with_components(|topic_components| {
+ let (topic_roots, ..) = topic_components.into_components();
+ stats.topics_count += topic_roots.len() as u32;
+
+ // Iterate through all topics in this stream
+ for (_, topic_root) in topic_roots.iter() {
+ // Count partitions in this topic
+ topic_root
+ .partitions()
+ .with_components(|partition_components| {
+ let (partition_roots, ..) =
partition_components.into_components();
+ stats.partitions_count +=
partition_roots.len() as u32;
+ });
+
+ // Count consumer groups in this topic
+ stats.consumer_groups_count +=
topic_root.consumer_groups().len() as u32;
+ }
+ });
+ }
+ });
Ok(stats)
}
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index bf0d6024..11e7e00a 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -21,14 +21,11 @@ use crate::shard::IggyShard;
use crate::slab::traits_ext::{DeleteCell, EntityMarker, InsertCell};
use crate::streaming::session::Session;
-use crate::streaming::stats::stats::StreamStats;
use crate::streaming::streams::storage2::{create_stream_file_hierarchy,
delete_stream_from_disk};
use crate::streaming::streams::{self, stream2};
use error_set::ErrContext;
-use iggy_common::locking::IggyRwLockFn;
use iggy_common::{Identifier, IggyError, IggyTimestamp};
-use std::sync::Arc;
impl IggyShard {
pub async fn create_stream2(
@@ -127,12 +124,6 @@ impl IggyShard {
.decrement_messages(stats.messages_count_inconsistent());
self.metrics
.decrement_segments(stats.segments_count_inconsistent());
-
- /*
- self.client_manager
- .borrow_mut()
- .delete_consumer_groups_for_stream(stream_id as u32);
- */
stream
}
@@ -157,36 +148,69 @@ impl IggyShard {
)
})?;
let mut stream = self.delete_stream2_base(id);
+ // Clean up consumer groups from ClientManager for this stream
+ let stream_id_usize = stream.id();
+ self.client_manager
+ .borrow_mut()
+ .delete_consumer_groups_for_stream(stream_id_usize);
delete_stream_from_disk(self.id, &mut stream,
&self.config.system).await?;
Ok(stream)
}
- pub async fn purge_stream2(&self, session: &Session, id: &Identifier) ->
Result<(), IggyError> {
+ pub async fn purge_stream2(
+ &self,
+ session: &Session,
+ stream_id: &Identifier,
+ ) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
- // self.ensure_stream_exists(id)?;
- let get_stream_id =
crate::streaming::streams::helpers::get_stream_id();
- let stream_id = self.streams2.with_stream_by_id(id, get_stream_id);
- self.permissioner
- .borrow()
- .purge_stream(session.get_user_id(), stream_id as u32)
- .with_error_context(|error| {
+ self.ensure_stream_exists(stream_id)?;
+ {
+ let get_stream_id =
crate::streaming::streams::helpers::get_stream_id();
+ let stream_id = self.streams2.with_stream_by_id(stream_id,
get_stream_id);
+ self.permissioner
+ .borrow()
+ .purge_stream(session.get_user_id(), stream_id as u32)
+ .with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - permission denied to purge
stream for user {}, stream ID: {}",
session.get_user_id(),
stream_id,
)
})?;
- self.purge_stream2_base(id)?;
+ }
+
+ //TODO: Tech debt.
+ let topic_ids = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_topic_ids());
+
+ // Purge each topic in the stream using bypass auth
+ for topic_id in topic_ids {
+ let topic_identifier = Identifier::numeric(topic_id as
u32).unwrap();
+ self.purge_topic2(session, stream_id, &topic_identifier)
+ .await?;
+ }
Ok(())
}
- pub fn purge_stream2_bypass_auth(&self, stream_id: &Identifier) ->
Result<(), IggyError> {
- self.purge_stream2_base(stream_id)?;
+ pub async fn purge_stream2_bypass_auth(&self, stream_id: &Identifier) ->
Result<(), IggyError> {
+ self.purge_stream2_base(stream_id).await?;
Ok(())
}
- fn purge_stream2_base(&self, _stream_id: &Identifier) -> Result<(),
IggyError> {
- // TODO
+ async fn purge_stream2_base(&self, stream_id: &Identifier) -> Result<(),
IggyError> {
+ // Get all topic IDs in the stream
+ let topic_ids = self
+ .streams2
+ .with_stream_by_id(stream_id, streams::helpers::get_topic_ids());
+
+ // Purge each topic in the stream using bypass auth
+ for topic_id in topic_ids {
+ let topic_identifier = Identifier::numeric(topic_id as
u32).unwrap();
+ self.purge_topic2_bypass_auth(stream_id, &topic_identifier)
+ .await?;
+ }
+
Ok(())
}
}
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index a9cf7d9f..f2995b30 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -19,7 +19,7 @@
use super::COMPONENT;
use crate::shard::IggyShard;
use crate::shard_info;
-use crate::slab::traits_ext::{EntityMarker, InsertCell};
+use crate::slab::traits_ext::{EntityComponentSystem, EntityMarker, InsertCell,
IntoComponents};
use crate::streaming::session::Session;
use crate::streaming::stats::stats::{StreamStats, TopicStats};
use crate::streaming::topics::storage2::{create_topic_file_hierarchy,
delete_topic_from_disk};
@@ -29,8 +29,10 @@ use error_set::ErrContext;
use iggy_common::{
CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp,
MaxTopicSize,
};
+use std::any::Any;
use std::str::FromStr;
use std::sync::Arc;
+use std::u32;
impl IggyShard {
pub async fn create_topic2(
@@ -139,7 +141,7 @@ impl IggyShard {
replication_factor: Option<u8>,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
- //self.ensure_topic_exists(stream_id, topic_id)?;
+ self.ensure_topic_exists(stream_id, topic_id)?;
{
let topic_id_val = self.streams2.with_topic_by_id(
stream_id,
@@ -247,6 +249,10 @@ impl IggyShard {
)
})?;
let mut topic = self.delete_topic_base2(stream_id, topic_id);
+ // Clean up consumer groups from ClientManager for this topic
+ self.client_manager
+ .borrow_mut()
+ .delete_consumer_groups_for_topic(numeric_stream_id, topic.id());
let parent = topic.stats().parent().clone();
// We need to borrow topic as mutable, as we are extracting partitions
out of it, in order to close them.
let (messages_count, size_bytes, segments_count) =
@@ -304,34 +310,54 @@ impl IggyShard {
})?;
}
- self.streams2.with_partitions_mut(
+ self.streams2.with_partitions(
stream_id,
topic_id,
partitions::helpers::purge_partitions_mem(),
);
- self.streams2.with_partitions_mut(
+
+ let (consumer_offset_paths, consumer_group_offset_paths) =
self.streams2.with_partitions(
stream_id,
topic_id,
- partitions::helpers::purge_segments_mem(),
+ partitions::helpers::purge_consumer_offsets(),
);
- self.streams2
- .with_topic_by_id_async(stream_id, topic_id,
topics::helpers::purge_topic_disk())
- .await;
+ for path in consumer_offset_paths {
+ self.delete_consumer_offset_from_disk(&path).await?;
+ }
+ for path in consumer_group_offset_paths {
+ self.delete_consumer_offset_from_disk(&path).await?;
+ }
+
+ self.purge_topic_base2(stream_id, topic_id).await?;
+ Ok(())
+ }
+
+ pub async fn purge_topic2_bypass_auth(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ ) -> Result<(), IggyError> {
+ self.purge_topic_base2(stream_id, topic_id).await?;
Ok(())
}
async fn purge_topic_base2(
&self,
- _stream_id: &Identifier,
- _topic_id: &Identifier,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
) -> Result<(), IggyError> {
- /*
- self.streams2
+ let part_ids = self
+ .streams2
.with_partitions(stream_id, topic_id, |partitions| {
- //partitions
+ partitions.with_components(|components| {
+ let (roots, ..) = components.into_components();
+ roots.iter().map(|(_, root)| root.id()).collect::<Vec<_>>()
+ })
});
- */
-
+ for part_id in part_ids {
+ self.delete_segments_bypass_auth(stream_id, topic_id, part_id,
u32::MAX)
+ .await?;
+ }
Ok(())
}
}
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index c87c5dbc..a28bfcdb 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -162,61 +162,60 @@ impl IggyShard {
)
})?;
- let user_id = self.create_user_base(username, password, status,
permissions)?;
- self.get_user(&user_id.try_into()?)
+ if self
+ .users
+ .borrow()
+ .iter()
+ .any(|(_, user)| user.username == username)
+ {
+ error!("User: {username} already exists.");
+ return Err(IggyError::UserAlreadyExists);
+ }
+
+ if self.users.borrow().len() >= MAX_USERS {
+ error!("Available users limit reached.");
+ return Err(IggyError::UsersLimitReached);
+ }
+
+ // TODO: Tech debt, replace with Slab.
+ USER_ID.fetch_add(1, Ordering::SeqCst);
+ let current_user_id = USER_ID.load(Ordering::SeqCst);
+ self.create_user_base(current_user_id, username, password, status,
permissions)?;
+ self.get_user(¤t_user_id.try_into()?)
.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get user
with id: {user_id}")
+ format!(
+ "{COMPONENT} (error: {error}) - failed to get user with
id: {current_user_id}"
+ )
})
}
pub fn create_user_bypass_auth(
&self,
+ user_id: u32,
username: &str,
password: &str,
status: UserStatus,
permissions: Option<Permissions>,
- ) -> Result<u32, IggyError> {
- let user_id = self.create_user_base(username, password, status,
permissions)?;
- Ok(user_id)
+ ) -> Result<(), IggyError> {
+ self.create_user_base(user_id, username, password, status,
permissions)?;
+ Ok(())
}
fn create_user_base(
&self,
+ user_id: u32,
username: &str,
password: &str,
status: UserStatus,
permissions: Option<Permissions>,
- ) -> Result<u32, IggyError> {
- if self
- .users
- .borrow()
- .iter()
- .any(|(_, user)| user.username == username)
- {
- error!("User: {username} already exists.");
- return Err(IggyError::UserAlreadyExists);
- }
-
- if self.users.borrow().len() >= MAX_USERS {
- error!("Available users limit reached.");
- return Err(IggyError::UsersLimitReached);
- }
-
- let user_id = USER_ID.fetch_add(1, Ordering::SeqCst);
- let current_user_id = USER_ID.load(Ordering::SeqCst);
- let user = User::new(
- current_user_id,
- username,
- password,
- status,
- permissions.clone(),
- );
+ ) -> Result<(), IggyError> {
+ let user = User::new(user_id, username, password, status,
permissions.clone());
self.permissioner
.borrow_mut()
.init_permissions_for_user(user_id, permissions);
self.users.borrow_mut().insert(user.id, user);
self.metrics.increment_users(1);
- Ok(user_id)
+ Ok(())
}
pub fn delete_user(&self, session: &Session, user_id: &Identifier) ->
Result<User, IggyError> {
@@ -319,7 +318,7 @@ impl IggyShard {
}
let mut user = self.get_user_mut(user_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}")
+ format!("{COMPONENT} update user (error: {error}) - failed to get
mutable reference to the user with id: {user_id}")
})?;
if let Some(username) = username {
user.username = username;
@@ -389,7 +388,7 @@ impl IggyShard {
{
let mut user =
self.get_user_mut(user_id).with_error_context(|error| {
format!(
- "{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}"
+ "{COMPONENT} update user permissions (error: {error}) -
failed to get mutable reference to the user with id: {user_id}"
)
})?;
user.permissions = permissions;
@@ -438,7 +437,7 @@ impl IggyShard {
new_password: &str,
) -> Result<(), IggyError> {
let mut user = self.get_user_mut(user_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to the user with id: {user_id}")
+ format!("{COMPONENT} change password (error: {error}) - failed to
get mutable reference to the user with id: {user_id}")
})?;
if !crypto::verify_password(current_password, &user.password) {
error!(
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index d7e725b1..afaa6da4 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -89,6 +89,7 @@ pub enum ShardEvent {
polling_consumer: PollingConsumer,
},
CreatedUser {
+ user_id: u32,
username: String,
password: String,
status: UserStatus,
@@ -127,12 +128,31 @@ pub enum ShardEvent {
name: String,
},
LoginWithPersonalAccessToken {
+ client_id: u32,
token: String,
},
+ DeletedSegments {
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partition_id: usize,
+ segments_count: u32,
+ },
NewSession {
address: SocketAddr,
transport: TransportProtocol,
},
+ JoinedConsumerGroup {
+ client_id: u32,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ },
+ LeftConsumerGroup {
+ client_id: u32,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ },
TcpBound {
address: SocketAddr,
},
diff --git a/core/server/src/slab/consumer_groups.rs
b/core/server/src/slab/consumer_groups.rs
index 21e60d36..683c3a0e 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -3,7 +3,7 @@ use crate::{
Keyed, consumer_groups,
traits_ext::{
Borrow, ComponentsById, Delete, EntityComponentSystem,
EntityComponentSystemMut,
- Insert, IntoComponents, IntoComponentsById,
+ EntityMarker, Insert, IntoComponents, IntoComponentsById,
},
},
streaming::topics::consumer_group2::{self, ConsumerGroupRef,
ConsumerGroupRefMut},
@@ -37,6 +37,8 @@ impl Insert for ConsumerGroups {
"consumer_group: id mismatch when inserting members"
);
self.index.insert(key, entity_id);
+ let root = self.root.get_mut(entity_id).unwrap();
+ root.update_id(entity_id);
entity_id
}
}
@@ -46,12 +48,12 @@ impl Delete for ConsumerGroups {
type Item = consumer_group2::ConsumerGroup;
fn delete(&mut self, id: Self::Idx) -> Self::Item {
- let (name, partitions) = self.root.remove(id).disarray();
- let members = self.members.remove(id).into_inner();
+ let root = self.root.remove(id);
+ let members = self.members.remove(id);
self.index
- .remove(&name)
+ .remove(root.key())
.expect("consumer_group_delete: key not found");
- consumer_group2::ConsumerGroup::new(name, members, partitions)
+ consumer_group2::ConsumerGroup::new_with_components(root, members)
}
}
@@ -113,6 +115,10 @@ impl ConsumerGroups {
}
}
+ pub fn len(&self) -> usize {
+ self.root.len()
+ }
+
pub fn get_index(&self, id: &Identifier) -> usize {
match id.kind {
iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as
usize,
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index 34ca117d..678846c3 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -96,6 +96,8 @@ impl Insert for Partitions {
entity_id, id,
"partition_insert: id mismatch when creating consumer_group_offset"
);
+ let root = self.root.get_mut(entity_id).unwrap();
+ root.update_id(entity_id);
entity_id
}
}
@@ -105,7 +107,23 @@ impl Delete for Partitions {
type Item = Partition;
fn delete(&mut self, id: Self::Idx) -> Self::Item {
- todo!()
+ let root = self.root.remove(id);
+ let stats = self.stats.remove(id);
+ let message_deduplicator = self.message_deduplicator.remove(id);
+ let offset = self.offset.remove(id);
+ let consumer_offset = self.consumer_offset.remove(id);
+ let consumer_group_offset = self.consumer_group_offset.remove(id);
+ let log = self.log.remove(id);
+
+ Partition::new_with_components(
+ root,
+ stats,
+ message_deduplicator,
+ offset,
+ consumer_offset,
+ consumer_group_offset,
+ log,
+ )
}
}
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 02ab2850..d733e04e 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -112,8 +112,21 @@ impl DeleteCell for Streams {
type Idx = ContainerId;
type Item = stream2::Stream;
- fn delete(&self, _id: Self::Idx) -> Self::Item {
- todo!()
+ fn delete(&self, id: Self::Idx) -> Self::Item {
+ let mut root_container = self.root.borrow_mut();
+ let mut indexes = self.index.borrow_mut();
+ let mut stats_container = self.stats.borrow_mut();
+
+ let root = root_container.remove(id);
+ let stats = stats_container.remove(id);
+
+ // Remove from index
+ let key = root.key();
+ indexes
+ .remove(key)
+ .expect("stream_delete: key not found in index");
+
+ stream2::Stream::new_with_components(root, stats)
}
}
@@ -718,7 +731,8 @@ impl Streams {
});
let (log_writer, index_writer) =
self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
- log.active_storage_mut().shutdown()
+ let (msg, index) = log.active_storage_mut().shutdown();
+ (msg.unwrap(), index.unwrap())
});
compio::runtime::spawn(async move {
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 549a2704..7508d675 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -73,8 +73,22 @@ impl DeleteCell for Topics {
type Item = topic2::Topic;
fn delete(&self, id: Self::Idx) -> Self::Item {
- // TODO: don't forget to remoev from the index
- todo!()
+ let mut root_container = self.root.borrow_mut();
+ let mut auxilaries = self.auxilaries.borrow_mut();
+ let mut indexes = self.index.borrow_mut();
+ let mut stats_container = self.stats.borrow_mut();
+
+ let root = root_container.remove(id);
+ let auxilary = auxilaries.remove(id);
+ let stats = stats_container.remove(id);
+
+ // Remove from index
+ let key = root.key();
+ indexes
+ .remove(key)
+ .expect("topic_delete: key not found in index");
+
+ topic2::Topic::new_with_components(root, auxilary, stats)
}
}
diff --git a/core/server/src/slab/traits_ext.rs
b/core/server/src/slab/traits_ext.rs
index 0db207cf..949971fa 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -158,7 +158,7 @@ type MappingByIdMut<'a, E, T> =
pub type Components<T> = <T as IntoComponents>::Components;
pub type ComponentsById<'a, T> = <T as IntoComponentsById>::Output;
-// TODO:
+// TODO:
// I've figured there is actually and ergonomic improvement that can be made
here.
// Observe that the chain of constraints put on the `EntityRef` type is
actually wrong.
// We constraint the `EntityRef` to be IntoComponents + IntoComponentsById,
@@ -173,7 +173,7 @@ pub type ComponentsById<'a, T> = <T as
IntoComponentsById>::Output;
// Maybe lets not go this way with the tuple mapping madness, it already is
pretty difficult to distinguish between all of the different components,
// and everytime we add a new component to an entity, we need to update the
tuple type everywhere.
-// Better idea would be to use the `EntityRef` type directly inside of the
`with_components_by_id` closure
+// Better idea would be to use the `EntityRef` type directly inside of the
`with_components_by_id` closure
// -- f(components.into_components_by_id(id)) ->
components.into_components_by_id(id) would return `EntityRef`, rather than the
tuple.
pub trait EntityComponentSystem<T>
where
diff --git a/core/server/src/streaming/clients/client_manager.rs
b/core/server/src/streaming/clients/client_manager.rs
index 8e41b69c..9fad13c5 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -23,7 +23,6 @@ use iggy_common::IggyError;
use iggy_common::IggyTimestamp;
use iggy_common::TransportProtocol;
use iggy_common::UserId;
-use std::fmt::{Display, Formatter};
use std::net::SocketAddr;
use std::rc::Rc;
@@ -119,7 +118,7 @@ impl ClientManager {
}
pub fn delete_client(&mut self, client_id: u32) -> Option<Client> {
- if let Some(mut client) = self.clients.remove(&client_id) {
+ if let Some(client) = self.clients.remove(&client_id) {
client.session.clear_user_id();
Some(client)
} else {
@@ -130,10 +129,13 @@ impl ClientManager {
pub fn join_consumer_group(
&mut self,
client_id: u32,
- stream_id: u32,
- topic_id: u32,
- group_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ group_id: usize,
) -> Result<(), IggyError> {
+ let stream_id = stream_id as u32;
+ let topic_id = topic_id as u32;
+ let group_id = group_id as u32;
let client = self.clients.get_mut(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
@@ -159,15 +161,18 @@ impl ClientManager {
pub fn leave_consumer_group(
&mut self,
client_id: u32,
- stream_id: u32,
- topic_id: u32,
- consumer_group_id: u32,
+ stream_id: usize,
+ topic_id: usize,
+ consumer_group_id: usize,
) -> Result<(), IggyError> {
+ let stream_id = stream_id as u32;
+ let topic_id = topic_id as u32;
+ let consumer_group_id = consumer_group_id as u32;
let client = self.clients.get_mut(&client_id);
if client.is_none() {
return Err(IggyError::ClientNotFound(client_id));
}
- let mut client = client.unwrap();
+ let client = client.unwrap();
for (index, consumer_group) in
client.consumer_groups.iter().enumerate() {
if consumer_group.stream_id == stream_id
&& consumer_group.topic_id == topic_id
@@ -180,7 +185,21 @@ impl ClientManager {
Ok(())
}
- pub fn delete_consumer_groups_for_stream(&mut self, stream_id: u32) {
+ pub fn delete_consumer_group(&mut self, stream_id: usize, topic_id: usize,
group_id: usize) {
+ let stream_id = stream_id as u32;
+ let topic_id = topic_id as u32;
+ let group_id = group_id as u32;
+ for client in self.clients.values_mut() {
+ client.consumer_groups.retain(|consumer_group| {
+ !(consumer_group.stream_id == stream_id
+ && consumer_group.topic_id == topic_id
+ && consumer_group.group_id == group_id)
+ });
+ }
+ }
+
+ pub fn delete_consumer_groups_for_stream(&mut self, stream_id: usize) {
+ let stream_id = stream_id as u32;
for client in self.clients.values_mut() {
let indexes_to_remove = client
.consumer_groups
@@ -200,7 +219,9 @@ impl ClientManager {
}
}
- pub fn delete_consumer_groups_for_topic(&mut self, stream_id: u32,
topic_id: u32) {
+ pub fn delete_consumer_groups_for_topic(&mut self, stream_id: usize,
topic_id: usize) {
+ let stream_id = stream_id as u32;
+ let topic_id = topic_id as u32;
for client in self.clients.values_mut() {
let indexes_to_remove = client
.consumer_groups
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index f9baf62e..1062a6dc 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -62,12 +62,49 @@ pub fn insert_partition(
move |partitions| partitions.insert(partition)
}
-pub fn purge_partitions_mem() -> impl FnOnce(&mut Partitions) {
+pub fn purge_partitions_mem() -> impl FnOnce(&Partitions) {
|partitions| {
partitions.with_components(|components| {
- let (_root, _stats, _deduplicator, _offset, _consumer_offset,
_cg_offset, _log) =
- components.into_components();
- // TODO: Implement purge logic
+ let (.., stats, _, offsets, _, _, _) =
components.into_components();
+ for (offset, stat) in offsets
+ .iter()
+ .map(|(_, o)| o)
+ .zip(stats.iter().map(|(_, s)| s))
+ {
+ offset.store(0, Ordering::Relaxed);
+ stat.zero_out_all();
+ }
+ })
+ }
+}
+
+pub fn purge_consumer_offsets() -> impl FnOnce(&Partitions) -> (Vec<String>,
Vec<String>) {
+ |partitions| {
+ partitions.with_components(|components| {
+ let (.., consumer_offsets, cg_offsets, _) =
components.into_components();
+
+ let mut consumer_offset_paths = Vec::new();
+ let mut consumer_group_offset_paths = Vec::new();
+
+ // Collect paths and clear consumer offsets
+ for (_, consumer_offset) in consumer_offsets {
+ let hdl = consumer_offset.pin();
+ for item in hdl.values() {
+ consumer_offset_paths.push(item.path.clone());
+ }
+ hdl.clear(); // Clear the hashmap
+ }
+
+ // Collect paths and clear consumer group offsets
+ for (_, cg_offset) in cg_offsets {
+ let hdl = cg_offset.pin();
+ for item in hdl.values() {
+ consumer_group_offset_paths.push(item.path.clone());
+ }
+ hdl.clear(); // Clear the hashmap
+ }
+
+ (consumer_offset_paths, consumer_group_offset_paths)
})
}
}
@@ -119,13 +156,13 @@ pub fn store_consumer_offset(
pub fn delete_consumer_offset(
id: usize,
-) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> {
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<String, IggyError> {
move |(.., offsets, _, _)| {
- offsets
- .pin()
+ let hdl = offsets.pin();
+ let offset = hdl
.remove(&id)
- .map(|_| ())
- .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))
+ .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))?;
+ Ok(offset.path.clone())
}
}
@@ -180,13 +217,13 @@ pub fn store_consumer_group_member_offset(
pub fn delete_consumer_group_member_offset(
id: usize,
-) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<(), IggyError> {
+) -> impl FnOnce(ComponentsById<PartitionRef>) -> Result<String, IggyError> {
move |(.., offsets, _)| {
- offsets
- .pin()
+ let hdl = offsets.pin();
+ let offset = hdl
.remove(&id)
- .map(|_| ())
- .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))
+ .ok_or_else(|| IggyError::ConsumerOffsetNotFound(id))?;
+ Ok(offset.path.clone())
}
}
@@ -218,13 +255,6 @@ pub fn delete_consumer_group_member_offset_from_disk(
}
}
-pub fn purge_segments_mem() -> impl FnOnce(&mut Partitions) {
- |_partitions| {
- // TODO:
- //partitions.segments_mut()
- }
-}
-
pub fn create_message_deduplicator(config: &SystemConfig) ->
Option<MessageDeduplicator> {
if !config.message_deduplication.enabled {
return None;
@@ -668,14 +698,14 @@ pub fn append_to_journal(
let batch_messages_size = batch.size();
let batch_messages_count = batch.count();
+ stats.increment_size_bytes(batch_messages_size as u64);
+ stats.increment_messages_count(batch_messages_count as u64);
+
segment.end_timestamp = batch.last_timestamp().unwrap();
segment.end_offset = batch.last_offset().unwrap();
let (journal_messages_count, journal_size) =
log.journal_mut().append(shard_id, batch)?;
- stats.increment_messages_count(batch_messages_count as u64);
- stats.increment_size_bytes(batch_messages_size as u64);
-
let last_offset = if batch_messages_count == 0 {
current_offset
} else {
@@ -802,14 +832,12 @@ pub fn update_index_and_increment_stats(
batch_count: u32,
config: &SystemConfig,
) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
- move |(_, stats, .., log)| {
+ move |(.., log)| {
let segment = log.active_segment_mut();
segment.size += saved.as_bytes_u32();
log.active_indexes_mut().unwrap().mark_saved();
if config.segment.cache_indexes == CacheIndexesConfig::None {
log.active_indexes_mut().unwrap().clear();
}
- stats.increment_size_bytes(saved.as_bytes_u64());
- stats.increment_messages_count(batch_count as u64);
}
}
diff --git a/core/server/src/streaming/partitions/log.rs
b/core/server/src/streaming/partitions/log.rs
index ebee527e..3ff049e9 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -56,6 +56,14 @@ where
&self.segments
}
+ pub fn segments_mut(&mut self) -> &mut Vec<Segment2> {
+ &mut self.segments
+ }
+
+ pub fn storages_mut(&mut self) -> &mut Vec<Storage> {
+ &mut self.storage
+ }
+
pub fn storages(&self) -> &Vec<Storage> {
&self.storage
}
@@ -88,6 +96,10 @@ where
&self.indexes
}
+ pub fn indexes_mut(&mut self) -> &mut Vec<Option<IggyIndexesMut>> {
+ &mut self.indexes
+ }
+
pub fn active_indexes(&self) -> Option<&IggyIndexesMut> {
self.indexes
.last()
diff --git a/core/server/src/streaming/partitions/partition2.rs
b/core/server/src/streaming/partitions/partition2.rs
index 92d8f738..6c85dfb0 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -117,6 +117,26 @@ impl Partition {
}
}
+ pub fn new_with_components(
+ root: PartitionRoot,
+ stats: Arc<PartitionStats>,
+ message_deduplicator: Option<MessageDeduplicator>,
+ offset: Arc<AtomicU64>,
+ consumer_offset: Arc<ConsumerOffsets>,
+ consumer_group_offset: Arc<ConsumerGroupOffsets>,
+ log: SegmentedLog<MemoryMessageJournal>,
+ ) -> Self {
+ Self {
+ root,
+ stats,
+ message_deduplicator,
+ offset,
+ consumer_offset,
+ consumer_group_offset,
+ log,
+ }
+ }
+
pub fn stats(&self) -> &PartitionStats {
&self.stats
}
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index becc7aea..da1b5adc 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -131,6 +131,10 @@ impl MessagesWriter {
Ok(IggyByteSize::from(messages_size as u64))
}
+ pub fn path(&self) -> String {
+ self.file_path.clone()
+ }
+
pub async fn fsync(&self) -> Result<(), IggyError> {
self.file
.sync_all()
diff --git a/core/server/src/streaming/segments/storage.rs
b/core/server/src/streaming/segments/storage.rs
index 249b67eb..84fcd783 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -1,10 +1,8 @@
use iggy_common::IggyError;
use std::rc::Rc;
use std::sync::atomic::AtomicU64;
-use tracing::warn;
use crate::configs::system::SystemConfig;
-use crate::shard_warn;
use crate::streaming::segments::{
indexes::{IndexReader, IndexWriter},
messages::{MessagesReader, MessagesWriter},
@@ -51,9 +49,9 @@ impl Storage {
})
}
- pub fn shutdown(&mut self) -> (MessagesWriter, IndexWriter) {
- let messages_writer = self.messages_writer.take().unwrap();
- let index_writer = self.index_writer.take().unwrap();
+ pub fn shutdown(&mut self) -> (Option<MessagesWriter>,
Option<IndexWriter>) {
+ let messages_writer = self.messages_writer.take();
+ let index_writer = self.index_writer.take();
(messages_writer, index_writer)
}
}
diff --git a/core/server/src/streaming/stats/stats.rs
b/core/server/src/streaming/stats/stats.rs
index 20933c42..67c464f3 100644
--- a/core/server/src/streaming/stats/stats.rs
+++ b/core/server/src/streaming/stats/stats.rs
@@ -50,6 +50,24 @@ impl StreamStats {
pub fn segments_count_inconsistent(&self) -> u32 {
self.segments_count.load(Ordering::Relaxed)
}
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
}
#[derive(Default, Debug)]
@@ -143,6 +161,43 @@ impl TopicStats {
pub fn segments_count_inconsistent(&self) -> u32 {
self.segments_count.load(Ordering::Relaxed)
}
+
+ pub fn zero_out_parent_size_bytes(&self) {
+ self.parent.zero_out_size_bytes();
+ }
+
+ pub fn zero_out_parent_messages_count(&self) {
+ self.parent.zero_out_messages_count();
+ }
+
+ pub fn zero_out_parent_segments_count(&self) {
+ self.parent.zero_out_segments_count();
+ }
+
+ pub fn zero_out_parent_all(&self) {
+ self.parent.zero_out_all();
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ self.zero_out_parent_size_bytes();
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_messages_count();
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_segments_count();
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
}
#[derive(Default, Debug)]
@@ -236,4 +291,41 @@ impl PartitionStats {
pub fn segments_count_inconsistent(&self) -> u32 {
self.segments_count.load(Ordering::Relaxed)
}
+
+ pub fn zero_out_parent_size_bytes(&self) {
+ self.parent.zero_out_size_bytes();
+ }
+
+ pub fn zero_out_parent_messages_count(&self) {
+ self.parent.zero_out_messages_count();
+ }
+
+ pub fn zero_out_parent_segments_count(&self) {
+ self.parent.zero_out_segments_count();
+ }
+
+ pub fn zero_out_parent_all(&self) {
+ self.parent.zero_out_all();
+ }
+
+ pub fn zero_out_size_bytes(&self) {
+ self.size_bytes.store(0, Ordering::Relaxed);
+ self.zero_out_parent_size_bytes();
+ }
+
+ pub fn zero_out_messages_count(&self) {
+ self.messages_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_messages_count();
+ }
+
+ pub fn zero_out_segments_count(&self) {
+ self.segments_count.store(0, Ordering::Relaxed);
+ self.zero_out_parent_segments_count();
+ }
+
+ pub fn zero_out_all(&self) {
+ self.zero_out_size_bytes();
+ self.zero_out_messages_count();
+ self.zero_out_segments_count();
+ }
}
diff --git a/core/server/src/streaming/streams/helpers.rs
b/core/server/src/streaming/streams/helpers.rs
index f71027e6..73afc612 100644
--- a/core/server/src/streaming/streams/helpers.rs
+++ b/core/server/src/streaming/streams/helpers.rs
@@ -2,7 +2,7 @@ use crate::{
configs::system::SystemConfig,
slab::{
streams,
- traits_ext::{ComponentsById, EntityComponentSystem},
+ traits_ext::{ComponentsById, EntityComponentSystem, IntoComponents},
},
streaming::{
partitions,
@@ -25,6 +25,18 @@ pub fn update_stream_name(name: String) -> impl
FnOnce(ComponentsById<StreamRefM
}
}
+pub fn get_topic_ids() -> impl FnOnce(ComponentsById<StreamRef>) -> Vec<usize>
{
+ |(root, _)| {
+ root.topics().with_components(|components| {
+ let (topic_roots, ..) = components.into_components();
+ topic_roots
+ .iter()
+ .map(|(_, topic)| topic.id())
+ .collect::<Vec<_>>()
+ })
+ }
+}
+
pub fn store_consumer_offset(
consumer_id: usize,
topic_id: &Identifier,
diff --git a/core/server/src/streaming/streams/stream2.rs
b/core/server/src/streaming/streams/stream2.rs
index 098ed8fc..d637e29d 100644
--- a/core/server/src/streaming/streams/stream2.rs
+++ b/core/server/src/streaming/streams/stream2.rs
@@ -110,6 +110,10 @@ impl Stream {
Self { root, stats }
}
+ pub fn new_with_components(root: StreamRoot, stats: Arc<StreamStats>) ->
Self {
+ Self { root, stats }
+ }
+
pub fn stats(&self) -> &Arc<StreamStats> {
&self.stats
}
diff --git a/core/server/src/streaming/topics/consumer_group2.rs
b/core/server/src/streaming/topics/consumer_group2.rs
index ef690087..0236d3da 100644
--- a/core/server/src/streaming/topics/consumer_group2.rs
+++ b/core/server/src/streaming/topics/consumer_group2.rs
@@ -53,6 +53,10 @@ impl ConsumerGroupRoot {
&self.partitions
}
+ pub fn update_id(&mut self, id: usize) {
+ self.id = id;
+ }
+
pub fn assign_partitions(&mut self, partitions: Vec<usize>) {
self.partitions = partitions;
}
@@ -171,6 +175,14 @@ impl ConsumerGroup {
let members = ConsumerGroupMembers { inner: members };
Self { root, members }
}
+
+ pub fn new_with_components(root: ConsumerGroupRoot, members:
ConsumerGroupMembers) -> Self {
+ Self { root, members }
+ }
+
+ pub fn members(&self) -> &ConsumerGroupMembers {
+ &self.members
+ }
}
#[derive(Debug)]
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 57122b31..222aa20b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -66,10 +66,6 @@ pub fn delete_topic(topic_id: &Identifier) -> impl
FnOnce(&Topics) -> Topic {
}
}
-pub fn purge_topic_disk() -> impl AsyncFnOnce(ComponentsById<TopicRef>) {
- async |(root, ..)| {}
-}
-
pub fn exists(identifier: &Identifier) -> impl FnOnce(&Topics) -> bool {
|topics| topics.exists(identifier)
}
@@ -238,6 +234,10 @@ fn assign_partitions_to_members(
.iter_mut()
.for_each(|(_, member)| member.partitions.clear());
let count = members.len();
+ if count == 0 {
+ return;
+ }
+
for (idx, partition) in partitions.iter().enumerate() {
let position = idx % count;
let member = &mut members[position];
@@ -251,6 +251,7 @@ fn assign_partitions_to_members(
);
}
}
+
fn mimic_members(members: &Slab<Member>) -> Slab<Member> {
let mut container = Slab::with_capacity(members.len());
for (_, member) in members {
diff --git a/core/server/src/streaming/topics/topic2.rs
b/core/server/src/streaming/topics/topic2.rs
index e2ae830c..64d4fc69 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -89,6 +89,17 @@ impl Topic {
stats,
}
}
+ pub fn new_with_components(
+ root: TopicRoot,
+ auxilary: TopicAuxilary,
+ stats: Arc<TopicStats>,
+ ) -> Self {
+ Self {
+ root,
+ auxilary,
+ stats,
+ }
+ }
pub fn root(&self) -> &TopicRoot {
&self.root