This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f44adee78872dca0656c39d3805081db3a40c572
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Jul 1 09:11:12 2025 +0200

    feat(io_uring): fix metadata crud operations (#1950)
    
    This PR fixes the metadata crud operations, preventing double borrow_mut
    call on collections across awaits
---
 core/integration/tests/streaming/stream.rs     |   1 -
 core/server/src/shard/mod.rs                   |   1 +
 core/server/src/shard/system/messages.rs       |  15 +--
 core/server/src/shard/system/partitions.rs     | 115 ++++++++++++++++------
 core/server/src/shard/system/streams.rs        |   7 ++
 core/server/src/shard/system/topics.rs         | 130 ++++++++++++++++---------
 core/server/src/streaming/streams/topics.rs    |  29 +-----
 core/server/src/streaming/topics/partitions.rs |  37 ++-----
 8 files changed, 196 insertions(+), 139 deletions(-)

diff --git a/core/integration/tests/streaming/stream.rs 
b/core/integration/tests/streaming/stream.rs
index 677e2d32..4cfa88ba 100644
--- a/core/integration/tests/streaming/stream.rs
+++ b/core/integration/tests/streaming/stream.rs
@@ -132,7 +132,6 @@ async fn should_purge_existing_stream_on_disk() {
                 MaxTopicSize::ServerDefault,
                 1,
             )
-            .await
             .unwrap();
 
         let messages = create_messages();
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6c3718f8..8ffc6560 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -400,6 +400,7 @@ impl IggyShard {
             }
         }
 
+        //TODO: Refactor...
         let mut streams_states = streams
             .into_iter()
             .filter(|s| !missing_ids.contains(&s.id))
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index b27b034c..c5e527d0 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -53,12 +53,15 @@ impl IggyShard {
             )
         })?;
         let stream_id = stream.stream_id;
-        let numeric_topic_id = stream.get_topic(topic_id).map(|topic| 
topic.topic_id).with_error_context(|error| {
-            format!(
-                "Failed to get topic with ID: {} (error: {})",
-                topic_id, error
-            )
-        })?;
+        let numeric_topic_id = stream
+            .get_topic(topic_id)
+            .map(|topic| topic.topic_id)
+            .with_error_context(|error| {
+                format!(
+                    "Failed to get topic with ID: {} (error: {})",
+                    topic_id, error
+                )
+            })?;
         // TODO: We should look into refactoring those permissioners, so they 
can accept `Identifier` instead of numeric IDs.
         // Validate permissions for given user on stream and topic.
         self.permissioner.borrow().append_messages(
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 78ef2144..a5b59516 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -24,7 +24,9 @@ use crate::streaming::session::Session;
 use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
+use iggy_common::locking::IggySharedMutFn;
 
+// TODO: MAJOR REFACTOR!!!!!!!!!!!!!!!!!
 impl IggyShard {
     pub async fn create_partitions(
         &self,
@@ -33,6 +35,9 @@ impl IggyShard {
         topic_id: &Identifier,
         partitions_count: u32,
     ) -> Result<(), IggyError> {
+        // This whole method is yeah....
+        // I don't admit to writing it.
+        // Sorry, not sorry.
         self.ensure_authenticated(session)?;
         {
             let stream = self.get_stream(stream_id).with_error_context(|error| 
{
@@ -53,35 +58,61 @@ impl IggyShard {
             ))?;
         }
 
-        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
-        })?;
-        let stream_id = stream.stream_id;
-        let topic = stream
+        let partition_ids = {
+            let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
+            })?;
+            let stream_id = stream.stream_id;
+            let topic = stream
             .get_topic_mut(topic_id)
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {stream_id}"
                 )
             })?;
-        let topic_id = topic.topic_id;
-
-        // TODO: Make add persisted partitions to topic sync, and extract the 
storage persister out of it
-        // perform disk i/o outside of the borrow_mut of the stream.
-        let partition_ids = topic
+            let partition_ids = topic
             .add_persisted_partitions(partitions_count)
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to add 
persisted partitions, topic: {topic}")
             })?;
