This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new 1154a06f feat(io_uring): migrate to new architecture stream/topic 
create/delete/update using new interfaces. (#2103)
1154a06f is described below

commit 1154a06f8d2c112e1bd2a9e8ee58a4ac2747df07
Author: Grzegorz Koszyk <112548209+numin...@users.noreply.github.com>
AuthorDate: Tue Aug 12 21:40:57 2025 +0200

    feat(io_uring): migrate to new architecture stream/topic 
create/delete/update using new interfaces. (#2103)
---
 .../handlers/streams/create_stream_handler.rs      |  19 +-
 .../handlers/streams/delete_stream_handler.rs      |   5 +-
 .../binary/handlers/topics/create_topic_handler.rs |  30 +--
 .../binary/handlers/topics/delete_topic_handler.rs |   1 +
 core/server/src/shard/mod.rs                       |  56 ++---
 core/server/src/shard/system/consumer_groups.rs    |  19 +-
 core/server/src/shard/system/partitions.rs         |  32 +--
 core/server/src/shard/system/streams.rs            | 118 ++++------
 core/server/src/shard/system/topics.rs             | 178 +++++++--------
 core/server/src/shard/system/utils.rs              |   9 +-
 core/server/src/shard/transmission/event.rs        |  14 +-
 core/server/src/slab/consumer_groups.rs            |   2 +-
 core/server/src/slab/mod.rs                        |   2 +-
 core/server/src/slab/partitions.rs                 | 119 +++++-----
 core/server/src/slab/streams.rs                    | 241 ++++++++++++++-------
 core/server/src/slab/topics.rs                     | 215 +++++++++++++-----
 core/server/src/slab/traits_ext.rs                 | 141 ++++++------
 core/server/src/streaming/partitions/partition2.rs |  61 +++---
 core/server/src/streaming/segments/segment2.rs     |   2 +-
 core/server/src/streaming/streams/stream2.rs       | 170 +++++++++++----
 .../server/src/streaming/topics/consumer_group2.rs |  13 +-
 core/server/src/streaming/topics/topic2.rs         | 153 +++++++++++--
 22 files changed, 983 insertions(+), 617 deletions(-)

diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 4461745e..95d960f5 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -23,6 +23,7 @@ use crate::binary::{handlers::streams::COMPONENT, 
sender::SenderKind};
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateStreamWithId;
 use crate::streaming::session::Session;
@@ -51,30 +52,22 @@ impl ServerCommandHandler for CreateStream {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id;
         let name = self.name.clone();
-        let stats = Arc::new(StreamStats::new());
 
-        let new_stream_id = shard
-            .create_stream2(session, stream_id, self.name.clone(), 
stats.clone())
+        let stream = shard
+            .create_stream2(session, stream_id, self.name.clone())
             .await?;
         shard_info!(
             shard.id,
             "Created stream with new API, Stream ID: {}, name: '{}'.",
-            new_stream_id,
+            stream.id(),
             name
         );
         let event = ShardEvent::CreatedStream2 {
-            id: new_stream_id,
-            name: self.name.clone(),
-            stats,
+            id: stream.id(),
+            stream,
         };
         let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
-        //TODO: Replace the mapping from line 89 with this once the Stream 
layer is finished.
-        let _ = shard.streams2.with_stream_by_id(
-            &Identifier::numeric(new_stream_id as u32).unwrap(),
-            |stream| mapper::map_stream2(stream),
-        );
-
         let created_stream_id = shard
                 .create_stream(session, stream_id, &name)
                 .await
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index af12beda..c7b9ad2c 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -23,6 +23,7 @@ use crate::shard::IggyShard;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::partitions::partition;
 use crate::streaming::session::Session;
@@ -58,8 +59,8 @@ impl ServerCommandHandler for DeleteStream {
                                 })?;
         shard_info!(
             shard.id,
-            "Deleted stream2 with name: {}, ID: {}",
-            stream2.name(),
+            "Deleted stream with name: {}, ID: {}",
+            stream2.root().name(),
             stream2.id()
         );
         let event = ShardEvent::DeletedStream2 {
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 848a4c90..3f913685 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -24,6 +24,7 @@ use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::{IggyShard, ShardInfo};
 use crate::shard_info;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreateTopicWithId;
 use crate::state::system::TopicState;
@@ -55,12 +56,7 @@ impl ServerCommandHandler for CreateTopic {
         debug!("session: {session}, command: {self}");
         let stream_id = self.stream_id.clone();
         let maybe_topic_id = self.topic_id;
-        //shard.ensure_stream_exists(&stream_id)?;
-        let parent = shard
-            .streams2
-            .with_stats_by_id(&stream_id, |stats| stats.clone());
-        let stats = Arc::new(TopicStats::new(parent));
-        let new_topic_id = shard
+        let topic = shard
             .create_topic2(
                 session,
                 &stream_id,
@@ -70,30 +66,24 @@ impl ServerCommandHandler for CreateTopic {
                 self.compression_algorithm,
                 self.max_topic_size,
                 self.replication_factor,
-                stats.clone(),
             )
             .await?;
+        let topic_id = topic.id();
+        // Send events for topic creation.
+        let event = ShardEvent::CreatedTopic2 {
+            stream_id: self.stream_id.clone(),
+            topic,
+        };
+        let _responses = shard.broadcast_event_to_all_shards(event).await;
 
         shard
             .create_partitions2(
                 session,
                 &stream_id,
-                &Identifier::numeric(new_topic_id as u32).unwrap(),
+                &Identifier::numeric(topic_id as u32).unwrap(),
                 self.partitions_count,
             )
             .await?;
-
-        let event = ShardEvent::CreatedTopic2 {
-            stream_id: self.stream_id.clone(),
-            id: new_topic_id,
-            name: self.name.clone(),
-            message_expiry: self.message_expiry,
-            compression_algorithm: self.compression_algorithm,
-            max_topic_size: self.max_topic_size,
-            replication_factor: self.replication_factor,
-            stats,
-        };
-        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
         let (topic_id, partition_ids) = shard
                 .create_topic(
                     session,
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index af16cf8c..70fccd8c 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -24,6 +24,7 @@ use crate::shard::IggyShard;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index c7994355..e7cb032a 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -68,7 +68,7 @@ use crate::{
         },
     },
     shard_error, shard_info, shard_warn,
-    slab::streams::Streams,
+    slab::{streams::Streams, traits_ext::EntityMarker},
     state::{
         StateKind,
         system::{StreamState, SystemState, UserState},
@@ -631,7 +631,7 @@ impl IggyShard {
                 partition_ids,
             } => {
                 self.delete_partitions2_bypass_auth(
-                   &stream_id,
+                    &stream_id,
                     &topic_id,
                     partitions_count,
                     partition_ids,
@@ -667,11 +667,13 @@ impl IggyShard {
                         "{COMPONENT} (error: {error}) - failed to get stream 
with ID: {stream_id}"
                     )
                 })?;
-                let topic = 
stream.get_topic_mut(&topic_id).with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to get topic 
with ID: {topic_id}"
-                    )
-                })?;
+                let topic = stream
+                    .get_topic_mut(&topic_id)
+                    .with_error_context(|error| {
+                        format!(
+                            "{COMPONENT} (error: {error}) - failed to get 
topic with ID: {topic_id}"
+                        )
+                    })?;
                 topic.reassign_consumer_groups();
                 if partitions.len() > 0 {
                     self.metrics.decrement_partitions(partitions_count as u32);
@@ -848,10 +850,8 @@ impl IggyShard {
                 self.tcp_bound_address.set(Some(address));
                 Ok(())
             }
-            ShardEvent::CreatedStream2 { id, name, stats } => {
-                let stream_id = self
-                    .create_stream2_bypass_auth(name.to_owned(), stats.clone())
-                    .await?;
+            ShardEvent::CreatedStream2 { id, stream } => {
+                let stream_id = self.create_stream2_bypass_auth(stream);
                 assert_eq!(stream_id, id);
                 Ok(())
             }
@@ -860,26 +860,8 @@ impl IggyShard {
                 assert_eq!(stream.id(), id);
                 Ok(())
             }
-            ShardEvent::CreatedTopic2 {
-                id,
-                stream_id,
-                name,
-                message_expiry,
-                compression_algorithm,
-                max_topic_size,
-                replication_factor,
-                stats,
-            } => {
-                let topic_id = self.create_topic2_bypass_auth(
-                    &stream_id,
-                    name.to_owned(),
-                    replication_factor,
-                    message_expiry,
-                    compression_algorithm,
-                    max_topic_size,
-                    stats.clone(),
-                )?;
-                assert_eq!(topic_id, id);
+            ShardEvent::CreatedTopic2 { stream_id, topic } => {
+                let _topic_id = self.create_topic2_bypass_auth(&stream_id, 
topic)?;
                 Ok(())
             }
             ShardEvent::CreatedPartitions2 {
@@ -895,7 +877,9 @@ impl IggyShard {
                 stream_id,
                 topic_id,
             } => {
-                let topic = self.delete_topic_bypass_auth2(&stream_id, 
&topic_id).await?;
+                let topic = self
+                    .delete_topic_bypass_auth2(&stream_id, &topic_id)
+                    .await?;
                 assert_eq!(topic.id(), id);
                 Ok(())
             }
@@ -916,7 +900,7 @@ impl IggyShard {
                     compression_algorithm,
                     max_topic_size,
                     replication_factor,
-                );
+                )?;
                 Ok(())
             }
             ShardEvent::CreatedConsumerGroup2 {
@@ -929,7 +913,8 @@ impl IggyShard {
                 let id = self.create_consumer_group_bypass_auth2(
                     &stream_id,
                     &topic_id,
-                    members.clone(), name.clone(),
+                    members.clone(),
+                    name.clone(),
                 )?;
                 assert_eq!(id, cg_id);
                 Ok(())
@@ -940,7 +925,8 @@ impl IggyShard {
                 topic_id,
                 group_id,
             } => {
-                let cg = self.delete_consumer_group_bypass_auth2(&stream_id, 
&topic_id, &group_id)?;
+                let cg =
+                    self.delete_consumer_group_bypass_auth2(&stream_id, 
&topic_id, &group_id)?;
                 assert_eq!(cg.id(), id);
                 Ok(())
             }
diff --git a/core/server/src/shard/system/consumer_groups.rs 
b/core/server/src/shard/system/consumer_groups.rs
index e58f36df..2283fbb4 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -119,10 +119,10 @@ impl IggyShard {
         {
             let topic_id = self
                 .streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id());
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id());
             let stream_id = self
                 .streams2
-                .with_stream_by_id(stream_id, |stream| stream.id());
+                .with_root_by_id(stream_id, |stream| stream.id());
             self.permissioner.borrow().create_consumer_group(
                 session.get_user_id(),
                 stream_id as u32,
@@ -151,7 +151,7 @@ impl IggyShard {
     ) -> Result<usize, IggyError> {
         let id = self
             .streams2
-            .with_topic_by_id_mut(stream_id, topic_id, |topic| {
+            .with_topic_root_by_id_mut(stream_id, topic_id, |topic| {
                 let partitions = topic.partitions().with(|partitions| {
                     let (info, _, _, _, _, _) = partitions.into_components();
                     info.iter()
@@ -233,10 +233,10 @@ impl IggyShard {
         {
             let topic_id = self
                 .streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id());
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id());
             let stream_id = self
                 .streams2
-                .with_stream_by_id(stream_id, |stream| stream.id());
+                .with_root_by_id(stream_id, |stream| stream.id());
             self.permissioner.borrow().delete_consumer_group(
                 session.get_user_id(),
                 stream_id as u32,
@@ -263,20 +263,19 @@ impl IggyShard {
     ) -> Result<consumer_group2::ConsumerGroup, IggyError> {
         let cg = self
             .streams2
-            .with_topic_by_id_mut(stream_id, topic_id, |topic| {
+            .with_topic_root_by_id_mut(stream_id, topic_id, |root| {
                 match group_id.kind {
                     iggy_common::IdKind::Numeric => {
-                        topic.consumer_groups_mut().with_mut(|container| {
+                        root.consumer_groups_mut().with_mut(|container| {
                             
container.try_remove(group_id.get_u32_value().unwrap() as usize)
                         })
                     }
                     iggy_common::IdKind::String => {
                         let key = group_id.get_string_value().unwrap();
-                        let id = topic
+                        let id = root
                             .consumer_groups()
                             .with_index(|index| *(index.get(&key).unwrap()));
-                        topic
-                            .consumer_groups_mut()
+                        root.consumer_groups_mut()
                             .with_mut(|container| container.try_remove(id))
                     }
                 }
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 71e9c8e3..3cc1ed61 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -24,6 +24,7 @@ use crate::configs::system::SystemConfig;
 use crate::shard::IggyShard;
 use crate::slab::traits_ext::Delete;
 use crate::slab::traits_ext::EntityComponentSystem;
+use crate::slab::traits_ext::EntityMarker;
 use crate::slab::traits_ext::Insert;
 use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator;
 use crate::streaming::partitions::partition::Partition;
@@ -74,12 +75,16 @@ impl IggyShard {
         partitions_count: u32,
     ) -> Result<Vec<partition2::Partition>, IggyError> {
         self.ensure_authenticated(session)?;
+        /*
+        self.ensure_stream_exists(stream_id)?;
+        self.ensure_topic_exists(stream_id, topic_id)?;
+        */
         let numeric_stream_id =
             self.streams2
-                .with_stream_by_id(stream_id, |stream| stream.id()) as u32;
+                .with_root_by_id(stream_id, |stream| stream.id()) as u32;
         let numeric_topic_id =
             self.streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id()) as 
u32;
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id()) as u32;
 
         self.validate_partition_permissions(
             session,
@@ -88,10 +93,9 @@ impl IggyShard {
             "create",
         )?;
 
-        let parent_stats = self.streams2.with_stream_by_id(stream_id, |stream| 
{
-            stream
-                .topics()
-                .with_topic_stats_by_id(topic_id, |stats| stats)
+        let parent_stats = self.streams2.with_root_by_id(stream_id, |root| {
+            root.topics()
+                .with_stats_by_id(topic_id, |stats| stats.clone())
         });
         let partitions = self.create_and_insert_partitions_mem(
             stream_id,
@@ -144,7 +148,7 @@ impl IggyShard {
             .map(|_| {
                 // Areczkuuuu.
                 let stats = 
Arc::new(PartitionStats::new(parent_stats.clone()));
-                let info = partition2::PartitionInfo::new(created_at, false);
+                let info = partition2::PartitionRoot::new(created_at, false);
                 let deduplicator = 
create_message_deduplicator(&self.config.system);
                 let offset = Arc::new(AtomicU64::new(0));
                 let consumer_offset = 
Arc::new(papaya::HashMap::with_capacity(2137));
@@ -172,8 +176,8 @@ impl IggyShard {
         partition: partition2::Partition,
     ) -> usize {
         self.streams2
-            .with_topic_by_id_mut(stream_id, topic_id, |topic| {
-                topic.partitions_mut().insert(partition)
+            .with_partitions_mut(stream_id, topic_id, |partitions| {
+                partitions.insert(partition)
             })
     }
 
@@ -265,10 +269,10 @@ impl IggyShard {
 
         let numeric_stream_id =
             self.streams2
-                .with_stream_by_id(stream_id, |stream| stream.id()) as u32;
+                .with_root_by_id(stream_id, |stream| stream.id()) as u32;
         let numeric_topic_id =
             self.streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id()) as 
u32;
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id()) as u32;
 
         self.validate_partition_permissions(
             session,
@@ -290,13 +294,13 @@ impl IggyShard {
     ) -> Vec<u32> {
         let deleted_partition_ids =
             self.streams2
-                .with_topic_by_id_mut(stream_id, topic_id, |topic| {
+                .with_topic_root_by_id_mut(stream_id, topic_id, |topic| {
                     let partitions = topic.partitions_mut();
-                    let current_count = partitions.count() as u32;
+                    let current_count = partitions.len() as u32;
                     let partitions_to_delete = 
partitions_count.min(current_count);
                     let start_idx = (current_count - partitions_to_delete) as 
usize;
                     let mut deleted_ids = 
Vec::with_capacity(partitions_to_delete as usize);
-                    for idx in (start_idx..current_count as usize) {
+                    for idx in start_idx..current_count as usize {
                         let partition = topic.partitions_mut().delete(idx);
                         assert_eq!(partition.id(), idx);
                         deleted_ids.push(partition.id() as u32);
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 958cab48..54e639fd 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -19,6 +19,9 @@
 use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::shard::namespace::IggyNamespace;
+use crate::slab::traits_ext::{
+    DeleteCell, EntityComponentSystem, EntityMarker, Insert, InsertCell, 
IntoComponents,
+};
 use crate::streaming::partitions::partition;
 use crate::streaming::session::Session;
 use crate::streaming::stats::stats::StreamStats;
@@ -28,7 +31,7 @@ use crate::streaming::streams::stream2;
 use error_set::ErrContext;
 use futures::future::try_join_all;
 use iggy_common::locking::IggyRwLockFn;
-use iggy_common::{IdKind, Identifier, IggyError};
+use iggy_common::{IdKind, Identifier, IggyError, IggyTimestamp};
 use std::cell::{Ref, RefCell, RefMut};
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU32, Ordering};
@@ -213,45 +216,38 @@ impl IggyShard {
         session: &Session,
         stream_id: Option<u32>,
         name: String,
-        stats: Arc<StreamStats>,
-    ) -> Result<usize, IggyError> {
+    ) -> Result<stream2::Stream, IggyError> {
         self.ensure_authenticated(session)?;
         self.permissioner
             .borrow()
             .create_stream(session.get_user_id())?;
-        let stream_id = self.create_stream2_base(name, stats).await?;
-        create_stream_file_hierarchy(self.id, stream_id, 
&self.config.system).await?;
-        Ok(stream_id)
-    }
-
-    pub async fn create_stream2_bypass_auth(
-        &self,
-        name: String,
-        stats: Arc<StreamStats>,
-    ) -> Result<usize, IggyError> {
-        self.create_stream2_base(name, stats).await
-    }
+        let exists = self
+            .streams2
+            .exists(&Identifier::from_str_value(&name).unwrap());
 
-    async fn create_stream2_base(
-        &self,
-        name: String,
-        stats: Arc<StreamStats>,
-    ) -> Result<usize, IggyError> {
-        let exists = self.streams2.exists(&Identifier::named(&name).unwrap());
         if exists {
             return Err(IggyError::StreamNameAlreadyExists(name));
         }
-        let key = name.clone();
-        let stream = stream2::Stream::new(name);
-        let stream_id = self
-            .streams2
-            .with_mut(|streams| stream.insert_into(streams));
-        self.streams2.with_index_mut(|index| {
-            index.insert(key, stream_id);
-        });
-        self.streams2
-            .with_stats_mut(|container| container.insert(stats));
-        Ok(stream_id)
+        let stream = self.create_and_insert_stream_mem(name);
+        create_stream_file_hierarchy(self.id, stream.id(), 
&self.config.system).await?;
+        Ok(stream)
+    }
+
+    fn create_and_insert_stream_mem(&self, name: String) -> stream2::Stream {
+        let now = IggyTimestamp::now();
+        let stats = Arc::new(StreamStats::new());
+        let mut stream = stream2::Stream::new(name, stats, now);
+        let id = self.insert_stream_mem(stream.clone());
+        stream.update_id(id);
+        stream
+    }
+
+    fn insert_stream_mem(&self, stream: stream2::Stream) -> usize {
+        self.streams2.insert(stream)
+    }
+
+    pub fn create_stream2_bypass_auth(&self, stream: stream2::Stream) -> usize 
{
+        self.insert_stream_mem(stream)
     }
 
     pub async fn create_stream(
@@ -405,9 +401,7 @@ impl IggyShard {
         name: &str,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
-        let stream_id = self
-            .streams2
-            .with_stream_by_id(id, |stream| stream.id() as u32);
+        let stream_id = self.streams2.with_root_by_id(id, |root| root.id() as 
u32);
 
         self.permissioner
             .borrow()
@@ -426,7 +420,7 @@ impl IggyShard {
     fn update_stream2_base(&self, id: &Identifier, name: String) -> 
Result<String, IggyError> {
         let old_name = self
             .streams2
-            .with_stream_by_id(id, |stream| stream.name().clone());
+            .with_root_by_id(id, |root| root.name().clone());
 
         if old_name == name {
             return Ok(old_name);
@@ -437,10 +431,10 @@ impl IggyShard {
             return Err(IggyError::StreamNameAlreadyExists(name.to_string()));
         }
 
-        self.streams2.with_stream_by_id_mut(id, |stream| {
-            stream.set_name(name.clone());
+        self.streams2.with_root_by_id_mut(id, |root| {
+            root.set_name(name.clone());
         });
-      
+
         self.streams2.with_index_mut(|index| {
             // Rename the key inside of hashmap
             let idx = index.remove(&old_name).expect("Rename key: key not 
found");
@@ -519,39 +513,21 @@ impl IggyShard {
     }
 
     fn delete_stream2_base(&self, id: &Identifier) -> Result<stream2::Stream, 
IggyError> {
-        let (stream_id, stream_name) = self
-            .streams2
-            .with_stream_by_id(id, |stream| (stream.id() as u32, 
stream.name().clone()));
-
-        let stream = self.streams2.with_mut(|streams| {
-            streams
-                .try_remove(stream_id as usize)
-                .ok_or_else(|| match id.kind {
-                    iggy_common::IdKind::Numeric => 
IggyError::StreamIdNotFound(stream_id as u32),
-                    iggy_common::IdKind::String => {
-                        IggyError::StreamNameNotFound(stream_name.clone())
-                    }
-                })
-        })?;
+        let id = self.streams2.get_index(id);
+        let stream = self.streams2.delete(id);
+        let stats = stream.stats();
 
-        self.streams2.with_stats_mut(|stats| {
-            if let Some(stream_stats) = stats.try_remove(stream_id as usize) {
-                self.metrics.decrement_streams(1);
-                self.metrics.decrement_topics(0); // TODO: stats doesn't have 
topic count
-                self.metrics.decrement_partitions(0); // TODO: stats doesn't 
have partition count
-                self.metrics
-                    .decrement_messages(stream_stats.messages_count());
-                self.metrics
-                    .decrement_segments(stream_stats.segments_count());
-            } else {
-                self.metrics.decrement_streams(1);
-            }
-        });
+        self.metrics.decrement_streams(1);
+        self.metrics.decrement_topics(0); // TODO: stats doesn't have topic 
count
+        self.metrics.decrement_partitions(0); // TODO: stats doesn't have 
partition count
+        self.metrics.decrement_messages(stats.messages_count());
+        self.metrics.decrement_segments(stats.segments_count());
 
+        /*
         self.client_manager
             .borrow_mut()
             .delete_consumer_groups_for_stream(stream_id as u32);
-
+        */
         Ok(stream)
     }
 
@@ -562,9 +538,7 @@ impl IggyShard {
     ) -> Result<stream2::Stream, IggyError> {
         self.ensure_authenticated(session)?;
         // self.ensure_stream_exists(id)?;
-        let stream_id = self
-            .streams2
-            .with_stream_by_id(id, |stream| stream.id() as u32);
+        let stream_id = self.streams2.with_root_by_id(id, |root| root.id() as 
u32);
         self.permissioner
             .borrow()
             .delete_stream(session.get_user_id(), stream_id as u32)
@@ -629,9 +603,7 @@ impl IggyShard {
     pub fn purge_stream2(&self, session: &Session, id: &Identifier) -> 
Result<(), IggyError> {
         self.ensure_authenticated(session)?;
         // self.ensure_stream_exists(id)?;
-        let stream_id = self
-            .streams2
-            .with_stream_by_id(id, |stream| stream.id() as u32);
+        let stream_id = self.streams2.with_root_by_id(id, |root| root.id() as 
u32);
         self.permissioner
             .borrow()
             .purge_stream(session.get_user_id(), stream_id)
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index ff327975..d0beaa31 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -24,10 +24,14 @@ use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard::{IggyShard, ShardInfo};
 use crate::shard_info;
+use crate::slab::traits_ext::{
+    Delete, DeleteCell, EntityComponentSystem, EntityMarker, Insert, 
InsertCell,
+};
+use crate::state::system::StreamState;
 use crate::streaming::partitions::partition2;
 use crate::streaming::partitions::storage2::create_partition_file_hierarchy;
 use crate::streaming::session::Session;
-use crate::streaming::stats::stats::TopicStats;
+use crate::streaming::stats::stats::{StreamStats, TopicStats};
 use crate::streaming::streams::stream::Stream;
 use crate::streaming::topics::storage2::create_topic_file_hierarchy;
 use crate::streaming::topics::topic::Topic;
@@ -35,7 +39,9 @@ use crate::streaming::topics::topic2;
 use clap::Id;
 use error_set::ErrContext;
 use iggy_common::locking::IggyRwLockFn;
-use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
+use iggy_common::{
+    CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize,
+};
 use tokio_util::io::StreamReader;
 use tracing::info;
 
@@ -152,13 +158,10 @@ impl IggyShard {
         compression: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
         replication_factor: Option<u8>,
-        stats: Arc<TopicStats>,
-    ) -> Result<usize, IggyError> {
+    ) -> Result<topic2::Topic, IggyError> {
         self.ensure_authenticated(session)?;
         //self.ensure_stream_exists(stream_id)?;
-        let numeric_stream_id = self
-            .streams2
-            .with_stream_by_id(stream_id, |stream| stream.id());
+        let numeric_stream_id = self.streams2.with_root_by_id(stream_id, 
|root| root.id());
         {
             self.permissioner
             .borrow()
@@ -170,94 +173,78 @@ impl IggyShard {
                     )
                 })?;
         }
-        let topic_id = self.create_topic2_base(
+        let exists = self.streams2.with_topics(stream_id, |topics| {
+            topics.exists(&Identifier::from_str_value(&name).unwrap())
+        });
+        if exists {
+            return Err(IggyError::TopicNameAlreadyExists(
+                name,
+                numeric_stream_id as u32,
+            ));
+        }
+
+        let parent_stats = self
+            .streams2
+            .with_stats_by_id(stream_id, |stats| stats.clone());
+        let topic = self.create_and_insert_topics_mem(
             stream_id,
             name,
             replication_factor.unwrap_or(1),
             message_expiry,
             compression,
             max_topic_size,
-            stats,
-        )?;
-
-        self.streams2.with_topic_by_id(
-            stream_id,
-            &Identifier::numeric(topic_id as u32).unwrap(),
-            |topic| {
-                let message_expiry = match topic.message_expiry() {
-                    IggyExpiry::ServerDefault => 
self.config.system.segment.message_expiry,
-                    _ => message_expiry,
-                };
-                shard_info!(self.id, "Topic message expiry: {}", 
message_expiry);
-            },
+            parent_stats,
         );
+        let message_expiry = match topic.root().message_expiry() {
+            IggyExpiry::ServerDefault => 
self.config.system.segment.message_expiry,
+            _ => message_expiry,
+        };
+        shard_info!(self.id, "Topic message expiry: {}", message_expiry);
 
         // Create file hierarchy for the topic.
-        create_topic_file_hierarchy(self.id, numeric_stream_id, topic_id, 
&self.config.system)
+        create_topic_file_hierarchy(self.id, numeric_stream_id, topic.id(), 
&self.config.system)
             .await?;
-        Ok(topic_id)
+        Ok(topic)
     }
 
-    pub fn create_topic2_bypass_auth(
+    fn create_and_insert_topics_mem(
         &self,
         stream_id: &Identifier,
         name: String,
-        replication_factor: Option<u8>,
+        replication_factor: u8,
         message_expiry: IggyExpiry,
         compression: CompressionAlgorithm,
         max_topic_size: MaxTopicSize,
-        stats: Arc<TopicStats>,
-    ) -> Result<usize, IggyError> {
-        let topic_id = self.create_topic2_base(
-            stream_id,
+        parent_stats: Arc<StreamStats>,
+    ) -> topic2::Topic {
+        let stats = Arc::new(TopicStats::new(parent_stats));
+        let now = IggyTimestamp::now();
+        let mut topic = topic2::Topic::new(
             name,
-            replication_factor.unwrap_or(1),
+            stats,
+            now,
+            replication_factor,
             message_expiry,
             compression,
             max_topic_size,
-            stats,
-        )?;
-        Ok(topic_id)
+        );
+
+        let id = self.insert_topic_mem(stream_id, topic.clone());
+        topic.update_id(id);
+        topic
+    }
+
+    fn insert_topic_mem(&self, stream_id: &Identifier, topic: topic2::Topic) 
-> usize {
+        self.streams2
+            .with_root_by_id(stream_id, |root| root.topics().insert(topic))
     }
 
-    fn create_topic2_base(
+    pub fn create_topic2_bypass_auth(
         &self,
         stream_id: &Identifier,
-        name: String,
-        replication_factor: u8,
-        message_expiry: IggyExpiry,
-        compression: CompressionAlgorithm,
-        max_topic_size: MaxTopicSize,
-        stats: Arc<TopicStats>,
+        topic: topic2::Topic,
     ) -> Result<usize, IggyError> {
-        let topic_id = self.streams2.with_stream_by_id(stream_id, |stream| {
-            let exists = 
stream.topics().exists(&Identifier::named(&name).unwrap());
-            if exists {
-                // TODO: Fixme, replace the second argument with identifier, 
rather than numeric ID.
-                return Err(IggyError::TopicNameAlreadyExists(name.to_owned(), 
0));
-            }
-            let topic = topic2::Topic::new(
-                name,
-                replication_factor,
-                message_expiry,
-                compression,
-                max_topic_size,
-            );
-            let name = topic.name().clone();
-            let topic_id = stream.topics().with_mut(|topics| {
-                let topic_id = topic.insert_into(topics);
-                topic_id
-            });
-            stream.topics().with_mut_index(|index| {
-                index.insert(name, topic_id);
-            });
-
-            stream.topics().with_stats_mut(|container| {
-                container.insert(stats);
-            });
-            Ok(topic_id)
-        })?;
-
+        let topic_id = self.insert_topic_mem(stream_id, topic);
         Ok(topic_id)
     }
 
@@ -390,10 +377,10 @@ impl IggyShard {
         {
             let topic_id = self
                 .streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id());
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id());
             let stream_id = self
                 .streams2
-                .with_stream_by_id(stream_id, |stream| stream.id());
+                .with_root_by_id(stream_id, |stream| stream.id());
             self.permissioner.borrow().update_topic(
                 session.get_user_id(),
                 stream_id as u32,
@@ -407,6 +394,17 @@ impl IggyShard {
                 )
             })?;
         }
+        let exists = self
+            .streams2
+            .with_topic_root_by_id(stream_id, topic_id, |topic| {
+                let old_name = topic.name();
+                old_name == &name
+            });
+        if exists {
+            // TODO: Fix the errors to accept Identifier instead of u32.
+            return Err(IggyError::TopicNameAlreadyExists(name, 0));
+        }
+
         self.update_topic_base2(
             stream_id,
             topic_id,
@@ -453,7 +451,7 @@ impl IggyShard {
     ) {
         let (old_name, new_name) =
             self.streams2
-                .with_topic_by_id_mut(stream_id, topic_id, |topic| {
+                .with_topic_root_by_id_mut(stream_id, topic_id, |topic| {
                     let old_name = topic.name().clone();
                     topic.set_name(name.clone());
                     topic.set_message_expiry(message_expiry);
@@ -465,7 +463,7 @@ impl IggyShard {
                 });
         if old_name != new_name {
             self.streams2.with_topics(stream_id, |topics| {
-                topics.with_mut_index(|index| {
+                topics.with_index_mut(|index| {
                     // Rename the key inside of hashmap
                     let idx = index.remove(&old_name).expect("Rename key: key 
not found");
                     index.insert(new_name, idx);
@@ -600,10 +598,10 @@ impl IggyShard {
         {
             let topic_id = self
                 .streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id());
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id());
             let stream_id = self
                 .streams2
-                .with_stream_by_id(stream_id, |stream| stream.id());
+                .with_root_by_id(stream_id, |stream| stream.id());
             self.permissioner
             .borrow()
                 .delete_topic(session.get_user_id(), stream_id as u32, 
topic_id as u32)
@@ -617,7 +615,7 @@ impl IggyShard {
         let topic = self.delete_topic_base2(stream_id, 
topic_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to delete topic 
with ID: {topic_id} in stream with ID: {stream_id}")
         })?;
-        // TODO: Remove partitions.
+        // TODO: Decrease the stats
         Ok(topic)
     }
 
@@ -635,26 +633,12 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
     ) -> Result<topic2::Topic, IggyError> {
-        let topic = self.streams2.with_stream_by_id(stream_id, |stream| {
-            let id = stream
-                .topics()
-                .with_topic_by_id(topic_id, |topic| topic.id());
-            stream.topics().with_mut(|container| {
-                container.try_remove(id).ok_or_else(|| {
-                    let topic_name = stream
-                        .topics()
-                        .with_topic_by_id(topic_id, |topic| 
topic.name().clone());
-                    IggyError::TopicNameNotFound(topic_name, 
stream.name().clone())
-                })
-            })
-        })?;
-        let id = topic.id();
-        self.streams2.with_topics(stream_id, |topics| {
-            topics.with_stats_mut(|container| {
-                container
-                    .try_remove(id)
-                    .expect("Topic delete: topic stats not found");
-            });
+        let topic = self.streams2.with_root_by_id_mut(stream_id, |stream| {
+            let topics = stream.topics();
+            let id = topics.with_root_by_id(topic_id, |topic| topic.id());
+            let topic = topics.delete(id);
+            assert_eq!(topic.id(), id, "delete_topic: topic ID mismatch");
+            topic
         });
         Ok(topic)
     }
@@ -727,10 +711,10 @@ impl IggyShard {
         {
             let topic_id = self
                 .streams2
-                .with_topic_by_id(stream_id, topic_id, |topic| topic.id());
+                .with_topic_root_by_id(stream_id, topic_id, |topic| 
topic.id());
             let stream_id = self
                 .streams2
-                .with_stream_by_id(stream_id, |stream| stream.id());
+                .with_root_by_id(stream_id, |stream| stream.id());
             self.permissioner.borrow().purge_topic(
                 session.get_user_id(),
                 stream_id as u32,
diff --git a/core/server/src/shard/system/utils.rs 
b/core/server/src/shard/system/utils.rs
index 6ad3b224..28db67dc 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -1,6 +1,6 @@
 use iggy_common::{Identifier, IggyError};
 
-use crate::shard::IggyShard;
+use crate::{shard::IggyShard, slab::traits_ext::EntityComponentSystem};
 
 impl IggyShard {
     pub fn ensure_stream_exists(&self, stream_id: &Identifier) -> Result<(), 
IggyError> {
@@ -16,9 +16,10 @@ impl IggyShard {
         topic_id: &Identifier,
     ) -> Result<(), IggyError> {
         //self.ensure_stream_exists(stream_id)?;
+        let stream_id = self.streams2.get_index(stream_id);
         let exists = self
             .streams2
-            .with_stream_by_id(stream_id, |stream| 
stream.topics().exists(topic_id));
+            .with_by_id(stream_id, |(root, _)| root.topics().exists(topic_id));
         if !exists {
             return Err(IggyError::TopicIdNotFound(0, 0));
         }
@@ -35,8 +36,8 @@ impl IggyShard {
         //self.ensure_topic_exists(stream_id, topic_id)?;
         let exists = self
             .streams2
-            .with_topic_by_id(stream_id, topic_id, |topic| {
-                topic.consumer_groups().exists(group_id)
+            .with_topic_root_by_id(stream_id, topic_id, |root| {
+                root.consumer_groups().exists(group_id)
             });
         if !exists {
             return Err(IggyError::ConsumerGroupIdNotFound(0, 0));
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 9f5dd19c..666e3272 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -18,7 +18,8 @@ use crate::{
         personal_access_tokens::personal_access_token::PersonalAccessToken,
         polling_consumer::PollingConsumer,
         stats::stats::{PartitionStats, StreamStats, TopicStats},
-        topics::consumer_group2::Member,
+        streams::stream2,
+        topics::{consumer_group2::Member, topic2},
     },
 };
 
@@ -34,8 +35,7 @@ pub enum ShardEvent {
     },
     CreatedStream2 {
         id: usize,
-        name: String,
-        stats: Arc<StreamStats>,
+        stream: stream2::Stream,
     },
     CreatedStream {
         stream_id: Option<u32>,
@@ -94,14 +94,8 @@ pub enum ShardEvent {
         replication_factor: Option<u8>,
     },
     CreatedTopic2 {
-        id: usize,
         stream_id: Identifier,
-        name: String,
-        message_expiry: IggyExpiry,
-        compression_algorithm: CompressionAlgorithm,
-        max_topic_size: MaxTopicSize,
-        replication_factor: Option<u8>,
-        stats: Arc<TopicStats>,
+        topic: topic2::Topic,
     },
     CreatedConsumerGroup {
         stream_id: Identifier,
diff --git a/core/server/src/slab/consumer_groups.rs 
b/core/server/src/slab/consumer_groups.rs
index fe352696..c52fda6d 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -7,7 +7,7 @@ use std::sync::{Arc, atomic::AtomicUsize};
 
 const CAPACITY: usize = 1024;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct ConsumerGroups {
     index: AHashMap<<consumer_group2::ConsumerGroup as Keyed>::Key, usize>,
     container: Slab<consumer_group2::ConsumerGroup>,
diff --git a/core/server/src/slab/mod.rs b/core/server/src/slab/mod.rs
index a8185700..840de3d3 100644
--- a/core/server/src/slab/mod.rs
+++ b/core/server/src/slab/mod.rs
@@ -16,4 +16,4 @@ pub trait Keyed {
     fn key(&self) -> &Self::Key;
 }
 
-//index: AHashMap<T::Key, usize>,
\ No newline at end of file
+//index: AHashMap<T::Key, usize>,
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index 23b2f966..63f174d0 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -1,34 +1,27 @@
 use crate::{
-    slab::traits_ext::{
-        Borrow, Components, Delete, EntityComponentSystem, EntityMarker, 
IndexComponents, Insert,
-        IntoComponents,
-    },
+    slab::traits_ext::{Borrow, Delete, EntityComponentSystem, Insert, 
IntoComponents},
     streaming::{
         deduplication::message_deduplicator::MessageDeduplicator,
         partitions::{
             consumer_offset,
-            partition::ConsumerOffset,
             partition2::{self, Partition, PartitionRef},
         },
         segments,
         stats::stats::PartitionStats,
+        topics::consumer_group,
     },
 };
-use ahash::AHashMap;
 use slab::Slab;
 use std::sync::{Arc, atomic::AtomicU64};
 
 // TODO: This could be upper limit of partitions per topic, use that value to 
validate instead of whathever this thing is in `common` crate.
 pub const PARTITIONS_CAPACITY: usize = 16384;
-pub type SlabId = usize;
-
-struct PartitionOffset {
-    offset: u64,
-}
+const SEGMENTS_CAPACITY: usize = 1024;
+pub type ContainerId = usize;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Partitions {
-    info: Slab<partition2::PartitionInfo>,
+    root: Slab<partition2::PartitionRoot>,
     stats: Slab<Arc<PartitionStats>>,
     segments: Slab<Vec<segments::Segment2>>,
     message_deduplicator: Slab<Option<MessageDeduplicator>>,
@@ -38,10 +31,63 @@ pub struct Partitions {
     consumer_group_offset: Slab<Arc<papaya::HashMap<usize, 
consumer_offset::ConsumerOffset>>>,
 }
 
+impl Insert for Partitions {
+    type Idx = ContainerId;
+    type Item = Partition;
+
+    fn insert(&mut self, item: Self::Item) -> Self::Idx {
+        let (root, stats, deduplicator, offset, consumer_offset, 
consumer_group_offset) =
+            item.into_components();
+
+        let entity_id = self.root.insert(root);
+        let id = self.stats.insert(stats);
+        assert_eq!(
+            entity_id, id,
+            "partition_insert: id mismatch when inserting stats"
+        );
+        let id = self.segments.insert(Vec::with_capacity(SEGMENTS_CAPACITY));
+        assert_eq!(
+            entity_id, id,
+            "partition_insert: id mismatch when inserting segments"
+        );
+        let id = self.message_deduplicator.insert(deduplicator);
+        assert_eq!(
+            entity_id, id,
+            "partition_insert: id mismatch when inserting message_deduplicator"
+        );
+        let id = self.offset.insert(offset);
+        assert_eq!(
+            entity_id, id,
+            "partition_insert: id mismatch when inserting offset"
+        );
+        let id = self.consumer_offset.insert(consumer_offset);
+        assert_eq!(
+            entity_id, id,
+            "partition_insert: id mismatch when inserting consumer_offset"
+        );
+        let id = self.consumer_group_offset.insert(consumer_group_offset);
+        assert_eq!(
+            entity_id, id,
+            "partition_insert: id mismatch when inserting 
consumer_group_offset"
+        );
+        entity_id
+    }
+}
+
+impl Delete for Partitions {
+    type Idx = ContainerId;
+    type Item = Partition;
+
+    fn delete(&mut self, id: Self::Idx) -> Self::Item {
+        todo!()
+    }
+}
+
+//TODO: those from impls could use a macro aswell.
 impl<'a> From<&'a Partitions> for PartitionRef<'a> {
     fn from(value: &'a Partitions) -> Self {
         PartitionRef::new(
-            &value.info,
+            &value.root,
             &value.stats,
             &value.message_deduplicator,
             &value.offset,
@@ -51,40 +97,8 @@ impl<'a> From<&'a Partitions> for PartitionRef<'a> {
     }
 }
 
-impl Insert<SlabId> for Partitions {
-    type Item = Partition;
-
-    fn insert(&mut self, item: Self::Item) -> SlabId {
-        let (
-            info,
-            stats,
-            message_deduplicator,
-            offset,
-            consumer_offset,
-            consumer_group_offset,
-        ) = item.into_components();
-
-        let id = self.info.insert(info);
-        let info = &mut self.info[id];
-        info.update_id(id);
-        self.stats.insert(stats);
-        self.message_deduplicator.insert(message_deduplicator);
-        self.offset.insert(offset);
-        self.consumer_offset.insert(consumer_offset);
-        self.consumer_group_offset.insert(consumer_group_offset);
-        id
-    }
-}
-
-impl Delete<SlabId> for Partitions {
-    type Item = Partition;
-
-    fn delete(&mut self, id: SlabId) -> Self::Item {
-        todo!()
-    }
-}
-
-impl EntityComponentSystem<SlabId, Borrow> for Partitions {
+impl EntityComponentSystem<Borrow> for Partitions {
+    type Idx = ContainerId;
     type Entity = Partition;
     type EntityRef<'a> = PartitionRef<'a>;
 
@@ -106,7 +120,7 @@ impl EntityComponentSystem<SlabId, Borrow> for Partitions {
 impl Default for Partitions {
     fn default() -> Self {
         Self {
-            info: Slab::with_capacity(PARTITIONS_CAPACITY),
+            root: Slab::with_capacity(PARTITIONS_CAPACITY),
             stats: Slab::with_capacity(PARTITIONS_CAPACITY),
             segments: Slab::with_capacity(PARTITIONS_CAPACITY),
             message_deduplicator: Slab::with_capacity(PARTITIONS_CAPACITY),
@@ -118,8 +132,8 @@ impl Default for Partitions {
 }
 
 impl Partitions {
-    pub fn count(&self) -> usize {
-        self.info.len()
+    pub fn len(&self) -> usize {
+        self.root.len()
     }
 
     pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<PartitionStats>>) -> 
T) -> T {
@@ -128,8 +142,7 @@ impl Partitions {
     }
 
     pub fn with_stats_mut<T>(&mut self, f: impl FnOnce(&mut 
Slab<Arc<PartitionStats>>) -> T) -> T {
-        let mut stats = &mut self.stats;
-        f(&mut stats)
+        f(&mut self.stats)
     }
 
     pub fn with_segments(&self, partition_id: usize, f: impl 
FnOnce(&Vec<segments::Segment2>)) {
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 56bce63d..15ed08ac 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -2,27 +2,111 @@ use ahash::AHashMap;
 use iggy_common::Identifier;
 use slab::Slab;
 use std::{cell::RefCell, sync::Arc};
+use tokio_rustls::StartHandshake;
 
 use crate::{
-    slab::{Keyed, partitions::Partitions, topics::Topics},
-    streaming::{
-        partitions::partition2, stats::stats::StreamStats, streams::stream2, 
topics::topic2,
+    shard::stats,
+    slab::{
+        Keyed,
+        partitions::Partitions,
+        topics::Topics,
+        traits_ext::{
+            Delete, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell, Insert,
+            InsertCell, InteriorMutability, IntoComponents,
+        },
     },
+    streaming::{stats::stats::StreamStats, streams::stream2, topics::topic2},
 };
 
 const CAPACITY: usize = 1024;
+pub type ContainerId = usize;
 
 pub struct Streams {
-    index: RefCell<AHashMap<<stream2::Stream as Keyed>::Key, usize>>,
-    container: RefCell<Slab<stream2::Stream>>,
+    index: RefCell<AHashMap<<stream2::StreamRoot as Keyed>::Key, ContainerId>>,
+    root: RefCell<Slab<stream2::StreamRoot>>,
     stats: RefCell<Slab<Arc<StreamStats>>>,
 }
 
+impl<'a> From<&'a Streams> for stream2::StreamRef<'a> {
+    fn from(value: &'a Streams) -> Self {
+        let root = value.root.borrow();
+        let stats = value.stats.borrow();
+        stream2::StreamRef::new(root, stats)
+    }
+}
+
+impl<'a> From<&'a Streams> for stream2::StreamRefMut<'a> {
+    fn from(value: &'a Streams) -> Self {
+        let root = value.root.borrow_mut();
+        let stats = value.stats.borrow_mut();
+        stream2::StreamRefMut::new(root, stats)
+    }
+}
+
+impl InsertCell for Streams {
+    type Idx = ContainerId;
+    type Item = stream2::Stream;
+
+    fn insert(&self, item: Self::Item) -> Self::Idx {
+        let (root, stats) = item.into_components();
+        let key = root.key().clone();
+
+        let entity_id = self.root.borrow_mut().insert(root);
+        let id = self.stats.borrow_mut().insert(stats);
+        assert_eq!(
+            entity_id, id,
+            "stream_insert: id mismatch when inserting stats"
+        );
+        self.index.borrow_mut().insert(key, entity_id);
+        entity_id
+    }
+}
+
+impl DeleteCell for Streams {
+    type Idx = ContainerId;
+    type Item = stream2::Stream;
+
+    fn delete(&self, id: Self::Idx) -> Self::Item {
+        todo!()
+    }
+}
+
+impl EntityComponentSystem<InteriorMutability> for Streams {
+    type Idx = ContainerId;
+    type Entity = stream2::Stream;
+    type EntityRef<'a> = stream2::StreamRef<'a>;
+
+    fn with<O, F>(&self, f: F) -> O
+    where
+        F: for<'a> FnOnce(Self::EntityRef<'a>) -> O,
+    {
+        f(self.into())
+    }
+
+    async fn with_async<O, F>(&self, f: F) -> O
+    where
+        F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O,
+    {
+        f(self.into()).await
+    }
+}
+
+impl EntityComponentSystemMutCell for Streams {
+    type EntityRefMut<'a> = stream2::StreamRefMut<'a>;
+
+    fn with_mut<O, F>(&self, f: F) -> O
+    where
+        F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O,
+    {
+        f(self.into())
+    }
+}
+
 impl Streams {
     pub fn init() -> Self {
         Self {
             index: RefCell::new(AHashMap::with_capacity(CAPACITY)),
-            container: RefCell::new(Slab::with_capacity(CAPACITY)),
+            root: RefCell::new(Slab::with_capacity(CAPACITY)),
             stats: RefCell::new(Slab::with_capacity(CAPACITY)),
         }
     }
@@ -31,7 +115,7 @@ impl Streams {
         match id.kind {
             iggy_common::IdKind::Numeric => {
                 let id = id.get_u32_value().unwrap() as usize;
-                self.container.borrow().contains(id)
+                self.root.borrow().contains(id)
             }
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
@@ -40,7 +124,7 @@ impl Streams {
         }
     }
 
-    fn get_index(&self, id: &Identifier) -> usize {
+    pub fn get_index(&self, id: &Identifier) -> usize {
         match id.kind {
             iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as 
usize,
             iggy_common::IdKind::String => {
@@ -50,89 +134,79 @@ impl Streams {
         }
     }
 
-    pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<StreamStats>>) -> T) 
-> T {
-        let stats = self.stats.borrow();
-        f(&stats)
-    }
-
-    pub fn with_stats_by_id<T>(
+    pub fn with_root_by_id<T>(
         &self,
         id: &Identifier,
-        f: impl FnOnce(&Arc<StreamStats>) -> T,
+        f: impl FnOnce(&stream2::StreamRoot) -> T,
     ) -> T {
-        let stream_id = self.with_stream_by_id(id, |stream| stream.id());
-        self.with_stats(|stats| {
-            let stats = &stats[stream_id];
-            f(stats)
-        })
-    }
-
-    pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut 
Slab<Arc<StreamStats>>) -> T) -> T {
-        let mut stats = self.stats.borrow_mut();
-        f(&mut stats)
-    }
-
-    pub async fn with_async<T>(&self, f: impl 
AsyncFnOnce(&Slab<stream2::Stream>) -> T) -> T {
-        let container = self.container.borrow();
-        f(&container).await
+        let id = self.get_index(id);
+        self.with_by_id(id, |(root, _)| f(&root))
     }
 
-    pub fn with_index<T>(
+    pub async fn with_root_by_id_async<T>(
         &self,
-        f: impl FnOnce(&AHashMap<<stream2::Stream as Keyed>::Key, usize>) -> T,
+        id: &Identifier,
+        f: impl AsyncFnOnce(&stream2::StreamRoot) -> T,
     ) -> T {
-        let index = self.index.borrow();
-        f(&index)
+        let id = self.get_index(id);
+        self.with_by_id_async(id, async |(root, _)| f(&root).await)
+            .await
     }
 
-    pub fn with_index_mut<T>(
+    pub fn with_root_by_id_mut<T>(
         &self,
-        f: impl FnOnce(&mut AHashMap<<stream2::Stream as Keyed>::Key, usize>) 
-> T,
+        id: &Identifier,
+        f: impl FnOnce(&mut stream2::StreamRoot) -> T,
     ) -> T {
-        let mut index = self.index.borrow_mut();
-        f(&mut index)
-    }
-
-    pub fn with<T>(&self, f: impl FnOnce(&Slab<stream2::Stream>) -> T) -> T {
-        let container = self.container.borrow();
-        f(&container)
+        let id = self.get_index(id);
+        self.with_by_id_mut(id, |(mut root, _)| f(&mut root))
     }
 
-    pub fn with_mut<T>(&self, f: impl FnOnce(&mut Slab<stream2::Stream>) -> T) 
-> T {
-        let mut container = self.container.borrow_mut();
-        f(&mut container)
+    pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<StreamStats>>) -> T) 
-> T {
+        self.with(|components| {
+            let (_, stats) = components.into_components();
+            f(&stats)
+        })
     }
 
-    pub fn with_stream_by_id<T>(
+    pub fn with_stats_by_id<T>(
         &self,
         id: &Identifier,
-        f: impl FnOnce(&stream2::Stream) -> T,
+        f: impl FnOnce(&Arc<StreamStats>) -> T,
     ) -> T {
         let id = self.get_index(id);
-        self.with(|streams| streams[id].invoke(f))
+        self.with_by_id(id, |(_, stats)| {
+            let stats = stats;
+            f(&stats)
+        })
     }
 
-    pub async fn with_stream_by_id_async<T>(
+    pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut 
Slab<Arc<StreamStats>>) -> T) -> T {
+        self.with_mut(|components| {
+            let (_, mut stats) = components.into_components();
+            f(&mut stats)
+        })
+    }
+
+    pub fn with_index<T>(
         &self,
-        id: &Identifier,
-        f: impl AsyncFnOnce(&stream2::Stream) -> T,
+        f: impl FnOnce(&AHashMap<<stream2::StreamRoot as Keyed>::Key, usize>) 
-> T,
     ) -> T {
-        let id = self.get_index(id);
-        self.with_async(async |streams| streams[id].invoke_async(f).await)
-            .await
+        let index = self.index.borrow();
+        f(&index)
     }
 
-    pub fn with_stream_by_id_mut<T>(
+    pub fn with_index_mut<T>(
         &self,
-        id: &Identifier,
-        f: impl FnOnce(&mut stream2::Stream) -> T,
+        f: impl FnOnce(&mut AHashMap<<stream2::StreamRoot as Keyed>::Key, 
usize>) -> T,
     ) -> T {
-        let id = self.get_index(id);
-        self.with_mut(|streams| streams[id].invoke_mut(f))
+        let mut index = self.index.borrow_mut();
+        f(&mut index)
     }
 
     pub fn with_topics<T>(&self, stream_id: &Identifier, f: impl 
FnOnce(&Topics) -> T) -> T {
-        self.with_stream_by_id(stream_id, |stream| f(stream.topics()))
+        let id = self.get_index(stream_id);
+        self.with_by_id(id, |(root, _)| f(root.topics()))
     }
 
     pub async fn with_topics_async<T>(
@@ -140,7 +214,8 @@ impl Streams {
         stream_id: &Identifier,
         f: impl AsyncFnOnce(&Topics) -> T,
     ) -> T {
-        self.with_stream_by_id_async(stream_id, async |stream| 
f(stream.topics()).await)
+        let id = self.get_index(stream_id);
+        self.with_by_id_async(id, async |(root, _)| f(root.topics()).await)
             .await
     }
 
@@ -149,37 +224,48 @@ impl Streams {
         stream_id: &Identifier,
         f: impl FnOnce(&mut Topics) -> T,
     ) -> T {
-        self.with_stream_by_id_mut(stream_id, |stream| f(stream.topics_mut()))
+        let id = self.get_index(stream_id);
+        self.with_by_id_mut(id, |(mut root, _)| {
+            let topics = root.topics_mut();
+            f(topics)
+        })
     }
 
-    pub fn with_topic_by_id<T>(
+    pub fn with_topic_root_by_id<T>(
         &self,
         stream_id: &Identifier,
         id: &Identifier,
-        f: impl FnOnce(&topic2::Topic) -> T,
+        f: impl FnOnce(&topic2::TopicRoot) -> T,
     ) -> T {
-        self.with_topics(stream_id, |topics| topics.with_topic_by_id(id, f))
+        self.with_topics(stream_id, |topics| {
+            topics.with_root_by_id(id, |root| f(root))
+        })
     }
 
-    pub async fn with_topic_by_id_async<T>(
+    pub async fn with_topic_root_by_id_async<T>(
         &self,
         stream_id: &Identifier,
         id: &Identifier,
-        f: impl AsyncFnOnce(&topic2::Topic) -> T,
+        f: impl AsyncFnOnce(&topic2::TopicRoot) -> T,
     ) -> T {
         self.with_topics_async(stream_id, async |topics| {
-            topics.with_topic_by_id_async(id, f).await
+            let id = topics.get_index(id);
+            topics
+                .with_by_id_async(id, async |(root, _)| f(&root).await)
+                .await
         })
         .await
     }
 
-    pub fn with_topic_by_id_mut<T>(
+    pub fn with_topic_root_by_id_mut<T>(
         &self,
         stream_id: &Identifier,
         topic_id: &Identifier,
-        f: impl FnOnce(&mut topic2::Topic) -> T,
+        f: impl FnOnce(&mut topic2::TopicRoot) -> T,
     ) -> T {
-        self.with_topics_mut(stream_id, |topics| 
topics.with_topic_by_id_mut(topic_id, f))
+        self.with_topics_mut(stream_id, |topics| {
+            topics.with_root_by_id_mut(topic_id, |root| f(root))
+        })
     }
 
     pub fn with_partitions(
@@ -193,6 +279,15 @@ impl Streams {
         });
     }
 
+    pub fn with_partitions_mut<T>(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        f: impl FnOnce(&mut Partitions) -> T,
+    ) -> T {
+        self.with_topics_mut(stream_id, |topics| 
topics.with_partitions_mut(topic_id, f))
+    }
+
     pub async fn with_partitions_async<T>(
         &self,
         stream_id: &Identifier,
@@ -206,6 +301,6 @@ impl Streams {
     }
 
     pub fn len(&self) -> usize {
-        self.container.borrow().len()
+        self.root.borrow().len()
     }
 }
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 2ccdec88..87eb360d 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -4,33 +4,126 @@ use slab::Slab;
 use std::{cell::RefCell, sync::Arc};
 
 use crate::{
-    slab::{Keyed, partitions::Partitions},
-    streaming::{partitions::partition2, stats::stats::TopicStats, 
topics::topic2},
+    slab::{
+        Keyed,
+        partitions::Partitions,
+        traits_ext::{
+            Delete, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMutCell, Insert,
+            InsertCell, InteriorMutability, IntoComponents,
+        },
+    },
+    streaming::{
+        partitions::partition2,
+        stats::stats::TopicStats,
+        topics::topic2::{self, TopicRef},
+    },
 };
 
 const CAPACITY: usize = 1024;
+pub type ContainerId = usize;
 
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Topics {
-    index: RefCell<AHashMap<<topic2::Topic as Keyed>::Key, usize>>,
-    container: RefCell<Slab<topic2::Topic>>,
+    index: RefCell<AHashMap<<topic2::TopicRoot as Keyed>::Key, ContainerId>>,
+    root: RefCell<Slab<topic2::TopicRoot>>,
     stats: RefCell<Slab<Arc<TopicStats>>>,
 }
 
-impl Topics {
-    pub fn init() -> Self {
+impl InsertCell for Topics {
+    type Idx = ContainerId;
+    type Item = topic2::Topic;
+
+    fn insert(&self, item: Self::Item) -> Self::Idx {
+        let (root, stats) = item.into_components();
+        let key = root.key().clone();
+
+        let entity_id = self.root.borrow_mut().insert(root);
+        let id = self.stats.borrow_mut().insert(stats);
+        assert_eq!(
+            entity_id, id,
+            "topic_insert: id mismatch when inserting stats"
+        );
+        self.index.borrow_mut().insert(key, entity_id);
+        entity_id
+    }
+}
+
+impl DeleteCell for Topics {
+    type Idx = ContainerId;
+    type Item = topic2::Topic;
+
+    fn delete(&self, id: Self::Idx) -> Self::Item {
+        todo!()
+    }
+}
+
+//TODO: those from impls could use a macro aswell.
+impl<'a> From<&'a Topics> for topic2::TopicRef<'a> {
+    fn from(value: &'a Topics) -> Self {
+        let root = value.root.borrow();
+        let stats = value.stats.borrow();
+        topic2::TopicRef::new(root, stats)
+    }
+}
+impl Default for Topics {
+    fn default() -> Self {
         Self {
             index: RefCell::new(AHashMap::with_capacity(CAPACITY)),
-            container: RefCell::new(Slab::with_capacity(CAPACITY)),
+            root: RefCell::new(Slab::with_capacity(CAPACITY)),
             stats: RefCell::new(Slab::with_capacity(CAPACITY)),
         }
     }
+}
+
+impl<'a> From<&'a Topics> for topic2::TopicRefMut<'a> {
+    fn from(value: &'a Topics) -> Self {
+        let root = value.root.borrow_mut();
+        let stats = value.stats.borrow_mut();
+        topic2::TopicRefMut::new(root, stats)
+    }
+}
+
+impl EntityComponentSystem<InteriorMutability> for Topics {
+    type Idx = ContainerId;
+    type Entity = topic2::Topic;
+    type EntityRef<'a> = topic2::TopicRef<'a>;
+
+    fn with<O, F>(&self, f: F) -> O
+    where
+        F: for<'a> FnOnce(Self::EntityRef<'a>) -> O,
+    {
+        f(self.into())
+    }
+
+    async fn with_async<O, F>(&self, f: F) -> O
+    where
+        F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O,
+    {
+        f(self.into()).await
+    }
+}
+
+impl EntityComponentSystemMutCell for Topics {
+    type EntityRefMut<'a> = topic2::TopicRefMut<'a>;
+
+    fn with_mut<O, F>(&self, f: F) -> O
+    where
+        F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O,
+    {
+        f(self.into())
+    }
+}
+
+impl Topics {
+    pub fn len(&self) -> usize {
+        self.root.borrow().len()
+    }
 
     pub fn exists(&self, id: &Identifier) -> bool {
         match id.kind {
             iggy_common::IdKind::Numeric => {
                 let id = id.get_u32_value().unwrap() as usize;
-                self.container.borrow().contains(id)
+                self.root.borrow().contains(id)
             }
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
@@ -39,83 +132,77 @@ impl Topics {
         }
     }
 
-    fn get_index(&self, id: &Identifier) -> usize {
+    pub fn get_index(&self, id: &Identifier) -> usize {
         match id.kind {
             iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as 
usize,
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
-                tracing::error!("Getting index for topic: {key}, index: {:?}", 
self.index.borrow());
                 *self.index.borrow().get(&key).expect("Topic not found")
             }
         }
     }
 
-    pub fn len(&self) -> usize {
-        self.container.borrow().len()
-    }
-
-    pub async fn with_async<T>(&self, f: impl 
AsyncFnOnce(&Slab<topic2::Topic>) -> T) -> T {
-        let container = self.container.borrow();
-        f(&container).await
-    }
-
-    pub fn with<T>(&self, f: impl FnOnce(&Slab<topic2::Topic>) -> T) -> T {
-        let container = self.container.borrow();
-        f(&container)
-    }
-
-    pub fn with_mut<T>(&self, f: impl FnOnce(&mut Slab<topic2::Topic>) -> T) 
-> T {
-        let mut container = self.container.borrow_mut();
-        f(&mut container)
+    pub fn with_index<T>(
+        &self,
+        f: impl FnOnce(&AHashMap<<topic2::TopicRoot as Keyed>::Key, usize>) -> 
T,
+    ) -> T {
+        let index = self.index.borrow();
+        f(&index)
     }
 
-    pub fn with_mut_index<T>(
+    pub fn with_index_mut<T>(
         &self,
-        f: impl FnOnce(&mut AHashMap<<topic2::Topic as Keyed>::Key, usize>) -> 
T,
+        f: impl FnOnce(&mut AHashMap<<topic2::TopicRoot as Keyed>::Key, 
usize>) -> T,
     ) -> T {
         let mut index = self.index.borrow_mut();
         f(&mut index)
     }
 
-    pub async fn with_topic_by_id_async<T>(
+    pub fn with_root_by_id<T>(
         &self,
         id: &Identifier,
-        f: impl AsyncFnOnce(&topic2::Topic) -> T,
+        f: impl FnOnce(&topic2::TopicRoot) -> T,
     ) -> T {
         let id = self.get_index(id);
-        self.with_async(async |topics| topics[id].invoke_async(f).await)
-            .await
+        self.with_by_id(id, |(root, _)| f(&root))
     }
 
-    pub fn with_topic_stats_by_id<T>(
+    pub async fn with_root_by_id_async<T>(
         &self,
         id: &Identifier,
-        f: impl FnOnce(Arc<TopicStats>) -> T,
+        f: impl AsyncFnOnce(&topic2::TopicRoot) -> T,
     ) -> T {
-        let topic_id = self.with_topic_by_id(id, |topic| topic.id());
-        self.with_stats(|stats| f(stats[topic_id].clone()))
-    }
-
-    pub fn with_topic_by_id<T>(&self, id: &Identifier, f: impl 
FnOnce(&topic2::Topic) -> T) -> T {
         let id = self.get_index(id);
-        self.with(|topics| topics[id].invoke(f))
+        self.with_by_id_async(id, async |(root, _)| f(&root).await)
+            .await
     }
 
-    pub fn with_topic_by_id_mut<T>(
+    pub fn with_root_by_id_mut<T>(
         &self,
         id: &Identifier,
-        f: impl FnOnce(&mut topic2::Topic) -> T,
+        f: impl FnOnce(&mut topic2::TopicRoot) -> T,
     ) -> T {
         let id = self.get_index(id);
-        self.with_mut(|topics| topics[id].invoke_mut(f))
+        self.with_by_id_mut(id, |(mut root, _)| f(&mut root))
     }
 
-    pub fn with_partitions(&self, topic_id: &Identifier, f: impl 
FnOnce(&Partitions)) {
-        self.with_topic_by_id(topic_id, |topic| f(topic.partitions()));
+    pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<TopicStats>>) -> T) 
-> T {
+        self.with(|components| {
+            let (_, stats) = components.into_components();
+            f(&stats)
+        })
     }
 
-    pub fn with_partitions_mut(&self, topic_id: &Identifier, f: impl 
FnOnce(&mut Partitions)) {
-        self.with_topic_by_id_mut(topic_id, |topic| f(topic.partitions_mut()));
+    pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut Slab<Arc<TopicStats>>) 
-> T) -> T {
+        self.with_mut(|components| {
+            let (_, mut stats) = components.into_components();
+            f(&mut stats)
+        })
+    }
+
+    pub fn with_stats_by_id<T>(&self, id: &Identifier, f: impl 
FnOnce(&Arc<TopicStats>) -> T) -> T {
+        let id = self.get_index(id);
+        self.with_by_id(id, |(_, stats)| f(&stats))
     }
 
     pub async fn with_partitions_async<T>(
@@ -123,17 +210,31 @@ impl Topics {
         topic_id: &Identifier,
         f: impl AsyncFnOnce(&Partitions) -> T,
     ) -> T {
-        self.with_topic_by_id_async(topic_id, async |topic| 
f(topic.partitions()).await)
-            .await
+        let id = self.get_index(topic_id);
+        self.with_by_id_async(id, async |(root, _)| {
+            let partitions = root.partitions();
+            f(partitions).await
+        })
+        .await
     }
 
-    pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<TopicStats>>) -> T) 
-> T {
-        let stats = self.stats.borrow();
-        f(&stats)
+    pub fn with_partitions(&self, topic_id: &Identifier, f: impl 
FnOnce(&Partitions)) {
+        let id = self.get_index(topic_id);
+        self.with_by_id(id, |(root, _)| {
+            let partitions = root.partitions();
+            f(partitions)
+        })
     }
 
-    pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut Slab<Arc<TopicStats>>) 
-> T) -> T {
-        let mut stats = self.stats.borrow_mut();
-        f(&mut stats)
+    pub fn with_partitions_mut<T>(
+        &self,
+        topic_id: &Identifier,
+        f: impl FnOnce(&mut Partitions) -> T,
+    ) -> T {
+        let id = self.get_index(topic_id);
+        self.with_by_id_mut(id, |(mut root, _)| {
+            let partitions = root.partitions_mut();
+            f(partitions)
+        })
     }
 }
diff --git a/core/server/src/slab/traits_ext.rs 
b/core/server/src/slab/traits_ext.rs
index 9abee696..4b860f05 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -3,31 +3,51 @@ pub trait IntoComponents {
     fn into_components(self) -> Self::Components;
 }
 
-// Marker trait for the entity type.
-pub trait EntityMarker {}
+/// Marker trait for the `Entity`.
+pub trait EntityMarker {
+    type Idx;
+    fn id(&self) -> Self::Idx;
+    fn update_id(&mut self, id: Self::Idx);
+}
+
+/// Insert trait for inserting an `Entity`` into container.
+pub trait Insert {
+    type Idx;
+    type Item: IntoComponents + EntityMarker;
+    fn insert(&mut self, item: Self::Item) -> Self::Idx;
+}
 
-pub trait Insert<Idx> {
+pub trait InsertCell {
+    type Idx;
     type Item: IntoComponents + EntityMarker;
-    fn insert(&mut self, item: Self::Item) -> Idx;
+    fn insert(&self, item: Self::Item) -> Self::Idx;
 }
 
-pub trait Delete<Idx> {
+/// Delete trait for deleting an `Entity` from container.
+pub trait Delete {
+    type Idx;
     type Item: IntoComponents + EntityMarker;
-    fn delete(&mut self, id: Idx) -> Self::Item;
+    fn delete(&mut self, id: Self::Idx) -> Self::Item;
 }
 
-pub trait DeleteCell<Idx> {
+/// Delete trait for deleting an `Entity` from container for container types 
that use interior mutability.
+pub trait DeleteCell {
+    type Idx;
     type Item: IntoComponents + EntityMarker;
-    fn delete(&self, id: Idx) -> Self::Item;
+    fn delete(&self, id: Self::Idx) -> Self::Item;
 }
 
-pub trait IndexComponents<Idx: ?Sized> {
+/// Trait for getting components by EntityId.
+pub trait IntoComponentsById {
+    type Idx;
     type Output;
-    fn index(&self, index: Idx) -> Self::Output;
+    fn into_components_by_id(self, index: Self::Idx) -> Self::Output;
 }
 
+/// Marker type for borrow component containers.
 pub struct Borrow;
-pub struct RefCell;
+/// Marker type for component containers that use interior mutability.
+pub struct InteriorMutability;
 
 mod private {
     pub trait Sealed {}
@@ -40,39 +60,37 @@ pub trait ComponentsMapping<T>: private::Sealed {
 }
 
 pub trait ComponentsByIdMapping<T>: private::Sealed {
-    // TODO: We will need this to contrain the `EntityRef` and `EntityRefMut` 
types, so after decomposing they have proper mapping.
-    // Similar mechanism to trait from above, but for (T1, T2) -> (&T1, &T2) 
mapping rather than (T1, T2) -> (&Slab<T1>, &Slab<T2>).
     type Ref<'a>;
     type RefMut<'a>;
 }
 
-macro_rules! impl_components_for_slab_as_refs {
+macro_rules! impl_components_mapping_for_slab {
     ($T:ident) => {
         impl<$T> private::Sealed for ($T,) {}
 
         impl<$T> ComponentsMapping<Borrow> for ($T,)
-        where for<'a> $T: 'a
+            where for<'a> $T :'a
         {
             type Ref<'a> = (&'a ::slab::Slab<$T>,);
             type RefMut<'a> = (&'a mut ::slab::Slab<$T>,);
         }
 
-        impl<$T> ComponentsMapping<RefCell> for ($T,)
-        where for<'a> $T: 'a
+        impl<$T> ComponentsMapping<InteriorMutability> for ($T,)
+            where for<'a> $T :'a
         {
             type Ref<'a> = (::std::cell::Ref<'a, ::slab::Slab<$T>>,);
             type RefMut<'a> = (::std::cell::RefMut<'a, ::slab::Slab<$T>>,);
         }
 
         impl<$T> ComponentsByIdMapping<Borrow> for ($T,)
-        where for<'a> $T: 'a
+            where for<'a> $T :'a
         {
             type Ref<'a> = (&'a $T,);
             type RefMut<'a> = (&'a mut $T,);
         }
 
-        impl<$T> ComponentsByIdMapping<RefCell> for ($T,)
-        where for<'a> $T: 'a
+        impl<$T> ComponentsByIdMapping<InteriorMutability> for ($T,)
+            where for<'a> $T :'a
         {
             type Ref<'a> = (::std::cell::Ref<'a, $T>,);
             type RefMut<'a> = (::std::cell::RefMut<'a, $T>,);
@@ -83,44 +101,44 @@ macro_rules! impl_components_for_slab_as_refs {
         impl<$T, $($rest),+> private::Sealed for ($T, $($rest),+) {}
 
         impl<$T, $($rest),+> ComponentsMapping<Borrow> for ($T, $($rest),+)
-        where
-            for<'a> $T: 'a,
-            $(for<'a> $rest: 'a),+
+            where
+                for<'a> $T :'a,
+                $(for<'a> $rest: 'a),+
         {
             type Ref<'a> = (&'a ::slab::Slab<$T>, $(&'a 
::slab::Slab<$rest>),+);
             type RefMut<'a> = (&'a mut ::slab::Slab<$T>, $(&'a mut 
::slab::Slab<$rest>),+);
         }
 
-        impl<$T, $($rest),+> ComponentsMapping<RefCell> for ($T, $($rest),+)
-        where
-            for<'a> $T: 'a,
-            $(for<'a> $rest: 'a),+
+        impl<$T, $($rest),+> ComponentsMapping<InteriorMutability> for ($T, 
$($rest),+)
+            where
+                for<'a> $T :'a,
+                $(for<'a> $rest: 'a),+
         {
             type Ref<'a> = (std::cell::Ref<'a, ::slab::Slab<$T>>, 
$(::std::cell::Ref<'a, ::slab::Slab<$rest>>),+);
             type RefMut<'a> = (std::cell::RefMut<'a, ::slab::Slab<$T>>, 
$(::std::cell::RefMut<'a, ::slab::Slab<$rest>>),+);
         }
 
         impl<$T, $($rest),+> ComponentsByIdMapping<Borrow> for ($T, $($rest),+)
-        where
-            for<'a> $T: 'a,
-            $(for<'a> $rest: 'a),+
+            where
+                for<'a> $T :'a,
+                $(for<'a> $rest: 'a),+
         {
             type Ref<'a> = (&'a $T, $(&'a $rest),+);
             type RefMut<'a> = (&'a mut $T, $(&'a mut $rest),+);
         }
 
-        impl<$T, $($rest),+> ComponentsByIdMapping<RefCell> for ($T, 
$($rest),+)
-        where
-            for<'a> $T: 'a,
-            $(for<'a> $rest: 'a),+
+        impl<$T, $($rest),+> ComponentsByIdMapping<InteriorMutability> for 
($T, $($rest),+)
+            where
+                for<'a> $T :'a,
+                $(for<'a> $rest: 'a),+
         {
             type Ref<'a> = (std::cell::Ref<'a, $T>, $(::std::cell::Ref<'a, 
$rest>),+);
             type RefMut<'a> = (std::cell::RefMut<'a, $T>, 
$(::std::cell::RefMut<'a, $rest>),+);
         }
-        impl_components_for_slab_as_refs!($($rest),+);
+        impl_components_mapping_for_slab!($($rest),+);
     };
 }
-impl_components_for_slab_as_refs!(T1, T2, T3, T4, T5, T6, T7, T8);
+impl_components_mapping_for_slab!(T1, T2, T3, T4, T5, T6, T7, T8);
 
 type Mapping<'a, E, T> = <<E as IntoComponents>::Components as 
ComponentsMapping<T>>::Ref<'a>;
 type MappingMut<'a, E, T> = <<E as IntoComponents>::Components as 
ComponentsMapping<T>>::RefMut<'a>;
@@ -137,17 +155,17 @@ type MappingByIdMut<'a, E, T> =
 // So we lack the ability to immediately discard unnecessary components, which 
leads to less ergonomic API.
 // Damn tradeoffs.
 pub type Components<T> = <T as IntoComponents>::Components;
-pub type ComponentsById<Idx, T> = <T as IndexComponents<Idx>>::Output;
+pub type ComponentsById<'a, T> = <T as IntoComponentsById>::Output;
 
-pub trait EntityComponentSystem<Idx, T>
+pub trait EntityComponentSystem<T>
 where
     <Self::Entity as IntoComponents>::Components: ComponentsMapping<T> + 
ComponentsByIdMapping<T>,
 {
+    type Idx;
     type Entity: IntoComponents + EntityMarker;
     type EntityRef<'a>: IntoComponents<Components = Mapping<'a, Self::Entity, 
T>>
-        + IndexComponents<Idx, Output = MappingById<'a, Self::Entity, T>>
-    where
-        Self: 'a;
+        + IntoComponentsById<Idx = Self::Idx, Output = MappingById<'a, 
Self::Entity, T>>;
+
     fn with<O, F>(&self, f: F) -> O
     where
         F: for<'a> FnOnce(Self::EntityRef<'a>) -> O;
@@ -156,53 +174,52 @@ where
     where
         F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O;
 
-    fn with_by_id<O, F>(&self, id: Idx, f: F) -> O
+    fn with_by_id<O, F>(&self, id: Self::Idx, f: F) -> O
     where
-        F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRef<'a>>) -> O,
+        F: for<'a> FnOnce(ComponentsById<'a, Self::EntityRef<'a>>) -> O,
     {
-        self.with(|components| f(components.index(id)))
+        self.with(|components| f(components.into_components_by_id(id)))
     }
 
-    fn with_by_id_async<O, F>(&self, id: Idx, f: F) -> impl Future<Output = O>
+    fn with_by_id_async<O, F>(&self, id: Self::Idx, f: F) -> impl 
Future<Output = O>
     where
-        F: for<'a> AsyncFnOnce(ComponentsById<Idx, Self::EntityRef<'a>>) -> O,
+        F: for<'a> AsyncFnOnce(ComponentsById<'a, Self::EntityRef<'a>>) -> O,
     {
-        self.with_async(async |components| f(components.index(id)).await)
+        self.with_async(async |components| 
f(components.into_components_by_id(id)).await)
     }
 }
 
-pub trait EntityComponentSystemMut<Idx>: EntityComponentSystem<Idx, Borrow> {
+pub trait EntityComponentSystemMut: EntityComponentSystem<Borrow> {
     type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, 
Self::Entity, Borrow>>
-        + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, 
Borrow>>
-    where
-        Self: 'a;
+        + IntoComponentsById<Idx = Self::Idx, Output = MappingByIdMut<'a, 
Self::Entity, Borrow>>;
 
     fn with_mut<O, F>(&mut self, f: F) -> O
     where
         F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O;
 
-    fn with_by_id_mut<O, F>(&mut self, id: Idx, f: F) -> O
+    fn with_by_id_mut<O, F>(&mut self, id: Self::Idx, f: F) -> O
     where
-        F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O,
+        F: for<'a> FnOnce(ComponentsById<'a, Self::EntityRefMut<'a>>) -> O,
     {
-        self.with_mut(|components| f(components.index(id)))
+        self.with_mut(|components| f(components.into_components_by_id(id)))
     }
 }
 
-pub trait EntityComponentSystemMutCell<Idx>: EntityComponentSystem<Idx, 
RefCell> {
-    type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, 
Self::Entity, RefCell>>
-        + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, 
RefCell>>
-    where
-        Self: 'a;
+pub trait EntityComponentSystemMutCell: 
EntityComponentSystem<InteriorMutability> {
+    type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, 
Self::Entity, InteriorMutability>>
+        + IntoComponentsById<
+            Idx = Self::Idx,
+            Output = MappingByIdMut<'a, Self::Entity, InteriorMutability>,
+        >;
 
     fn with_mut<O, F>(&self, f: F) -> O
     where
         F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O;
 
-    fn with_by_id_mut<O, F>(&self, id: Idx, f: F) -> O
+    fn with_by_id_mut<O, F>(&self, id: Self::Idx, f: F) -> O
     where
-        F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O,
+        F: for<'a> FnOnce(ComponentsById<'a, Self::EntityRefMut<'a>>) -> O,
     {
-        self.with_mut(|components| f(components.index(id)))
+        self.with_mut(|components| f(components.into_components_by_id(id)))
     }
 }
diff --git a/core/server/src/streaming/partitions/partition2.rs 
b/core/server/src/streaming/partitions/partition2.rs
index 3c5cab3a..c5633048 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -1,7 +1,7 @@
 use crate::{
     slab::{
-        partitions::{Partitions, SlabId},
-        traits_ext::{EntityMarker, IndexComponents, IntoComponents},
+        partitions::{self, Partitions},
+        traits_ext::{EntityMarker, IntoComponents, IntoComponentsById},
     },
     streaming::{
         deduplication::message_deduplicator::{self, MessageDeduplicator},
@@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicU64};
 
 #[derive(Debug)]
 pub struct Partition {
-    info: PartitionInfo,
+    root: PartitionRoot,
     stats: Arc<PartitionStats>,
     message_deduplicator: Option<MessageDeduplicator>,
     offset: Arc<AtomicU64>,
@@ -25,7 +25,7 @@ pub struct Partition {
 
 impl Partition {
     pub fn new(
-        info: PartitionInfo,
+        root: PartitionRoot,
         stats: Arc<PartitionStats>,
         message_deduplicator: Option<MessageDeduplicator>,
         offset: Arc<AtomicU64>,
@@ -33,7 +33,7 @@ impl Partition {
         consumer_group_offset: Arc<papaya::HashMap<usize, 
consumer_offset::ConsumerOffset>>,
     ) -> Self {
         Self {
-            info,
+            root,
             stats,
             message_deduplicator,
             offset,
@@ -41,20 +41,12 @@ impl Partition {
             consumer_group_offset,
         }
     }
-
-    pub fn update_id(&mut self, id: usize) {
-        self.info.id = id;
-    }
-
-    pub fn id(&self) -> usize {
-        self.info.id
-    }
 }
 
 impl Clone for Partition {
     fn clone(&self) -> Self {
         Self {
-            info: self.info.clone(),
+            root: self.root.clone(),
             stats: Arc::clone(&self.stats),
             message_deduplicator: self.message_deduplicator.clone(),
             offset: Arc::clone(&self.offset),
@@ -64,11 +56,21 @@ impl Clone for Partition {
     }
 }
 
-impl EntityMarker for Partition {}
+impl EntityMarker for Partition {
+    type Idx = partitions::ContainerId;
+
+    fn id(&self) -> Self::Idx {
+        self.root.id
+    }
+
+    fn update_id(&mut self, id: Self::Idx) {
+        self.root.id = id;
+    }
+}
 
 impl IntoComponents for Partition {
     type Components = (
-        PartitionInfo,
+        PartitionRoot,
         Arc<PartitionStats>,
         Option<MessageDeduplicator>,
         Arc<AtomicU64>,
@@ -78,7 +80,7 @@ impl IntoComponents for Partition {
 
     fn into_components(self) -> Self::Components {
         (
-            self.info,
+            self.root,
             self.stats,
             self.message_deduplicator,
             self.offset,
@@ -89,13 +91,13 @@ impl IntoComponents for Partition {
 }
 
 #[derive(Default, Debug, Clone)]
-pub struct PartitionInfo {
+pub struct PartitionRoot {
     id: usize,
     created_at: IggyTimestamp,
     should_increment_offset: bool,
 }
 
-impl PartitionInfo {
+impl PartitionRoot {
     pub fn new(created_at: IggyTimestamp, should_increment_offset: bool) -> 
Self {
         Self {
             id: 0,
@@ -113,9 +115,9 @@ impl PartitionInfo {
     }
 }
 
-// TODO: Probably move this to the `slab` module
+// TODO: Create a macro to impl those PartitionRef/PartitionRefMut structs and 
it's traits.
 pub struct PartitionRef<'a> {
-    info: &'a Slab<PartitionInfo>,
+    root: &'a Slab<PartitionRoot>,
     stats: &'a Slab<Arc<PartitionStats>>,
     message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
     offset: &'a Slab<Arc<AtomicU64>>,
@@ -125,7 +127,7 @@ pub struct PartitionRef<'a> {
 
 impl<'a> PartitionRef<'a> {
     pub fn new(
-        info: &'a Slab<PartitionInfo>,
+        root: &'a Slab<PartitionRoot>,
         stats: &'a Slab<Arc<PartitionStats>>,
         message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
         offset: &'a Slab<Arc<AtomicU64>>,
@@ -135,7 +137,7 @@ impl<'a> PartitionRef<'a> {
         >,
     ) -> Self {
         Self {
-            info,
+            root,
             stats,
             message_deduplicator,
             offset,
@@ -147,7 +149,7 @@ impl<'a> PartitionRef<'a> {
 
 impl<'a> IntoComponents for PartitionRef<'a> {
     type Components = (
-        &'a Slab<PartitionInfo>,
+        &'a Slab<PartitionRoot>,
         &'a Slab<Arc<PartitionStats>>,
         &'a Slab<Option<MessageDeduplicator>>,
         &'a Slab<Arc<AtomicU64>>,
@@ -157,7 +159,7 @@ impl<'a> IntoComponents for PartitionRef<'a> {
 
     fn into_components(self) -> Self::Components {
         (
-            self.info,
+            self.root,
             self.stats,
             self.message_deduplicator,
             self.offset,
@@ -167,9 +169,10 @@ impl<'a> IntoComponents for PartitionRef<'a> {
     }
 }
 
-impl<'a> IndexComponents<SlabId> for PartitionRef<'a> {
+impl<'a> IntoComponentsById for PartitionRef<'a> {
+    type Idx = partitions::ContainerId;
     type Output = (
-        &'a PartitionInfo,
+        &'a PartitionRoot,
         &'a Arc<PartitionStats>,
         &'a Option<MessageDeduplicator>,
         &'a Arc<AtomicU64>,
@@ -177,9 +180,9 @@ impl<'a> IndexComponents<SlabId> for PartitionRef<'a> {
         &'a Arc<papaya::HashMap<usize, consumer_offset::ConsumerOffset>>,
     );
 
-    fn index(&self, index: SlabId) -> Self::Output {
+    fn into_components_by_id(self, index: Self::Idx) -> Self::Output {
         (
-            &self.info[index],
+            &self.root[index],
             &self.stats[index],
             &self.message_deduplicator[index],
             &self.offset[index],
diff --git a/core/server/src/streaming/segments/segment2.rs 
b/core/server/src/streaming/segments/segment2.rs
index 9d2f6e70..237c9a3a 100644
--- a/core/server/src/streaming/segments/segment2.rs
+++ b/core/server/src/streaming/segments/segment2.rs
@@ -1,4 +1,4 @@
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub struct Segment2 {
     pub id: usize,
 }
diff --git a/core/server/src/streaming/streams/stream2.rs 
b/core/server/src/streaming/streams/stream2.rs
index 6e4eaafb..309e4b95 100644
--- a/core/server/src/streaming/streams/stream2.rs
+++ b/core/server/src/streaming/streams/stream2.rs
@@ -1,29 +1,28 @@
-use bytes::{BufMut, BytesMut};
 use iggy_common::IggyTimestamp;
 use slab::Slab;
-
-use crate::slab::{Keyed, topics::Topics};
-
-#[derive(Debug)]
-pub struct Stream {
+use std::{
+    cell::{Ref, RefMut},
+    sync::Arc,
+};
+
+use crate::{
+    slab::{
+        Keyed, streams,
+        topics::Topics,
+        traits_ext::{EntityMarker, IntoComponents, IntoComponentsById},
+    },
+    streaming::stats::stats::StreamStats,
+};
+
+#[derive(Debug, Clone)]
+pub struct StreamRoot {
     id: usize,
     name: String,
     created_at: IggyTimestamp,
     topics: Topics,
 }
 
-impl Default for Stream {
-    fn default() -> Self {
-        Self {
-            id: 0,
-            name: String::new(),
-            created_at: IggyTimestamp::now(),
-            topics: Topics::init(),
-        }
-    }
-}
-
-impl Keyed for Stream {
+impl Keyed for StreamRoot {
     type Key = String;
 
     fn key(&self) -> &Self::Key {
@@ -31,29 +30,16 @@ impl Keyed for Stream {
     }
 }
 
-impl Stream {
-    pub fn new(name: String) -> Self {
-        let now = IggyTimestamp::now();
+impl StreamRoot {
+    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
         Self {
             id: 0,
             name,
-            created_at: now,
-            topics: Topics::init(),
+            created_at,
+            topics: Topics::default(),
         }
     }
 
-    pub fn invoke<T>(&self, f: impl FnOnce(&Self) -> T) -> T {
-        f(self)
-    }
-
-    pub fn invoke_mut<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T {
-        f(self)
-    }
-
-    pub async fn invoke_async<T>(&self, f: impl AsyncFnOnce(&Self) -> T) -> T {
-        f(self).await
-    }
-
     pub fn id(&self) -> usize {
         self.id
     }
@@ -70,13 +56,6 @@ impl Stream {
         self.topics.len()
     }
 
-    pub fn insert_into(self, container: &mut Slab<Self>) -> usize {
-        let idx = container.insert(self);
-        let stream = &mut container[idx];
-        stream.id = idx;
-        idx
-    }
-
     pub fn topics(&self) -> &Topics {
         &self.topics
     }
@@ -85,3 +64,110 @@ impl Stream {
         &mut self.topics
     }
 }
+
+#[derive(Debug, Clone)]
+pub struct Stream {
+    root: StreamRoot,
+    stats: Arc<StreamStats>,
+}
+
+impl IntoComponents for Stream {
+    type Components = (StreamRoot, Arc<StreamStats>);
+
+    fn into_components(self) -> Self::Components {
+        (self.root, self.stats)
+    }
+}
+
+impl EntityMarker for Stream {
+    type Idx = streams::ContainerId;
+
+    fn id(&self) -> Self::Idx {
+        self.root.id
+    }
+
+    fn update_id(&mut self, id: Self::Idx) {
+        self.root.id = id;
+    }
+}
+
+impl Stream {
+    pub fn new(name: String, stats: Arc<StreamStats>, created_at: 
IggyTimestamp) -> Self {
+        let root = StreamRoot::new(name, created_at);
+        Self { root, stats }
+    }
+
+    pub fn stats(&self) -> &Arc<StreamStats> {
+        &self.stats
+    }
+
+    pub fn root(&self) -> &StreamRoot {
+        &self.root
+    }
+}
+
+pub struct StreamRef<'a> {
+    root: Ref<'a, Slab<StreamRoot>>,
+    stats: Ref<'a, Slab<Arc<StreamStats>>>,
+}
+
+impl<'a> StreamRef<'a> {
+    pub fn new(root: Ref<'a, Slab<StreamRoot>>, stats: Ref<'a, 
Slab<Arc<StreamStats>>>) -> Self {
+        Self { root, stats }
+    }
+}
+
+impl<'a> IntoComponents for StreamRef<'a> {
+    type Components = (Ref<'a, Slab<StreamRoot>>, Ref<'a, 
Slab<Arc<StreamStats>>>);
+
+    fn into_components(self) -> Self::Components {
+        (self.root, self.stats)
+    }
+}
+
+impl<'a> IntoComponentsById for StreamRef<'a> {
+    type Idx = streams::ContainerId;
+    type Output = (Ref<'a, StreamRoot>, Ref<'a, Arc<StreamStats>>);
+
+    fn into_components_by_id(self, index: Self::Idx) -> Self::Output {
+        let root = Ref::map(self.root, |r| &r[index]);
+        let stats = Ref::map(self.stats, |s| &s[index]);
+        (root, stats)
+    }
+}
+
+pub struct StreamRefMut<'a> {
+    root: RefMut<'a, Slab<StreamRoot>>,
+    stats: RefMut<'a, Slab<Arc<StreamStats>>>,
+}
+
+impl<'a> StreamRefMut<'a> {
+    pub fn new(
+        root: RefMut<'a, Slab<StreamRoot>>,
+        stats: RefMut<'a, Slab<Arc<StreamStats>>>,
+    ) -> Self {
+        Self { root, stats }
+    }
+}
+
+impl<'a> IntoComponents for StreamRefMut<'a> {
+    type Components = (
+        RefMut<'a, Slab<StreamRoot>>,
+        RefMut<'a, Slab<Arc<StreamStats>>>,
+    );
+
+    fn into_components(self) -> Self::Components {
+        (self.root, self.stats)
+    }
+}
+
+impl<'a> IntoComponentsById for StreamRefMut<'a> {
+    type Idx = streams::ContainerId;
+    type Output = (RefMut<'a, StreamRoot>, RefMut<'a, Arc<StreamStats>>);
+
+    fn into_components_by_id(self, index: Self::Idx) -> Self::Output {
+        let root = RefMut::map(self.root, |r| &mut r[index]);
+        let stats = RefMut::map(self.stats, |s| &mut s[index]);
+        (root, stats)
+    }
+}
diff --git a/core/server/src/streaming/topics/consumer_group2.rs 
b/core/server/src/streaming/topics/consumer_group2.rs
index f1da2f85..d3785052 100644
--- a/core/server/src/streaming/topics/consumer_group2.rs
+++ b/core/server/src/streaming/topics/consumer_group2.rs
@@ -17,7 +17,7 @@ use tracing::trace;
 
 pub const MEMBERS_CAPACITY: usize = 128;
 
-#[derive(Default, Debug)]
+#[derive(Default, Debug, Clone)]
 pub struct ConsumerGroup {
     id: usize,
     name: String,
@@ -156,6 +156,17 @@ pub struct Member {
     current_partition_idx: AtomicUsize,
 }
 
+impl Clone for Member {
+    fn clone(&self) -> Self {
+        Self {
+            id: self.id.clone(),
+            client_id: self.client_id.clone(),
+            partitions: self.partitions.clone(),
+            current_partition_idx: AtomicUsize::new(0),
+        }
+    }
+}
+
 impl Member {
     pub fn new(client_id: u32) -> Self {
         Member {
diff --git a/core/server/src/streaming/topics/topic2.rs 
b/core/server/src/streaming/topics/topic2.rs
index f66cfda5..01586a87 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -1,18 +1,14 @@
-use crate::streaming::stats::stats::PartitionStats;
-use crate::{
-    slab::{
-        Keyed,
-        consumer_groups::ConsumerGroups,
-        partitions::{PARTITIONS_CAPACITY, Partitions},
-    },
-    streaming::partitions::consumer_offset,
-};
+use crate::slab::topics;
+use crate::slab::traits_ext::{EntityMarker, IntoComponents, 
IntoComponentsById};
+use crate::slab::{Keyed, consumer_groups::ConsumerGroups, 
partitions::Partitions};
+use crate::streaming::stats::stats::{PartitionStats, TopicStats};
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use slab::Slab;
+use std::cell::{Ref, RefMut};
 use std::sync::Arc;
 
-#[derive(Default, Debug)]
-pub struct Topic {
+#[derive(Default, Debug, Clone)]
+pub struct TopicRoot {
     id: usize,
     // TODO: This property should be removed, we won't use it in our 
clustering impl.
     replication_factor: u8,
@@ -26,9 +22,136 @@ pub struct Topic {
     consumer_groups: ConsumerGroups,
 }
 
+impl Keyed for TopicRoot {
+    type Key = String;
+
+    fn key(&self) -> &Self::Key {
+        &self.name
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Topic {
+    root: TopicRoot,
+    stats: Arc<TopicStats>,
+}
+
 impl Topic {
     pub fn new(
         name: String,
+        stats: Arc<TopicStats>,
+        created_at: IggyTimestamp,
+        replication_factor: u8,
+        message_expiry: IggyExpiry,
+        compression: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+    ) -> Self {
+        let root = TopicRoot::new(
+            name,
+            created_at,
+            replication_factor,
+            message_expiry,
+            compression,
+            max_topic_size,
+        );
+        Self { root, stats }
+    }
+
+    pub fn root(&self) -> &TopicRoot {
+        &self.root
+    }
+}
+
+impl IntoComponents for Topic {
+    type Components = (TopicRoot, Arc<TopicStats>);
+
+    fn into_components(self) -> Self::Components {
+        (self.root, self.stats)
+    }
+}
+
+impl EntityMarker for Topic {
+    type Idx = topics::ContainerId;
+    fn id(&self) -> Self::Idx {
+        self.root.id
+    }
+
+    fn update_id(&mut self, id: Self::Idx) {
+        self.root.id = id;
+    }
+}
+
+// TODO: Create a macro to impl those TopicRef/TopicRefMut structs and it's 
traits.
+pub struct TopicRef<'a> {
+    root: Ref<'a, Slab<TopicRoot>>,
+    stats: Ref<'a, Slab<Arc<TopicStats>>>,
+}
+
+impl<'a> TopicRef<'a> {
+    pub fn new(root: Ref<'a, Slab<TopicRoot>>, stats: Ref<'a, 
Slab<Arc<TopicStats>>>) -> Self {
+        Self { root, stats }
+    }
+}
+
+impl<'a> IntoComponents for TopicRef<'a> {
+    type Components = (Ref<'a, Slab<TopicRoot>>, Ref<'a, 
Slab<Arc<TopicStats>>>);
+
+    fn into_components(self) -> Self::Components {
+        (self.root, self.stats)
+    }
+}
+
+impl<'a> IntoComponentsById for TopicRef<'a> {
+    type Idx = topics::ContainerId;
+    type Output = (Ref<'a, TopicRoot>, Ref<'a, Arc<TopicStats>>);
+
+    fn into_components_by_id(self, index: Self::Idx) -> Self::Output {
+        let root = Ref::map(self.root, |r| &r[index]);
+        let stats = Ref::map(self.stats, |s| &s[index]);
+        (root, stats)
+    }
+}
+
+pub struct TopicRefMut<'a> {
+    root: RefMut<'a, Slab<TopicRoot>>,
+    stats: RefMut<'a, Slab<Arc<TopicStats>>>,
+}
+
+impl<'a> TopicRefMut<'a> {
+    pub fn new(
+        root: RefMut<'a, Slab<TopicRoot>>,
+        stats: RefMut<'a, Slab<Arc<TopicStats>>>,
+    ) -> Self {
+        Self { root, stats }
+    }
+}
+
+impl<'a> IntoComponents for TopicRefMut<'a> {
+    type Components = (
+        RefMut<'a, Slab<TopicRoot>>,
+        RefMut<'a, Slab<Arc<TopicStats>>>,
+    );
+
+    fn into_components(self) -> Self::Components {
+        (self.root, self.stats)
+    }
+}
+
+impl<'a> IntoComponentsById for TopicRefMut<'a> {
+    type Idx = topics::ContainerId;
+    type Output = (RefMut<'a, TopicRoot>, RefMut<'a, Arc<TopicStats>>);
+
+    fn into_components_by_id(self, index: Self::Idx) -> Self::Output {
+        let root = RefMut::map(self.root, |r| &mut r[index]);
+        let stats = RefMut::map(self.stats, |s| &mut s[index]);
+        (root, stats)
+    }
+}
+
+impl TopicRoot {
+    pub fn new(
+        name: String,
+        created_at: IggyTimestamp,
         replication_factor: u8,
         message_expiry: IggyExpiry,
         compression: CompressionAlgorithm,
@@ -121,11 +244,3 @@ impl Topic {
         idx
     }
 }
-
-impl Keyed for Topic {
-    type Key = String;
-
-    fn key(&self) -> &Self::Key {
-        &self.name
-    }
-}

Reply via email to