This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new e58bee328 fix(server): handle entity ID gaps on bootstrap after 
deletions (#2548)
e58bee328 is described below

commit e58bee328fdd3b66cc87115d3733941a25c13fa0
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Jan 8 12:36:58 2026 +0100

    fix(server): handle entity ID gaps on bootstrap after deletions (#2548)
    
    Server panicked on restart when streams, topics, partitions, or consumer
    groups had been deleted, creating gaps in ID sequences.
    
    The issue: `slab.insert()` auto-assigns sequential IDs (0, 1, 2...),
    but after deletions the state file contains non-sequential IDs (e.g.,
    {0, 2} after deleting ID 1). On restart, bootstrap called `insert()`
    which returned sequential IDs, causing assertion failures when the
    assigned ID didn't match the expected ID from state.
    
    The fix uses `Slab::from_iter()` via new `from_entries()` constructors
    on Streams, Topics, Partitions, and ConsumerGroups. This correctly
    places entities at their original keys regardless of gaps.
---
 .../data_integrity/verify_after_server_restart.rs  | 210 ++++++++++++++++++++-
 core/server/src/bootstrap.rs                       | 203 +++++++++-----------
 core/server/src/shard/system/consumer_offsets.rs   |  10 +-
 core/server/src/slab/consumer_groups.rs            |  27 +++
 core/server/src/slab/partitions.rs                 |  38 ++++
 core/server/src/slab/streams.rs                    |  25 +++
 core/server/src/slab/topics.rs                     |  28 +++
 core/server/src/streaming/streams/stream.rs        |   4 +
 core/server/src/streaming/topics/topic.rs          |   8 +
 9 files changed, 434 insertions(+), 119 deletions(-)

diff --git 
a/core/integration/tests/data_integrity/verify_after_server_restart.rs 
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 3a6059ee1..f632abd92 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -16,9 +16,7 @@
  * under the License.
  */
 
-use iggy::clients::client::IggyClient;
-use iggy::prelude::{ConsumerGroupClient, Identifier, IggyByteSize, 
MessageClient, SystemClient};
-use iggy_common::TransportProtocol;
+use iggy::prelude::*;
 use integration::bench_utils::run_bench_and_wait_for_finish;
 use integration::{
     tcp_client::TcpClientFactory,
@@ -319,3 +317,209 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     // 16. Manual cleanup
     std::fs::remove_dir_all(local_data_path).unwrap();
 }
+
+/// Test that verifies server correctly handles ID gaps after deletions at all 
levels:
+/// streams, topics, partitions, and consumer groups.
+///
+/// Creates a hierarchy with gaps from deletions:
+/// - Streams: 0, 2 (1 deleted)
+/// - Topics per stream: 0, 2 (1 deleted)
+/// - Partitions per topic: 0, 2 (1 deleted)
+/// - Consumer groups per topic: 0, 2 (1 deleted)
+#[tokio::test]
+#[parallel]
+async fn should_handle_resource_deletion_and_restart() {
+    let env_vars = HashMap::from([(
+        SYSTEM_PATH_ENV_VAR.to_owned(),
+        TestServer::get_random_path(),
+    )]);
+
+    let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, 
IpAddrKind::V4);
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let local_data_path = test_server.get_local_data_path().to_owned();
+
+    let client = IggyClient::create(
+        TcpClientFactory {
+            server_addr: server_addr.clone(),
+            ..Default::default()
+        }
+        .create_client()
+        .await,
+        None,
+        None,
+    );
+    login_root(&client).await;
+
+    // Create 3 streams
+    let stream_0 = client.create_stream("stream-0").await.unwrap();
+    let stream_1 = client.create_stream("stream-1").await.unwrap();
+    let stream_2 = client.create_stream("stream-2").await.unwrap();
+
+    assert_eq!(stream_0.id, 0);
+    assert_eq!(stream_1.id, 1);
+    assert_eq!(stream_2.id, 2);
+
+    // For streams 0 and 2, create topics, partitions, and consumer groups 
with gaps
+    for stream_id in [0u32, 2u32] {
+        let stream_ident = Identifier::numeric(stream_id).unwrap();
+
+        // Create 3 topics per stream (each with 3 partitions initially)
+        for topic_idx in 0..3 {
+            let topic = client
+                .create_topic(
+                    &stream_ident,
+                    &format!("topic-{}", topic_idx),
+                    3,
+                    CompressionAlgorithm::None,
+                    None,
+                    IggyExpiry::NeverExpire,
+                    MaxTopicSize::Unlimited,
+                )
+                .await
+                .unwrap();
+            assert_eq!(topic.id, topic_idx);
+
+            let topic_ident = Identifier::numeric(topic_idx).unwrap();
+
+            // Create 3 consumer groups per topic
+            for cg_idx in 0..3 {
+                client
+                    .create_consumer_group(&stream_ident, &topic_ident, 
&format!("cg-{}", cg_idx))
+                    .await
+                    .unwrap();
+            }
+        }
+
+        // Delete middle topic (topic 1)
+        client
+            .delete_topic(&stream_ident, &Identifier::numeric(1).unwrap())
+            .await
+            .unwrap();
+
+        // For remaining topics (0 and 2), delete middle partition then middle 
consumer group
+        // This order tests that the server handles partition deletion before 
consumer group deletion
+        for topic_id in [0u32, 2u32] {
+            let topic_ident = Identifier::numeric(topic_id).unwrap();
+
+            client
+                .delete_partitions(&stream_ident, &topic_ident, 1)
+                .await
+                .unwrap();
+
+            client
+                .delete_consumer_group(
+                    &stream_ident,
+                    &topic_ident,
+                    &Identifier::numeric(1).unwrap(),
+                )
+                .await
+                .unwrap();
+        }
+    }
+
+    // Delete middle stream (stream 1)
+    client
+        .delete_stream(&Identifier::numeric(1).unwrap())
+        .await
+        .unwrap();
+
+    // Verify state before restart
+    let streams = client.get_streams().await.unwrap();
+    assert_eq!(streams.len(), 2);
+    let stream_ids: Vec<u32> = streams.iter().map(|s| s.id).collect();
+    assert!(stream_ids.contains(&0) && stream_ids.contains(&2));
+
+    drop(client);
+    test_server.stop();
+    drop(test_server);
+
+    // Restart server
+    let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, 
IpAddrKind::V4);
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+    let client = IggyClient::create(
+        TcpClientFactory {
+            server_addr,
+            ..Default::default()
+        }
+        .create_client()
+        .await,
+        None,
+        None,
+    );
+    login_root(&client).await;
+
+    // Verify streams after restart
+    let streams = client.get_streams().await.unwrap();
+    assert_eq!(streams.len(), 2, "Expected 2 streams after restart");
+    let stream_ids: Vec<u32> = streams.iter().map(|s| s.id).collect();
+    assert!(
+        stream_ids.contains(&0) && stream_ids.contains(&2),
+        "Expected streams 0 and 2, got: {:?}",
+        stream_ids
+    );
+
+    // Verify topics, partitions, and consumer groups for each stream
+    for stream_id in [0u32, 2u32] {
+        let stream_ident = Identifier::numeric(stream_id).unwrap();
+        let stream = client.get_stream(&stream_ident).await.unwrap().unwrap();
+
+        assert_eq!(
+            stream.topics_count, 2,
+            "Stream {} should have 2 topics",
+            stream_id
+        );
+
+        // Verify topics have correct IDs (0 and 2, not 0 and 1)
+        let topics = client.get_topics(&stream_ident).await.unwrap();
+        let topic_ids: Vec<u32> = topics.iter().map(|t| t.id).collect();
+        assert!(
+            topic_ids.contains(&0) && topic_ids.contains(&2),
+            "Stream {} should have topics 0 and 2, got: {:?}",
+            stream_id,
+            topic_ids
+        );
+
+        for topic_id in [0u32, 2u32] {
+            let topic_ident = Identifier::numeric(topic_id).unwrap();
+            let topic = client
+                .get_topic(&stream_ident, &topic_ident)
+                .await
+                .unwrap()
+                .unwrap();
+
+            assert_eq!(
+                topic.partitions_count, 2,
+                "Topic {} in stream {} should have 2 partitions",
+                topic_id, stream_id
+            );
+
+            // Verify consumer groups have correct IDs (0 and 2)
+            let cgs = client
+                .get_consumer_groups(&stream_ident, &topic_ident)
+                .await
+                .unwrap();
+            assert_eq!(
+                cgs.len(),
+                2,
+                "Topic {} in stream {} should have 2 consumer groups",
+                topic_id,
+                stream_id
+            );
+            let cg_ids: Vec<u32> = cgs.iter().map(|c| c.id).collect();
+            assert!(
+                cg_ids.contains(&0) && cg_ids.contains(&2),
+                "Topic {} in stream {} should have consumer groups 0 and 2, 
got: {:?}",
+                topic_id,
+                stream_id,
+                cg_ids
+            );
+        }
+    }
+
+    drop(client);
+    test_server.stop();
+    std::fs::remove_dir_all(local_data_path).unwrap();
+}
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 67c832231..361c224e5 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -36,13 +36,10 @@ use crate::{
         },
     },
     slab::{
-        streams::Streams,
-        traits_ext::{
-            EntityComponentSystem, EntityComponentSystemMutCell, Insert, 
InsertCell, IntoComponents,
-        },
-        users::Users,
+        consumer_groups::ConsumerGroups, partitions::Partitions, 
streams::Streams, topics::Topics,
+        traits_ext::IntoComponents, users::Users,
     },