-        let records = partition_ids.into_iter().map(|partition_id| {
-            let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
-            let hash = namespace.generate_hash();
-            let shard_id = hash % self.get_available_shards_count();
-            let shard_info = ShardInfo::new(shard_id as u16);
-            (namespace, shard_info)
-        });
-        self.insert_shard_table_records(records);
+            partition_ids
+        };
+
+        {
+            let stream = self.get_stream(stream_id).with_error_context(|error| 
{
+                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
+            })?;
+            let stream_id = stream.stream_id;
+            let topic = stream.get_topic(topic_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get topic with 
ID: {topic_id} in stream with ID: {stream_id}")
+        })?;
+            let topic_id = topic.topic_id;
+            for partition_id in &partition_ids {
+                let partition = topic.partitions.get(partition_id).unwrap();
+                let mut partition = partition.write().await;
+                partition.persist().await.with_error_context(|error| {
+                    format!(
+                        "{COMPONENT} (error: {error}) - failed to persist 
partition with id: {}",
+                        partition.partition_id
+                    )
+                })?;
+            }
+            let records = partition_ids.into_iter().map(|partition_id| {
+                let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
+                let hash = namespace.generate_hash();
+                let shard_id = hash % self.get_available_shards_count();
+                let shard_info = ShardInfo::new(shard_id as u16);
+                (namespace, shard_info)
+            });
+            self.insert_shard_table_records(records);
+        }
+
+        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let topic = stream.get_topic_mut(topic_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get topic with 
ID: {topic_id} in stream with ID: {stream_id}")
+        })?;
 
         topic.reassign_consumer_groups();
         self.metrics.increment_partitions(partitions_count);
@@ -116,10 +147,11 @@ impl IggyShard {
             ))?;
         }
 
-        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
-        })?;
-        let topic = stream
+        let (numeric_topic_id, partitions) = {
+            let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}")
+            })?;
+            let topic = stream
             .get_topic_mut(topic_id)
             .with_error_context(|error| {
                 format!(
@@ -127,19 +159,42 @@ impl IggyShard {
                 )
             })?;
 
-        // TODO: Make delete persisted partitions from topic sync, and extract 
the storage persister out of it
-        // perform disk i/o outside of the borrow_mut of the stream.
-        let partitions = topic
+            let partitions = topic
             .delete_persisted_partitions(partitions_count)
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to delete 
persisted partitions for topic: {topic}")
             })?;
+            (topic.topic_id, partitions)
+        };
+
+        let mut segments_count = 0;
+        let mut messages_count = 0;
+        for partition in &partitions {
+            let mut partition = partition.write().await;
+            let partition_id = partition.partition_id;
+            let partition_messages_count = partition.get_messages_count();
+            segments_count += partition.get_segments_count();
+            messages_count += partition_messages_count;
+            partition.delete().await.with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to delete partition 
with ID: {} in topic with ID: {}",
+                    partition_id,
+                    numeric_topic_id
+                )
+            })?;
+        }
+
+        let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let topic = stream.get_topic_mut(topic_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get topic with 
ID: {topic_id}")
+        })?;
         topic.reassign_consumer_groups();
-        if let Some(partitions) = partitions {
+        if partitions.len() > 0 {
             self.metrics.decrement_partitions(partitions_count);
-            self.metrics.decrement_segments(partitions.segments_count);
-            self.metrics.decrement_messages(partitions.messages_count);
+            self.metrics.decrement_segments(segments_count);
+            self.metrics.decrement_messages(messages_count);
         }
         Ok(())
     }
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 47068f11..b4921066 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -298,6 +298,13 @@ impl IggyShard {
             })?;
             old_name = stream.name.clone();
             stream.name = name.to_owned();
