This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch improved_tpc_partition_ecs in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 80c0b01ea561e99be63726185f43b2d51a13d42f Author: numminex <[email protected]> AuthorDate: Mon Aug 11 11:17:26 2025 +0200 changes --- core/server/src/slab/partitions.rs | 197 +++++++++++---------- core/server/src/slab/traits_ext.rs | 62 +++++-- core/server/src/streaming/partitions/partition2.rs | 4 +- core/server/src/streaming/topics/topic2.rs | 21 --- 4 files changed, 150 insertions(+), 134 deletions(-) diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index c82867f5..0cc1425c 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -1,12 +1,13 @@ use crate::{ slab::traits_ext::{ - Borrow, Components, EntityComponentSystem, IndexComponents, IntoComponents, + Borrow, Components, Delete, EntityComponentSystem, EntityMarker, IndexComponents, Insert, IntoComponents, }, streaming::{ - deduplication::message_deduplicator::MessageDeduplicator, partitions::partition2, segments, + deduplication::message_deduplicator::MessageDeduplicator, partitions::{partition::ConsumerOffset, partition2}, segments, stats::stats::PartitionStats, }, }; +use ahash::AHashMap; use slab::Slab; use std::{ ops::Index, @@ -15,80 +16,146 @@ use std::{ // TODO: This could be upper limit of partitions per topic, use that value to validate instead of whathever this thing is in `common` crate. pub const PARTITIONS_CAPACITY: usize = 16384; -type Idx = usize; +type Id = usize; #[derive(Debug)] pub struct Partitions { - container: Slab<partition2::Partition>, + info: Slab<partition2::PartitionInfo>, stats: Slab<Arc<PartitionStats>>, segments: Slab<Vec<segments::Segment2>>, - message_deduplicators: Slab<Option<MessageDeduplicator>>, - partition_offsets: Slab<Arc<AtomicU64>>, + message_deduplicator: Slab<Option<MessageDeduplicator>>, + offset: Slab<Arc<AtomicU64>>, + + consumer_offset: Slab<AHashMap<usize, ConsumerOffset>>, + consumer_group_offset: Slab<AHashMap<usize, ConsumerOffset>>, +} + +impl Insert<Id> for Partitions { + type Item = Partition; + + fn insert(&mut self, item: Self::Item) -> Id { + todo!(); + } +} + +impl Delete<Id> for Partitions { + type Item = Partition; + + fn delete(&mut self, id: Id) -> Self::Item { + todo!() + } } -pub struct Part { - partition: partition2::Partition, +pub struct Partition { + info: partition2::PartitionInfo, + stats: Arc<PartitionStats>, + message_deduplicator: Option<MessageDeduplicator>, + offset: Arc<AtomicU64>, } -impl IntoComponents for Part { - type Components = (partition2::Partition,); +impl EntityMarker for Partition {} + +impl IntoComponents for Partition { + type Components = ( + partition2::PartitionInfo, + Arc<PartitionStats>, + Option<MessageDeduplicator>, + Arc<AtomicU64>, + ); fn into_components(self) -> Self::Components { - (self.partition,) + ( + self.info, + self.stats, + self.message_deduplicator, + self.offset, + ) } } -pub struct PartRef<'a> { - partition: &'a Slab<partition2::Partition>, +pub struct PartitionRef<'a> { + info: &'a Slab<partition2::PartitionInfo>, + stats: &'a Slab<Arc<PartitionStats>>, + message_deduplicator: &'a Slab<Option<MessageDeduplicator>>, + offset: &'a Slab<Arc<AtomicU64>>, } -pub struct PartItemRef<'a> { - partition: &'a partition2::Partition, +impl<'a> From<&'a Partitions> for PartitionRef<'a> { + fn from(value: &'a Partitions) -> Self { + PartitionRef { + info: &value.info, + stats: &value.stats, + message_deduplicator: &value.message_deduplicator, + offset: &value.offset, + } + } } -impl<'a> IntoComponents for PartRef<'a> { - type Components = (&'a Slab<partition2::Partition>,); +impl<'a> IntoComponents for PartitionRef<'a> { + type Components = ( + &'a Slab<partition2::PartitionInfo>, + &'a Slab<Arc<PartitionStats>>, + &'a Slab<Option<MessageDeduplicator>>, + &'a Slab<Arc<AtomicU64>>, + ); fn into_components(self) -> Self::Components { - (self.partition,) + ( + self.info, + self.stats, + self.message_deduplicator, + self.offset, + ) } } -impl<'a> IndexComponents<Idx> for PartRef<'a> { - type Output = (&'a partition2::Partition,); - - fn index(&self, index: Idx) -> Self::Output { - (&self.partition[index],) +impl<'a> IndexComponents<Id> for PartitionRef<'a> { + type Output = ( + &'a partition2::PartitionInfo, + &'a Arc<PartitionStats>, + &'a Option<MessageDeduplicator>, + &'a Arc<AtomicU64>, + ); + + fn index(&self, index: Id) -> Self::Output { + ( + &self.info[index], + &self.stats[index], + &self.message_deduplicator[index], + &self.offset[index], + ) } } -impl EntityComponentSystem<Idx, Borrow> for Partitions { - type Entity = Part; - type EntityRef<'a> = PartRef<'a>; +impl EntityComponentSystem<Id, Borrow> for Partitions { + type Entity = Partition; + type EntityRef<'a> = PartitionRef<'a>; fn with<O, F>(&self, f: F) -> O where F: for<'a> FnOnce(Self::EntityRef<'a>) -> O, { - todo!() + f(self.into()) } async fn with_async<O, F>(&self, f: F) -> O where - F: for<'a> FnOnce(Components<Self::EntityRef<'a>>) -> O, + F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O, { - todo!() + f(self.into()).await } } impl Default for Partitions { fn default() -> Self { Self { - container: Slab::with_capacity(PARTITIONS_CAPACITY), + info: Slab::with_capacity(PARTITIONS_CAPACITY), stats: Slab::with_capacity(PARTITIONS_CAPACITY), segments: Slab::with_capacity(PARTITIONS_CAPACITY), - message_deduplicators: Slab::with_capacity(PARTITIONS_CAPACITY), - partition_offsets: Slab::with_capacity(PARTITIONS_CAPACITY), + message_deduplicator: Slab::with_capacity(PARTITIONS_CAPACITY), + offset: Slab::with_capacity(PARTITIONS_CAPACITY), + consumer_offset: Slab::with_capacity(PARTITIONS_CAPACITY), + consumer_group_offset: Slab::with_capacity(PARTITIONS_CAPACITY), } } } @@ -104,72 +171,6 @@ impl Partitions { f(&mut stats) } - pub fn with_message_deduplicators<T>( - &self, - f: impl FnOnce(&Slab<Option<MessageDeduplicator>>) -> T, - ) -> T { - let message_deduplicators = &self.message_deduplicators; - f(message_deduplicators) - } - - pub fn with_message_deduplicators_mut<T>( - &mut self, - f: impl FnOnce(&mut Slab<Option<MessageDeduplicator>>) -> T, - ) -> T { - let mut message_deduplicators = &mut self.message_deduplicators; - f(&mut message_deduplicators) - } - - pub fn with_partition_offsets<T>(&self, f: impl FnOnce(&Slab<Arc<AtomicU64>>) -> T) -> T { - let partition_offsets = &self.partition_offsets; - f(partition_offsets) - } - - pub fn with_partition_offsets_mut<T>( - &mut self, - f: impl FnOnce(&mut Slab<Arc<AtomicU64>>) -> T, - ) -> T { - let mut partition_offsets = &mut self.partition_offsets; - f(&mut partition_offsets) - } - - pub async fn with_async<T>(&self, f: impl AsyncFnOnce(&Slab<partition2::Partition>) -> T) -> T { - let container = &self.container; - f(&container).await - } - - pub fn with<T>(&self, f: impl FnOnce(&Slab<partition2::Partition>) -> T) -> T { - let container = &self.container; - f(&container) - } - - pub fn with_mut<T>(&mut self, f: impl FnOnce(&mut Slab<partition2::Partition>) -> T) -> T { - let mut container = &mut self.container; - f(&mut container) - } - - pub fn with_partition_id<T>( - &self, - partition_id: usize, - f: impl FnOnce(&partition2::Partition) -> T, - ) -> T { - self.with(|partitions| { - let partition = &partitions[partition_id]; - f(partition) - }) - } - - pub fn with_partition_by_id_mut<T>( - &mut self, - partition_id: usize, - f: impl FnOnce(&mut partition2::Partition) -> T, - ) -> T { - self.with_mut(|partitions| { - let partition = &mut partitions[partition_id]; - f(partition) - }) - } - pub fn with_segments(&self, partition_id: usize, f: impl FnOnce(&Vec<segments::Segment2>)) { let segments = &self.segments[partition_id]; f(segments); diff --git a/core/server/src/slab/traits_ext.rs b/core/server/src/slab/traits_ext.rs index fc4b207e..cbd7f533 100644 --- a/core/server/src/slab/traits_ext.rs +++ b/core/server/src/slab/traits_ext.rs @@ -1,10 +1,26 @@ -use std::ops::Index; - pub trait IntoComponents { type Components; fn into_components(self) -> Self::Components; } +// Marker trait for the entity type. +pub trait EntityMarker {} + +pub trait Insert<Idx> { + type Item: IntoComponents + EntityMarker; + fn insert(&mut self, item: Self::Item) -> Idx; +} + +pub trait Delete<Idx> { + type Item: IntoComponents + EntityMarker; + fn delete(&mut self, id: Idx) -> Self::Item; +} + +pub trait DeleteCell<Idx> { + type Item: IntoComponents + EntityMarker; + fn delete(&self, id: Idx) -> Self::Item; +} + pub trait IndexComponents<Idx: ?Sized> { type Output; fn index(&self, index: Idx) -> Self::Output; @@ -23,11 +39,6 @@ pub trait ComponentsMapping<T>: private::Sealed { type RefMut<'a>; } -pub trait ComponentsMappingById<T>: private::Sealed { - type Ref<'a>; - type RefMut<'a>; -} - pub trait ComponentsByIdMapping<T>: private::Sealed { // TODO: We will need this to contrain the `EntityRef` and `EntityRefMut` types, so after decomposing they have proper mapping. // Similar mechanism to trait from above, but for (T1, T2) -> (&T1, &T2) mapping rather than (T1, T2) -> (&Slab<T>, &Slab<T2>). @@ -116,12 +127,14 @@ type MappingMut<'a, E, T> = <<E as IntoComponents>::Components as ComponentsMapp type MappingById<'a, E, T> = <<E as IntoComponents>::Components as ComponentsByIdMapping<T>>::Ref<'a>; +type MappingByIdMut<'a, E, T> = + <<E as IntoComponents>::Components as ComponentsByIdMapping<T>>::RefMut<'a>; -// TODO: I think it's better to not use `Components` directly on the `with` methods. +// I think it's better to *NOT* use `Components` directly on the `with` methods. // Instead use the `Self::EntityRef` type directly. // This way we can auto implement the `with_by_id` method. // But on the other hand, we need to call `into_components` on the value returned by the `with` method. -// So we lack the abilit to immediately discard unnecessary components, which leads to less egonomic API. +// So we lack the ability to immediately discard unnecessary components, which leads to less ergonomic API. // Damn tradeoffs. pub type Components<T> = <T as IntoComponents>::Components; pub type ComponentsById<Idx, T> = <T as IndexComponents<Idx>>::Output; @@ -130,7 +143,7 @@ pub trait EntityComponentSystem<Idx, T> where <Self::Entity as IntoComponents>::Components: ComponentsMapping<T> + ComponentsByIdMapping<T>, { - type Entity: IntoComponents; + type Entity: IntoComponents + EntityMarker; type EntityRef<'a>: IntoComponents<Components = Mapping<'a, Self::Entity, T>> + IndexComponents<Idx, Output = MappingById<'a, Self::Entity, T>> where @@ -141,7 +154,7 @@ where fn with_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> FnOnce(Components<Self::EntityRef<'a>>) -> O; + F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O; fn with_by_id<O, F>(&self, id: Idx, f: F) -> O where @@ -149,24 +162,47 @@ where { self.with(|components| f(components.index(id))) } + + fn with_by_id_async<O, F>(&self, id: Idx, f: F) -> impl Future<Output = O> + where + F: for<'a> AsyncFnOnce(ComponentsById<Idx, Self::EntityRef<'a>>) -> O, + { + self.with_async(async |components| f(components.index(id)).await) + } } pub trait EntityComponentSystemMut<Idx>: EntityComponentSystem<Idx, Borrow> { type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, Self::Entity, Borrow>> + + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, Borrow>> where Self: 'a; fn with_mut<O, F>(&mut self, f: F) -> O where - F: for<'a> FnOnce(Components<Self::EntityRefMut<'a>>) -> O; + F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O; + + fn with_by_id_mut<O, F>(&mut self, id: Idx, f: F) -> O + where + F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O, + { + self.with_mut(|components| f(components.index(id))) + } } pub trait EntityComponentSystemMutCell<Idx>: EntityComponentSystem<Idx, RefCell> { type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, Self::Entity, RefCell>> + + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, RefCell>> where Self: 'a; fn with_mut<O, F>(&mut self, f: F) -> O where - F: for<'a> FnOnce(Components<Self::EntityRefMut<'a>>) -> O; + F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O; + + fn with_by_id_mut<O, F>(&mut self, id: Idx, f: F) -> O + where + F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O, + { + self.with_mut(|components| f(components.index(id))) + } } diff --git a/core/server/src/streaming/partitions/partition2.rs b/core/server/src/streaming/partitions/partition2.rs index 1d642ad9..e2baef51 100644 --- a/core/server/src/streaming/partitions/partition2.rs +++ b/core/server/src/streaming/partitions/partition2.rs @@ -13,13 +13,13 @@ pub struct SharedPartition { } #[derive(Default, Debug)] -pub struct Partition { +pub struct PartitionInfo { id: usize, created_at: IggyTimestamp, should_increment_offset: bool, } -impl Partition { +impl PartitionInfo { pub fn new(created_at: IggyTimestamp, should_increment_offset: bool) -> Self { Self { id: 0, diff --git a/core/server/src/streaming/topics/topic2.rs b/core/server/src/streaming/topics/topic2.rs index d13fada1..738d2d89 100644 --- a/core/server/src/streaming/topics/topic2.rs +++ b/core/server/src/streaming/topics/topic2.rs @@ -26,9 +26,6 @@ pub struct Topic { partitions: Partitions, consumer_groups: ConsumerGroups, - - consumer_offsets: Slab<AHashMap<usize, ConsumerOffset>>, - consumer_group_offsets: Slab<AHashMap<usize, ConsumerOffset>>, } impl Topic { @@ -49,8 +46,6 @@ impl Topic { max_topic_size, partitions: Partitions::default(), consumer_groups: ConsumerGroups::default(), - consumer_offsets: Slab::with_capacity(PARTITIONS_CAPACITY), - consumer_group_offsets: Slab::with_capacity(PARTITIONS_CAPACITY), } } @@ -121,22 +116,6 @@ impl Topic { &mut self.consumer_groups } - pub fn consumer_offsets(&self) -> &Slab<AHashMap<usize, ConsumerOffset>> { - &self.consumer_offsets - } - - pub fn consumer_offsets_mut(&mut self) -> &mut Slab<AHashMap<usize, ConsumerOffset>> { - &mut self.consumer_offsets - } - - pub fn consumer_group_offsets(&self) -> &Slab<AHashMap<usize, ConsumerOffset>> { - &self.consumer_group_offsets - } - - pub fn consumer_group_offsets_mut(&mut self) -> &mut Slab<AHashMap<usize, ConsumerOffset>> { - &mut self.consumer_group_offsets - } - pub fn insert_into(self, container: &mut Slab<Self>) -> usize { let idx = container.insert(self); let topic = &mut container[idx];
