This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch io_uring_tpc in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push: new 1154a06f feat(io_uring): migrate to new architecture stream/topic create/delete/update using new interfaces. (#2103) 1154a06f is described below commit 1154a06f8d2c112e1bd2a9e8ee58a4ac2747df07 Author: Grzegorz Koszyk <112548209+numin...@users.noreply.github.com> AuthorDate: Tue Aug 12 21:40:57 2025 +0200 feat(io_uring): migrate to new architecture stream/topic create/delete/update using new interfaces. (#2103) --- .../handlers/streams/create_stream_handler.rs | 19 +- .../handlers/streams/delete_stream_handler.rs | 5 +- .../binary/handlers/topics/create_topic_handler.rs | 30 +-- .../binary/handlers/topics/delete_topic_handler.rs | 1 + core/server/src/shard/mod.rs | 56 ++--- core/server/src/shard/system/consumer_groups.rs | 19 +- core/server/src/shard/system/partitions.rs | 32 +-- core/server/src/shard/system/streams.rs | 118 ++++------ core/server/src/shard/system/topics.rs | 178 +++++++-------- core/server/src/shard/system/utils.rs | 9 +- core/server/src/shard/transmission/event.rs | 14 +- core/server/src/slab/consumer_groups.rs | 2 +- core/server/src/slab/mod.rs | 2 +- core/server/src/slab/partitions.rs | 119 +++++----- core/server/src/slab/streams.rs | 241 ++++++++++++++------- core/server/src/slab/topics.rs | 215 +++++++++++++----- core/server/src/slab/traits_ext.rs | 141 ++++++------ core/server/src/streaming/partitions/partition2.rs | 61 +++--- core/server/src/streaming/segments/segment2.rs | 2 +- core/server/src/streaming/streams/stream2.rs | 170 +++++++++++---- .../server/src/streaming/topics/consumer_group2.rs | 13 +- core/server/src/streaming/topics/topic2.rs | 153 +++++++++++-- 22 files changed, 983 insertions(+), 617 deletions(-) diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs b/core/server/src/binary/handlers/streams/create_stream_handler.rs index 4461745e..95d960f5 100644 --- a/core/server/src/binary/handlers/streams/create_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs @@ -23,6 +23,7 @@ use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind}; use crate::shard::IggyShard; use crate::shard::transmission::event::ShardEvent; use crate::shard_info; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::state::models::CreateStreamWithId; use crate::streaming::session::Session; @@ -51,30 +52,22 @@ impl ServerCommandHandler for CreateStream { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id; let name = self.name.clone(); - let stats = Arc::new(StreamStats::new()); - let new_stream_id = shard - .create_stream2(session, stream_id, self.name.clone(), stats.clone()) + let stream = shard + .create_stream2(session, stream_id, self.name.clone()) .await?; shard_info!( shard.id, "Created stream with new API, Stream ID: {}, name: '{}'.", - new_stream_id, + stream.id(), name ); let event = ShardEvent::CreatedStream2 { - id: new_stream_id, - name: self.name.clone(), - stats, + id: stream.id(), + stream, }; let _responses = shard.broadcast_event_to_all_shards(event.into()).await; - //TODO: Replace the mapping from line 89 with this once the Stream layer is finished. - let _ = shard.streams2.with_stream_by_id( - &Identifier::numeric(new_stream_id as u32).unwrap(), - |stream| mapper::map_stream2(stream), - ); - let created_stream_id = shard .create_stream(session, stream_id, &name) .await diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs b/core/server/src/binary/handlers/streams/delete_stream_handler.rs index af12beda..c7b9ad2c 100644 --- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -23,6 +23,7 @@ use crate::shard::IggyShard; use crate::shard::namespace::IggyNamespace; use crate::shard::transmission::event::ShardEvent; use crate::shard_info; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::partitions::partition; use crate::streaming::session::Session; @@ -58,8 +59,8 @@ impl ServerCommandHandler for DeleteStream { })?; shard_info!( shard.id, - "Deleted stream2 with name: {}, ID: {}", - stream2.name(), + "Deleted stream with name: {}, ID: {}", + stream2.root().name(), stream2.id() ); let event = ShardEvent::DeletedStream2 { diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs b/core/server/src/binary/handlers/topics/create_topic_handler.rs index 848a4c90..3f913685 100644 --- a/core/server/src/binary/handlers/topics/create_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs @@ -24,6 +24,7 @@ use crate::shard::namespace::IggyNamespace; use crate::shard::transmission::event::ShardEvent; use crate::shard::{IggyShard, ShardInfo}; use crate::shard_info; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::state::models::CreateTopicWithId; use crate::state::system::TopicState; @@ -55,12 +56,7 @@ impl ServerCommandHandler for CreateTopic { debug!("session: {session}, command: {self}"); let stream_id = self.stream_id.clone(); let maybe_topic_id = self.topic_id; - //shard.ensure_stream_exists(&stream_id)?; - let parent = shard - .streams2 - .with_stats_by_id(&stream_id, |stats| stats.clone()); - let stats = Arc::new(TopicStats::new(parent)); - let new_topic_id = shard + let topic = shard .create_topic2( session, &stream_id, @@ -70,30 +66,24 @@ impl ServerCommandHandler for CreateTopic { self.compression_algorithm, self.max_topic_size, self.replication_factor, - stats.clone(), ) .await?; + let topic_id = topic.id(); + // Send events for topic creation. + let event = ShardEvent::CreatedTopic2 { + stream_id: self.stream_id.clone(), + topic, + }; + let _responses = shard.broadcast_event_to_all_shards(event).await; shard .create_partitions2( session, &stream_id, - &Identifier::numeric(new_topic_id as u32).unwrap(), + &Identifier::numeric(topic_id as u32).unwrap(), self.partitions_count, ) .await?; - - let event = ShardEvent::CreatedTopic2 { - stream_id: self.stream_id.clone(), - id: new_topic_id, - name: self.name.clone(), - message_expiry: self.message_expiry, - compression_algorithm: self.compression_algorithm, - max_topic_size: self.max_topic_size, - replication_factor: self.replication_factor, - stats, - }; - let _responses = shard.broadcast_event_to_all_shards(event.into()).await; let (topic_id, partition_ids) = shard .create_topic( session, diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs b/core/server/src/binary/handlers/topics/delete_topic_handler.rs index af16cf8c..70fccd8c 100644 --- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -24,6 +24,7 @@ use crate::shard::IggyShard; use crate::shard::namespace::IggyNamespace; use crate::shard::transmission::event::ShardEvent; use crate::shard_info; +use crate::slab::traits_ext::EntityMarker; use crate::state::command::EntryCommand; use crate::streaming::session::Session; use anyhow::Result; diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index c7994355..e7cb032a 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -68,7 +68,7 @@ use crate::{ }, }, shard_error, shard_info, shard_warn, - slab::streams::Streams, + slab::{streams::Streams, traits_ext::EntityMarker}, state::{ StateKind, system::{StreamState, SystemState, UserState}, @@ -631,7 +631,7 @@ impl IggyShard { partition_ids, } => { self.delete_partitions2_bypass_auth( - &stream_id, + &stream_id, &topic_id, partitions_count, partition_ids, @@ -667,11 +667,13 @@ impl IggyShard { "{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}" ) })?; - let topic = stream.get_topic_mut(&topic_id).with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id}" - ) - })?; + let topic = stream + .get_topic_mut(&topic_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id}" + ) + })?; topic.reassign_consumer_groups(); if partitions.len() > 0 { self.metrics.decrement_partitions(partitions_count as u32); @@ -848,10 +850,8 @@ impl IggyShard { self.tcp_bound_address.set(Some(address)); Ok(()) } - ShardEvent::CreatedStream2 { id, name, stats } => { - let stream_id = self - .create_stream2_bypass_auth(name.to_owned(), stats.clone()) - .await?; + ShardEvent::CreatedStream2 { id, stream } => { + let stream_id = self.create_stream2_bypass_auth(stream); assert_eq!(stream_id, id); Ok(()) } @@ -860,26 +860,8 @@ impl IggyShard { assert_eq!(stream.id(), id); Ok(()) } - ShardEvent::CreatedTopic2 { - id, - stream_id, - name, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - stats, - } => { - let topic_id = self.create_topic2_bypass_auth( - &stream_id, - name.to_owned(), - replication_factor, - message_expiry, - compression_algorithm, - max_topic_size, - stats.clone(), - )?; - assert_eq!(topic_id, id); + ShardEvent::CreatedTopic2 { stream_id, topic } => { + let _topic_id = self.create_topic2_bypass_auth(&stream_id, topic)?; Ok(()) } ShardEvent::CreatedPartitions2 { @@ -895,7 +877,9 @@ impl IggyShard { stream_id, topic_id, } => { - let topic = self.delete_topic_bypass_auth2(&stream_id, &topic_id).await?; + let topic = self + .delete_topic_bypass_auth2(&stream_id, &topic_id) + .await?; assert_eq!(topic.id(), id); Ok(()) } @@ -916,7 +900,7 @@ impl IggyShard { compression_algorithm, max_topic_size, replication_factor, - ); + )?; Ok(()) } ShardEvent::CreatedConsumerGroup2 { @@ -929,7 +913,8 @@ impl IggyShard { let id = self.create_consumer_group_bypass_auth2( &stream_id, &topic_id, - members.clone(), name.clone(), + members.clone(), + name.clone(), )?; assert_eq!(id, cg_id); Ok(()) @@ -940,7 +925,8 @@ impl IggyShard { topic_id, group_id, } => { - let cg = self.delete_consumer_group_bypass_auth2(&stream_id, &topic_id, &group_id)?; + let cg = + self.delete_consumer_group_bypass_auth2(&stream_id, &topic_id, &group_id)?; assert_eq!(cg.id(), id); Ok(()) } diff --git a/core/server/src/shard/system/consumer_groups.rs b/core/server/src/shard/system/consumer_groups.rs index e58f36df..2283fbb4 100644 --- a/core/server/src/shard/system/consumer_groups.rs +++ b/core/server/src/shard/system/consumer_groups.rs @@ -119,10 +119,10 @@ impl IggyShard { { let topic_id = self .streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()); + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()); let stream_id = self .streams2 - .with_stream_by_id(stream_id, |stream| stream.id()); + .with_root_by_id(stream_id, |stream| stream.id()); self.permissioner.borrow().create_consumer_group( session.get_user_id(), stream_id as u32, @@ -151,7 +151,7 @@ impl IggyShard { ) -> Result<usize, IggyError> { let id = self .streams2 - .with_topic_by_id_mut(stream_id, topic_id, |topic| { + .with_topic_root_by_id_mut(stream_id, topic_id, |topic| { let partitions = topic.partitions().with(|partitions| { let (info, _, _, _, _, _) = partitions.into_components(); info.iter() @@ -233,10 +233,10 @@ impl IggyShard { { let topic_id = self .streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()); + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()); let stream_id = self .streams2 - .with_stream_by_id(stream_id, |stream| stream.id()); + .with_root_by_id(stream_id, |stream| stream.id()); self.permissioner.borrow().delete_consumer_group( session.get_user_id(), stream_id as u32, @@ -263,20 +263,19 @@ impl IggyShard { ) -> Result<consumer_group2::ConsumerGroup, IggyError> { let cg = self .streams2 - .with_topic_by_id_mut(stream_id, topic_id, |topic| { + .with_topic_root_by_id_mut(stream_id, topic_id, |root| { match group_id.kind { iggy_common::IdKind::Numeric => { - topic.consumer_groups_mut().with_mut(|container| { + root.consumer_groups_mut().with_mut(|container| { container.try_remove(group_id.get_u32_value().unwrap() as usize) }) } iggy_common::IdKind::String => { let key = group_id.get_string_value().unwrap(); - let id = topic + let id = root .consumer_groups() .with_index(|index| *(index.get(&key).unwrap())); - topic - .consumer_groups_mut() + root.consumer_groups_mut() .with_mut(|container| container.try_remove(id)) } } diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 71e9c8e3..3cc1ed61 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -24,6 +24,7 @@ use crate::configs::system::SystemConfig; use crate::shard::IggyShard; use crate::slab::traits_ext::Delete; use crate::slab::traits_ext::EntityComponentSystem; +use crate::slab::traits_ext::EntityMarker; use crate::slab::traits_ext::Insert; use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator; use crate::streaming::partitions::partition::Partition; @@ -74,12 +75,16 @@ impl IggyShard { partitions_count: u32, ) -> Result<Vec<partition2::Partition>, IggyError> { self.ensure_authenticated(session)?; + /* + self.ensure_stream_exists(stream_id)?; + self.ensure_topic_exists(stream_id, topic_id)?; + */ let numeric_stream_id = self.streams2 - .with_stream_by_id(stream_id, |stream| stream.id()) as u32; + .with_root_by_id(stream_id, |stream| stream.id()) as u32; let numeric_topic_id = self.streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()) as u32; + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()) as u32; self.validate_partition_permissions( session, @@ -88,10 +93,9 @@ impl IggyShard { "create", )?; - let parent_stats = self.streams2.with_stream_by_id(stream_id, |stream| { - stream - .topics() - .with_topic_stats_by_id(topic_id, |stats| stats) + let parent_stats = self.streams2.with_root_by_id(stream_id, |root| { + root.topics() + .with_stats_by_id(topic_id, |stats| stats.clone()) }); let partitions = self.create_and_insert_partitions_mem( stream_id, @@ -144,7 +148,7 @@ impl IggyShard { .map(|_| { // Areczkuuuu. let stats = Arc::new(PartitionStats::new(parent_stats.clone())); - let info = partition2::PartitionInfo::new(created_at, false); + let info = partition2::PartitionRoot::new(created_at, false); let deduplicator = create_message_deduplicator(&self.config.system); let offset = Arc::new(AtomicU64::new(0)); let consumer_offset = Arc::new(papaya::HashMap::with_capacity(2137)); @@ -172,8 +176,8 @@ impl IggyShard { partition: partition2::Partition, ) -> usize { self.streams2 - .with_topic_by_id_mut(stream_id, topic_id, |topic| { - topic.partitions_mut().insert(partition) + .with_partitions_mut(stream_id, topic_id, |partitions| { + partitions.insert(partition) }) } @@ -265,10 +269,10 @@ impl IggyShard { let numeric_stream_id = self.streams2 - .with_stream_by_id(stream_id, |stream| stream.id()) as u32; + .with_root_by_id(stream_id, |stream| stream.id()) as u32; let numeric_topic_id = self.streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()) as u32; + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()) as u32; self.validate_partition_permissions( session, @@ -290,13 +294,13 @@ impl IggyShard { ) -> Vec<u32> { let deleted_partition_ids = self.streams2 - .with_topic_by_id_mut(stream_id, topic_id, |topic| { + .with_topic_root_by_id_mut(stream_id, topic_id, |topic| { let partitions = topic.partitions_mut(); - let current_count = partitions.count() as u32; + let current_count = partitions.len() as u32; let partitions_to_delete = partitions_count.min(current_count); let start_idx = (current_count - partitions_to_delete) as usize; let mut deleted_ids = Vec::with_capacity(partitions_to_delete as usize); - for idx in (start_idx..current_count as usize) { + for idx in start_idx..current_count as usize { let partition = topic.partitions_mut().delete(idx); assert_eq!(partition.id(), idx); deleted_ids.push(partition.id() as u32); diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 958cab48..54e639fd 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -19,6 +19,9 @@ use super::COMPONENT; use crate::shard::IggyShard; use crate::shard::namespace::IggyNamespace; +use crate::slab::traits_ext::{ + DeleteCell, EntityComponentSystem, EntityMarker, Insert, InsertCell, IntoComponents, +}; use crate::streaming::partitions::partition; use crate::streaming::session::Session; use crate::streaming::stats::stats::StreamStats; @@ -28,7 +31,7 @@ use crate::streaming::streams::stream2; use error_set::ErrContext; use futures::future::try_join_all; use iggy_common::locking::IggyRwLockFn; -use iggy_common::{IdKind, Identifier, IggyError}; +use iggy_common::{IdKind, Identifier, IggyError, IggyTimestamp}; use std::cell::{Ref, RefCell, RefMut}; use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; @@ -213,45 +216,38 @@ impl IggyShard { session: &Session, stream_id: Option<u32>, name: String, - stats: Arc<StreamStats>, - ) -> Result<usize, IggyError> { + ) -> Result<stream2::Stream, IggyError> { self.ensure_authenticated(session)?; self.permissioner .borrow() .create_stream(session.get_user_id())?; - let stream_id = self.create_stream2_base(name, stats).await?; - create_stream_file_hierarchy(self.id, stream_id, &self.config.system).await?; - Ok(stream_id) - } - - pub async fn create_stream2_bypass_auth( - &self, - name: String, - stats: Arc<StreamStats>, - ) -> Result<usize, IggyError> { - self.create_stream2_base(name, stats).await - } + let exists = self + .streams2 + .exists(&Identifier::from_str_value(&name).unwrap()); - async fn create_stream2_base( - &self, - name: String, - stats: Arc<StreamStats>, - ) -> Result<usize, IggyError> { - let exists = self.streams2.exists(&Identifier::named(&name).unwrap()); if exists { return Err(IggyError::StreamNameAlreadyExists(name)); } - let key = name.clone(); - let stream = stream2::Stream::new(name); - let stream_id = self - .streams2 - .with_mut(|streams| stream.insert_into(streams)); - self.streams2.with_index_mut(|index| { - index.insert(key, stream_id); - }); - self.streams2 - .with_stats_mut(|container| container.insert(stats)); - Ok(stream_id) + let stream = self.create_and_insert_stream_mem(name); + create_stream_file_hierarchy(self.id, stream.id(), &self.config.system).await?; + Ok(stream) + } + + fn create_and_insert_stream_mem(&self, name: String) -> stream2::Stream { + let now = IggyTimestamp::now(); + let stats = Arc::new(StreamStats::new()); + let mut stream = stream2::Stream::new(name, stats, now); + let id = self.insert_stream_mem(stream.clone()); + stream.update_id(id); + stream + } + + fn insert_stream_mem(&self, stream: stream2::Stream) -> usize { + self.streams2.insert(stream) + } + + pub fn create_stream2_bypass_auth(&self, stream: stream2::Stream) -> usize { + self.insert_stream_mem(stream) } pub async fn create_stream( @@ -405,9 +401,7 @@ impl IggyShard { name: &str, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let stream_id = self - .streams2 - .with_stream_by_id(id, |stream| stream.id() as u32); + let stream_id = self.streams2.with_root_by_id(id, |root| root.id() as u32); self.permissioner .borrow() @@ -426,7 +420,7 @@ impl IggyShard { fn update_stream2_base(&self, id: &Identifier, name: String) -> Result<String, IggyError> { let old_name = self .streams2 - .with_stream_by_id(id, |stream| stream.name().clone()); + .with_root_by_id(id, |root| root.name().clone()); if old_name == name { return Ok(old_name); @@ -437,10 +431,10 @@ impl IggyShard { return Err(IggyError::StreamNameAlreadyExists(name.to_string())); } - self.streams2.with_stream_by_id_mut(id, |stream| { - stream.set_name(name.clone()); + self.streams2.with_root_by_id_mut(id, |root| { + root.set_name(name.clone()); }); - + self.streams2.with_index_mut(|index| { // Rename the key inside of hashmap let idx = index.remove(&old_name).expect("Rename key: key not found"); @@ -519,39 +513,21 @@ impl IggyShard { } fn delete_stream2_base(&self, id: &Identifier) -> Result<stream2::Stream, IggyError> { - let (stream_id, stream_name) = self - .streams2 - .with_stream_by_id(id, |stream| (stream.id() as u32, stream.name().clone())); - - let stream = self.streams2.with_mut(|streams| { - streams - .try_remove(stream_id as usize) - .ok_or_else(|| match id.kind { - iggy_common::IdKind::Numeric => IggyError::StreamIdNotFound(stream_id as u32), - iggy_common::IdKind::String => { - IggyError::StreamNameNotFound(stream_name.clone()) - } - }) - })?; + let id = self.streams2.get_index(id); + let stream = self.streams2.delete(id); + let stats = stream.stats(); - self.streams2.with_stats_mut(|stats| { - if let Some(stream_stats) = stats.try_remove(stream_id as usize) { - self.metrics.decrement_streams(1); - self.metrics.decrement_topics(0); // TODO: stats doesn't have topic count - self.metrics.decrement_partitions(0); // TODO: stats doesn't have partition count - self.metrics - .decrement_messages(stream_stats.messages_count()); - self.metrics - .decrement_segments(stream_stats.segments_count()); - } else { - self.metrics.decrement_streams(1); - } - }); + self.metrics.decrement_streams(1); + self.metrics.decrement_topics(0); // TODO: stats doesn't have topic count + self.metrics.decrement_partitions(0); // TODO: stats doesn't have partition count + self.metrics.decrement_messages(stats.messages_count()); + self.metrics.decrement_segments(stats.segments_count()); + /* self.client_manager .borrow_mut() .delete_consumer_groups_for_stream(stream_id as u32); - + */ Ok(stream) } @@ -562,9 +538,7 @@ impl IggyShard { ) -> Result<stream2::Stream, IggyError> { self.ensure_authenticated(session)?; // self.ensure_stream_exists(id)?; - let stream_id = self - .streams2 - .with_stream_by_id(id, |stream| stream.id() as u32); + let stream_id = self.streams2.with_root_by_id(id, |root| root.id() as u32); self.permissioner .borrow() .delete_stream(session.get_user_id(), stream_id as u32) @@ -629,9 +603,7 @@ impl IggyShard { pub fn purge_stream2(&self, session: &Session, id: &Identifier) -> Result<(), IggyError> { self.ensure_authenticated(session)?; // self.ensure_stream_exists(id)?; - let stream_id = self - .streams2 - .with_stream_by_id(id, |stream| stream.id() as u32); + let stream_id = self.streams2.with_root_by_id(id, |root| root.id() as u32); self.permissioner .borrow() .purge_stream(session.get_user_id(), stream_id) diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index ff327975..d0beaa31 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -24,10 +24,14 @@ use crate::shard::namespace::IggyNamespace; use crate::shard::transmission::event::ShardEvent; use crate::shard::{IggyShard, ShardInfo}; use crate::shard_info; +use crate::slab::traits_ext::{ + Delete, DeleteCell, EntityComponentSystem, EntityMarker, Insert, InsertCell, +}; +use crate::state::system::StreamState; use crate::streaming::partitions::partition2; use crate::streaming::partitions::storage2::create_partition_file_hierarchy; use crate::streaming::session::Session; -use crate::streaming::stats::stats::TopicStats; +use crate::streaming::stats::stats::{StreamStats, TopicStats}; use crate::streaming::streams::stream::Stream; use crate::streaming::topics::storage2::create_topic_file_hierarchy; use crate::streaming::topics::topic::Topic; @@ -35,7 +39,9 @@ use crate::streaming::topics::topic2; use clap::Id; use error_set::ErrContext; use iggy_common::locking::IggyRwLockFn; -use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize}; +use iggy_common::{ + CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, +}; use tokio_util::io::StreamReader; use tracing::info; @@ -152,13 +158,10 @@ impl IggyShard { compression: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: Option<u8>, - stats: Arc<TopicStats>, - ) -> Result<usize, IggyError> { + ) -> Result<topic2::Topic, IggyError> { self.ensure_authenticated(session)?; //self.ensure_stream_exists(stream_id)?; - let numeric_stream_id = self - .streams2 - .with_stream_by_id(stream_id, |stream| stream.id()); + let numeric_stream_id = self.streams2.with_root_by_id(stream_id, |root| root.id()); { self.permissioner .borrow() @@ -170,94 +173,78 @@ impl IggyShard { ) })?; } - let topic_id = self.create_topic2_base( + let exists = self.streams2.with_topics(stream_id, |topics| { + topics.exists(&Identifier::from_str_value(&name).unwrap()) + }); + if exists { + return Err(IggyError::TopicNameAlreadyExists( + name, + numeric_stream_id as u32, + )); + } + + let parent_stats = self + .streams2 + .with_stats_by_id(stream_id, |stats| stats.clone()); + let topic = self.create_and_insert_topics_mem( stream_id, name, replication_factor.unwrap_or(1), message_expiry, compression, max_topic_size, - stats, - )?; - - self.streams2.with_topic_by_id( - stream_id, - &Identifier::numeric(topic_id as u32).unwrap(), - |topic| { - let message_expiry = match topic.message_expiry() { - IggyExpiry::ServerDefault => self.config.system.segment.message_expiry, - _ => message_expiry, - }; - shard_info!(self.id, "Topic message expiry: {}", message_expiry); - }, + parent_stats, ); + let message_expiry = match topic.root().message_expiry() { + IggyExpiry::ServerDefault => self.config.system.segment.message_expiry, + _ => message_expiry, + }; + shard_info!(self.id, "Topic message expiry: {}", message_expiry); // Create file hierarchy for the topic. - create_topic_file_hierarchy(self.id, numeric_stream_id, topic_id, &self.config.system) + create_topic_file_hierarchy(self.id, numeric_stream_id, topic.id(), &self.config.system) .await?; - Ok(topic_id) + Ok(topic) } - pub fn create_topic2_bypass_auth( + fn create_and_insert_topics_mem( &self, stream_id: &Identifier, name: String, - replication_factor: Option<u8>, + replication_factor: u8, message_expiry: IggyExpiry, compression: CompressionAlgorithm, max_topic_size: MaxTopicSize, - stats: Arc<TopicStats>, - ) -> Result<usize, IggyError> { - let topic_id = self.create_topic2_base( - stream_id, + parent_stats: Arc<StreamStats>, + ) -> topic2::Topic { + let stats = Arc::new(TopicStats::new(parent_stats)); + let now = IggyTimestamp::now(); + let mut topic = topic2::Topic::new( name, - replication_factor.unwrap_or(1), + stats, + now, + replication_factor, message_expiry, compression, max_topic_size, - stats, - )?; - Ok(topic_id) + ); + + let id = self.insert_topic_mem(stream_id, topic.clone()); + topic.update_id(id); + topic + } + + fn insert_topic_mem(&self, stream_id: &Identifier, topic: topic2::Topic) -> usize { + self.streams2 + .with_root_by_id(stream_id, |root| root.topics().insert(topic)) } - fn create_topic2_base( + pub fn create_topic2_bypass_auth( &self, stream_id: &Identifier, - name: String, - replication_factor: u8, - message_expiry: IggyExpiry, - compression: CompressionAlgorithm, - max_topic_size: MaxTopicSize, - stats: Arc<TopicStats>, + topic: topic2::Topic, ) -> Result<usize, IggyError> { - let topic_id = self.streams2.with_stream_by_id(stream_id, |stream| { - let exists = stream.topics().exists(&Identifier::named(&name).unwrap()); - if exists { - // TODO: Fixme, replace the second argument with identifier, rather than numeric ID. - return Err(IggyError::TopicNameAlreadyExists(name.to_owned(), 0)); - } - let topic = topic2::Topic::new( - name, - replication_factor, - message_expiry, - compression, - max_topic_size, - ); - let name = topic.name().clone(); - let topic_id = stream.topics().with_mut(|topics| { - let topic_id = topic.insert_into(topics); - topic_id - }); - stream.topics().with_mut_index(|index| { - index.insert(name, topic_id); - }); - - stream.topics().with_stats_mut(|container| { - container.insert(stats); - }); - Ok(topic_id) - })?; - + let topic_id = self.insert_topic_mem(stream_id, topic); Ok(topic_id) } @@ -390,10 +377,10 @@ impl IggyShard { { let topic_id = self .streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()); + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()); let stream_id = self .streams2 - .with_stream_by_id(stream_id, |stream| stream.id()); + .with_root_by_id(stream_id, |stream| stream.id()); self.permissioner.borrow().update_topic( session.get_user_id(), stream_id as u32, @@ -407,6 +394,17 @@ impl IggyShard { ) })?; } + let exists = self + .streams2 + .with_topic_root_by_id(stream_id, topic_id, |topic| { + let old_name = topic.name(); + old_name == &name + }); + if exists { + // TODO: Fix the errors to accept Identifier instead of u32. + return Err(IggyError::TopicNameAlreadyExists(name, 0)); + } + self.update_topic_base2( stream_id, topic_id, @@ -453,7 +451,7 @@ impl IggyShard { ) { let (old_name, new_name) = self.streams2 - .with_topic_by_id_mut(stream_id, topic_id, |topic| { + .with_topic_root_by_id_mut(stream_id, topic_id, |topic| { let old_name = topic.name().clone(); topic.set_name(name.clone()); topic.set_message_expiry(message_expiry); @@ -465,7 +463,7 @@ impl IggyShard { }); if old_name != new_name { self.streams2.with_topics(stream_id, |topics| { - topics.with_mut_index(|index| { + topics.with_index_mut(|index| { // Rename the key inside of hashmap let idx = index.remove(&old_name).expect("Rename key: key not found"); index.insert(new_name, idx); @@ -600,10 +598,10 @@ impl IggyShard { { let topic_id = self .streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()); + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()); let stream_id = self .streams2 - .with_stream_by_id(stream_id, |stream| stream.id()); + .with_root_by_id(stream_id, |stream| stream.id()); self.permissioner .borrow() .delete_topic(session.get_user_id(), stream_id as u32, topic_id as u32) @@ -617,7 +615,7 @@ impl IggyShard { let topic = self.delete_topic_base2(stream_id, topic_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}") })?; - // TODO: Remove partitions. + // TODO: Decrease the stats Ok(topic) } @@ -635,26 +633,12 @@ impl IggyShard { stream_id: &Identifier, topic_id: &Identifier, ) -> Result<topic2::Topic, IggyError> { - let topic = self.streams2.with_stream_by_id(stream_id, |stream| { - let id = stream - .topics() - .with_topic_by_id(topic_id, |topic| topic.id()); - stream.topics().with_mut(|container| { - container.try_remove(id).ok_or_else(|| { - let topic_name = stream - .topics() - .with_topic_by_id(topic_id, |topic| topic.name().clone()); - IggyError::TopicNameNotFound(topic_name, stream.name().clone()) - }) - }) - })?; - let id = topic.id(); - self.streams2.with_topics(stream_id, |topics| { - topics.with_stats_mut(|container| { - container - .try_remove(id) - .expect("Topic delete: topic stats not found"); - }); + let topic = self.streams2.with_root_by_id_mut(stream_id, |stream| { + let topics = stream.topics(); + let id = topics.with_root_by_id(topic_id, |topic| topic.id()); + let topic = topics.delete(id); + assert_eq!(topic.id(), id, "delete_topic: topic ID mismatch"); + topic }); Ok(topic) } @@ -727,10 +711,10 @@ impl IggyShard { { let topic_id = self .streams2 - .with_topic_by_id(stream_id, topic_id, |topic| topic.id()); + .with_topic_root_by_id(stream_id, topic_id, |topic| topic.id()); let stream_id = self .streams2 - .with_stream_by_id(stream_id, |stream| stream.id()); + .with_root_by_id(stream_id, |stream| stream.id()); self.permissioner.borrow().purge_topic( session.get_user_id(), stream_id as u32, diff --git a/core/server/src/shard/system/utils.rs b/core/server/src/shard/system/utils.rs index 6ad3b224..28db67dc 100644 --- a/core/server/src/shard/system/utils.rs +++ b/core/server/src/shard/system/utils.rs @@ -1,6 +1,6 @@ use iggy_common::{Identifier, IggyError}; -use crate::shard::IggyShard; +use crate::{shard::IggyShard, slab::traits_ext::EntityComponentSystem}; impl IggyShard { pub fn ensure_stream_exists(&self, stream_id: &Identifier) -> Result<(), IggyError> { @@ -16,9 +16,10 @@ impl IggyShard { topic_id: &Identifier, ) -> Result<(), IggyError> { //self.ensure_stream_exists(stream_id)?; + let stream_id = self.streams2.get_index(stream_id); let exists = self .streams2 - .with_stream_by_id(stream_id, |stream| stream.topics().exists(topic_id)); + .with_by_id(stream_id, |(root, _)| root.topics().exists(topic_id)); if !exists { return Err(IggyError::TopicIdNotFound(0, 0)); } @@ -35,8 +36,8 @@ impl IggyShard { //self.ensure_topic_exists(stream_id, topic_id)?; let exists = self .streams2 - .with_topic_by_id(stream_id, topic_id, |topic| { - topic.consumer_groups().exists(group_id) + .with_topic_root_by_id(stream_id, topic_id, |root| { + root.consumer_groups().exists(group_id) }); if !exists { return Err(IggyError::ConsumerGroupIdNotFound(0, 0)); diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index 9f5dd19c..666e3272 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -18,7 +18,8 @@ use crate::{ personal_access_tokens::personal_access_token::PersonalAccessToken, polling_consumer::PollingConsumer, stats::stats::{PartitionStats, StreamStats, TopicStats}, - topics::consumer_group2::Member, + streams::stream2, + topics::{consumer_group2::Member, topic2}, }, }; @@ -34,8 +35,7 @@ pub enum ShardEvent { }, CreatedStream2 { id: usize, - name: String, - stats: Arc<StreamStats>, + stream: stream2::Stream, }, CreatedStream { stream_id: Option<u32>, @@ -94,14 +94,8 @@ pub enum ShardEvent { replication_factor: Option<u8>, }, CreatedTopic2 { - id: usize, stream_id: Identifier, - name: String, - message_expiry: IggyExpiry, - compression_algorithm: CompressionAlgorithm, - max_topic_size: MaxTopicSize, - replication_factor: Option<u8>, - stats: Arc<TopicStats>, + topic: topic2::Topic, }, CreatedConsumerGroup { stream_id: Identifier, diff --git a/core/server/src/slab/consumer_groups.rs b/core/server/src/slab/consumer_groups.rs index fe352696..c52fda6d 100644 --- a/core/server/src/slab/consumer_groups.rs +++ b/core/server/src/slab/consumer_groups.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, atomic::AtomicUsize}; const CAPACITY: usize = 1024; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConsumerGroups { index: AHashMap<<consumer_group2::ConsumerGroup as Keyed>::Key, usize>, container: Slab<consumer_group2::ConsumerGroup>, diff --git a/core/server/src/slab/mod.rs b/core/server/src/slab/mod.rs index a8185700..840de3d3 100644 --- a/core/server/src/slab/mod.rs +++ b/core/server/src/slab/mod.rs @@ -16,4 +16,4 @@ pub trait Keyed { fn key(&self) -> &Self::Key; } -//index: AHashMap<T::Key, usize>, \ No newline at end of file +//index: AHashMap<T::Key, usize>, diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index 23b2f966..63f174d0 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -1,34 +1,27 @@ use crate::{ - slab::traits_ext::{ - Borrow, Components, Delete, EntityComponentSystem, EntityMarker, IndexComponents, Insert, - IntoComponents, - }, + slab::traits_ext::{Borrow, Delete, EntityComponentSystem, Insert, IntoComponents}, streaming::{ deduplication::message_deduplicator::MessageDeduplicator, partitions::{ consumer_offset, - partition::ConsumerOffset, partition2::{self, Partition, PartitionRef}, }, segments, stats::stats::PartitionStats, + topics::consumer_group, }, }; -use ahash::AHashMap; use slab::Slab; use std::sync::{Arc, atomic::AtomicU64}; // TODO: This could be upper limit of partitions per topic, use that value to validate instead of whathever this thing is in `common` crate. pub const PARTITIONS_CAPACITY: usize = 16384; -pub type SlabId = usize; - -struct PartitionOffset { - offset: u64, -} +const SEGMENTS_CAPACITY: usize = 1024; +pub type ContainerId = usize; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Partitions { - info: Slab<partition2::PartitionInfo>, + root: Slab<partition2::PartitionRoot>, stats: Slab<Arc<PartitionStats>>, segments: Slab<Vec<segments::Segment2>>, message_deduplicator: Slab<Option<MessageDeduplicator>>, @@ -38,10 +31,63 @@ pub struct Partitions { consumer_group_offset: Slab<Arc<papaya::HashMap<usize, consumer_offset::ConsumerOffset>>>, } +impl Insert for Partitions { + type Idx = ContainerId; + type Item = Partition; + + fn insert(&mut self, item: Self::Item) -> Self::Idx { + let (root, stats, deduplicator, offset, consumer_offset, consumer_group_offset) = + item.into_components(); + + let entity_id = self.root.insert(root); + let id = self.stats.insert(stats); + assert_eq!( + entity_id, id, + "partition_insert: id mismatch when inserting stats" + ); + let id = self.segments.insert(Vec::with_capacity(SEGMENTS_CAPACITY)); + assert_eq!( + entity_id, id, + "partition_insert: id mismatch when inserting segments" + ); + let id = self.message_deduplicator.insert(deduplicator); + assert_eq!( + entity_id, id, + "partition_insert: id mismatch when inserting message_deduplicator" + ); + let id = self.offset.insert(offset); + assert_eq!( + entity_id, id, + "partition_insert: id mismatch when inserting offset" + ); + let id = self.consumer_offset.insert(consumer_offset); + assert_eq!( + entity_id, id, + "partition_insert: id mismatch when inserting consumer_offset" + ); + let id = self.consumer_group_offset.insert(consumer_group_offset); + assert_eq!( + entity_id, id, + "partition_insert: id mismatch when inserting consumer_group_offset" + ); + entity_id + } +} + +impl Delete for Partitions { + type Idx = ContainerId; + type Item = Partition; + + fn delete(&mut self, id: Self::Idx) -> Self::Item { + todo!() + } +} + +//TODO: those from impls could use a macro aswell. impl<'a> From<&'a Partitions> for PartitionRef<'a> { fn from(value: &'a Partitions) -> Self { PartitionRef::new( - &value.info, + &value.root, &value.stats, &value.message_deduplicator, &value.offset, @@ -51,40 +97,8 @@ impl<'a> From<&'a Partitions> for PartitionRef<'a> { } } -impl Insert<SlabId> for Partitions { - type Item = Partition; - - fn insert(&mut self, item: Self::Item) -> SlabId { - let ( - info, - stats, - message_deduplicator, - offset, - consumer_offset, - consumer_group_offset, - ) = item.into_components(); - - let id = self.info.insert(info); - let info = &mut self.info[id]; - info.update_id(id); - self.stats.insert(stats); - self.message_deduplicator.insert(message_deduplicator); - self.offset.insert(offset); - self.consumer_offset.insert(consumer_offset); - self.consumer_group_offset.insert(consumer_group_offset); - id - } -} - -impl Delete<SlabId> for Partitions { - type Item = Partition; - - fn delete(&mut self, id: SlabId) -> Self::Item { - todo!() - } -} - -impl EntityComponentSystem<SlabId, Borrow> for Partitions { +impl EntityComponentSystem<Borrow> for Partitions { + type Idx = ContainerId; type Entity = Partition; type EntityRef<'a> = PartitionRef<'a>; @@ -106,7 +120,7 @@ impl EntityComponentSystem<SlabId, Borrow> for Partitions { impl Default for Partitions { fn default() -> Self { Self { - info: Slab::with_capacity(PARTITIONS_CAPACITY), + root: Slab::with_capacity(PARTITIONS_CAPACITY), stats: Slab::with_capacity(PARTITIONS_CAPACITY), segments: Slab::with_capacity(PARTITIONS_CAPACITY), message_deduplicator: Slab::with_capacity(PARTITIONS_CAPACITY), @@ -118,8 +132,8 @@ impl Default for Partitions { } impl Partitions { - pub fn count(&self) -> usize { - self.info.len() + pub fn len(&self) -> usize { + self.root.len() } pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<PartitionStats>>) -> T) -> T { @@ -128,8 +142,7 @@ impl Partitions { } pub fn with_stats_mut<T>(&mut self, f: impl FnOnce(&mut Slab<Arc<PartitionStats>>) -> T) -> T { - let mut stats = &mut self.stats; - f(&mut stats) + f(&mut self.stats) } pub fn with_segments(&self, partition_id: usize, f: impl FnOnce(&Vec<segments::Segment2>)) { diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 56bce63d..15ed08ac 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -2,27 +2,111 @@ use ahash::AHashMap; use iggy_common::Identifier; use slab::Slab; use std::{cell::RefCell, sync::Arc}; +use tokio_rustls::StartHandshake; use crate::{ - slab::{Keyed, partitions::Partitions, topics::Topics}, - streaming::{ - partitions::partition2, stats::stats::StreamStats, streams::stream2, topics::topic2, + shard::stats, + slab::{ + Keyed, + partitions::Partitions, + topics::Topics, + traits_ext::{ + Delete, DeleteCell, EntityComponentSystem, EntityComponentSystemMutCell, Insert, + InsertCell, InteriorMutability, IntoComponents, + }, }, + streaming::{stats::stats::StreamStats, streams::stream2, topics::topic2}, }; const CAPACITY: usize = 1024; +pub type ContainerId = usize; pub struct Streams { - index: RefCell<AHashMap<<stream2::Stream as Keyed>::Key, usize>>, - container: RefCell<Slab<stream2::Stream>>, + index: RefCell<AHashMap<<stream2::StreamRoot as Keyed>::Key, ContainerId>>, + root: RefCell<Slab<stream2::StreamRoot>>, stats: RefCell<Slab<Arc<StreamStats>>>, } +impl<'a> From<&'a Streams> for stream2::StreamRef<'a> { + fn from(value: &'a Streams) -> Self { + let root = value.root.borrow(); + let stats = value.stats.borrow(); + stream2::StreamRef::new(root, stats) + } +} + +impl<'a> From<&'a Streams> for stream2::StreamRefMut<'a> { + fn from(value: &'a Streams) -> Self { + let root = value.root.borrow_mut(); + let stats = value.stats.borrow_mut(); + stream2::StreamRefMut::new(root, stats) + } +} + +impl InsertCell for Streams { + type Idx = ContainerId; + type Item = stream2::Stream; + + fn insert(&self, item: Self::Item) -> Self::Idx { + let (root, stats) = item.into_components(); + let key = root.key().clone(); + + let entity_id = self.root.borrow_mut().insert(root); + let id = self.stats.borrow_mut().insert(stats); + assert_eq!( + entity_id, id, + "stream_insert: id mismatch when inserting stats" + ); + self.index.borrow_mut().insert(key, entity_id); + entity_id + } +} + +impl DeleteCell for Streams { + type Idx = ContainerId; + type Item = stream2::Stream; + + fn delete(&self, id: Self::Idx) -> Self::Item { + todo!() + } +} + +impl EntityComponentSystem<InteriorMutability> for Streams { + type Idx = ContainerId; + type Entity = stream2::Stream; + type EntityRef<'a> = stream2::StreamRef<'a>; + + fn with<O, F>(&self, f: F) -> O + where + F: for<'a> FnOnce(Self::EntityRef<'a>) -> O, + { + f(self.into()) + } + + async fn with_async<O, F>(&self, f: F) -> O + where + F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O, + { + f(self.into()).await + } +} + +impl EntityComponentSystemMutCell for Streams { + type EntityRefMut<'a> = stream2::StreamRefMut<'a>; + + fn with_mut<O, F>(&self, f: F) -> O + where + F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O, + { + f(self.into()) + } +} + impl Streams { pub fn init() -> Self { Self { index: RefCell::new(AHashMap::with_capacity(CAPACITY)), - container: RefCell::new(Slab::with_capacity(CAPACITY)), + root: RefCell::new(Slab::with_capacity(CAPACITY)), stats: RefCell::new(Slab::with_capacity(CAPACITY)), } } @@ -31,7 +115,7 @@ impl Streams { match id.kind { iggy_common::IdKind::Numeric => { let id = id.get_u32_value().unwrap() as usize; - self.container.borrow().contains(id) + self.root.borrow().contains(id) } iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); @@ -40,7 +124,7 @@ impl Streams { } } - fn get_index(&self, id: &Identifier) -> usize { + pub fn get_index(&self, id: &Identifier) -> usize { match id.kind { iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as usize, iggy_common::IdKind::String => { @@ -50,89 +134,79 @@ impl Streams { } } - pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<StreamStats>>) -> T) -> T { - let stats = self.stats.borrow(); - f(&stats) - } - - pub fn with_stats_by_id<T>( + pub fn with_root_by_id<T>( &self, id: &Identifier, - f: impl FnOnce(&Arc<StreamStats>) -> T, + f: impl FnOnce(&stream2::StreamRoot) -> T, ) -> T { - let stream_id = self.with_stream_by_id(id, |stream| stream.id()); - self.with_stats(|stats| { - let stats = &stats[stream_id]; - f(stats) - }) - } - - pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut Slab<Arc<StreamStats>>) -> T) -> T { - let mut stats = self.stats.borrow_mut(); - f(&mut stats) - } - - pub async fn with_async<T>(&self, f: impl AsyncFnOnce(&Slab<stream2::Stream>) -> T) -> T { - let container = self.container.borrow(); - f(&container).await + let id = self.get_index(id); + self.with_by_id(id, |(root, _)| f(&root)) } - pub fn with_index<T>( + pub async fn with_root_by_id_async<T>( &self, - f: impl FnOnce(&AHashMap<<stream2::Stream as Keyed>::Key, usize>) -> T, + id: &Identifier, + f: impl AsyncFnOnce(&stream2::StreamRoot) -> T, ) -> T { - let index = self.index.borrow(); - f(&index) + let id = self.get_index(id); + self.with_by_id_async(id, async |(root, _)| f(&root).await) + .await } - pub fn with_index_mut<T>( + pub fn with_root_by_id_mut<T>( &self, - f: impl FnOnce(&mut AHashMap<<stream2::Stream as Keyed>::Key, usize>) -> T, + id: &Identifier, + f: impl FnOnce(&mut stream2::StreamRoot) -> T, ) -> T { - let mut index = self.index.borrow_mut(); - f(&mut index) - } - - pub fn with<T>(&self, f: impl FnOnce(&Slab<stream2::Stream>) -> T) -> T { - let container = self.container.borrow(); - f(&container) + let id = self.get_index(id); + self.with_by_id_mut(id, |(mut root, _)| f(&mut root)) } - pub fn with_mut<T>(&self, f: impl FnOnce(&mut Slab<stream2::Stream>) -> T) -> T { - let mut container = self.container.borrow_mut(); - f(&mut container) + pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<StreamStats>>) -> T) -> T { + self.with(|components| { + let (_, stats) = components.into_components(); + f(&stats) + }) } - pub fn with_stream_by_id<T>( + pub fn with_stats_by_id<T>( &self, id: &Identifier, - f: impl FnOnce(&stream2::Stream) -> T, + f: impl FnOnce(&Arc<StreamStats>) -> T, ) -> T { let id = self.get_index(id); - self.with(|streams| streams[id].invoke(f)) + self.with_by_id(id, |(_, stats)| { + let stats = stats; + f(&stats) + }) } - pub async fn with_stream_by_id_async<T>( + pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut Slab<Arc<StreamStats>>) -> T) -> T { + self.with_mut(|components| { + let (_, mut stats) = components.into_components(); + f(&mut stats) + }) + } + + pub fn with_index<T>( &self, - id: &Identifier, - f: impl AsyncFnOnce(&stream2::Stream) -> T, + f: impl FnOnce(&AHashMap<<stream2::StreamRoot as Keyed>::Key, usize>) -> T, ) -> T { - let id = self.get_index(id); - self.with_async(async |streams| streams[id].invoke_async(f).await) - .await + let index = self.index.borrow(); + f(&index) } - pub fn with_stream_by_id_mut<T>( + pub fn with_index_mut<T>( &self, - id: &Identifier, - f: impl FnOnce(&mut stream2::Stream) -> T, + f: impl FnOnce(&mut AHashMap<<stream2::StreamRoot as Keyed>::Key, usize>) -> T, ) -> T { - let id = self.get_index(id); - self.with_mut(|streams| streams[id].invoke_mut(f)) + let mut index = self.index.borrow_mut(); + f(&mut index) } pub fn with_topics<T>(&self, stream_id: &Identifier, f: impl FnOnce(&Topics) -> T) -> T { - self.with_stream_by_id(stream_id, |stream| f(stream.topics())) + let id = self.get_index(stream_id); + self.with_by_id(id, |(root, _)| f(root.topics())) } pub async fn with_topics_async<T>( @@ -140,7 +214,8 @@ impl Streams { stream_id: &Identifier, f: impl AsyncFnOnce(&Topics) -> T, ) -> T { - self.with_stream_by_id_async(stream_id, async |stream| f(stream.topics()).await) + let id = self.get_index(stream_id); + self.with_by_id_async(id, async |(root, _)| f(root.topics()).await) .await } @@ -149,37 +224,48 @@ impl Streams { stream_id: &Identifier, f: impl FnOnce(&mut Topics) -> T, ) -> T { - self.with_stream_by_id_mut(stream_id, |stream| f(stream.topics_mut())) + let id = self.get_index(stream_id); + self.with_by_id_mut(id, |(mut root, _)| { + let topics = root.topics_mut(); + f(topics) + }) } - pub fn with_topic_by_id<T>( + pub fn with_topic_root_by_id<T>( &self, stream_id: &Identifier, id: &Identifier, - f: impl FnOnce(&topic2::Topic) -> T, + f: impl FnOnce(&topic2::TopicRoot) -> T, ) -> T { - self.with_topics(stream_id, |topics| topics.with_topic_by_id(id, f)) + self.with_topics(stream_id, |topics| { + topics.with_root_by_id(id, |root| f(root)) + }) } - pub async fn with_topic_by_id_async<T>( + pub async fn with_topic_root_by_id_async<T>( &self, stream_id: &Identifier, id: &Identifier, - f: impl AsyncFnOnce(&topic2::Topic) -> T, + f: impl AsyncFnOnce(&topic2::TopicRoot) -> T, ) -> T { self.with_topics_async(stream_id, async |topics| { - topics.with_topic_by_id_async(id, f).await + let id = topics.get_index(id); + topics + .with_by_id_async(id, async |(root, _)| f(&root).await) + .await }) .await } - pub fn with_topic_by_id_mut<T>( + pub fn with_topic_root_by_id_mut<T>( &self, stream_id: &Identifier, topic_id: &Identifier, - f: impl FnOnce(&mut topic2::Topic) -> T, + f: impl FnOnce(&mut topic2::TopicRoot) -> T, ) -> T { - self.with_topics_mut(stream_id, |topics| topics.with_topic_by_id_mut(topic_id, f)) + self.with_topics_mut(stream_id, |topics| { + topics.with_root_by_id_mut(topic_id, |root| f(root)) + }) } pub fn with_partitions( @@ -193,6 +279,15 @@ impl Streams { }); } + pub fn with_partitions_mut<T>( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + f: impl FnOnce(&mut Partitions) -> T, + ) -> T { + self.with_topics_mut(stream_id, |topics| topics.with_partitions_mut(topic_id, f)) + } + pub async fn with_partitions_async<T>( &self, stream_id: &Identifier, @@ -206,6 +301,6 @@ impl Streams { } pub fn len(&self) -> usize { - self.container.borrow().len() + self.root.borrow().len() } } diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs index 2ccdec88..87eb360d 100644 --- a/core/server/src/slab/topics.rs +++ b/core/server/src/slab/topics.rs @@ -4,33 +4,126 @@ use slab::Slab; use std::{cell::RefCell, sync::Arc}; use crate::{ - slab::{Keyed, partitions::Partitions}, - streaming::{partitions::partition2, stats::stats::TopicStats, topics::topic2}, + slab::{ + Keyed, + partitions::Partitions, + traits_ext::{ + Delete, DeleteCell, EntityComponentSystem, EntityComponentSystemMutCell, Insert, + InsertCell, InteriorMutability, IntoComponents, + }, + }, + streaming::{ + partitions::partition2, + stats::stats::TopicStats, + topics::topic2::{self, TopicRef}, + }, }; const CAPACITY: usize = 1024; +pub type ContainerId = usize; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Topics { - index: RefCell<AHashMap<<topic2::Topic as Keyed>::Key, usize>>, - container: RefCell<Slab<topic2::Topic>>, + index: RefCell<AHashMap<<topic2::TopicRoot as Keyed>::Key, ContainerId>>, + root: RefCell<Slab<topic2::TopicRoot>>, stats: RefCell<Slab<Arc<TopicStats>>>, } -impl Topics { - pub fn init() -> Self { +impl InsertCell for Topics { + type Idx = ContainerId; + type Item = topic2::Topic; + + fn insert(&self, item: Self::Item) -> Self::Idx { + let (root, stats) = item.into_components(); + let key = root.key().clone(); + + let entity_id = self.root.borrow_mut().insert(root); + let id = self.stats.borrow_mut().insert(stats); + assert_eq!( + entity_id, id, + "topic_insert: id mismatch when inserting stats" + ); + self.index.borrow_mut().insert(key, entity_id); + entity_id + } +} + +impl DeleteCell for Topics { + type Idx = ContainerId; + type Item = topic2::Topic; + + fn delete(&self, id: Self::Idx) -> Self::Item { + todo!() + } +} + +//TODO: those from impls could use a macro aswell. +impl<'a> From<&'a Topics> for topic2::TopicRef<'a> { + fn from(value: &'a Topics) -> Self { + let root = value.root.borrow(); + let stats = value.stats.borrow(); + topic2::TopicRef::new(root, stats) + } +} +impl Default for Topics { + fn default() -> Self { Self { index: RefCell::new(AHashMap::with_capacity(CAPACITY)), - container: RefCell::new(Slab::with_capacity(CAPACITY)), + root: RefCell::new(Slab::with_capacity(CAPACITY)), stats: RefCell::new(Slab::with_capacity(CAPACITY)), } } +} + +impl<'a> From<&'a Topics> for topic2::TopicRefMut<'a> { + fn from(value: &'a Topics) -> Self { + let root = value.root.borrow_mut(); + let stats = value.stats.borrow_mut(); + topic2::TopicRefMut::new(root, stats) + } +} + +impl EntityComponentSystem<InteriorMutability> for Topics { + type Idx = ContainerId; + type Entity = topic2::Topic; + type EntityRef<'a> = topic2::TopicRef<'a>; + + fn with<O, F>(&self, f: F) -> O + where + F: for<'a> FnOnce(Self::EntityRef<'a>) -> O, + { + f(self.into()) + } + + async fn with_async<O, F>(&self, f: F) -> O + where + F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O, + { + f(self.into()).await + } +} + +impl EntityComponentSystemMutCell for Topics { + type EntityRefMut<'a> = topic2::TopicRefMut<'a>; + + fn with_mut<O, F>(&self, f: F) -> O + where + F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O, + { + f(self.into()) + } +} + +impl Topics { + pub fn len(&self) -> usize { + self.root.borrow().len() + } pub fn exists(&self, id: &Identifier) -> bool { match id.kind { iggy_common::IdKind::Numeric => { let id = id.get_u32_value().unwrap() as usize; - self.container.borrow().contains(id) + self.root.borrow().contains(id) } iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); @@ -39,83 +132,77 @@ impl Topics { } } - fn get_index(&self, id: &Identifier) -> usize { + pub fn get_index(&self, id: &Identifier) -> usize { match id.kind { iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as usize, iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); - tracing::error!("Getting index for topic: {key}, index: {:?}", self.index.borrow()); *self.index.borrow().get(&key).expect("Topic not found") } } } - pub fn len(&self) -> usize { - self.container.borrow().len() - } - - pub async fn with_async<T>(&self, f: impl AsyncFnOnce(&Slab<topic2::Topic>) -> T) -> T { - let container = self.container.borrow(); - f(&container).await - } - - pub fn with<T>(&self, f: impl FnOnce(&Slab<topic2::Topic>) -> T) -> T { - let container = self.container.borrow(); - f(&container) - } - - pub fn with_mut<T>(&self, f: impl FnOnce(&mut Slab<topic2::Topic>) -> T) -> T { - let mut container = self.container.borrow_mut(); - f(&mut container) + pub fn with_index<T>( + &self, + f: impl FnOnce(&AHashMap<<topic2::TopicRoot as Keyed>::Key, usize>) -> T, + ) -> T { + let index = self.index.borrow(); + f(&index) } - pub fn with_mut_index<T>( + pub fn with_index_mut<T>( &self, - f: impl FnOnce(&mut AHashMap<<topic2::Topic as Keyed>::Key, usize>) -> T, + f: impl FnOnce(&mut AHashMap<<topic2::TopicRoot as Keyed>::Key, usize>) -> T, ) -> T { let mut index = self.index.borrow_mut(); f(&mut index) } - pub async fn with_topic_by_id_async<T>( + pub fn with_root_by_id<T>( &self, id: &Identifier, - f: impl AsyncFnOnce(&topic2::Topic) -> T, + f: impl FnOnce(&topic2::TopicRoot) -> T, ) -> T { let id = self.get_index(id); - self.with_async(async |topics| topics[id].invoke_async(f).await) - .await + self.with_by_id(id, |(root, _)| f(&root)) } - pub fn with_topic_stats_by_id<T>( + pub async fn with_root_by_id_async<T>( &self, id: &Identifier, - f: impl FnOnce(Arc<TopicStats>) -> T, + f: impl AsyncFnOnce(&topic2::TopicRoot) -> T, ) -> T { - let topic_id = self.with_topic_by_id(id, |topic| topic.id()); - self.with_stats(|stats| f(stats[topic_id].clone())) - } - - pub fn with_topic_by_id<T>(&self, id: &Identifier, f: impl FnOnce(&topic2::Topic) -> T) -> T { let id = self.get_index(id); - self.with(|topics| topics[id].invoke(f)) + self.with_by_id_async(id, async |(root, _)| f(&root).await) + .await } - pub fn with_topic_by_id_mut<T>( + pub fn with_root_by_id_mut<T>( &self, id: &Identifier, - f: impl FnOnce(&mut topic2::Topic) -> T, + f: impl FnOnce(&mut topic2::TopicRoot) -> T, ) -> T { let id = self.get_index(id); - self.with_mut(|topics| topics[id].invoke_mut(f)) + self.with_by_id_mut(id, |(mut root, _)| f(&mut root)) } - pub fn with_partitions(&self, topic_id: &Identifier, f: impl FnOnce(&Partitions)) { - self.with_topic_by_id(topic_id, |topic| f(topic.partitions())); + pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<TopicStats>>) -> T) -> T { + self.with(|components| { + let (_, stats) = components.into_components(); + f(&stats) + }) } - pub fn with_partitions_mut(&self, topic_id: &Identifier, f: impl FnOnce(&mut Partitions)) { - self.with_topic_by_id_mut(topic_id, |topic| f(topic.partitions_mut())); + pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut Slab<Arc<TopicStats>>) -> T) -> T { + self.with_mut(|components| { + let (_, mut stats) = components.into_components(); + f(&mut stats) + }) + } + + pub fn with_stats_by_id<T>(&self, id: &Identifier, f: impl FnOnce(&Arc<TopicStats>) -> T) -> T { + let id = self.get_index(id); + self.with_by_id(id, |(_, stats)| f(&stats)) } pub async fn with_partitions_async<T>( @@ -123,17 +210,31 @@ impl Topics { topic_id: &Identifier, f: impl AsyncFnOnce(&Partitions) -> T, ) -> T { - self.with_topic_by_id_async(topic_id, async |topic| f(topic.partitions()).await) - .await + let id = self.get_index(topic_id); + self.with_by_id_async(id, async |(root, _)| { + let partitions = root.partitions(); + f(partitions).await + }) + .await } - pub fn with_stats<T>(&self, f: impl FnOnce(&Slab<Arc<TopicStats>>) -> T) -> T { - let stats = self.stats.borrow(); - f(&stats) + pub fn with_partitions(&self, topic_id: &Identifier, f: impl FnOnce(&Partitions)) { + let id = self.get_index(topic_id); + self.with_by_id(id, |(root, _)| { + let partitions = root.partitions(); + f(partitions) + }) } - pub fn with_stats_mut<T>(&self, f: impl FnOnce(&mut Slab<Arc<TopicStats>>) -> T) -> T { - let mut stats = self.stats.borrow_mut(); - f(&mut stats) + pub fn with_partitions_mut<T>( + &self, + topic_id: &Identifier, + f: impl FnOnce(&mut Partitions) -> T, + ) -> T { + let id = self.get_index(topic_id); + self.with_by_id_mut(id, |(mut root, _)| { + let partitions = root.partitions_mut(); + f(partitions) + }) } } diff --git a/core/server/src/slab/traits_ext.rs b/core/server/src/slab/traits_ext.rs index 9abee696..4b860f05 100644 --- a/core/server/src/slab/traits_ext.rs +++ b/core/server/src/slab/traits_ext.rs @@ -3,31 +3,51 @@ pub trait IntoComponents { fn into_components(self) -> Self::Components; } -// Marker trait for the entity type. -pub trait EntityMarker {} +/// Marker trait for the `Entity`. +pub trait EntityMarker { + type Idx; + fn id(&self) -> Self::Idx; + fn update_id(&mut self, id: Self::Idx); +} + +/// Insert trait for inserting an `Entity`` into container. +pub trait Insert { + type Idx; + type Item: IntoComponents + EntityMarker; + fn insert(&mut self, item: Self::Item) -> Self::Idx; +} -pub trait Insert<Idx> { +pub trait InsertCell { + type Idx; type Item: IntoComponents + EntityMarker; - fn insert(&mut self, item: Self::Item) -> Idx; + fn insert(&self, item: Self::Item) -> Self::Idx; } -pub trait Delete<Idx> { +/// Delete trait for deleting an `Entity` from container. +pub trait Delete { + type Idx; type Item: IntoComponents + EntityMarker; - fn delete(&mut self, id: Idx) -> Self::Item; + fn delete(&mut self, id: Self::Idx) -> Self::Item; } -pub trait DeleteCell<Idx> { +/// Delete trait for deleting an `Entity` from container for container types that use interior mutability. +pub trait DeleteCell { + type Idx; type Item: IntoComponents + EntityMarker; - fn delete(&self, id: Idx) -> Self::Item; + fn delete(&self, id: Self::Idx) -> Self::Item; } -pub trait IndexComponents<Idx: ?Sized> { +/// Trait for getting components by EntityId. +pub trait IntoComponentsById { + type Idx; type Output; - fn index(&self, index: Idx) -> Self::Output; + fn into_components_by_id(self, index: Self::Idx) -> Self::Output; } +/// Marker type for borrow component containers. pub struct Borrow; -pub struct RefCell; +/// Marker type for component containers that use interior mutability. +pub struct InteriorMutability; mod private { pub trait Sealed {} @@ -40,39 +60,37 @@ pub trait ComponentsMapping<T>: private::Sealed { } pub trait ComponentsByIdMapping<T>: private::Sealed { - // TODO: We will need this to contrain the `EntityRef` and `EntityRefMut` types, so after decomposing they have proper mapping. - // Similar mechanism to trait from above, but for (T1, T2) -> (&T1, &T2) mapping rather than (T1, T2) -> (&Slab<T1>, &Slab<T2>). type Ref<'a>; type RefMut<'a>; } -macro_rules! impl_components_for_slab_as_refs { +macro_rules! impl_components_mapping_for_slab { ($T:ident) => { impl<$T> private::Sealed for ($T,) {} impl<$T> ComponentsMapping<Borrow> for ($T,) - where for<'a> $T: 'a + where for<'a> $T :'a { type Ref<'a> = (&'a ::slab::Slab<$T>,); type RefMut<'a> = (&'a mut ::slab::Slab<$T>,); } - impl<$T> ComponentsMapping<RefCell> for ($T,) - where for<'a> $T: 'a + impl<$T> ComponentsMapping<InteriorMutability> for ($T,) + where for<'a> $T :'a { type Ref<'a> = (::std::cell::Ref<'a, ::slab::Slab<$T>>,); type RefMut<'a> = (::std::cell::RefMut<'a, ::slab::Slab<$T>>,); } impl<$T> ComponentsByIdMapping<Borrow> for ($T,) - where for<'a> $T: 'a + where for<'a> $T :'a { type Ref<'a> = (&'a $T,); type RefMut<'a> = (&'a mut $T,); } - impl<$T> ComponentsByIdMapping<RefCell> for ($T,) - where for<'a> $T: 'a + impl<$T> ComponentsByIdMapping<InteriorMutability> for ($T,) + where for<'a> $T :'a { type Ref<'a> = (::std::cell::Ref<'a, $T>,); type RefMut<'a> = (::std::cell::RefMut<'a, $T>,); @@ -83,44 +101,44 @@ macro_rules! impl_components_for_slab_as_refs { impl<$T, $($rest),+> private::Sealed for ($T, $($rest),+) {} impl<$T, $($rest),+> ComponentsMapping<Borrow> for ($T, $($rest),+) - where - for<'a> $T: 'a, - $(for<'a> $rest: 'a),+ + where + for<'a> $T :'a, + $(for<'a> $rest: 'a),+ { type Ref<'a> = (&'a ::slab::Slab<$T>, $(&'a ::slab::Slab<$rest>),+); type RefMut<'a> = (&'a mut ::slab::Slab<$T>, $(&'a mut ::slab::Slab<$rest>),+); } - impl<$T, $($rest),+> ComponentsMapping<RefCell> for ($T, $($rest),+) - where - for<'a> $T: 'a, - $(for<'a> $rest: 'a),+ + impl<$T, $($rest),+> ComponentsMapping<InteriorMutability> for ($T, $($rest),+) + where + for<'a> $T :'a, + $(for<'a> $rest: 'a),+ { type Ref<'a> = (std::cell::Ref<'a, ::slab::Slab<$T>>, $(::std::cell::Ref<'a, ::slab::Slab<$rest>>),+); type RefMut<'a> = (std::cell::RefMut<'a, ::slab::Slab<$T>>, $(::std::cell::RefMut<'a, ::slab::Slab<$rest>>),+); } impl<$T, $($rest),+> ComponentsByIdMapping<Borrow> for ($T, $($rest),+) - where - for<'a> $T: 'a, - $(for<'a> $rest: 'a),+ + where + for<'a> $T :'a, + $(for<'a> $rest: 'a),+ { type Ref<'a> = (&'a $T, $(&'a $rest),+); type RefMut<'a> = (&'a mut $T, $(&'a mut $rest),+); } - impl<$T, $($rest),+> ComponentsByIdMapping<RefCell> for ($T, $($rest),+) - where - for<'a> $T: 'a, - $(for<'a> $rest: 'a),+ + impl<$T, $($rest),+> ComponentsByIdMapping<InteriorMutability> for ($T, $($rest),+) + where + for<'a> $T :'a, + $(for<'a> $rest: 'a),+ { type Ref<'a> = (std::cell::Ref<'a, $T>, $(::std::cell::Ref<'a, $rest>),+); type RefMut<'a> = (std::cell::RefMut<'a, $T>, $(::std::cell::RefMut<'a, $rest>),+); } - impl_components_for_slab_as_refs!($($rest),+); + impl_components_mapping_for_slab!($($rest),+); }; } -impl_components_for_slab_as_refs!(T1, T2, T3, T4, T5, T6, T7, T8); +impl_components_mapping_for_slab!(T1, T2, T3, T4, T5, T6, T7, T8); type Mapping<'a, E, T> = <<E as IntoComponents>::Components as ComponentsMapping<T>>::Ref<'a>; type MappingMut<'a, E, T> = <<E as IntoComponents>::Components as ComponentsMapping<T>>::RefMut<'a>; @@ -137,17 +155,17 @@ type MappingByIdMut<'a, E, T> = // So we lack the ability to immediately discard unnecessary components, which leads to less ergonomic API. // Damn tradeoffs. pub type Components<T> = <T as IntoComponents>::Components; -pub type ComponentsById<Idx, T> = <T as IndexComponents<Idx>>::Output; +pub type ComponentsById<'a, T> = <T as IntoComponentsById>::Output; -pub trait EntityComponentSystem<Idx, T> +pub trait EntityComponentSystem<T> where <Self::Entity as IntoComponents>::Components: ComponentsMapping<T> + ComponentsByIdMapping<T>, { + type Idx; type Entity: IntoComponents + EntityMarker; type EntityRef<'a>: IntoComponents<Components = Mapping<'a, Self::Entity, T>> - + IndexComponents<Idx, Output = MappingById<'a, Self::Entity, T>> - where - Self: 'a; + + IntoComponentsById<Idx = Self::Idx, Output = MappingById<'a, Self::Entity, T>>; + fn with<O, F>(&self, f: F) -> O where F: for<'a> FnOnce(Self::EntityRef<'a>) -> O; @@ -156,53 +174,52 @@ where where F: for<'a> AsyncFnOnce(Self::EntityRef<'a>) -> O; - fn with_by_id<O, F>(&self, id: Idx, f: F) -> O + fn with_by_id<O, F>(&self, id: Self::Idx, f: F) -> O where - F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRef<'a>>) -> O, + F: for<'a> FnOnce(ComponentsById<'a, Self::EntityRef<'a>>) -> O, { - self.with(|components| f(components.index(id))) + self.with(|components| f(components.into_components_by_id(id))) } - fn with_by_id_async<O, F>(&self, id: Idx, f: F) -> impl Future<Output = O> + fn with_by_id_async<O, F>(&self, id: Self::Idx, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(ComponentsById<Idx, Self::EntityRef<'a>>) -> O, + F: for<'a> AsyncFnOnce(ComponentsById<'a, Self::EntityRef<'a>>) -> O, { - self.with_async(async |components| f(components.index(id)).await) + self.with_async(async |components| f(components.into_components_by_id(id)).await) } } -pub trait EntityComponentSystemMut<Idx>: EntityComponentSystem<Idx, Borrow> { +pub trait EntityComponentSystemMut: EntityComponentSystem<Borrow> { type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, Self::Entity, Borrow>> - + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, Borrow>> - where - Self: 'a; + + IntoComponentsById<Idx = Self::Idx, Output = MappingByIdMut<'a, Self::Entity, Borrow>>; fn with_mut<O, F>(&mut self, f: F) -> O where F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O; - fn with_by_id_mut<O, F>(&mut self, id: Idx, f: F) -> O + fn with_by_id_mut<O, F>(&mut self, id: Self::Idx, f: F) -> O where - F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O, + F: for<'a> FnOnce(ComponentsById<'a, Self::EntityRefMut<'a>>) -> O, { - self.with_mut(|components| f(components.index(id))) + self.with_mut(|components| f(components.into_components_by_id(id))) } } -pub trait EntityComponentSystemMutCell<Idx>: EntityComponentSystem<Idx, RefCell> { - type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, Self::Entity, RefCell>> - + IndexComponents<Idx, Output = MappingByIdMut<'a, Self::Entity, RefCell>> - where - Self: 'a; +pub trait EntityComponentSystemMutCell: EntityComponentSystem<InteriorMutability> { + type EntityRefMut<'a>: IntoComponents<Components = MappingMut<'a, Self::Entity, InteriorMutability>> + + IntoComponentsById< + Idx = Self::Idx, + Output = MappingByIdMut<'a, Self::Entity, InteriorMutability>, + >; fn with_mut<O, F>(&self, f: F) -> O where F: for<'a> FnOnce(Self::EntityRefMut<'a>) -> O; - fn with_by_id_mut<O, F>(&self, id: Idx, f: F) -> O + fn with_by_id_mut<O, F>(&self, id: Self::Idx, f: F) -> O where - F: for<'a> FnOnce(ComponentsById<Idx, Self::EntityRefMut<'a>>) -> O, + F: for<'a> FnOnce(ComponentsById<'a, Self::EntityRefMut<'a>>) -> O, { - self.with_mut(|components| f(components.index(id))) + self.with_mut(|components| f(components.into_components_by_id(id))) } } diff --git a/core/server/src/streaming/partitions/partition2.rs b/core/server/src/streaming/partitions/partition2.rs index 3c5cab3a..c5633048 100644 --- a/core/server/src/streaming/partitions/partition2.rs +++ b/core/server/src/streaming/partitions/partition2.rs @@ -1,7 +1,7 @@ use crate::{ slab::{ - partitions::{Partitions, SlabId}, - traits_ext::{EntityMarker, IndexComponents, IntoComponents}, + partitions::{self, Partitions}, + traits_ext::{EntityMarker, IntoComponents, IntoComponentsById}, }, streaming::{ deduplication::message_deduplicator::{self, MessageDeduplicator}, @@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicU64}; #[derive(Debug)] pub struct Partition { - info: PartitionInfo, + root: PartitionRoot, stats: Arc<PartitionStats>, message_deduplicator: Option<MessageDeduplicator>, offset: Arc<AtomicU64>, @@ -25,7 +25,7 @@ pub struct Partition { impl Partition { pub fn new( - info: PartitionInfo, + root: PartitionRoot, stats: Arc<PartitionStats>, message_deduplicator: Option<MessageDeduplicator>, offset: Arc<AtomicU64>, @@ -33,7 +33,7 @@ impl Partition { consumer_group_offset: Arc<papaya::HashMap<usize, consumer_offset::ConsumerOffset>>, ) -> Self { Self { - info, + root, stats, message_deduplicator, offset, @@ -41,20 +41,12 @@ impl Partition { consumer_group_offset, } } - - pub fn update_id(&mut self, id: usize) { - self.info.id = id; - } - - pub fn id(&self) -> usize { - self.info.id - } } impl Clone for Partition { fn clone(&self) -> Self { Self { - info: self.info.clone(), + root: self.root.clone(), stats: Arc::clone(&self.stats), message_deduplicator: self.message_deduplicator.clone(), offset: Arc::clone(&self.offset), @@ -64,11 +56,21 @@ impl Clone for Partition { } } -impl EntityMarker for Partition {} +impl EntityMarker for Partition { + type Idx = partitions::ContainerId; + + fn id(&self) -> Self::Idx { + self.root.id + } + + fn update_id(&mut self, id: Self::Idx) { + self.root.id = id; + } +} impl IntoComponents for Partition { type Components = ( - PartitionInfo, + PartitionRoot, Arc<PartitionStats>, Option<MessageDeduplicator>, Arc<AtomicU64>, @@ -78,7 +80,7 @@ impl IntoComponents for Partition { fn into_components(self) -> Self::Components { ( - self.info, + self.root, self.stats, self.message_deduplicator, self.offset, @@ -89,13 +91,13 @@ impl IntoComponents for Partition { } #[derive(Default, Debug, Clone)] -pub struct PartitionInfo { +pub struct PartitionRoot { id: usize, created_at: IggyTimestamp, should_increment_offset: bool, } -impl PartitionInfo { +impl PartitionRoot { pub fn new(created_at: IggyTimestamp, should_increment_offset: bool) -> Self { Self { id: 0, @@ -113,9 +115,9 @@ impl PartitionInfo { } } -// TODO: Probably move this to the `slab` module +// TODO: Create a macro to impl those PartitionRef/PartitionRefMut structs and it's traits. pub struct PartitionRef<'a> { - info: &'a Slab<PartitionInfo>, + root: &'a Slab<PartitionRoot>, stats: &'a Slab<Arc<PartitionStats>>, message_deduplicator: &'a Slab<Option<MessageDeduplicator>>, offset: &'a Slab<Arc<AtomicU64>>, @@ -125,7 +127,7 @@ pub struct PartitionRef<'a> { impl<'a> PartitionRef<'a> { pub fn new( - info: &'a Slab<PartitionInfo>, + root: &'a Slab<PartitionRoot>, stats: &'a Slab<Arc<PartitionStats>>, message_deduplicator: &'a Slab<Option<MessageDeduplicator>>, offset: &'a Slab<Arc<AtomicU64>>, @@ -135,7 +137,7 @@ impl<'a> PartitionRef<'a> { >, ) -> Self { Self { - info, + root, stats, message_deduplicator, offset, @@ -147,7 +149,7 @@ impl<'a> PartitionRef<'a> { impl<'a> IntoComponents for PartitionRef<'a> { type Components = ( - &'a Slab<PartitionInfo>, + &'a Slab<PartitionRoot>, &'a Slab<Arc<PartitionStats>>, &'a Slab<Option<MessageDeduplicator>>, &'a Slab<Arc<AtomicU64>>, @@ -157,7 +159,7 @@ impl<'a> IntoComponents for PartitionRef<'a> { fn into_components(self) -> Self::Components { ( - self.info, + self.root, self.stats, self.message_deduplicator, self.offset, @@ -167,9 +169,10 @@ impl<'a> IntoComponents for PartitionRef<'a> { } } -impl<'a> IndexComponents<SlabId> for PartitionRef<'a> { +impl<'a> IntoComponentsById for PartitionRef<'a> { + type Idx = partitions::ContainerId; type Output = ( - &'a PartitionInfo, + &'a PartitionRoot, &'a Arc<PartitionStats>, &'a Option<MessageDeduplicator>, &'a Arc<AtomicU64>, @@ -177,9 +180,9 @@ impl<'a> IndexComponents<SlabId> for PartitionRef<'a> { &'a Arc<papaya::HashMap<usize, consumer_offset::ConsumerOffset>>, ); - fn index(&self, index: SlabId) -> Self::Output { + fn into_components_by_id(self, index: Self::Idx) -> Self::Output { ( - &self.info[index], + &self.root[index], &self.stats[index], &self.message_deduplicator[index], &self.offset[index], diff --git a/core/server/src/streaming/segments/segment2.rs b/core/server/src/streaming/segments/segment2.rs index 9d2f6e70..237c9a3a 100644 --- a/core/server/src/streaming/segments/segment2.rs +++ b/core/server/src/streaming/segments/segment2.rs @@ -1,4 +1,4 @@ -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Segment2 { pub id: usize, } diff --git a/core/server/src/streaming/streams/stream2.rs b/core/server/src/streaming/streams/stream2.rs index 6e4eaafb..309e4b95 100644 --- a/core/server/src/streaming/streams/stream2.rs +++ b/core/server/src/streaming/streams/stream2.rs @@ -1,29 +1,28 @@ -use bytes::{BufMut, BytesMut}; use iggy_common::IggyTimestamp; use slab::Slab; - -use crate::slab::{Keyed, topics::Topics}; - -#[derive(Debug)] -pub struct Stream { +use std::{ + cell::{Ref, RefMut}, + sync::Arc, +}; + +use crate::{ + slab::{ + Keyed, streams, + topics::Topics, + traits_ext::{EntityMarker, IntoComponents, IntoComponentsById}, + }, + streaming::stats::stats::StreamStats, +}; + +#[derive(Debug, Clone)] +pub struct StreamRoot { id: usize, name: String, created_at: IggyTimestamp, topics: Topics, } -impl Default for Stream { - fn default() -> Self { - Self { - id: 0, - name: String::new(), - created_at: IggyTimestamp::now(), - topics: Topics::init(), - } - } -} - -impl Keyed for Stream { +impl Keyed for StreamRoot { type Key = String; fn key(&self) -> &Self::Key { @@ -31,29 +30,16 @@ impl Keyed for Stream { } } -impl Stream { - pub fn new(name: String) -> Self { - let now = IggyTimestamp::now(); +impl StreamRoot { + pub fn new(name: String, created_at: IggyTimestamp) -> Self { Self { id: 0, name, - created_at: now, - topics: Topics::init(), + created_at, + topics: Topics::default(), } } - pub fn invoke<T>(&self, f: impl FnOnce(&Self) -> T) -> T { - f(self) - } - - pub fn invoke_mut<T>(&mut self, f: impl FnOnce(&mut Self) -> T) -> T { - f(self) - } - - pub async fn invoke_async<T>(&self, f: impl AsyncFnOnce(&Self) -> T) -> T { - f(self).await - } - pub fn id(&self) -> usize { self.id } @@ -70,13 +56,6 @@ impl Stream { self.topics.len() } - pub fn insert_into(self, container: &mut Slab<Self>) -> usize { - let idx = container.insert(self); - let stream = &mut container[idx]; - stream.id = idx; - idx - } - pub fn topics(&self) -> &Topics { &self.topics } @@ -85,3 +64,110 @@ impl Stream { &mut self.topics } } + +#[derive(Debug, Clone)] +pub struct Stream { + root: StreamRoot, + stats: Arc<StreamStats>, +} + +impl IntoComponents for Stream { + type Components = (StreamRoot, Arc<StreamStats>); + + fn into_components(self) -> Self::Components { + (self.root, self.stats) + } +} + +impl EntityMarker for Stream { + type Idx = streams::ContainerId; + + fn id(&self) -> Self::Idx { + self.root.id + } + + fn update_id(&mut self, id: Self::Idx) { + self.root.id = id; + } +} + +impl Stream { + pub fn new(name: String, stats: Arc<StreamStats>, created_at: IggyTimestamp) -> Self { + let root = StreamRoot::new(name, created_at); + Self { root, stats } + } + + pub fn stats(&self) -> &Arc<StreamStats> { + &self.stats + } + + pub fn root(&self) -> &StreamRoot { + &self.root + } +} + +pub struct StreamRef<'a> { + root: Ref<'a, Slab<StreamRoot>>, + stats: Ref<'a, Slab<Arc<StreamStats>>>, +} + +impl<'a> StreamRef<'a> { + pub fn new(root: Ref<'a, Slab<StreamRoot>>, stats: Ref<'a, Slab<Arc<StreamStats>>>) -> Self { + Self { root, stats } + } +} + +impl<'a> IntoComponents for StreamRef<'a> { + type Components = (Ref<'a, Slab<StreamRoot>>, Ref<'a, Slab<Arc<StreamStats>>>); + + fn into_components(self) -> Self::Components { + (self.root, self.stats) + } +} + +impl<'a> IntoComponentsById for StreamRef<'a> { + type Idx = streams::ContainerId; + type Output = (Ref<'a, StreamRoot>, Ref<'a, Arc<StreamStats>>); + + fn into_components_by_id(self, index: Self::Idx) -> Self::Output { + let root = Ref::map(self.root, |r| &r[index]); + let stats = Ref::map(self.stats, |s| &s[index]); + (root, stats) + } +} + +pub struct StreamRefMut<'a> { + root: RefMut<'a, Slab<StreamRoot>>, + stats: RefMut<'a, Slab<Arc<StreamStats>>>, +} + +impl<'a> StreamRefMut<'a> { + pub fn new( + root: RefMut<'a, Slab<StreamRoot>>, + stats: RefMut<'a, Slab<Arc<StreamStats>>>, + ) -> Self { + Self { root, stats } + } +} + +impl<'a> IntoComponents for StreamRefMut<'a> { + type Components = ( + RefMut<'a, Slab<StreamRoot>>, + RefMut<'a, Slab<Arc<StreamStats>>>, + ); + + fn into_components(self) -> Self::Components { + (self.root, self.stats) + } +} + +impl<'a> IntoComponentsById for StreamRefMut<'a> { + type Idx = streams::ContainerId; + type Output = (RefMut<'a, StreamRoot>, RefMut<'a, Arc<StreamStats>>); + + fn into_components_by_id(self, index: Self::Idx) -> Self::Output { + let root = RefMut::map(self.root, |r| &mut r[index]); + let stats = RefMut::map(self.stats, |s| &mut s[index]); + (root, stats) + } +} diff --git a/core/server/src/streaming/topics/consumer_group2.rs b/core/server/src/streaming/topics/consumer_group2.rs index f1da2f85..d3785052 100644 --- a/core/server/src/streaming/topics/consumer_group2.rs +++ b/core/server/src/streaming/topics/consumer_group2.rs @@ -17,7 +17,7 @@ use tracing::trace; pub const MEMBERS_CAPACITY: usize = 128; -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct ConsumerGroup { id: usize, name: String, @@ -156,6 +156,17 @@ pub struct Member { current_partition_idx: AtomicUsize, } +impl Clone for Member { + fn clone(&self) -> Self { + Self { + id: self.id.clone(), + client_id: self.client_id.clone(), + partitions: self.partitions.clone(), + current_partition_idx: AtomicUsize::new(0), + } + } +} + impl Member { pub fn new(client_id: u32) -> Self { Member { diff --git a/core/server/src/streaming/topics/topic2.rs b/core/server/src/streaming/topics/topic2.rs index f66cfda5..01586a87 100644 --- a/core/server/src/streaming/topics/topic2.rs +++ b/core/server/src/streaming/topics/topic2.rs @@ -1,18 +1,14 @@ -use crate::streaming::stats::stats::PartitionStats; -use crate::{ - slab::{ - Keyed, - consumer_groups::ConsumerGroups, - partitions::{PARTITIONS_CAPACITY, Partitions}, - }, - streaming::partitions::consumer_offset, -}; +use crate::slab::topics; +use crate::slab::traits_ext::{EntityMarker, IntoComponents, IntoComponentsById}; +use crate::slab::{Keyed, consumer_groups::ConsumerGroups, partitions::Partitions}; +use crate::streaming::stats::stats::{PartitionStats, TopicStats}; use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; use slab::Slab; +use std::cell::{Ref, RefMut}; use std::sync::Arc; -#[derive(Default, Debug)] -pub struct Topic { +#[derive(Default, Debug, Clone)] +pub struct TopicRoot { id: usize, // TODO: This property should be removed, we won't use it in our clustering impl. replication_factor: u8, @@ -26,9 +22,136 @@ pub struct Topic { consumer_groups: ConsumerGroups, } +impl Keyed for TopicRoot { + type Key = String; + + fn key(&self) -> &Self::Key { + &self.name + } +} + +#[derive(Debug, Clone)] +pub struct Topic { + root: TopicRoot, + stats: Arc<TopicStats>, +} + impl Topic { pub fn new( name: String, + stats: Arc<TopicStats>, + created_at: IggyTimestamp, + replication_factor: u8, + message_expiry: IggyExpiry, + compression: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + ) -> Self { + let root = TopicRoot::new( + name, + created_at, + replication_factor, + message_expiry, + compression, + max_topic_size, + ); + Self { root, stats } + } + + pub fn root(&self) -> &TopicRoot { + &self.root + } +} + +impl IntoComponents for Topic { + type Components = (TopicRoot, Arc<TopicStats>); + + fn into_components(self) -> Self::Components { + (self.root, self.stats) + } +} + +impl EntityMarker for Topic { + type Idx = topics::ContainerId; + fn id(&self) -> Self::Idx { + self.root.id + } + + fn update_id(&mut self, id: Self::Idx) { + self.root.id = id; + } +} + +// TODO: Create a macro to impl those TopicRef/TopicRefMut structs and it's traits. +pub struct TopicRef<'a> { + root: Ref<'a, Slab<TopicRoot>>, + stats: Ref<'a, Slab<Arc<TopicStats>>>, +} + +impl<'a> TopicRef<'a> { + pub fn new(root: Ref<'a, Slab<TopicRoot>>, stats: Ref<'a, Slab<Arc<TopicStats>>>) -> Self { + Self { root, stats } + } +} + +impl<'a> IntoComponents for TopicRef<'a> { + type Components = (Ref<'a, Slab<TopicRoot>>, Ref<'a, Slab<Arc<TopicStats>>>); + + fn into_components(self) -> Self::Components { + (self.root, self.stats) + } +} + +impl<'a> IntoComponentsById for TopicRef<'a> { + type Idx = topics::ContainerId; + type Output = (Ref<'a, TopicRoot>, Ref<'a, Arc<TopicStats>>); + + fn into_components_by_id(self, index: Self::Idx) -> Self::Output { + let root = Ref::map(self.root, |r| &r[index]); + let stats = Ref::map(self.stats, |s| &s[index]); + (root, stats) + } +} + +pub struct TopicRefMut<'a> { + root: RefMut<'a, Slab<TopicRoot>>, + stats: RefMut<'a, Slab<Arc<TopicStats>>>, +} + +impl<'a> TopicRefMut<'a> { + pub fn new( + root: RefMut<'a, Slab<TopicRoot>>, + stats: RefMut<'a, Slab<Arc<TopicStats>>>, + ) -> Self { + Self { root, stats } + } +} + +impl<'a> IntoComponents for TopicRefMut<'a> { + type Components = ( + RefMut<'a, Slab<TopicRoot>>, + RefMut<'a, Slab<Arc<TopicStats>>>, + ); + + fn into_components(self) -> Self::Components { + (self.root, self.stats) + } +} + +impl<'a> IntoComponentsById for TopicRefMut<'a> { + type Idx = topics::ContainerId; + type Output = (RefMut<'a, TopicRoot>, RefMut<'a, Arc<TopicStats>>); + + fn into_components_by_id(self, index: Self::Idx) -> Self::Output { + let root = RefMut::map(self.root, |r| &mut r[index]); + let stats = RefMut::map(self.stats, |s| &mut s[index]); + (root, stats) + } +} + +impl TopicRoot { + pub fn new( + name: String, + created_at: IggyTimestamp, replication_factor: u8, message_expiry: IggyExpiry, compression: CompressionAlgorithm, @@ -121,11 +244,3 @@ impl Topic { idx } } - -impl Keyed for Topic { - type Key = String; - - fn key(&self) -> &Self::Key { - &self.name - } -}