+            // Drop the exclusive borrow.
+            drop(stream);
+            // Get it again using inclusive borrow
+            let stream = self.get_stream(id).with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with id: {id}")
+            })?;
+            // Persist it.
             stream.persist().await?;
         }
 
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index c247cf74..0f8155fb 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -26,6 +26,7 @@ use error_set::ErrContext;
 use iggy_common::locking::IggySharedMutFn;
 use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
 use tokio_util::io::StreamReader;
+use tracing::info;
 
 impl IggyShard {
     pub fn find_topic<'topic, 'stream>(
@@ -112,28 +113,35 @@ impl IggyShard {
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
     ) -> Result<(), IggyError> {
-        let (stream_id, topic_id, partition_ids) = self
-            .create_topic_base(
-                stream_id,
-                topic_id,
-                name,
-                partitions_count,
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-            )
-            .await?;
+        let (topic_id, partition_ids) = self.create_topic_base(
+            stream_id,
+            topic_id,
+            name,
+            partitions_count,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+        )?;
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let topic = stream
+            .get_topic(&Identifier::numeric(topic_id)?)
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id} in stream with ID: {stream_id}")
+            })?;
+        topic.persist().await.with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
+        })?;
+
         // TODO: Figure out a way how to distribute the shards table among 
different shards,
         // without the need to do code from below, everytime we handle a 
`ShardEvent`.
-        // I think we shouldn't be sharing it, maintain a single shard table 
per shard,
-        // but figure out a way how to distribute it smarter (maybe move the 
broadcast inside of the `insert_shard_table_records`) method.
+        // I think we shouldn't be sharing it tho and still maintain a single 
shard table per shard,
+        // but find a way how to distribute it smarter (maybe move the 
broadcast inside of the `insert_shard_table_records`) method.
         let records = partition_ids.into_iter().map(|partition_id| {
             let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
-            // TODO: This setup isn't deterministic.
-            // Imagine a scenario where client creates partition using 
`String` identifiers,
-            // but then for poll_messages requests uses numeric ones.
-            // the namespace wouldn't match, therefore we would get miss in 
the shard table.
             let hash = namespace.generate_hash();
             let shard_id = hash % self.get_available_shards_count();
             let shard_info = ShardInfo::new(shard_id as u16);
@@ -176,24 +184,32 @@ impl IggyShard {
                 })?;
         }
 
-        let (stream_id, topic_id, partition_ids) = self
-            .create_topic_base(
-                stream_id,
-                topic_id,
-                name,
-                partitions_count,
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-            )
-            .await?;
+        let (topic_id, partition_ids) = self.create_topic_base(
+            stream_id,
+            topic_id,
+            name,
+            partitions_count,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+        )?;
+
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let stream_id = stream.stream_id;
+        let topic = stream
+                .get_topic(&Identifier::numeric(topic_id)?)
+                .with_error_context(|error| {
+                    format!("{COMPONENT} (error: {error}) - failed to get 
topic with ID: {topic_id} in stream with ID: {stream_id}")
+                })?;
+        topic.persist().await.with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
+        })?;
+
         let records = partition_ids.into_iter().map(|partition_id| {
             let namespace = IggyNamespace::new(stream_id, topic_id, 
partition_id);
-            // TODO: This setup isn't deterministic.
-            // Imagine a scenario where client creates partition using 
`String` identifiers,
-            // but then for poll_messages requests uses numeric ones.
-            // the namespace wouldn't match, therefore we would get miss in 
the shard table.
             let hash = namespace.generate_hash();
             let shard_id = hash % self.get_available_shards_count();
             let shard_info = ShardInfo::new(shard_id as u16);
@@ -208,7 +224,7 @@ impl IggyShard {
         Ok(Identifier::numeric(topic_id)?)
     }
 
-    async fn create_topic_base(
+    fn create_topic_base(
         &self,
         stream_id: &Identifier,
         topic_id: Option<u32>,
@@ -218,9 +234,7 @@ impl IggyShard {
         compression_algorithm: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-    ) -> Result<(u32, u32, Vec<u32>), IggyError> {
-        // TODO: Make create topic sync, and extract the storage persister out 
of it
-        // perform disk i/o outside of the borrow_mut of the stream.
+    ) -> Result<(u32, Vec<u32>), IggyError> {
         let mut stream = 
self.get_stream_mut(stream_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}")
         })?;
@@ -235,11 +249,10 @@ impl IggyShard {
                 max_topic_size,
                 replication_factor.unwrap_or(1),
             )
-            .await
             .with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to create topic 
with name: {name} in stream ID: {stream_id}")
             })?;
-        Ok((stream_id, topic_id, partition_ids))
+        Ok((topic_id, partition_ids))
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -280,8 +293,6 @@ impl IggyShard {
             })?;
         }
 
