This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new fe5c0ae98 feat(io_uring): fix integration tests for http transport
(#2249)
fe5c0ae98 is described below
commit fe5c0ae986ca9e66983aabe7f4dd1ee9f020da1a
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Oct 7 17:41:13 2025 +0200
feat(io_uring): fix integration tests for http transport (#2249)
---
core/integration/tests/server/general.rs | 2 +-
.../scenarios/stream_size_validation_scenario.rs | 2 +-
.../handlers/streams/update_stream_handler.rs | 1 -
core/server/src/http/consumer_groups.rs | 121 +++++++++++++--------
core/server/src/http/partitions.rs | 44 ++++++--
core/server/src/http/streams.rs | 74 +++++++++++--
core/server/src/http/topics.rs | 71 +++++++++++-
core/server/src/http/users.rs | 89 +++++++++++++++
core/server/src/shard/system/segments.rs | 15 ++-
core/server/src/shard/system/streams.rs | 21 +++-
core/server/src/shard/system/topics.rs | 34 ++++--
core/server/src/streaming/stats/mod.rs | 1 +
12 files changed, 388 insertions(+), 87 deletions(-)
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
index 2a384332f..e8a0b5ffa 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -25,7 +25,7 @@ use test_case::test_matrix;
// TODO: Include other trasnsport protocols
#[test_matrix(
- [TransportProtocol::Tcp],
+ [TransportProtocol::Http, TransportProtocol::Tcp],
[
system_scenario(),
user_scenario(),
diff --git
a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
index c816b2dac..17e5bc870 100644
--- a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
+++ b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
@@ -20,7 +20,7 @@ use crate::server::scenarios::{PARTITION_ID,
PARTITIONS_COUNT, create_client};
use bytes::Bytes;
use iggy::prelude::*;
use integration::test_server::{ClientFactory, assert_clean_system, login_root};
-use std::str::FromStr;
+use std::{str::FromStr, time::Duration};
const S1_NAME: &str = "test-stream-1";
const T1_NAME: &str = "test-topic-1";
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 2e81915a1..996db7f4b 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -45,7 +45,6 @@ impl ServerCommandHandler for UpdateStream {
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
let stream_id = self.stream_id.clone();
-
shard
.update_stream2(session, &self.stream_id, self.name.clone())
.with_error_context(|error| {
diff --git a/core/server/src/http/consumer_groups.rs
b/core/server/src/http/consumer_groups.rs
index d4883470e..18e7cfa94 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -236,53 +236,86 @@ async fn delete_consumer_group(
let identifier_topic_id = Identifier::from_str_value(&topic_id)?;
let identifier_group_id = Identifier::from_str_value(&group_id)?;
- let session = Session::stateless(identity.user_id, identity.ip_address);
-
- // Delete using the new API
- let consumer_group = state.shard.shard().delete_consumer_group2(
- &session,
- &identifier_stream_id,
- &identifier_topic_id,
- &identifier_group_id
- )
- .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed
to delete consumer group with ID: {group_id} for topic with ID: {topic_id} in
stream with ID: {stream_id}"))?;
-
- let cg_id = consumer_group.id();
-
- // Send event for consumer group deletion
- {
- let broadcast_future = SendWrapper::new(async {
- use crate::shard::transmission::event::ShardEvent;
- let event = ShardEvent::DeletedConsumerGroup2 {
- id: cg_id,
- stream_id: identifier_stream_id.clone(),
- topic_id: identifier_topic_id.clone(),
- group_id: identifier_group_id.clone(),
- };
- let _responses = state
+ let result = SendWrapper::new(async move {
+ let session = Session::stateless(identity.user_id,
identity.ip_address);
+
+ // Delete using the new API
+ let consumer_group = state.shard.shard().delete_consumer_group2(
+ &session,
+ &identifier_stream_id,
+ &identifier_topic_id,
+ &identifier_group_id
+ )
+ .with_error_context(|error| format!("{COMPONENT} (error: {error}) -
failed to delete consumer group with ID: {group_id} for topic with ID:
{topic_id} in stream with ID: {stream_id}"))?;
+
+ let cg_id = consumer_group.id();
+
+ // Remove all consumer group members from ClientManager
+ let stream_id_usize = state.shard.shard().streams2.with_stream_by_id(
+ &identifier_stream_id,
+ crate::streaming::streams::helpers::get_stream_id(),
+ );
+ let topic_id_usize = state.shard.shard().streams2.with_topic_by_id(
+ &identifier_stream_id,
+ &identifier_topic_id,
+ crate::streaming::topics::helpers::get_topic_id(),
+ );
+
+ // TODO: Tech debt, repeated code from
`delete_consumer_group_handler.rs`
+ // Get members from the deleted consumer group and make them leave
+ let slab = consumer_group.members().inner().shared_get();
+ for (_, member) in slab.iter() {
+ if let Err(err) =
state.shard.shard().client_manager.borrow_mut().leave_consumer_group(
+ member.client_id,
+ stream_id_usize,
+ topic_id_usize,
+ cg_id,
+ ) {
+ tracing::warn!(
+ "{COMPONENT} (error: {err}) - failed to make client leave
consumer group for client ID: {}, group ID: {}",
+ member.client_id,
+ cg_id
+ );
+ }
+ }
+
+ // Send event for consumer group deletion
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::DeletedConsumerGroup2 {
+ id: cg_id,
+ stream_id: identifier_stream_id.clone(),
+ topic_id: identifier_topic_id.clone(),
+ group_id: identifier_group_id.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
+ // Apply state change
+ let entry_command =
EntryCommand::DeleteConsumerGroup(DeleteConsumerGroup {
+ stream_id: identifier_stream_id,
+ topic_id: identifier_topic_id,
+ group_id: identifier_group_id,
+ });
+ let state_future = SendWrapper::new(
+ state
.shard
.shard()
- .broadcast_event_to_all_shards(event)
- .await;
- });
- broadcast_future.await;
- }
+ .state
+ .apply(identity.user_id, &entry_command),
+ );
- // Apply state change
- let entry_command = EntryCommand::DeleteConsumerGroup(DeleteConsumerGroup {
- stream_id: identifier_stream_id,
- topic_id: identifier_topic_id,
- group_id: identifier_group_id,
- });
- let state_future = SendWrapper::new(
- state
- .shard
- .shard()
- .state
- .apply(identity.user_id, &entry_command),
- );
+ state_future.await?;
- state_future.await?;
+ Ok::<StatusCode, CustomError>(StatusCode::NO_CONTENT)
+ });
- Ok(StatusCode::NO_CONTENT)
+ result.await
}
diff --git a/core/server/src/http/partitions.rs
b/core/server/src/http/partitions.rs
index 831925194..308e9d30f 100644
--- a/core/server/src/http/partitions.rs
+++ b/core/server/src/http/partitions.rs
@@ -111,18 +111,38 @@ async fn delete_partitions(
query.validate()?;
let session = Session::stateless(identity.user_id, identity.ip_address);
- let delete_future =
SendWrapper::new(state.shard.shard().delete_partitions2(
- &session,
- &query.stream_id,
- &query.topic_id,
- query.partitions_count,
- ));
-
- delete_future.await.with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to delete partitions for
topic with ID: {topic_id} in stream with ID: {stream_id}"
- )
- })?;
+ let deleted_partition_ids = {
+ let delete_future =
SendWrapper::new(state.shard.shard().delete_partitions2(
+ &session,
+ &query.stream_id,
+ &query.topic_id,
+ query.partitions_count,
+ ));
+
+ delete_future.await.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete partitions
for topic with ID: {topic_id} in stream with ID: {stream_id}"
+ )
+ })?
+ };
+
+ // Send event for partition deletion
+ {
+ let broadcast_future = SendWrapper::new(async {
+ let event = ShardEvent::DeletedPartitions2 {
+ stream_id: query.stream_id.clone(),
+ topic_id: query.topic_id.clone(),
+ partitions_count: query.partitions_count,
+ partition_ids: deleted_partition_ids,
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
let command = EntryCommand::DeletePartitions(DeletePartitions {
stream_id: query.stream_id.clone(),
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index ab71c5517..51aad3ee5 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -184,6 +184,23 @@ async fn update_stream(
)
})?;
+ // Send event for stream update
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::UpdatedStream2 {
+ stream_id: command.stream_id.clone(),
+ name: command.name.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
// Apply state change using wrapper method
let entry_command = EntryCommand::UpdateStream(command);
state.shard.apply_state(identity.user_id,
&entry_command).await.with_error_context(|error| {
@@ -209,16 +226,37 @@ async fn delete_stream(
let result = SendWrapper::new(async move {
let session = Session::stateless(identity.user_id,
identity.ip_address);
- // Delete stream using wrapper method
- state
- .shard
- .delete_stream(&session, &identifier_stream_id)
- .await
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to delete stream
with ID: {stream_id}",
- )
- })?;
+ // Delete stream and get the stream entity
+ let stream = {
+ let future = SendWrapper::new(state.shard.shard().delete_stream2(
+ &session,
+ &identifier_stream_id,
+ ));
+ future.await
+ }.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete stream with
ID: {stream_id}",
+ )
+ })?;
+
+ let stream_id_numeric = stream.root().id();
+
+ // Send event for stream deletion
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::DeletedStream2 {
+ id: stream_id_numeric,
+ stream_id: identifier_stream_id.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
// Apply state change using wrapper method
let entry_command = EntryCommand::DeleteStream(DeleteStream {
@@ -259,6 +297,22 @@ async fn purge_stream(
)
})?;
+ // Send event for stream purge
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::PurgedStream2 {
+ stream_id: identifier_stream_id.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
// Apply state change using wrapper method
let entry_command = EntryCommand::PurgeStream(PurgeStream {
stream_id: identifier_stream_id,
diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs
index b57a0bb47..59599eebe 100644
--- a/core/server/src/http/topics.rs
+++ b/core/server/src/http/topics.rs
@@ -282,6 +282,7 @@ async fn update_topic(
let session = Session::stateless(identity.user_id, identity.ip_address);
+ let name_changed = !command.name.is_empty();
state.shard.shard().update_topic2(
&session,
&command.stream_id,
@@ -297,13 +298,42 @@ async fn update_topic(
)
})?;
+ // TODO: Tech debt.
+ let topic_id = if name_changed {
+ Identifier::named(&command.name.clone()).unwrap()
+ } else {
+ command.topic_id.clone()
+ };
+
// Get the updated values from the topic
let (message_expiry, max_topic_size) =
state.shard.shard().streams2.with_topic_by_id(
&command.stream_id,
- &command.topic_id,
+ &topic_id,
|(root, _, _)| (root.message_expiry(), root.max_topic_size()),
);
+ // Send event for topic update
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::UpdatedTopic2 {
+ stream_id: command.stream_id.clone(),
+ topic_id: command.topic_id.clone(),
+ name: command.name.clone(),
+ message_expiry: command.message_expiry,
+ compression_algorithm: command.compression_algorithm,
+ max_topic_size: command.max_topic_size,
+ replication_factor: command.replication_factor,
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
command.message_expiry = message_expiry;
command.max_topic_size = max_topic_size;
@@ -333,7 +363,7 @@ async fn delete_topic(
let session = Session::stateless(identity.user_id, identity.ip_address);
- {
+ let topic = {
let future = SendWrapper::new(state.shard.shard().delete_topic2(
&session,
&identifier_stream_id,
@@ -346,6 +376,26 @@ async fn delete_topic(
)
})?;
+ let topic_id_numeric = topic.root().id();
+
+ // Send event for topic deletion
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::DeletedTopic2 {
+ id: topic_id_numeric,
+ stream_id: identifier_stream_id.clone(),
+ topic_id: identifier_topic_id.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let entry_command = EntryCommand::DeleteTopic(DeleteTopic {
stream_id: identifier_stream_id,
@@ -391,6 +441,23 @@ async fn purge_topic(
)
})?;
+ // Send event for topic purge
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::PurgedTopic {
+ stream_id: identifier_stream_id.clone(),
+ topic_id: identifier_topic_id.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let entry_command = EntryCommand::PurgeTopic(PurgeTopic {
stream_id: identifier_stream_id,
diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs
index a1de84bf2..56c48bdaf 100644
--- a/core/server/src/http/users.rs
+++ b/core/server/src/http/users.rs
@@ -137,6 +137,26 @@ async fn create_user(
let user_id = user.id;
let response = Json(mapper::map_user(&user));
+ // Send event for user creation
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::CreatedUser {
+ user_id,
+ username: command.username.to_owned(),
+ password: command.password.to_owned(),
+ status: command.status,
+ permissions: command.permissions.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let username = command.username.clone();
let entry_command = EntryCommand::CreateUser(CreateUserWithId {
@@ -192,6 +212,24 @@ async fn update_user(
format!("{COMPONENT} (error: {error}) - failed to update user,
user ID: {user_id}")
})?;
+ // Send event for user update
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::UpdatedUser {
+ user_id: command.user_id.clone(),
+ username: command.username.clone(),
+ status: command.status,
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let username = command.username.clone();
let entry_command = EntryCommand::UpdateUser(command);
@@ -235,6 +273,23 @@ async fn update_permissions(
)
})?;
+ // Send event for permissions update
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::UpdatedPermissions {
+ user_id: command.user_id.clone(),
+ permissions: command.permissions.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let entry_command = EntryCommand::UpdatePermissions(command);
let future = SendWrapper::new(
@@ -279,6 +334,24 @@ async fn change_password(
format!("{COMPONENT} (error: {error}) - failed to change password,
user ID: {user_id}")
})?;
+ // Send event for password change
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::ChangedPassword {
+ user_id: command.user_id.clone(),
+ current_password: command.current_password.clone(),
+ new_password: command.new_password.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let entry_command = EntryCommand::ChangePassword(command);
let future = SendWrapper::new(
@@ -316,6 +389,22 @@ async fn delete_user(
format!("{COMPONENT} (error: {error}) - failed to delete user with
ID: {user_id}")
})?;
+ // Send event for user deletion
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::DeletedUser {
+ user_id: identifier_user_id.clone(),
+ };
+ let _responses = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
{
let entry_command = EntryCommand::DeleteUser(DeleteUser {
user_id: identifier_user_id,
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index 8b96c610c..055e0d76f 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -40,11 +40,11 @@ impl IggyShard {
partition_id: usize,
segments_count: u32,
) -> Result<(), IggyError> {
- let (segments, storages) = self.streams2.with_partition_by_id_mut(
+ let (segments, storages, stats) =
self.streams2.with_partition_by_id_mut(
stream_id,
topic_id,
partition_id,
- |(.., log)| {
+ |(_, stats,.., log)| {
let upperbound = log.segments().len();
let begin = upperbound.saturating_sub(segments_count as usize);
let segments = log
@@ -59,7 +59,7 @@ impl IggyShard {
.indexes_mut()
.drain(begin..upperbound)
.collect::<Vec<_>>();
- (segments, storages)
+ (segments, storages, stats.clone())
},
);
let numeric_stream_id = self
@@ -71,7 +71,6 @@ impl IggyShard {
streaming::topics::helpers::get_topic_id(),
);
- let create_base_segment = !segments.is_empty() && !storages.is_empty();
for (mut storage, segment) in
storages.into_iter().zip(segments.into_iter()) {
let (msg_writer, index_writer) = storage.shutdown();
if let Some(msg_writer) = msg_writer
@@ -101,12 +100,12 @@ impl IggyShard {
})?;
}
}
-
- if create_base_segment {
- self.init_log(stream_id, topic_id, partition_id).await?;
- }
+ self.init_log(stream_id, topic_id, partition_id).await?;
+ // TODO: Tech debt. make the increment seg count be part of init_log.
+ stats.increment_segments_count(1);
Ok(())
}
+
pub async fn delete_segments(
&self,
session: &Session,
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 6e76b2ca5..c8677e93f 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -147,11 +147,30 @@ impl IggyShard {
)
})?;
let mut stream = self.delete_stream2_base(id);
- // Clean up consumer groups from ClientManager for this stream
let stream_id_usize = stream.id();
+
+ // Clean up consumer groups from ClientManager for this stream
self.client_manager
.borrow_mut()
.delete_consumer_groups_for_stream(stream_id_usize);
+
+ // Remove all entries from shards_table for this stream (all topics
and partitions)
+ let namespaces_to_remove: Vec<_> = self.shards_table
+ .iter()
+ .filter_map(|entry| {
+ let (ns, _) = entry.pair();
+ if ns.stream_id() == stream_id_usize {
+ Some(*ns)
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ for ns in namespaces_to_remove {
+ self.remove_shard_table_record(&ns);
+ }
+
delete_stream_from_disk(self.id, &mut stream,
&self.config.system).await?;
Ok(stream)
}
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index ed8c348d0..48218474e 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -218,10 +218,30 @@ impl IggyShard {
)
})?;
let mut topic = self.delete_topic_base2(stream_id, topic_id);
+ let topic_id_numeric = topic.id();
+
// Clean up consumer groups from ClientManager for this topic
self.client_manager
.borrow_mut()
- .delete_consumer_groups_for_topic(numeric_stream_id, topic.id());
+ .delete_consumer_groups_for_topic(numeric_stream_id,
topic_id_numeric);
+
+ // Remove all partition entries from shards_table for this topic
+ let namespaces_to_remove: Vec<_> = self.shards_table
+ .iter()
+ .filter_map(|entry| {
+ let (ns, _) = entry.pair();
+ if ns.stream_id() == numeric_stream_id && ns.topic_id() ==
topic_id_numeric {
+ Some(*ns)
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ for ns in namespaces_to_remove {
+ self.remove_shard_table_record(&ns);
+ }
+
let parent = topic.stats().parent().clone();
// We need to borrow topic as mutable, as we are extracting partitions
out of it, in order to close them.
let (messages_count, size_bytes, segments_count) =
@@ -279,12 +299,6 @@ impl IggyShard {
})?;
}
- self.streams2.with_partitions(
- stream_id,
- topic_id,
- partitions::helpers::purge_partitions_mem(),
- );
-
let (consumer_offset_paths, consumer_group_offset_paths) =
self.streams2.with_partitions(
stream_id,
topic_id,
@@ -323,6 +337,12 @@ impl IggyShard {
roots.iter().map(|(_, root)| root.id()).collect::<Vec<_>>()
})
});
+
+ self.streams2.with_partitions(
+ stream_id,
+ topic_id,
+ partitions::helpers::purge_partitions_mem(),
+ );
for part_id in part_ids {
self.delete_segments_bypass_auth(stream_id, topic_id, part_id,
u32::MAX)
.await?;
diff --git a/core/server/src/streaming/stats/mod.rs
b/core/server/src/streaming/stats/mod.rs
index 67c464f30..2c89e832d 100644
--- a/core/server/src/streaming/stats/mod.rs
+++ b/core/server/src/streaming/stats/mod.rs
@@ -327,5 +327,6 @@ impl PartitionStats {
self.zero_out_size_bytes();
self.zero_out_messages_count();
self.zero_out_segments_count();
+ self.zero_out_parent_all();
}
}