This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch improved_tpc_partition_ecs
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 80c0b01ea561e99be63726185f43b2d51a13d42f
Author: numminex <[email protected]>
AuthorDate: Mon Aug 11 11:17:26 2025 +0200

    changes
---
 core/server/src/slab/partitions.rs                 | 197 +++++++++++----------
 core/server/src/slab/traits_ext.rs                 |  62 +++++--
 core/server/src/streaming/partitions/partition2.rs |   4 +-
 core/server/src/streaming/topics/topic2.rs         |  21 ---
 4 files changed, 150 insertions(+), 134 deletions(-)

diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index c82867f5..0cc1425c 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -1,12 +1,13 @@
 use crate::{
     slab::traits_ext::{
-        Borrow, Components, EntityComponentSystem, IndexComponents, 
IntoComponents,
+        Borrow, Components, Delete, EntityComponentSystem, EntityMarker, 
IndexComponents, Insert, IntoComponents, 
     },
     streaming::{
-        deduplication::message_deduplicator::MessageDeduplicator, 
partitions::partition2, segments,
+        deduplication::message_deduplicator::MessageDeduplicator, 
partitions::{partition::ConsumerOffset, partition2}, segments,
         stats::stats::PartitionStats,
     },
 };
+use ahash::AHashMap;
 use slab::Slab;
 use std::{
     ops::Index,
@@ -15,80 +16,146 @@ use std::{
 
 // TODO: This could be upper limit of partitions per topic, use that value to 
validate instead of whathever this thing is in `common` crate.
 pub const PARTITIONS_CAPACITY: usize = 16384;
-type Idx = usize;
+type Id = usize;
 
 #[derive(Debug)]
 pub struct Partitions {
-    container: Slab<partition2::Partition>,
+    info: Slab<partition2::PartitionInfo>,
     stats: Slab<Arc<PartitionStats>>,
     segments: Slab<Vec<segments::Segment2>>,
-    message_deduplicators: Slab<Option<MessageDeduplicator>>,
-    partition_offsets: Slab<Arc<AtomicU64>>,
+    message_deduplicator: Slab<Option<MessageDeduplicator>>,
+    offset: Slab<Arc<AtomicU64>>,
+
+    consumer_offset: Slab<AHashMap<usize, ConsumerOffset>>,
+    consumer_group_offset: Slab<AHashMap<usize, ConsumerOffset>>,
+}
+
+impl Insert<Id> for Partitions {
+    type Item = Partition;
+
+    fn insert(&mut self, item: Self::Item) -> Id {
+        todo!();
+    }
+}
+
+impl Delete<Id> for Partitions {
+    type Item = Partition;
+
+    fn delete(&mut self, id: Id) -> Self::Item {
+        todo!()
+    }
 }
 
-pub struct Part {
-    partition: partition2::Partition,
+pub struct Partition {
+    info: partition2::PartitionInfo,
+    stats: Arc<PartitionStats>,
+    message_deduplicator: Option<MessageDeduplicator>,
+    offset: Arc<AtomicU64>,
 }
 
-impl IntoComponents for Part {
-    type Components = (partition2::Partition,);
+impl EntityMarker for Partition {}
+
+impl IntoComponents for Partition {
+    type Components = (
+        partition2::PartitionInfo,
+        Arc<PartitionStats>,
+        Option<MessageDeduplicator>,
+        Arc<AtomicU64>,
+    );
 
     fn into_components(self) -> Self::Components {
-        (self.partition,)
+        (
+            self.info,
+            self.stats,
+            self.message_deduplicator,
+            self.offset,
+        )
     }
 }
 
-pub struct PartRef<'a> {
-    partition: &'a Slab<partition2::Partition>,
+pub struct PartitionRef<'a> {
+    info: &'a Slab<partition2::PartitionInfo>,
+    stats: &'a Slab<Arc<PartitionStats>>,
+    message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
+    offset: &'a Slab<Arc<AtomicU64>>,
 }
 
-pub struct PartItemRef<'a> {
-    partition: &'a partition2::Partition,
+impl<'a> From<&'a Partitions> for PartitionRef<'a> {
+    fn from(value: &'a Partitions) -> Self {
+        PartitionRef {
+            info: &value.info,
+            stats: &value.stats,
+            message_deduplicator: &value.message_deduplicator,
+            offset: &value.offset,
+        }
+    }
 }
 
-impl<'a> IntoComponents for PartRef<'a> {
-    type Components = (&'a Slab<partition2::Partition>,);
+impl<'a> IntoComponents for PartitionRef<'a> {
+    type Components = (
+        &'a Slab<partition2::PartitionInfo>,
+        &'a Slab<Arc<PartitionStats>>,
+        &'a Slab<Option<MessageDeduplicator>>,
+        &'a Slab<Arc<AtomicU64>>,
+    );
 
     fn into_components(self) -> Self::Components {
-        (self.partition,)
+        (
+            self.info,
+            self.stats,
+            self.message_deduplicator,
+            self.offset,
+        )
     }
 }
 
-impl<'a> IndexComponents<Idx> for PartRef<'a> {
-    type Output = (&'a partition2::Partition,);
-
-    fn index(&self, index: Idx) -> Self::Output {
-        (&self.partition[index],)
+impl<'a> IndexComponents<Id> for PartitionRef<'a> {
+    type Output = (
+        &'a partition2::PartitionInfo,
+        &'a Arc<PartitionStats>,
+        &'a Option<MessageDeduplicator>,
+        &'a Arc<AtomicU64>,
+    );
+
+    fn index(&self, index: Id) -> Self::Output {
+        (
+            &self.info[index],
+            &self.stats[index],
+            &self.message_deduplicator[index],
+            &self.offset[index],
+        )
     }
 }
 
-impl EntityComponentSystem<Idx, Borrow> for Partitions {
-    type Entity = Part;
-    type EntityRef<'a> = PartRef<'a>;
+impl EntityComponentSystem<Id, Borrow> for Partitions {
+    type Entity = Partition;
+    type EntityRef<'a> = PartitionRef<'a>;
 
     fn with<O, F>(&self, f: F) -> O
     where
         F: for<'a> FnOnce(Self::EntityRef<'a>) -> O,
     {
-        todo!()
+        f(self.into())
     }
 
     async fn with_async<O, F>(&self, f: F) -> O
     where
-        F: for<'a> FnOnce(Components<Self::EntityRef<'a>>) -> O,
+        F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O,
     {
-        todo!()
+        f(self.into()).await
     }
 }
 
 impl Default for Partitions {
     fn default() -> Self {
         Self {
-            container: Slab::with_capacity(PARTITIONS_CAPACITY),
+            info: Slab::with_capacity(PARTITIONS_CAPACITY),
             stats: Slab::with_capacity(PARTITIONS_CAPACITY),
             segments: Slab::with_capacity(PARTITIONS_CAPACITY),
-            message_deduplicators: Slab::with_capacity(PARTITIONS_CAPACITY),
-            partition_offsets: Slab::with_capacity(PARTITIONS_CAPACITY),
+            message_deduplicator: Slab::with_capacity(PARTITIONS_CAPACITY),
+            offset: Slab::with_capacity(PARTITIONS_CAPACITY),
+            consumer_offset: Slab::with_capacity(PARTITIONS_CAPACITY),
+            consumer_group_offset: Slab::with_capacity(PARTITIONS_CAPACITY),
         }
     }
 }
@@ -104,72 +171,6 @@ impl Partitions {
         f(&mut stats)
     }
 
-    pub fn with_message_deduplicators<T>(
-        &self,
-        f: impl FnOnce(&Slab<Option<MessageDeduplicator>>) -> T,
-    ) -> T {
-        let message_deduplicators = &self.message_deduplicators;
-        f(message_deduplicators)
-    }
-
-    pub fn with_message_deduplicators_mut<T>(
-        &mut self,
-        f: impl FnOnce(&mut Slab<Option<MessageDeduplicator>>) -> T,
-    ) -> T {
-        let mut message_deduplicators = &mut self.message_deduplicators;
-        f(&mut message_deduplicators)
-    }
-
-    pub fn with_partition_offsets<T>(&self, f: impl 
FnOnce(&Slab<Arc<AtomicU64>>) -> T) -> T {
-        let partition_offsets = &self.partition_offsets;
-        f(partition_offsets)
-    }
-
-    pub fn with_partition_offsets_mut<T>(
-        &mut self,
-        f: impl FnOnce(&mut Slab<Arc<AtomicU64>>) -> T,
-    ) -> T {
-        let mut partition_offsets = &mut self.partition_offsets;
-        f(&mut partition_offsets)
-    }
-
-    pub async fn with_async<T>(&self, f: impl 
AsyncFnOnce(&Slab<partition2::Partition>) -> T) -> T {
-        let container = &self.container;
-        f(&container).await
-    }
-
-    pub fn with<T>(&self, f: impl FnOnce(&Slab<partition2::Partition>) -> T) 
-> T {
-        let container = &self.container;
-        f(&container)
-    }
-
-    pub fn with_mut<T>(&mut self, f: impl FnOnce(&mut 
Slab<partition2::Partition>) -> T) -> T {
-        let mut container = &mut self.container;
-        f(&mut container)
-    }
-
-    pub fn with_partition_id<T>(
-        &self,
-        partition_id: usize,
-        f: impl FnOnce(&partition2::Partition) -> T,
-    ) -> T {
-        self.with(|partitions| {
-            let partition = &partitions[partition_id];
-            f(partition)
-        })
-    }
-
-    pub fn with_partition_by_id_mut<T>(
-        &mut self,
-        partition_id: usize,
-        f: impl FnOnce(&mut partition2::Partition) -> T,
-    ) -> T {
-        self.with_mut(|partitions| {
-            let partition = &mut partitions[partition_id];
-            f(partition)
-        })
-    }
-
     pub fn with_segments(&self, partition_id: usize, f: impl 
FnOnce(&Vec<segments::Segment2>)) {
         let segments = &self.segments[partition_id];
         f(segments);
diff --git a/core/server/src/slab/traits_ext.rs 
b/core/server/src/slab/traits_ext.rs
index fc4b207e..cbd7f533 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -1,10 +1,26 @@
-use std::ops::Index;
-
 pub trait IntoComponents {
     type Components;
     fn into_components(self) -> Self::Components;
 }
 
+// Marker trait for the entity type.
+pub trait EntityMarker {}
+
+pub trait Insert<Idx> {
+    type Item: IntoComponents + EntityMarker;
+    fn insert(&mut self, item: Self::Item) -> Idx;
+}
+
+pub trait Delete<Idx> {
+    type Item: IntoComponents + EntityMarker;
+    fn delete(&mut self, id: Idx) -> Self::Item;
+}
+
+pub trait DeleteCell<Idx> {
+    type Item: IntoComponents + EntityMarker;
+    fn delete(&self, id: Idx) -> Self::Item;
+}
+
 pub trait IndexComponents<Idx: ?Sized> {
     type Output;
     fn index(&self, index: Idx) -> Self::Output;
@@ -23,11 +39,6 @@ pub trait ComponentsMapping<T>: private::Sealed {
     type RefMut<'a>;
 }
 
-pub trait ComponentsMappingById<T>: private::Sealed {
-    type Ref<'a>;
-    type RefMut<'a>;
-}
-
 pub trait ComponentsByIdMapping<T>: private::Sealed {
     // TODO: We will need this to contrain the `EntityRef` and `EntityRefMut` 
types, so after decomposing they have proper mapping.
     // Similar mechanism to trait from above, but for (T1, T2) -> (&T1, &T2) 
mapping rather than (T1, T2) -> (&Slab<T>, &Slab<T2>).
@@ -116,12 +127,14 @@ type MappingMut<'a, E, T> = <<E as 
IntoComponents>::Components as ComponentsMapp
 
 type MappingById<'a, E, T> =
     <<E as IntoComponents>::Components as ComponentsByIdMapping<T>>::Ref<'a>;
+type MappingByIdMut<'a, E, T> =
+    <<E as IntoComponents>::Components as 
ComponentsByIdMapping<T>>::RefMut<'a>;
 
-// TODO: I think it's better to not use `Components` directly on the `with` 
methods.
+// I think it's better to *NOT* use `Components` directly on the `with` 
methods.
 // Instead use the `Self::EntityRef` type directly.
 // This way we can auto implement the `with_by_id` method.
 // But on the other hand, we need to call `into_components` on the value 
returned by the `with` method.
-// So we lack the abilit to immediately discard unnecessary components, which 
leads to less egonomic API.
+// So we lack the ability to immediately discard unnecessary components, which 
leads to less ergonomic API.
 // Damn tradeoffs.
 pub type Components<T> = <T as IntoComponents>::Components;
 pub type ComponentsById<Idx, T> = <T as IndexComponents<Idx>>::Output;
@@ -130,7 +143,7 @@ pub trait EntityComponentSystem<Idx, T>
 where
     <Self::Entity as IntoComponents>::Components: ComponentsMapping<T> + 
ComponentsByIdMapping<T>,
 {
-    type Entity: IntoComponents;
+    type Entity: IntoComponents + EntityMarker;
     type EntityRef<'a>: IntoComponents<Components = Mapping<'a, Self::Entity, 
T>>
         + IndexComponents<Idx, Output = MappingById<'a, Self::Entity, T>>
     where
@@ -141,7 +154,7 @@ where
 
     fn with_async<O, F>(&self, f: F) -> impl Future<Output = O>
     where
-        F: for<'a> FnOnce(Components<Self::EntityRef<'a>>) -> O;
+        F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O;
 
     fn with_by_id<O, F>(&self, id: Idx, f: F) -> O
     where
@@ -149,24 +162,47 @@ where
     {
         self.with(|components| f(components.index(id)))
     }
+
+    fn with_by_id_async<O, F>(&self, id: Idx, f: F) -> impl Future<Output = O>
+    where
+        F: for<'a> AsyncFnOnce(ComponentsById<Idx, Self::EntityRef<'a>>) -> O,
+    {
+        self.with_async(async |components| f(components.index(id)).await)
+    }
 }
 
 pub trait EntityComponentSystemMut<Idx>: EntityComponentSystem<Idx, Borrow> {
     type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, 
Self::Entity, Borrow>>
+        + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, 
Borrow>>
     where
         Self: 'a;
 
     fn with_mut<O, F>(&mut self, f: F) -> O
     where
-        F: for<'a> FnOnce(Components<Self::EntityRefMut<'a>>) -> O;
+        F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O;
+
+    fn with_by_id_mut<O, F>(&mut self, id: Idx, f: F) -> O
+    where
+        F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O,
+    {
+        self.with_mut(|components| f(components.index(id)))
+    }
 }
 
 pub trait EntityComponentSystemMutCell<Idx>: EntityComponentSystem<Idx, 
RefCell> {
     type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, 
Self::Entity, RefCell>>
+        + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, 
RefCell>>
     where
         Self: 'a;
 
     fn with_mut<O, F>(&mut self, f: F) -> O
     where
-        F: for<'a> FnOnce(Components<Self::EntityRefMut<'a>>) -> O;
+        F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O;
+
+    fn with_by_id_mut<O, F>(&mut self, id: Idx, f: F) -> O
+    where
+        F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O,
+    {
+        self.with_mut(|components| f(components.index(id)))
+    }
 }
diff --git a/core/server/src/streaming/partitions/partition2.rs 
b/core/server/src/streaming/partitions/partition2.rs
index 1d642ad9..e2baef51 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -13,13 +13,13 @@ pub struct SharedPartition {
 }
 
 #[derive(Default, Debug)]
-pub struct Partition {
+pub struct PartitionInfo {
     id: usize,
     created_at: IggyTimestamp,
     should_increment_offset: bool,
 }
 
-impl Partition {
+impl PartitionInfo {
     pub fn new(created_at: IggyTimestamp, should_increment_offset: bool) -> 
Self {
         Self {
             id: 0,
diff --git a/core/server/src/streaming/topics/topic2.rs 
b/core/server/src/streaming/topics/topic2.rs
index d13fada1..738d2d89 100644
--- a/core/server/src/streaming/topics/topic2.rs
+++ b/core/server/src/streaming/topics/topic2.rs
@@ -26,9 +26,6 @@ pub struct Topic {
 
     partitions: Partitions,
     consumer_groups: ConsumerGroups,
-
-    consumer_offsets: Slab<AHashMap<usize, ConsumerOffset>>,
-    consumer_group_offsets: Slab<AHashMap<usize, ConsumerOffset>>,
 }
 
 impl Topic {
@@ -49,8 +46,6 @@ impl Topic {
             max_topic_size,
             partitions: Partitions::default(),
             consumer_groups: ConsumerGroups::default(),
-            consumer_offsets: Slab::with_capacity(PARTITIONS_CAPACITY),
-            consumer_group_offsets: Slab::with_capacity(PARTITIONS_CAPACITY),
         }
     }
 
@@ -121,22 +116,6 @@ impl Topic {
         &mut self.consumer_groups
     }
 
-    pub fn consumer_offsets(&self) -> &Slab<AHashMap<usize, ConsumerOffset>> {
-        &self.consumer_offsets
-    }
-
-    pub fn consumer_offsets_mut(&mut self) -> &mut Slab<AHashMap<usize, 
ConsumerOffset>> {
-        &mut self.consumer_offsets
-    }
-
-    pub fn consumer_group_offsets(&self) -> &Slab<AHashMap<usize, 
ConsumerOffset>> {
-        &self.consumer_group_offsets
-    }
-
-    pub fn consumer_group_offsets_mut(&mut self) -> &mut Slab<AHashMap<usize, 
ConsumerOffset>> {
-        &mut self.consumer_group_offsets
-    }
-
     pub fn insert_into(self, container: &mut Slab<Self>) -> usize {
         let idx = container.insert(self);
         let topic = &mut container[idx];

Reply via email to