-        // TODO: Make update topic sync, and extract the storage persister out 
of it
-        // perform disk i/o outside of the borrow_mut of the stream.
         self.get_stream_mut(stream_id)?
             .update_topic(
                 topic_id,
@@ -291,12 +302,32 @@ impl IggyShard {
                 max_topic_size,
                 replication_factor.unwrap_or(1),
             )
-            .await
             .with_error_context(|error| {
                 format!(
                     "{COMPONENT} (error: {error}) - failed to update topic 
with ID: {topic_id} in stream with ID: {stream_id}",
                 )
             })?;
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
+        })?;
+        let topic = stream
+            .get_topic(topic_id)
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get topic with 
ID: {topic_id} in stream with ID: {stream_id}",
+                )
+            })?;
+        for partition in topic.partitions.values() {
+            let mut partition = partition.write().await;
+            partition.message_expiry = message_expiry;
+            for segment in partition.segments.iter_mut() {
+                segment.update_message_expiry(message_expiry);
+            }
+        }
+        topic.persist().await.with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
+        })?;
+        info!("Updated topic: {topic}");
 
         // TODO: if message_expiry is changed, we need to check if we need to 
purge messages based on the new expiry
         // TODO: if max_size_bytes is changed, we need to check if we need to 
purge messages based on the new size
@@ -334,13 +365,20 @@ impl IggyShard {
             stream_id_value = topic.stream_id;
         }
 
-        // TODO: Make delete topic sync, and extract the storage persister out 
of it
-        // perform disk i/o outside of the borrow_mut of the stream.
         let topic = self
             .get_stream_mut(stream_id)?
             .delete_topic(topic_id)
-            .await
             .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}"))?;
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!("{COMPONENT} (error: {error}) - failed to get mutable 
reference to stream with ID: {stream_id}")
+        })?;
+        topic
+            .delete()
+            .await
+            .with_error_context(|error| {
+                format!("{COMPONENT} (error: {error}) - failed to delete 
topic: {topic}")
+            })
+            .map_err(|_| IggyError::CannotDeleteTopic(topic.topic_id, 
stream.stream_id))?;
 
         self.metrics.decrement_topics(1);
         self.metrics
diff --git a/core/server/src/streaming/streams/topics.rs 
b/core/server/src/streaming/streams/topics.rs
index 6a201c7a..7833efa2 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -36,7 +36,7 @@ impl Stream {
     }
 
     #[allow(clippy::too_many_arguments)]
-    pub async fn create_topic(
+    pub fn create_topic(
         &mut self,
         topic_id: Option<u32>,
         name: &str,
@@ -94,16 +94,13 @@ impl Stream {
             replication_factor,
         )?;
 
-        topic.persist().await.with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - failed to persist topic: 
{topic}")
-        })?;
         info!("Created topic {}", topic);
         self.topics_ids.insert(name.to_owned(), id);
         self.topics.insert(id, topic);
         Ok((id, partition_ids))
     }
 
