This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new fe5c0ae98 feat(io_uring): fix integration tests for http transport 
(#2249)
fe5c0ae98 is described below

commit fe5c0ae986ca9e66983aabe7f4dd1ee9f020da1a
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Oct 7 17:41:13 2025 +0200

    feat(io_uring): fix integration tests for http transport (#2249)
---
 core/integration/tests/server/general.rs           |   2 +-
 .../scenarios/stream_size_validation_scenario.rs   |   2 +-
 .../handlers/streams/update_stream_handler.rs      |   1 -
 core/server/src/http/consumer_groups.rs            | 121 +++++++++++++--------
 core/server/src/http/partitions.rs                 |  44 ++++++--
 core/server/src/http/streams.rs                    |  74 +++++++++++--
 core/server/src/http/topics.rs                     |  71 +++++++++++-
 core/server/src/http/users.rs                      |  89 +++++++++++++++
 core/server/src/shard/system/segments.rs           |  15 ++-
 core/server/src/shard/system/streams.rs            |  21 +++-
 core/server/src/shard/system/topics.rs             |  34 ++++--
 core/server/src/streaming/stats/mod.rs             |   1 +
 12 files changed, 388 insertions(+), 87 deletions(-)

diff --git a/core/integration/tests/server/general.rs 
b/core/integration/tests/server/general.rs
index 2a384332f..e8a0b5ffa 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -25,7 +25,7 @@ use test_case::test_matrix;
 
 // TODO: Include other trasnsport protocols
 #[test_matrix(
-    [TransportProtocol::Tcp],
+    [TransportProtocol::Http, TransportProtocol::Tcp],
     [
         system_scenario(),
         user_scenario(),
diff --git 
a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs 
b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
index c816b2dac..17e5bc870 100644
--- a/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
+++ b/core/integration/tests/server/scenarios/stream_size_validation_scenario.rs
@@ -20,7 +20,7 @@ use crate::server::scenarios::{PARTITION_ID, 
PARTITIONS_COUNT, create_client};
 use bytes::Bytes;
 use iggy::prelude::*;
 use integration::test_server::{ClientFactory, assert_clean_system, login_root};
-use std::str::FromStr;
+use std::{str::FromStr, time::Duration};
 
 const S1_NAME: &str = "test-stream-1";
 const T1_NAME: &str = "test-topic-1";
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 2e81915a1..996db7f4b 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -45,7 +45,6 @@ impl ServerCommandHandler for UpdateStream {
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
-
         shard
         .update_stream2(session, &self.stream_id, self.name.clone())
         .with_error_context(|error| {
diff --git a/core/server/src/http/consumer_groups.rs 
b/core/server/src/http/consumer_groups.rs
index d4883470e..18e7cfa94 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -236,53 +236,86 @@ async fn delete_consumer_group(
     let identifier_topic_id = Identifier::from_str_value(&topic_id)?;
     let identifier_group_id = Identifier::from_str_value(&group_id)?;
 
-    let session = Session::stateless(identity.user_id, identity.ip_address);
-
-    // Delete using the new API
-    let consumer_group = state.shard.shard().delete_consumer_group2(
-        &session,
-        &identifier_stream_id,
-        &identifier_topic_id,
-        &identifier_group_id
-    )
-    .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed 
to delete consumer group with ID: {group_id} for topic with ID: {topic_id} in 
stream with ID: {stream_id}"))?;
-
-    let cg_id = consumer_group.id();
-
-    // Send event for consumer group deletion
-    {
-        let broadcast_future = SendWrapper::new(async {
-            use crate::shard::transmission::event::ShardEvent;
-            let event = ShardEvent::DeletedConsumerGroup2 {
-                id: cg_id,
-                stream_id: identifier_stream_id.clone(),
-                topic_id: identifier_topic_id.clone(),
-                group_id: identifier_group_id.clone(),
-            };
-            let _responses = state
+    let result = SendWrapper::new(async move {
+        let session = Session::stateless(identity.user_id, 
identity.ip_address);
+
+        // Delete using the new API
+        let consumer_group = state.shard.shard().delete_consumer_group2(
+            &session,
+            &identifier_stream_id,
+            &identifier_topic_id,
+            &identifier_group_id
+        )
+        .with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
failed to delete consumer group with ID: {group_id} for topic with ID: 
{topic_id} in stream with ID: {stream_id}"))?;
+
+        let cg_id = consumer_group.id();
+
+        // Remove all consumer group members from ClientManager
+        let stream_id_usize = state.shard.shard().streams2.with_stream_by_id(
+            &identifier_stream_id,
+            crate::streaming::streams::helpers::get_stream_id(),
+        );
+        let topic_id_usize = state.shard.shard().streams2.with_topic_by_id(
+            &identifier_stream_id,
+            &identifier_topic_id,
+            crate::streaming::topics::helpers::get_topic_id(),
+        );
+
+        // TODO: Tech debt, repeated code from 
`delete_consumer_group_handler.rs`
+        // Get members from the deleted consumer group and make them leave
+        let slab = consumer_group.members().inner().shared_get();
+        for (_, member) in slab.iter() {
+            if let Err(err) = 
state.shard.shard().client_manager.borrow_mut().leave_consumer_group(
+                member.client_id,
+                stream_id_usize,
+                topic_id_usize,
+                cg_id,
+            ) {
+                tracing::warn!(
+                    "{COMPONENT} (error: {err}) - failed to make client leave 
consumer group for client ID: {}, group ID: {}",
+                    member.client_id,
+                    cg_id
+                );
+            }
+        }
+
+        // Send event for consumer group deletion
+        {
+            let broadcast_future = SendWrapper::new(async {
+                use crate::shard::transmission::event::ShardEvent;
+                let event = ShardEvent::DeletedConsumerGroup2 {
+                    id: cg_id,
+                    stream_id: identifier_stream_id.clone(),
+                    topic_id: identifier_topic_id.clone(),
+                    group_id: identifier_group_id.clone(),
+                };
+                let _responses = state
+                    .shard
+                    .shard()
+                    .broadcast_event_to_all_shards(event)
+                    .await;
+            });
+            broadcast_future.await;
+        }
+
+        // Apply state change
+        let entry_command = 
EntryCommand::DeleteConsumerGroup(DeleteConsumerGroup {
+            stream_id: identifier_stream_id,
+            topic_id: identifier_topic_id,
+            group_id: identifier_group_id,
+        });
+        let state_future = SendWrapper::new(
+            state
                 .shard
                 .shard()
-                .broadcast_event_to_all_shards(event)
-                .await;
-        });
-        broadcast_future.await;
-    }
+                .state
+                .apply(identity.user_id, &entry_command),
+        );
 
-    // Apply state change
-    let entry_command = EntryCommand::DeleteConsumerGroup(DeleteConsumerGroup {
-        stream_id: identifier_stream_id,
-        topic_id: identifier_topic_id,
-        group_id: identifier_group_id,
-    });
-    let state_future = SendWrapper::new(
-        state
-            .shard
-            .shard()
-            .state
-            .apply(identity.user_id, &entry_command),
-    );
+        state_future.await?;
 
-    state_future.await?;
+        Ok::<StatusCode, CustomError>(StatusCode::NO_CONTENT)
+    });
 
-    Ok(StatusCode::NO_CONTENT)
+    result.await
 }
diff --git a/core/server/src/http/partitions.rs 
b/core/server/src/http/partitions.rs
index 831925194..308e9d30f 100644
--- a/core/server/src/http/partitions.rs
+++ b/core/server/src/http/partitions.rs
@@ -111,18 +111,38 @@ async fn delete_partitions(
     query.validate()?;
 
     let session = Session::stateless(identity.user_id, identity.ip_address);
-    let delete_future = 
SendWrapper::new(state.shard.shard().delete_partitions2(
-        &session,
-        &query.stream_id,
-        &query.topic_id,
-        query.partitions_count,
-    ));
-
-    delete_future.await.with_error_context(|error| {
-        format!(
-            "{COMPONENT} (error: {error}) - failed to delete partitions for 
topic with ID: {topic_id} in stream with ID: {stream_id}"
-        )
-    })?;
+    let deleted_partition_ids = {
+        let delete_future = 
SendWrapper::new(state.shard.shard().delete_partitions2(
+            &session,
+            &query.stream_id,
+            &query.topic_id,
+            query.partitions_count,
+        ));
+
+        delete_future.await.with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - failed to delete partitions 
for topic with ID: {topic_id} in stream with ID: {stream_id}"
+            )
+        })?
+    };
+
+    // Send event for partition deletion
+    {
+        let broadcast_future = SendWrapper::new(async {
+            let event = ShardEvent::DeletedPartitions2 {
+                stream_id: query.stream_id.clone(),
+                topic_id: query.topic_id.clone(),
+                partitions_count: query.partitions_count,
+                partition_ids: deleted_partition_ids,
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
 
     let command = EntryCommand::DeletePartitions(DeletePartitions {
         stream_id: query.stream_id.clone(),
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index ab71c5517..51aad3ee5 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -184,6 +184,23 @@ async fn update_stream(
                 )
             })?;
 
+        // Send event for stream update
+        {
+            let broadcast_future = SendWrapper::new(async {
+                use crate::shard::transmission::event::ShardEvent;
+                let event = ShardEvent::UpdatedStream2 {
+                    stream_id: command.stream_id.clone(),
+                    name: command.name.clone(),
+                };
+                let _responses = state
+                    .shard
+                    .shard()
+                    .broadcast_event_to_all_shards(event)
+                    .await;
+            });
+            broadcast_future.await;
+        }
+
         // Apply state change using wrapper method
         let entry_command = EntryCommand::UpdateStream(command);
         state.shard.apply_state(identity.user_id, 
&entry_command).await.with_error_context(|error| {
@@ -209,16 +226,37 @@ async fn delete_stream(
     let result = SendWrapper::new(async move {
         let session = Session::stateless(identity.user_id, 
identity.ip_address);
 
-        // Delete stream using wrapper method
-        state
-            .shard
-            .delete_stream(&session, &identifier_stream_id)
-            .await
-            .with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to delete stream 
with ID: {stream_id}",
-                )
-            })?;
+        // Delete stream and get the stream entity
+        let stream = {
+            let future = SendWrapper::new(state.shard.shard().delete_stream2(
+                &session,
+                &identifier_stream_id,
+            ));
+            future.await
+        }.with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - failed to delete stream with 
ID: {stream_id}",
+            )
+        })?;
+
+        let stream_id_numeric = stream.root().id();
+
+        // Send event for stream deletion
+        {
+            let broadcast_future = SendWrapper::new(async {
+                use crate::shard::transmission::event::ShardEvent;
+                let event = ShardEvent::DeletedStream2 {
+                    id: stream_id_numeric,
+                    stream_id: identifier_stream_id.clone(),
+                };
+                let _responses = state
+                    .shard
+                    .shard()
+                    .broadcast_event_to_all_shards(event)
+                    .await;
+            });
+            broadcast_future.await;
+        }
 
         // Apply state change using wrapper method
         let entry_command = EntryCommand::DeleteStream(DeleteStream {
@@ -259,6 +297,22 @@ async fn purge_stream(
                 )
             })?;
 
+        // Send event for stream purge
+        {
+            let broadcast_future = SendWrapper::new(async {
+                use crate::shard::transmission::event::ShardEvent;
+                let event = ShardEvent::PurgedStream2 {
+                    stream_id: identifier_stream_id.clone(),
+                };
+                let _responses = state
+                    .shard
+                    .shard()
+                    .broadcast_event_to_all_shards(event)
+                    .await;
+            });
+            broadcast_future.await;
+        }
+
         // Apply state change using wrapper method
         let entry_command = EntryCommand::PurgeStream(PurgeStream {
             stream_id: identifier_stream_id,
diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs
index b57a0bb47..59599eebe 100644
--- a/core/server/src/http/topics.rs
+++ b/core/server/src/http/topics.rs
@@ -282,6 +282,7 @@ async fn update_topic(
 
     let session = Session::stateless(identity.user_id, identity.ip_address);
 
+    let name_changed = !command.name.is_empty();
     state.shard.shard().update_topic2(
         &session,
         &command.stream_id,
@@ -297,13 +298,42 @@ async fn update_topic(
         )
     })?;
 
+    // TODO: Tech debt.
+    let topic_id = if name_changed {
+        Identifier::named(&command.name.clone()).unwrap()
+    } else {
+        command.topic_id.clone()
+    };
+
     // Get the updated values from the topic
     let (message_expiry, max_topic_size) = 
state.shard.shard().streams2.with_topic_by_id(
         &command.stream_id,
-        &command.topic_id,
+        &topic_id,
         |(root, _, _)| (root.message_expiry(), root.max_topic_size()),
     );
 
+    // Send event for topic update
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::UpdatedTopic2 {
+                stream_id: command.stream_id.clone(),
+                topic_id: command.topic_id.clone(),
+                name: command.name.clone(),
+                message_expiry: command.message_expiry,
+                compression_algorithm: command.compression_algorithm,
+                max_topic_size: command.max_topic_size,
+                replication_factor: command.replication_factor,
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     command.message_expiry = message_expiry;
     command.max_topic_size = max_topic_size;
 
@@ -333,7 +363,7 @@ async fn delete_topic(
 
     let session = Session::stateless(identity.user_id, identity.ip_address);
 
-    {
+    let topic = {
         let future = SendWrapper::new(state.shard.shard().delete_topic2(
             &session,
             &identifier_stream_id,
@@ -346,6 +376,26 @@ async fn delete_topic(
         )
     })?;
 
+    let topic_id_numeric = topic.root().id();
+
+    // Send event for topic deletion
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::DeletedTopic2 {
+                id: topic_id_numeric,
+                stream_id: identifier_stream_id.clone(),
+                topic_id: identifier_topic_id.clone(),
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let entry_command = EntryCommand::DeleteTopic(DeleteTopic {
             stream_id: identifier_stream_id,
@@ -391,6 +441,23 @@ async fn purge_topic(
         )
     })?;
 
+    // Send event for topic purge
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::PurgedTopic {
+                stream_id: identifier_stream_id.clone(),
+                topic_id: identifier_topic_id.clone(),
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let entry_command = EntryCommand::PurgeTopic(PurgeTopic {
             stream_id: identifier_stream_id,
diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs
index a1de84bf2..56c48bdaf 100644
--- a/core/server/src/http/users.rs
+++ b/core/server/src/http/users.rs
@@ -137,6 +137,26 @@ async fn create_user(
     let user_id = user.id;
     let response = Json(mapper::map_user(&user));
 
+    // Send event for user creation
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::CreatedUser {
+                user_id,
+                username: command.username.to_owned(),
+                password: command.password.to_owned(),
+                status: command.status,
+                permissions: command.permissions.clone(),
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let username = command.username.clone();
         let entry_command = EntryCommand::CreateUser(CreateUserWithId {
@@ -192,6 +212,24 @@ async fn update_user(
             format!("{COMPONENT} (error: {error}) - failed to update user, 
user ID: {user_id}")
         })?;
 
+    // Send event for user update
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::UpdatedUser {
+                user_id: command.user_id.clone(),
+                username: command.username.clone(),
+                status: command.status,
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let username = command.username.clone();
         let entry_command = EntryCommand::UpdateUser(command);
@@ -235,6 +273,23 @@ async fn update_permissions(
             )
         })?;
 
+    // Send event for permissions update
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::UpdatedPermissions {
+                user_id: command.user_id.clone(),
+                permissions: command.permissions.clone(),
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let entry_command = EntryCommand::UpdatePermissions(command);
         let future = SendWrapper::new(
@@ -279,6 +334,24 @@ async fn change_password(
             format!("{COMPONENT} (error: {error}) - failed to change password, 
user ID: {user_id}")
         })?;
 
+    // Send event for password change
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::ChangedPassword {
+                user_id: command.user_id.clone(),
+                current_password: command.current_password.clone(),
+                new_password: command.new_password.clone(),
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let entry_command = EntryCommand::ChangePassword(command);
         let future = SendWrapper::new(
@@ -316,6 +389,22 @@ async fn delete_user(
             format!("{COMPONENT} (error: {error}) - failed to delete user with 
ID: {user_id}")
         })?;
 
+    // Send event for user deletion
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::DeletedUser {
+                user_id: identifier_user_id.clone(),
+            };
+            let _responses = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     {
         let entry_command = EntryCommand::DeleteUser(DeleteUser {
             user_id: identifier_user_id,
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index 8b96c610c..055e0d76f 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -40,11 +40,11 @@ impl IggyShard {
         partition_id: usize,
         segments_count: u32,
     ) -> Result<(), IggyError> {
-        let (segments, storages) = self.streams2.with_partition_by_id_mut(
+        let (segments, storages, stats) = 
self.streams2.with_partition_by_id_mut(
             stream_id,
             topic_id,
             partition_id,
-            |(.., log)| {
+            |(_, stats,.., log)| {
                 let upperbound = log.segments().len();
                 let begin = upperbound.saturating_sub(segments_count as usize);
                 let segments = log
@@ -59,7 +59,7 @@ impl IggyShard {
                     .indexes_mut()
                     .drain(begin..upperbound)
                     .collect::<Vec<_>>();
-                (segments, storages)
+                (segments, storages, stats.clone())
             },
         );
         let numeric_stream_id = self
@@ -71,7 +71,6 @@ impl IggyShard {
             streaming::topics::helpers::get_topic_id(),
         );
 
-        let create_base_segment = !segments.is_empty() && !storages.is_empty();
         for (mut storage, segment) in 
storages.into_iter().zip(segments.into_iter()) {
             let (msg_writer, index_writer) = storage.shutdown();
             if let Some(msg_writer) = msg_writer
@@ -101,12 +100,12 @@ impl IggyShard {
                 })?;
             }
         }
-
-        if create_base_segment {
-            self.init_log(stream_id, topic_id, partition_id).await?;
-        }
+        self.init_log(stream_id, topic_id, partition_id).await?;
+        // TODO: Tech debt. make the increment seg count be part of init_log.
+        stats.increment_segments_count(1);
         Ok(())
     }
+
     pub async fn delete_segments(
         &self,
         session: &Session,
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 6e76b2ca5..c8677e93f 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -147,11 +147,30 @@ impl IggyShard {
                 )
             })?;
         let mut stream = self.delete_stream2_base(id);
-        // Clean up consumer groups from ClientManager for this stream
         let stream_id_usize = stream.id();
+        
+        // Clean up consumer groups from ClientManager for this stream
         self.client_manager
             .borrow_mut()
             .delete_consumer_groups_for_stream(stream_id_usize);
+        
+        // Remove all entries from shards_table for this stream (all topics 
and partitions)
+        let namespaces_to_remove: Vec<_> = self.shards_table
+            .iter()
+            .filter_map(|entry| {
+                let (ns, _) = entry.pair();
+                if ns.stream_id() == stream_id_usize {
+                    Some(*ns)
+                } else {
+                    None
+                }
+            })
+            .collect();
+        
+        for ns in namespaces_to_remove {
+            self.remove_shard_table_record(&ns);
+        }
+        
         delete_stream_from_disk(self.id, &mut stream, 
&self.config.system).await?;
         Ok(stream)
     }
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index ed8c348d0..48218474e 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -218,10 +218,30 @@ impl IggyShard {
                     )
                 })?;
         let mut topic = self.delete_topic_base2(stream_id, topic_id);
+        let topic_id_numeric = topic.id();
+        
         // Clean up consumer groups from ClientManager for this topic
         self.client_manager
             .borrow_mut()
-            .delete_consumer_groups_for_topic(numeric_stream_id, topic.id());
+            .delete_consumer_groups_for_topic(numeric_stream_id, 
topic_id_numeric);
+        
+        // Remove all partition entries from shards_table for this topic
+        let namespaces_to_remove: Vec<_> = self.shards_table
+            .iter()
+            .filter_map(|entry| {
+                let (ns, _) = entry.pair();
+                if ns.stream_id() == numeric_stream_id && ns.topic_id() == 
topic_id_numeric {
+                    Some(*ns)
+                } else {
+                    None
+                }
+            })
+            .collect();
+        
+        for ns in namespaces_to_remove {
+            self.remove_shard_table_record(&ns);
+        }
+        
         let parent = topic.stats().parent().clone();
         // We need to borrow topic as mutable, as we are extracting partitions 
out of it, in order to close them.
         let (messages_count, size_bytes, segments_count) =
@@ -279,12 +299,6 @@ impl IggyShard {
             })?;
         }
 
-        self.streams2.with_partitions(
-            stream_id,
-            topic_id,
-            partitions::helpers::purge_partitions_mem(),
-        );
-
         let (consumer_offset_paths, consumer_group_offset_paths) = 
self.streams2.with_partitions(
             stream_id,
             topic_id,
@@ -323,6 +337,12 @@ impl IggyShard {
                     roots.iter().map(|(_, root)| root.id()).collect::<Vec<_>>()
                 })
             });
+
+        self.streams2.with_partitions(
+            stream_id,
+            topic_id,
+            partitions::helpers::purge_partitions_mem(),
+        );
         for part_id in part_ids {
             self.delete_segments_bypass_auth(stream_id, topic_id, part_id, 
u32::MAX)
                 .await?;
diff --git a/core/server/src/streaming/stats/mod.rs 
b/core/server/src/streaming/stats/mod.rs
index 67c464f30..2c89e832d 100644
--- a/core/server/src/streaming/stats/mod.rs
+++ b/core/server/src/streaming/stats/mod.rs
@@ -327,5 +327,6 @@ impl PartitionStats {
         self.zero_out_size_bytes();
         self.zero_out_messages_count();
         self.zero_out_segments_count();
+        self.zero_out_parent_all();
     }
 }

Reply via email to