-    state::system::{StreamState, TopicState, UserState},
+    state::system::{StreamState, UserState},
     streaming::{
         partitions::{
             consumer_offset::ConsumerOffset,
@@ -82,144 +79,120 @@ pub async fn load_streams(
     state: impl IntoIterator<Item = StreamState>,
     config: &SystemConfig,
 ) -> Result<Streams, IggyError> {
-    let streams = Streams::default();
-    for StreamState {
-        name,
-        created_at,
-        id,
-        topics,
-    } in state
-    {
+    let state: Vec<StreamState> = state.into_iter().collect();
+    let mut stream_entries = Vec::with_capacity(state.len());
+
+    for stream_state in state {
+        let stream_id = stream_state.id as usize;
         info!(
             "Loading stream with ID: {}, name: {} from state...",
-            id, name
-        );
-        let stream_id = id;
-        let stats = Arc::new(StreamStats::default());
-        let stream = stream::Stream::new(name.clone(), stats.clone(), 
created_at);
-        let new_id = streams.insert(stream);
-        assert_eq!(
-            new_id, stream_id as usize,
-            "load_streams: id mismatch when inserting stream, mismatch for 
stream with ID: {}, name: {}",
-            stream_id, name
-        );
-        info!(
-            "Loaded stream with ID: {}, name: {} from state...",
-            id, name
+            stream_state.id, stream_state.name
         );
 
-        let topics = topics.into_values();
-        for TopicState {
-            id,
-            name,
-            created_at,
-            compression_algorithm,
-            message_expiry,
-            max_topic_size,
-            replication_factor,
-            consumer_groups,
-            partitions,
-        } in topics
-        {
+        let stream_stats = Arc::new(StreamStats::default());
+        let mut topic_entries = Vec::new();
+
+        for topic_state in stream_state.topics.into_values() {
+            let topic_id = topic_state.id as usize;
             info!(
                 "Loading topic with ID: {}, name: {} from state...",
-                id, name
+                topic_state.id, topic_state.name
             );
-            let topic_id = id;
-            let parent_stats = stats.clone();
-            let stats = Arc::new(TopicStats::new(parent_stats));
-            let topic_id = streams.with_components_by_id_mut(stream_id as 
usize, |(mut root, ..)| {
-                let topic = topic::Topic::new(
-                    name.clone(),
-                    stats.clone(),
-                    created_at,
-                    replication_factor.unwrap_or(1),
-                    message_expiry,
-                    compression_algorithm,
-                    max_topic_size,
-                );
-                let new_id = root.topics_mut().insert(topic);
-                assert_eq!(
-                    new_id, topic_id as usize,
-                    "load_streams: topic id mismatch when inserting topic, 
mismatch for topic with ID: {}, name: {}",
-                    topic_id, &name
-                );
-                new_id
-            });
-            info!("Loaded topic with ID: {}, name: {} from state...", id, 
name);
 
-            let parent_stats = stats.clone();
-            let cgs = consumer_groups.into_values();
-            let partitions = partitions.into_values();
+            let topic_stats = Arc::new(TopicStats::new(stream_stats.clone()));
 
-            // Load each partition asynchronously and insert immediately
-            for partition_state in partitions {
+            // Build partitions
+            let mut partition_entries = Vec::new();
+            for partition_state in topic_state.partitions.into_values() {
+                let partition_id = partition_state.id as usize;
                 info!(
                     "Loading partition with ID: {}, for topic with ID: {} from 
state...",
-                    partition_state.id, topic_id
+                    partition_id, topic_id
                 );
 
-                let partition_id = partition_state.id;
                 let partition = load_partition(
                     config,
-                    stream_id as usize,
+                    stream_id,
                     topic_id,
                     partition_state,
-                    parent_stats.clone(),
+                    topic_stats.clone(),
                 )
                 .await?;
-                streams.with_components_by_id(stream_id as usize, |(root, ..)| 
{
-                    root.topics()
-                        .with_components_by_id_mut(topic_id, |(mut root, ..)| {
-                            let new_id = 
root.partitions_mut().insert(partition);
-                            assert_eq!(
-                                new_id, partition_id as usize,
-                                "load_streams: partition id mismatch when 
inserting partition, mismatch for partition with ID: {}, for topic with ID: {}, 
for stream with ID: {}",
-                                partition_id, topic_id, stream_id
-                            );
-                        });
-                });
+                partition_entries.push((partition_id, partition));
 
                 info!(
                     "Loaded partition with ID: {}, for topic with ID: {} from 
state...",
                     partition_id, topic_id
                 );
             }
-            let partition_ids = streams.with_components_by_id(stream_id as 
usize, |(root, ..)| {
-                root.topics().with_components_by_id(topic_id, |(root, ..)| {
-                    root.partitions().with_components(|components| {
-                        let (root, ..) = components.into_components();
-                        root.iter().map(|(_, root)| 
root.id()).collect::<Vec<_>>()
-                    })
+
+            // Build consumer groups
+            let partition_ids: Vec<_> = partition_entries.iter().map(|(id, _)| 
*id).collect();
+            let cg_entries: Vec<_> = topic_state
+                .consumer_groups
+                .into_values()
+                .map(|cg_state| {
+                    info!(
+                        "Loading consumer group with ID: {}, name: {} for 
topic with ID: {} from state...",
+                        cg_state.id, cg_state.name, topic_id
+                    );
+                    let cg = consumer_group::ConsumerGroup::new(
+                        cg_state.name.clone(),
+                        Default::default(),
+                        partition_ids.clone(),
+                    );
+                    info!(
+                        "Loaded consumer group with ID: {}, name: {} for topic 
with ID: {} from state...",
+                        cg_state.id, cg_state.name, topic_id
+                    );
+                    (cg_state.id as usize, cg)
                 })
-            });
+                .collect();
+
+            // Build topic with pre-built partitions and consumer groups
+            let mut topic = topic::Topic::new(
+                topic_state.name.clone(),
+                topic_stats,
+                topic_state.created_at,
+                topic_state.replication_factor.unwrap_or(1),
+                topic_state.message_expiry,
+                topic_state.compression_algorithm,
+                topic_state.max_topic_size,
+            );
 
-            for cg_state in cgs {
-                info!(
-                    "Loading consumer group with ID: {}, name: {} for topic 
with ID: {} from state...",
-                    cg_state.id, cg_state.name, topic_id
-                );
-                streams.with_components_by_id(stream_id as usize, |(root, ..)| 
{
-                    root.topics()
-                        .with_components_by_id_mut(topic_id, |(mut root, ..)| {
-                            let id = cg_state.id;
-                            let cg = 
consumer_group::ConsumerGroup::new(cg_state.name.clone(), Default::default(), 
partition_ids.clone());
-                            let new_id = root.consumer_groups_mut().insert(cg);
-                            assert_eq!(
-                                new_id, id as usize,
-                                "load_streams: consumer group id mismatch when 
inserting consumer group, mismatch for consumer group with ID: {}, name: {} for 
topic with ID: {}, for stream with ID: {}",
-                                id, cg_state.name, topic_id, stream_id
-                            );
-                        });
-                });
-                info!(
-                    "Loaded consumer group with ID: {}, name: {} for topic 
with ID: {} from state...",
-                    cg_state.id, cg_state.name, topic_id
-                );
-            }
+            // Decompose, set nested containers, recompose
+            let (mut root, auxilary, stats) = topic.into_components();
+            root.set_partitions(Partitions::from_entries(partition_entries));
+            root.set_consumer_groups(ConsumerGroups::from_entries(cg_entries));
+            topic = topic::Topic::new_with_components(root, auxilary, stats);
+
+            topic_entries.push((topic_id, topic));
+            info!(
+                "Loaded topic with ID: {}, name: {} from state...",
+                topic_state.id, topic_state.name
+            );
         }
+
+        // Build stream with pre-built topics
+        let mut stream = stream::Stream::new(
+            stream_state.name.clone(),
+            stream_stats,
+            stream_state.created_at,
+        );
+
+        // Decompose, set nested containers, recompose
+        let (mut root, stats) = stream.into_components();
+        root.set_topics(Topics::from_entries(topic_entries));
+        stream = stream::Stream::new_with_components(root, stats);
+
+        stream_entries.push((stream_id, stream));
+        info!(
+            "Loaded stream with ID: {}, name: {} from state...",
+            stream_state.id, stream_state.name
+        );
     }
-    Ok(streams)
+
+    Ok(Streams::from_entries(stream_entries))
 }
 
 pub fn load_users(state: impl IntoIterator<Item = UserState>) -> Users {
diff --git a/core/server/src/shard/system/consumer_offsets.rs 
b/core/server/src/shard/system/consumer_offsets.rs
index 74cdb82f1..1777675a1 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -200,7 +200,15 @@ impl IggyShard {
         partition_ids: &[usize],
     ) -> Result<(), IggyError> {
         for &partition_id in partition_ids {
-            // Skip if offset does not exist.
+            // Skip if partition was deleted
+            let partition_exists = self
+                .streams
+                .with_partitions(stream_id, topic_id, |p| 
p.exists(partition_id));
+            if !partition_exists {
+                continue;
+            }
+
+            // Skip if offset does not exist
             let has_offset = self
                 .streams
                 .with_partition_by_id(
diff --git a/core/server/src/slab/consumer_groups.rs 
b/core/server/src/slab/consumer_groups.rs
index cf5af3b19..618ecb156 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -171,3 +171,30 @@ impl Default for ConsumerGroups {
         }
     }
 }
+
+impl ConsumerGroups {
+    /// Construct from pre-built entries with specific IDs.
+    pub fn from_entries(
+        entries: impl IntoIterator<Item = (usize, 
consumer_group::ConsumerGroup)>,
+    ) -> Self {
+        let entries: Vec<_> = entries.into_iter().collect();
+
+        let mut index = AHashMap::with_capacity(entries.len());
+        let mut root_entries = Vec::with_capacity(entries.len());
+        let mut members_entries = Vec::with_capacity(entries.len());
+
+        for (id, cg) in entries {
+            let (mut root, members) = cg.into_components();
+            root.update_id(id);
+            index.insert(root.key().clone(), id);
+            root_entries.push((id, root));
+            members_entries.push((id, members));
+        }
+
+        Self {
+            index,
+            root: root_entries.into_iter().collect(),
+            members: members_entries.into_iter().collect(),
+        }
+    }
+}
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index 0ad5abbf0..214ffc6b1 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -208,6 +208,44 @@ impl Default for Partitions {
     }
 }
 
+impl Partitions {
+    /// Construct from pre-built entries with specific IDs.
+    pub fn from_entries(entries: impl IntoIterator<Item = (usize, 
partition::Partition)>) -> Self {
+        let entries: Vec<_> = entries.into_iter().collect();
+
+        let mut root_entries = Vec::with_capacity(entries.len());
+        let mut stats_entries = Vec::with_capacity(entries.len());
+        let mut dedup_entries = Vec::with_capacity(entries.len());
+        let mut offset_entries = Vec::with_capacity(entries.len());
+        let mut consumer_offset_entries = Vec::with_capacity(entries.len());
+        let mut consumer_group_offset_entries = 
Vec::with_capacity(entries.len());
+        let mut log_entries = Vec::with_capacity(entries.len());
+
+        for (id, partition) in entries {
+            let (mut root, stats, dedup, offset, consumer_offset, 
consumer_group_offset, log) =
+                partition.into_components();
+            root.update_id(id);
+            root_entries.push((id, root));
+            stats_entries.push((id, stats));
+            dedup_entries.push((id, dedup));
+            offset_entries.push((id, offset));
+            consumer_offset_entries.push((id, consumer_offset));
+            consumer_group_offset_entries.push((id, consumer_group_offset));
+            log_entries.push((id, log));
+        }
+
+        Self {
+            root: root_entries.into_iter().collect(),
+            stats: stats_entries.into_iter().collect(),
+            message_deduplicator: dedup_entries.into_iter().collect(),
+            offset: offset_entries.into_iter().collect(),
+            consumer_offset: consumer_offset_entries.into_iter().collect(),
+            consumer_group_offset: 
consumer_group_offset_entries.into_iter().collect(),
+            log: log_entries.into_iter().collect(),
+        }
+    }
+}
+
 impl Partitions {
     pub fn len(&self) -> usize {
         self.root.len()
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index eebfe9a1d..66e5c7cbb 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -86,6 +86,31 @@ impl Default for Streams {
     }
 }
 
+impl Streams {
+    /// Construct from pre-built entries with specific IDs.
+    pub fn from_entries(entries: impl IntoIterator<Item = (usize, 
stream::Stream)>) -> Self {
+        let entries: Vec<_> = entries.into_iter().collect();
+
+        let mut index = AHashMap::with_capacity(entries.len());
+        let mut root_entries = Vec::with_capacity(entries.len());
+        let mut stats_entries = Vec::with_capacity(entries.len());
+
+        for (id, stream) in entries {
+            let (mut root, stats) = stream.into_components();
+            root.update_id(id);
+            index.insert(root.key().clone(), id);
+            root_entries.push((id, root));
+            stats_entries.push((id, stats));
+        }
+
+        Self {
+            index: RefCell::new(index),
+            root: RefCell::new(root_entries.into_iter().collect()),
+            stats: RefCell::new(stats_entries.into_iter().collect()),
+        }
+    }
+}
+
 impl<'a> From<&'a Streams> for stream::StreamRef<'a> {
     fn from(value: &'a Streams) -> Self {
         let root = value.root.borrow();
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 7bfed97e7..5270dbd84 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -137,6 +137,34 @@ impl Default for Topics {
     }
 }
 
+impl Topics {
+    /// Construct from pre-built entries with specific IDs.
+    pub fn from_entries(entries: impl IntoIterator<Item = (usize, 
topic::Topic)>) -> Self {
+        let entries: Vec<_> = entries.into_iter().collect();
+
+        let mut index = AHashMap::with_capacity(entries.len());
+        let mut root_entries = Vec::with_capacity(entries.len());
+        let mut auxilary_entries = Vec::with_capacity(entries.len());
+        let mut stats_entries = Vec::with_capacity(entries.len());
+
+        for (id, topic) in entries {
+            let (mut root, auxilary, stats) = topic.into_components();
+            root.update_id(id);
+            index.insert(root.key().clone(), id);
+            root_entries.push((id, root));
+            auxilary_entries.push((id, auxilary));
+            stats_entries.push((id, stats));
+        }
+
+        Self {
+            index: RefCell::new(index),
+            root: RefCell::new(root_entries.into_iter().collect()),
+            auxilaries: RefCell::new(auxilary_entries.into_iter().collect()),
+            stats: RefCell::new(stats_entries.into_iter().collect()),
+        }
+    }
+}
+
 impl EntityComponentSystem<InteriorMutability> for Topics {
     type Idx = ContainerId;
     type Entity = topic::Topic;
diff --git a/core/server/src/streaming/streams/stream.rs 
b/core/server/src/streaming/streams/stream.rs
index c596a11f8..f2e4c18fe 100644
--- a/core/server/src/streaming/streams/stream.rs
+++ b/core/server/src/streaming/streams/stream.rs
@@ -90,6 +90,10 @@ impl StreamRoot {
         &mut self.topics
     }
 
+    pub fn set_topics(&mut self, topics: Topics) {
+        self.topics = topics;
+    }
+
     pub fn created_at(&self) -> IggyTimestamp {
         self.created_at
     }
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index 348dc3c8a..c87958f68 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -328,6 +328,14 @@ impl TopicRoot {
         &mut self.consumer_groups
     }
 
+    pub fn set_partitions(&mut self, partitions: Partitions) {
+        self.partitions = partitions;
+    }
+
+    pub fn set_consumer_groups(&mut self, consumer_groups: ConsumerGroups) {
+        self.consumer_groups = consumer_groups;
+    }
+
     pub fn created_at(&self) -> IggyTimestamp {
         self.created_at
     }

Reply via email to