-    pub async fn update_topic(
+    pub fn update_topic(
         &mut self,
         id: &Identifier,
         name: &str,
@@ -150,21 +147,9 @@ impl Stream {
             topic.name = name.to_owned();
             topic.message_expiry = message_expiry;
             topic.compression_algorithm = compression_algorithm;
-            for partition in topic.partitions.values_mut() {
-                let mut partition = partition.write().await;
-                partition.message_expiry = message_expiry;
-                for segment in partition.segments.iter_mut() {
-                    segment.update_message_expiry(message_expiry);
-                }
-            }
             topic.max_topic_size = max_topic_size;
             topic.replication_factor = replication_factor;
-            topic.persist().await.with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to persist 
topic: {topic}")
-            })?;
-            info!("Updated topic: {topic}");
         }
-
         Ok(())
     }
 
@@ -253,7 +238,7 @@ impl Stream {
             .ok_or(IggyError::TopicIdNotFound(topic_id, self.stream_id))
     }
 
-    pub async fn delete_topic(&mut self, id: &Identifier) -> Result<Topic, 
IggyError> {
+    pub fn delete_topic(&mut self, id: &Identifier) -> Result<Topic, 
IggyError> {
         let topic = self.remove_topic(id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to remove topic 
with id: {id}")
         })?;
@@ -263,13 +248,6 @@ impl Stream {
             self.current_topic_id.store(topic_id, Ordering::SeqCst);
         }
 
-        topic
-            .delete()
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to delete 
topic: {topic}")
-            })
-            .map_err(|_| IggyError::CannotDeleteTopic(topic.topic_id, 
self.stream_id))?;
         Ok(topic)
     }
 }
@@ -319,7 +297,6 @@ mod tests {
                 max_topic_size,
                 1,
             )
-            .await
             .unwrap();
 
         let topic = stream.get_topic(&Identifier::numeric(topic_id).unwrap());
diff --git a/core/server/src/streaming/topics/partitions.rs 
b/core/server/src/streaming/topics/partitions.rs
index b9e56331..cbbe15e5 100644
--- a/core/server/src/streaming/topics/partitions.rs
+++ b/core/server/src/streaming/topics/partitions.rs
@@ -69,29 +69,19 @@ impl Topic {
         Ok(partition_ids)
     }
 
-    pub async fn add_persisted_partitions(&mut self, count: u32) -> 
Result<Vec<u32>, IggyError> {
+    pub fn add_persisted_partitions(&mut self, count: u32) -> Result<Vec<u32>, 
IggyError> {
         let partition_ids = 
self.add_partitions(count).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to add partitions, 
count: {count}")
         })?;
-        for partition_id in &partition_ids {
-            let partition = self.partitions.get(partition_id).unwrap();
-            let mut partition = partition.write().await;
-            partition.persist().await.with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to persist 
partition with id: {}",
-                    partition.partition_id
-                )
-            })?;
-        }
         Ok(partition_ids)
     }
 
-    pub async fn delete_persisted_partitions(
+    pub fn delete_persisted_partitions(
         &mut self,
         mut count: u32,
-    ) -> Result<Option<DeletedPartitions>, IggyError> {
+    ) -> Result<Vec<IggyRwLock<Partition>>, IggyError> {
         if count == 0 {
-            return Ok(None);
+            return Ok(vec![]);
         }
 
         let current_partitions_count = self.partitions.len() as u32;
@@ -99,25 +89,12 @@ impl Topic {
             count = current_partitions_count;
         }
 
-        let mut segments_count = 0;
-        let mut messages_count = 0;
+        let mut partitions = Vec::with_capacity(count as usize);
         for partition_id in current_partitions_count - count + 
1..=current_partitions_count {
             let partition = self.partitions.remove(&partition_id).unwrap();
-            let mut partition = partition.write().await;
-            let partition_messages_count = partition.get_messages_count();
-            segments_count += partition.get_segments_count();
-            messages_count += partition_messages_count;
-            partition.delete().await.with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - failed to delete partition 
with ID: {partition_id} in topic with ID: {}",
-                    self.topic_id
-                )
-            })?;
+            partitions.push(partition);
         }
-        Ok(Some(DeletedPartitions {
-            segments_count,
-            messages_count,
-        }))
+        Ok(partitions)
     }
 }
 

Reply via email to