This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_concurrency_bug in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 6b0fc4be1006a31208e77ae75cfeabf9817c3e60 Author: numminex <[email protected]> AuthorDate: Wed Oct 1 18:19:16 2025 +0200 temp --- .../consumer_groups/get_consumer_group_handler.rs | 19 +- .../consumer_groups/get_consumer_groups_handler.rs | 21 +- .../binary/handlers/topics/get_topic_handler.rs | 14 +- .../binary/handlers/topics/get_topics_handler.rs | 17 +- core/server/src/http/consumer_groups.rs | 66 +-- core/server/src/shard/mod.rs | 81 +-- core/server/src/shard/system/consumer_offsets.rs | 48 +- core/server/src/shard/system/messages.rs | 48 +- core/server/src/slab/consumer_groups.rs | 18 +- core/server/src/slab/helpers.rs | 16 +- core/server/src/slab/partitions.rs | 14 +- core/server/src/slab/streams.rs | 652 +++++++++++++++++---- core/server/src/slab/topics.rs | 34 +- core/server/src/streaming/partitions/helpers.rs | 325 +--------- core/server/src/streaming/partitions/partition2.rs | 28 +- core/server/src/streaming/segments/storage.rs | 28 +- .../streaming/segments/types/messages_batch_mut.rs | 3 +- core/server/src/streaming/topics/storage2.rs | 188 ------ 18 files changed, 753 insertions(+), 867 deletions(-) diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs index 5330e748..a056659c 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs @@ -59,18 +59,13 @@ impl ServerCommandHandler for GetConsumerGroup { numeric_topic_id as u32, ); - shard - .streams2 - .with_consumer_group_by_id_async( - &self.stream_id, - &self.topic_id, - &self.group_id, - async |(root, members)| { - let consumer_group = mapper::map_consumer_group(root, members); - sender.send_ok_response(&consumer_group).await - }, - ) - .await?; + let consumer_group = shard.streams2.with_consumer_group_by_id( + &self.stream_id, + &self.topic_id, + &self.group_id, + |(root, members)| mapper::map_consumer_group(root, members), + ); + sender.send_ok_response(&consumer_group).await?; Ok(()) } } diff --git a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs index 1c9ecd95..3740d44a 100644 --- a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs +++ b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs @@ -61,17 +61,16 @@ impl ServerCommandHandler for GetConsumerGroups { numeric_topic_id as u32, ); - shard - .streams2 - .with_consumer_groups_async(&self.stream_id, &self.topic_id, async |cgs| { - cgs.with_components_async(async |cgs| { - let (roots, members) = cgs.into_components(); - let consumer_groups = mapper::map_consumer_groups(roots, members); - sender.send_ok_response(&consumer_groups).await - }) - .await - }) - .await?; + let consumer_groups = + shard + .streams2 + .with_consumer_groups(&self.stream_id, &self.topic_id, |cgs| { + cgs.with_components(|cgs| { + let (roots, members) = cgs.into_components(); + mapper::map_consumer_groups(roots, members) + }) + }); + sender.send_ok_response(&consumer_groups).await?; Ok(()) } } diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs b/core/server/src/binary/handlers/topics/get_topic_handler.rs index 653ce995..e06a07a7 100644 --- a/core/server/src/binary/handlers/topics/get_topic_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs @@ -53,13 +53,13 @@ impl ServerCommandHandler for GetTopic { self.topic_id.get_u32_value().unwrap_or(0), )?; - shard - .streams2 - .with_topic_by_id_async(&self.stream_id, &self.topic_id, async |(root, _, stats)| { - let response = mapper::map_topic(&root, &stats); - sender.send_ok_response(&response).await - }) - .await?; + let response = + shard + .streams2 + .with_topic_by_id(&self.stream_id, &self.topic_id, |(root, _, stats)| { + mapper::map_topic(&root, &stats) + }); + sender.send_ok_response(&response).await?; Ok(()) } } diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs b/core/server/src/binary/handlers/topics/get_topics_handler.rs index 561c0adc..0c78e90e 100644 --- a/core/server/src/binary/handlers/topics/get_topics_handler.rs +++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs @@ -53,18 +53,13 @@ impl ServerCommandHandler for GetTopics { .borrow() .get_topics(session.get_user_id(), numeric_stream_id as u32); - shard - .streams2 - .with_topics_async(&self.stream_id, async |topics| { - topics - .with_components_async(async |topics| { - let (roots, _, stats) = topics.into_components(); - let response = mapper::map_topics(&roots, &stats); - sender.send_ok_response(&response).await - }) - .await + let response = shard.streams2.with_topics(&self.stream_id, |topics| { + topics.with_components(|topics| { + let (roots, _, stats) = topics.into_components(); + mapper::map_topics(&roots, &stats) }) - .await?; + }); + sender.send_ok_response(&response).await?; Ok(()) } } diff --git a/core/server/src/http/consumer_groups.rs b/core/server/src/http/consumer_groups.rs index 56f4ecf4..d4883470 100644 --- a/core/server/src/http/consumer_groups.rs +++ b/core/server/src/http/consumer_groups.rs @@ -93,21 +93,12 @@ async fn get_consumer_group( numeric_topic_id as u32, )?; - let consumer_group = { - let future = SendWrapper::new( - state - .shard - .shard() - .streams2 - .with_consumer_group_by_id_async( - &identifier_stream_id, - &identifier_topic_id, - &identifier_group_id, - async |(root, members)| mapper::map_consumer_group(root, members), - ), - ); - future.await - }; + let consumer_group = state.shard.shard().streams2.with_consumer_group_by_id( + &identifier_stream_id, + &identifier_topic_id, + &identifier_group_id, + |(root, members)| mapper::map_consumer_group(root, members), + ); Ok(Json(consumer_group)) } @@ -150,20 +141,16 @@ async fn get_consumer_groups( numeric_topic_id as u32, )?; - let consumer_groups = { - let future = SendWrapper::new(state.shard.shard().streams2.with_consumer_groups_async( - &identifier_stream_id, - &identifier_topic_id, - async |cgs| { - cgs.with_components_async(async |cgs| { - let (roots, members) = cgs.into_components(); - mapper::map_consumer_groups(roots, members) - }) - .await - }, - )); - future.await - }; + let consumer_groups = state.shard.shard().streams2.with_consumer_groups( + &identifier_stream_id, + &identifier_topic_id, + |cgs| { + cgs.with_components(|cgs| { + let (roots, members) = cgs.into_components(); + mapper::map_consumer_groups(roots, members) + }) + }, + ); Ok(Json(consumer_groups)) } @@ -213,21 +200,12 @@ async fn create_consumer_group( // Get the created consumer group details let group_id_identifier = Identifier::numeric(group_id as u32).unwrap(); - let consumer_group_details = { - let future = SendWrapper::new( - state - .shard - .shard() - .streams2 - .with_consumer_group_by_id_async( - &command.stream_id, - &command.topic_id, - &group_id_identifier, - async |(root, members)| mapper::map_consumer_group(root, members), - ), - ); - future.await - }; + let consumer_group_details = state.shard.shard().streams2.with_consumer_group_by_id( + &command.stream_id, + &command.topic_id, + &group_id_identifier, + |(root, members)| mapper::map_consumer_group(root, members), + ); // Apply state change let entry_command = EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 035cdaac..8a5b7d8b 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -572,55 +572,58 @@ impl IggyShard { ); match consumer { PollingConsumer::Consumer(consumer_id, _) => { - self.streams2.with_partition_by_id( + let (offset_value, path) = self.streams2.with_partition_by_id( &stream_id, &topic_id, partition_id, - partitions::helpers::store_consumer_offset( - consumer_id, - numeric_stream_id, - numeric_topic_id, - partition_id, - offset, - &self.config.system, - ), - ); - self.streams2 - .with_partition_by_id_async( - &stream_id, - &topic_id, - partition_id, - partitions::helpers::persist_consumer_offset_to_disk( - self.id, + |(.., offsets, _, _)| { + let hdl = offsets.pin(); + let item = hdl.get_or_insert( consumer_id, - ), - ) - .await?; + crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer( + consumer_id as u32, + &self.config.system.get_consumer_offsets_path(numeric_stream_id, numeric_topic_id, partition_id), + ), + ); + item.offset.store(offset, std::sync::atomic::Ordering::Relaxed); + let offset_value = item.offset.load(std::sync::atomic::Ordering::Relaxed); + let path = item.path.clone(); + (offset_value, path) + }, + ); + crate::streaming::partitions::storage2::persist_offset( + self.id, + &path, + offset_value, + ) + .await?; } PollingConsumer::ConsumerGroup(cg_id, _) => { - self.streams2.with_partition_by_id( + let (offset_value, path) = self.streams2.with_partition_by_id( &stream_id, &topic_id, partition_id, - partitions::helpers::store_consumer_group_member_offset( - cg_id, - numeric_stream_id, - numeric_topic_id, - partition_id, - offset, - &self.config.system, - ), - ); - self.streams2.with_partition_by_id_async( - &stream_id, - &topic_id, - partition_id, - partitions::helpers::persist_consumer_group_member_offset_to_disk( - self.id, + |(.., offsets, _)| { + let hdl = offsets.pin(); + let item = hdl.get_or_insert( cg_id, - ), - ) - .await?; + crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group( + cg_id as u32, + &self.config.system.get_consumer_group_offsets_path(numeric_stream_id, numeric_topic_id, partition_id), + ), + ); + item.offset.store(offset, std::sync::atomic::Ordering::Relaxed); + let offset_value = item.offset.load(std::sync::atomic::Ordering::Relaxed); + let path = item.path.clone(); + (offset_value, path) + }, + ); + crate::streaming::partitions::storage2::persist_offset( + self.id, + &path, + offset_value, + ) + .await?; } } } diff --git a/core/server/src/shard/system/consumer_offsets.rs b/core/server/src/shard/system/consumer_offsets.rs index 5e4f4ff5..af5f71bb 100644 --- a/core/server/src/shard/system/consumer_offsets.rs +++ b/core/server/src/shard/system/consumer_offsets.rs @@ -258,26 +258,38 @@ impl IggyShard { ) -> Result<(), IggyError> { match polling_consumer { PollingConsumer::Consumer(id, _) => { - self.streams2 - .with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - partitions::helpers::persist_consumer_offset_to_disk(self.id, *id), - ) - .await + let (offset_value, path) = self.streams2.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(.., offsets, _, _)| { + let hdl = offsets.pin(); + let item = hdl + .get(id) + .expect("persist_consumer_offset_to_disk: offset not found"); + let offset = item.offset.load(std::sync::atomic::Ordering::Relaxed); + let path = item.path.clone(); + (offset, path) + }, + ); + partitions::storage2::persist_offset(self.id, &path, offset_value).await } PollingConsumer::ConsumerGroup(_, id) => { - self.streams2 - .with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - partitions::helpers::persist_consumer_group_member_offset_to_disk( - self.id, *id, - ), - ) - .await + let (offset_value, path) = self.streams2.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(.., offsets, _)| { + let hdl = offsets.pin(); + let item = hdl.get(id).expect( + "persist_consumer_group_member_offset_to_disk: offset not found", + ); + let offset = item.offset.load(std::sync::atomic::Ordering::Relaxed); + let path = item.path.clone(); + (offset, path) + }, + ); + partitions::storage2::persist_offset(self.id, &path, offset_value).await } } } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 320ce902..211d4a54 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -281,16 +281,23 @@ impl IggyShard { &self.config.system, ), ); - self.streams2 - .with_partition_by_id_async( - &stream_id, - &topic_id, - partition_id, - partitions::helpers::persist_consumer_offset_to_disk( - self.id, - consumer_id, - ), - ) + + let (offset_value, path) = self.streams2.with_partition_by_id( + &stream_id, + &topic_id, + partition_id, + |(.., offsets, _, _)| { + let hdl = offsets.pin(); + let item = hdl.get(&consumer_id).expect( + "persist_consumer_offset_to_disk: offset not found", + ); + let offset = + item.offset.load(std::sync::atomic::Ordering::Relaxed); + let path = item.path.clone(); + (offset, path) + }, + ); + partitions::storage2::persist_offset(self.id, &path, offset_value) .await?; } PollingConsumer::ConsumerGroup(cg_id, _) => { @@ -307,16 +314,23 @@ impl IggyShard { &self.config.system, ), ); - self.streams2.with_partition_by_id_async( + + let (offset_value, path) = self.streams2.with_partition_by_id( &stream_id, &topic_id, partition_id, - partitions::helpers::persist_consumer_group_member_offset_to_disk( - self.id, - cg_id, - ), - ) - .await?; + |(.., offsets, _)| { + let hdl = offsets.pin(); + let item = hdl + .get(&cg_id) + .expect("persist_consumer_group_member_offset_to_disk: offset not found"); + let offset = item.offset.load(std::sync::atomic::Ordering::Relaxed); + let path = item.path.clone(); + (offset, path) + }, + ); + partitions::storage2::persist_offset(self.id, &path, offset_value) + .await?; } } } diff --git a/core/server/src/slab/consumer_groups.rs b/core/server/src/slab/consumer_groups.rs index 683c3a0e..101364fe 100644 --- a/core/server/src/slab/consumer_groups.rs +++ b/core/server/src/slab/consumer_groups.rs @@ -3,7 +3,7 @@ use crate::{ Keyed, consumer_groups, traits_ext::{ Borrow, ComponentsById, Delete, EntityComponentSystem, EntityComponentSystemMut, - EntityMarker, Insert, IntoComponents, IntoComponentsById, + Insert, IntoComponents, }, }, streaming::topics::consumer_group2::{self, ConsumerGroupRef, ConsumerGroupRefMut}, @@ -81,12 +81,11 @@ impl EntityComponentSystem<Borrow> for ConsumerGroups { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, - { - f(self.into()) + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { + f(self.into()) } } @@ -146,15 +145,6 @@ impl ConsumerGroups { let id = self.get_index(identifier); self.with_components_by_id_mut(id, |components| f(components)) } - - pub fn with_consumer_group_by_id_async<T>( - &self, - identifier: &Identifier, - f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T, - ) -> impl Future<Output = T> { - let id = self.get_index(identifier); - self.with_components_by_id_async(id, async |components| f(components).await) - } } impl Default for ConsumerGroups { diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs index 31a5a8b7..d2fe7487 100644 --- a/core/server/src/slab/helpers.rs +++ b/core/server/src/slab/helpers.rs @@ -1,13 +1,10 @@ use crate::{ slab::{ - consumer_groups::ConsumerGroups, - partitions::{self, ContainerId, Partitions}, - topics::Topics, + consumer_groups::ConsumerGroups, partitions::Partitions, topics::Topics, traits_ext::ComponentsById, }, streaming::{ - partitions::log::Log, - streams::stream2::{StreamRef, StreamRefMut}, + streams::stream2::StreamRef, topics::topic2::{TopicRef, TopicRefMut}, }, }; @@ -26,7 +23,6 @@ where { async |(root, ..)| f(root.topics()).await } - pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O where F: for<'a> FnOnce(&'a Topics) -> O, @@ -47,7 +43,6 @@ where { async |(root, ..)| f(root.partitions()).await } - pub fn partitions_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRefMut>) -> O where F: for<'a> FnOnce(&'a mut Partitions) -> O, @@ -68,10 +63,3 @@ where { |(mut root, ..)| f(root.consumer_groups_mut()) } - -pub fn consumer_groups_async<O, F>(f: F) -> impl AsyncFnOnce(ComponentsById<TopicRef>) -> O -where - F: for<'a> AsyncFnOnce(&'a ConsumerGroups) -> O, -{ - async |(root, ..)| f(root.consumer_groups()).await -} diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index 678846c3..b678bc6c 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -1,7 +1,7 @@ use crate::{ slab::traits_ext::{ Borrow, ComponentsById, Delete, EntityComponentSystem, EntityComponentSystemMut, Insert, - IntoComponents, IntoComponentsById, + IntoComponents, }, streaming::{ deduplication::message_deduplicator::MessageDeduplicator, @@ -17,10 +17,7 @@ use crate::{ }, }; use slab::Slab; -use std::{ - future::Future, - sync::{Arc, atomic::AtomicU64}, -}; +use std::sync::{Arc, atomic::AtomicU64}; // TODO: This could be upper limit of partitions per topic, use that value to validate instead of whathever this thing is in `common` crate. pub const PARTITIONS_CAPACITY: usize = 16384; @@ -30,7 +27,7 @@ pub type ContainerId = usize; pub struct Partitions { root: Slab<partition2::PartitionRoot>, stats: Slab<Arc<PartitionStats>>, - message_deduplicator: Slab<Option<MessageDeduplicator>>, + message_deduplicator: Slab<Option<Arc<MessageDeduplicator>>>, offset: Slab<Arc<AtomicU64>>, consumer_offset: Slab<Arc<ConsumerOffsets>>, @@ -167,11 +164,10 @@ impl EntityComponentSystem<Borrow> for Partitions { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, - { + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { f(self.into()) } } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 7fadce50..79947bb5 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1,5 +1,8 @@ use crate::shard::task_registry::TaskRegistry; +use crate::shard_trace; use crate::streaming::partitions as streaming_partitions; +use crate::streaming::segments::IggyIndexesMut; +use crate::streaming::segments::storage::Storage; use crate::{ binary::handlers::messages::poll_messages_handler::IggyPollMetadata, configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig}, @@ -39,6 +42,7 @@ use crate::{ }, }; use ahash::AHashMap; +use error_set::ErrContext; use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind}; use slab::Slab; use std::{ @@ -141,11 +145,10 @@ impl EntityComponentSystem<InteriorMutability> for Streams { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, - { + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { f(self.into()) } } @@ -192,17 +195,21 @@ impl MainOps for Streams { self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { log.journal().inner().size + log.active_segment().size }); - self.with_partition_by_id_async( + let (segment_start_offset, message_deduplicator) = self.with_partition_by_id( stream_id, topic_id, partition_id, - streaming_partitions::helpers::deduplicate_messages( + streaming_partitions::helpers::get_segment_start_offset_and_deduplicator(), + ); + + input + .prepare_for_persistence( + segment_start_offset, current_offset, current_position, - &mut input, - ), - ) - .await; + message_deduplicator.as_ref(), + ) + .await; let (journal_messages_count, journal_size) = self.with_partition_by_id_mut( stream_id, @@ -460,7 +467,6 @@ impl Streams { let id = self.get_index(id); self.with_components_by_id_async(id, async |stream| f(stream).await) } - pub fn with_stream_by_id_mut<T>( &self, id: &Identifier, @@ -481,7 +487,6 @@ impl Streams { ) -> impl Future<Output = T> { self.with_stream_by_id_async(stream_id, helpers::topics_async(f)) } - pub fn with_topics_mut<T>(&self, stream_id: &Identifier, f: impl FnOnce(&Topics) -> T) -> T { self.with_stream_by_id(stream_id, helpers::topics_mut(f)) } @@ -507,7 +512,6 @@ impl Streams { container.with_topic_by_id_async(topic_id, f).await }) } - pub fn with_topic_by_id_mut<T>( &self, stream_id: &Identifier, @@ -530,17 +534,6 @@ impl Streams { }) } - pub fn with_consumer_groups_async<T>( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - f: impl AsyncFnOnce(&ConsumerGroups) -> T, - ) -> impl Future<Output = T> { - self.with_topics_async(stream_id, async |container| { - container.with_consumer_groups_async(topic_id, f).await - }) - } - pub fn with_consumer_group_by_id<T>( &self, stream_id: &Identifier, @@ -565,18 +558,6 @@ impl Streams { }) } - pub fn with_consumer_group_by_id_async<T>( - &self, - stream_id: &Identifier, - topic_id: &Identifier, - group_id: &Identifier, - f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T, - ) -> impl Future<Output = T> { - self.with_consumer_groups_async(stream_id, topic_id, async move |container| { - container.with_consumer_group_by_id_async(group_id, f).await - }) - } - pub fn with_consumer_groups_mut<T>( &self, stream_id: &Identifier, @@ -599,6 +580,7 @@ impl Streams { }) } + pub fn with_partitions_async<T>( &self, stream_id: &Identifier, @@ -609,7 +591,6 @@ impl Streams { container.with_partitions_async(topic_id, f).await }) } - pub fn with_partitions_mut<T>( &self, stream_id: &Identifier, @@ -644,7 +625,6 @@ impl Streams { container.with_partition_by_id_async(id, f).await }) } - pub fn with_partition_by_id_mut<T>( &self, stream_id: &Identifier, @@ -677,12 +657,261 @@ impl Streams { helpers::get_segment_range_by_offset(offset), ); - self.with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - helpers::get_messages_by_offset_range(offset, count, range), - ) + let mut remaining_count = count; + let mut batches = IggyMessagesBatchSet::empty(); + let mut current_offset = offset; + + for idx in range { + let (segment_start_offset, segment_end_offset) = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + let segment = &log.segments()[idx]; + (segment.start_offset, segment.end_offset) + }, + ); + + let start_offset = if current_offset < segment_start_offset { + segment_start_offset + } else { + current_offset + }; + + let mut end_offset = start_offset + (remaining_count - 1) as u64; + if end_offset > segment_end_offset { + end_offset = segment_end_offset; + } + + // Calculate the actual count to request from this segment + let count: u32 = ((end_offset - start_offset + 1) as u32).min(remaining_count); + + let messages = self + .get_messages_by_offset_base( + stream_id, + topic_id, + partition_id, + idx, + start_offset, + end_offset, + count, + segment_start_offset, + ) + .await?; + + let messages_count = messages.count(); + if messages_count == 0 { + current_offset = segment_end_offset + 1; + continue; + } + + remaining_count = remaining_count.saturating_sub(messages_count); + + if let Some(last_offset) = messages.last_offset() { + current_offset = last_offset + 1; + } else if messages_count > 0 { + current_offset += messages_count as u64; + } + + batches.add_batch_set(messages); + + if remaining_count == 0 { + break; + } + } + + Ok(batches) + } + + async fn get_messages_by_offset_base( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: partitions::ContainerId, + idx: usize, + offset: u64, + end_offset: u64, + count: u32, + segment_start_offset: u64, + ) -> Result<IggyMessagesBatchSet, IggyError> { + if count == 0 { + return Ok(IggyMessagesBatchSet::default()); + } + + let (is_journal_empty, journal_first_offset, journal_last_offset) = self + .with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + let journal = log.journal(); + ( + journal.is_empty(), + journal.inner().base_offset, + journal.inner().current_offset, + ) + }, + ); + + // Case 0: Accumulator is empty, so all messages have to be on disk + if is_journal_empty { + return self + .load_messages_from_disk_by_offset( + stream_id, + topic_id, + partition_id, + idx, + offset, + count, + segment_start_offset, + ) + .await; + } + + // Case 1: All messages are in accumulator buffer + if offset >= journal_first_offset && end_offset <= journal_last_offset { + let batches = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + log.journal() + .get(|batches| batches.get_by_offset(offset, count)) + }, + ); + return Ok(batches); + } + + // Case 2: All messages are on disk + if end_offset < journal_first_offset { + return self + .load_messages_from_disk_by_offset( + stream_id, + topic_id, + partition_id, + idx, + offset, + count, + segment_start_offset, + ) + .await; + } + + // Case 3: Messages span disk and accumulator buffer boundary + // Calculate how many messages we need from disk + let disk_count = if offset < journal_first_offset { + ((journal_first_offset - offset) as u32).min(count) + } else { + 0 + }; + let mut combined_batch_set = IggyMessagesBatchSet::empty(); + + // Load messages from disk if needed + if disk_count > 0 { + let disk_messages = self + .load_messages_from_disk_by_offset( + stream_id, + topic_id, + partition_id, + idx, + offset, + disk_count, + segment_start_offset, + ) + .await + .with_error_context(|error| { + format!("Failed to load messages from disk, start offset: {offset}, count: {disk_count}, error: {error}") + })?; + + if !disk_messages.is_empty() { + combined_batch_set.add_batch_set(disk_messages); + } + } + + // Calculate how many more messages we need from the accumulator + let remaining_count = count - combined_batch_set.count(); + + if remaining_count > 0 { + let accumulator_start_offset = std::cmp::max(offset, journal_first_offset); + let journal_messages = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + log.journal().get(|batches| { + batches.get_by_offset(accumulator_start_offset, remaining_count) + }) + }, + ); + + if !journal_messages.is_empty() { + combined_batch_set.add_batch_set(journal_messages); + } + } + + Ok(combined_batch_set) + } + + async fn load_messages_from_disk_by_offset( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: partitions::ContainerId, + idx: usize, + start_offset: u64, + count: u32, + segment_start_offset: u64, + ) -> Result<IggyMessagesBatchSet, IggyError> { + // Convert start_offset to relative offset within the segment + let relative_start_offset = (start_offset - segment_start_offset) as u32; + + // Use with_partition_by_id_async to perform disk I/O inside the async closure + self.with_partition_by_id_async(stream_id, topic_id, partition_id, async move |(.., log)| { + let storage = log.storages()[idx].clone(); + let indexes = log.indexes()[idx].as_ref(); + + // Load indexes first + let indexes_to_read = if let Some(indexes) = indexes { + if !indexes.is_empty() { + indexes.slice_by_offset(relative_start_offset, count) + } else { + storage + .index_reader + .as_ref() + .expect("Index reader not initialized") + .load_from_disk_by_offset(relative_start_offset, count) + .await? + } + } else { + storage + .index_reader + .as_ref() + .expect("Index reader not initialized") + .load_from_disk_by_offset(relative_start_offset, count) + .await? + }; + + if indexes_to_read.is_none() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let indexes_to_read = indexes_to_read.unwrap(); + let batch = storage + .messages_reader + .as_ref() + .expect("Messages reader not initialized") + .load_messages_from_disk(indexes_to_read) + .await + .with_error_context(|error| format!("Failed to load messages from disk: {error}"))?; + + batch + .validate_checksums_and_offsets(start_offset) + .with_error_context(|error| { + format!("Failed to validate messages read from disk! error: {error}") + })?; + + Ok(IggyMessagesBatchSet::from(batch)) + }) .await } @@ -704,13 +933,200 @@ impl Streams { return Ok(IggyMessagesBatchSet::default()); }; - self.with_partition_by_id_async( + let mut remaining_count = count; + let mut batches = IggyMessagesBatchSet::empty(); + + for idx in range { + let segment_end_timestamp = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + let segment = &log.segments()[idx]; + segment.end_timestamp + }, + ); + + // Skip segments that end before our timestamp + if segment_end_timestamp < timestamp { + continue; + } + + let messages = self + .get_messages_by_timestamp_base( + stream_id, + topic_id, + partition_id, + idx, + timestamp, + remaining_count, + ) + .await?; + + let messages_count = messages.count(); + if messages_count == 0 { + continue; + } + + remaining_count = remaining_count.saturating_sub(messages_count); + batches.add_batch_set(messages); + + if remaining_count == 0 { + break; + } + } + + Ok(batches) + } + + async fn get_messages_by_timestamp_base( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: partitions::ContainerId, + idx: usize, + timestamp: u64, + count: u32, + ) -> Result<IggyMessagesBatchSet, IggyError> { + if count == 0 { + return Ok(IggyMessagesBatchSet::default()); + } + + let (is_journal_empty, journal_first_timestamp, journal_last_timestamp) = self + .with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + let journal = log.journal(); + ( + journal.is_empty(), + journal.inner().first_timestamp, + journal.inner().end_timestamp, + ) + }, + ); + + // Case 0: Accumulator is empty, so all messages have to be on disk + if is_journal_empty { + let (storage, index) = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + let storage = log.storages()[idx].clone(); + let indexes = log.indexes()[idx] + .as_ref() + .map(|indexes| indexes.slice_by_offset(0, u32::MAX).unwrap()); + (storage, indexes) + }, + ); + return Self::load_messages_from_disk_by_timestamp(&storage, &index, timestamp, count) + .await; + } + + // Case 1: All messages are in accumulator buffer (timestamp is after journal ends) + if timestamp > journal_last_timestamp { + return Ok(IggyMessagesBatchSet::empty()); + } + + // Case 1b: Timestamp is within journal range + if timestamp >= journal_first_timestamp { + let batches = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + log.journal() + .get(|batches| batches.get_by_timestamp(timestamp, count)) + }, + ); + return Ok(batches); + } + + // Case 2: All messages are on disk (timestamp is before journal's first timestamp) + let (storage, index) = self.with_partition_by_id( stream_id, topic_id, partition_id, - helpers::get_messages_by_timestamp_range(timestamp, count, range), - ) - .await + |(_, _, _, _, _, _, log)| { + let storage = log.storages()[idx].clone(); + let indexes = log.indexes()[idx] + .as_ref() + .map(|indexes| indexes.slice_by_timestamp(0, u32::MAX).unwrap()); + (storage, indexes) + }, + ); + + let disk_messages = + Self::load_messages_from_disk_by_timestamp(&storage, &index, timestamp, count).await?; + + if disk_messages.count() >= count { + return Ok(disk_messages); + } + + // Case 3: Messages span disk and accumulator buffer boundary + let remaining_count = count - disk_messages.count(); + let journal_messages = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + log.journal() + .get(|batches| batches.get_by_timestamp(timestamp, remaining_count)) + }, + ); + + let mut combined_batch_set = disk_messages; + if !journal_messages.is_empty() { + combined_batch_set.add_batch_set(journal_messages); + } + Ok(combined_batch_set) + } + + async fn load_messages_from_disk_by_timestamp( + storage: &Storage, + index: &Option<IggyIndexesMut>, + timestamp: u64, + count: u32, + ) -> Result<IggyMessagesBatchSet, IggyError> { + let indexes_to_read = if let Some(indexes) = index { + if !indexes.is_empty() { + indexes.slice_by_timestamp(timestamp, count) + } else { + storage + .index_reader + .as_ref() + .expect("Index reader not initialized") + .load_from_disk_by_timestamp(timestamp, count) + .await? + } + } else { + storage + .index_reader + .as_ref() + .expect("Index reader not initialized") + .load_from_disk_by_timestamp(timestamp, count) + .await? + }; + + if indexes_to_read.is_none() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let indexes_to_read = indexes_to_read.unwrap(); + + let batch = storage + .messages_reader + .as_ref() + .expect("Messages reader not initialized") + .load_messages_from_disk(indexes_to_read) + .await + .with_error_context(|error| { + format!("Failed to load messages from disk by timestamp: {error}") + })?; + + Ok(IggyMessagesBatchSet::from(batch)) } pub async fn handle_full_segment( @@ -844,22 +1260,63 @@ impl Streams { streaming_partitions::helpers::commit_journal(), ); - let (saved, batch_count) = self - .with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - streaming_partitions::helpers::persist_batch( - shard_id, - stream_id, - topic_id, - partition_id, - batches, - reason.to_string(), - ), - ) + shard_trace!( + shard_id, + "Persisting messages on disk for stream ID: {}, topic ID: {}, partition ID: {} because {}...", + stream_id, + topic_id, + partition_id, + reason + ); + + let batch_count = batches.count(); + let batch_size = batches.size(); + + // Perform disk I/O inside the async closure + let saved = self + .with_partition_by_id_async(stream_id, topic_id, partition_id, async move |(.., log)| { + let storage = log.active_storage().clone(); + + let saved = storage + .messages_writer + .as_ref() + .expect("Messages writer not initialized") + .save_batch_set(batches) + .await + .with_error_context(|error| { + format!( + "Failed to save batch of {batch_count} messages \ + ({batch_size} bytes) to stream ID: {stream_id}, topic ID: {topic_id}, partition ID: {partition_id}. {error}", + ) + })?; + + let unsaved_indexes_slice = log.active_indexes().unwrap().unsaved_slice(); + let indexes_len = unsaved_indexes_slice.len(); + + storage + .index_writer + .as_ref() + .expect("Index writer not initialized") + .save_indexes(unsaved_indexes_slice) + .await + .with_error_context(|error| { + format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",) + })?; + + Ok::<_, IggyError>(saved) + }) .await?; + shard_trace!( + shard_id, + "Persisted {} messages on disk for stream ID: {}, topic ID: {}, for partition with ID: {}, total bytes written: {}.", + batch_count, + stream_id, + topic_id, + partition_id, + saved + ); + self.with_partition_by_id_mut( stream_id, topic_id, @@ -880,58 +1337,37 @@ impl Streams { topic_id: &Identifier, partition_id: usize, ) -> Result<(), IggyError> { - let has_storage = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - let storage = log.active_storage(); - storage.messages_writer.is_some() && storage.index_writer.is_some() - }); - - if !has_storage { - return Ok(()); - } + self.with_partition_by_id_async(stream_id, topic_id, partition_id, async move |(.., log)| { + let storage = log.active_storage(); - self.with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - async move |(.., log)| { - let storage = log.active_storage(); + if storage.messages_writer.is_none() || storage.index_writer.is_none() { + return Ok(()); + } - if let Some(ref messages_writer) = storage.messages_writer { - if let Err(e) = messages_writer.fsync().await { - tracing::error!( - "Failed to fsync messages writer for partition {}: {}", - partition_id, - e - ); - return Err(e); - } + if let Some(ref messages_writer) = storage.messages_writer { + if let Err(e) = messages_writer.fsync().await { + tracing::error!( + "Failed to fsync messages writer for partition {}: {}", + partition_id, + e + ); + return Err(e); } - Ok(()) - }, - ) - .await?; + } - self.with_partition_by_id_async( - stream_id, - topic_id, - partition_id, - async move |(.., log)| { - if let Some(ref index_writer) = log.active_storage().index_writer { - if let Err(e) = index_writer.fsync().await { - tracing::error!( - "Failed to fsync index writer for partition {}: {}", - partition_id, - e - ); - return Err(e); - } + if let Some(ref index_writer) = storage.index_writer { + if let Err(e) = index_writer.fsync().await { + tracing::error!( + "Failed to fsync index writer for partition {}: {}", + partition_id, + e + ); + return Err(e); } - Ok(()) - }, - ) - .await?; + } - Ok(()) + Ok(()) + }) + .await } } diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs index 7508d675..cfc09bea 100644 --- a/core/server/src/slab/topics.rs +++ b/core/server/src/slab/topics.rs @@ -11,14 +11,10 @@ use crate::{ partitions::Partitions, traits_ext::{ ComponentsById, DeleteCell, EntityComponentSystem, EntityComponentSystemMutCell, - Insert, InsertCell, InteriorMutability, IntoComponents, + InsertCell, InteriorMutability, IntoComponents, }, }, streaming::{ - partitions::{ - journal::MemoryMessageJournal, - log::{Log, SegmentedLog}, - }, stats::stats::TopicStats, topics::{ consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut}, @@ -133,12 +129,11 @@ impl EntityComponentSystem<InteriorMutability> for Topics { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, - { - f(self.into()) + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { + f(self.into()) } } @@ -214,7 +209,6 @@ impl Topics { let id = self.get_index(topic_id); self.with_components_by_id_async(id, async |components| f(components).await) } - pub fn with_topic_by_id_mut<T>( &self, topic_id: &Identifier, @@ -232,14 +226,6 @@ impl Topics { self.with_topic_by_id(topic_id, helpers::consumer_groups(f)) } - pub fn with_consumer_groups_async<T>( - &self, - topic_id: &Identifier, - f: impl AsyncFnOnce(&ConsumerGroups) -> T, - ) -> impl Future<Output = T> { - self.with_topic_by_id_async(topic_id, helpers::consumer_groups_async(f)) - } - pub fn with_consumer_groups_mut<T>( &self, topic_id: &Identifier, @@ -259,17 +245,6 @@ impl Topics { }) } - pub fn with_consumer_group_by_id_async<T>( - &self, - topic_id: &Identifier, - group_id: &Identifier, - f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T, - ) -> impl Future<Output = T> { - self.with_consumer_groups_async(topic_id, async |container| { - container.with_consumer_group_by_id_async(group_id, f).await - }) - } - pub fn with_consumer_group_by_id_mut<T>( &self, topic_id: &Identifier, @@ -292,7 +267,6 @@ impl Topics { ) -> impl Future<Output = T> { self.with_topic_by_id_async(topic_id, helpers::partitions_async(f)) } - pub fn with_partitions_mut<T>( &self, topic_id: &Identifier, diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index 1062a6dc..312152c6 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -2,7 +2,7 @@ use error_set::ErrContext; use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError}; use std::{ ops::{AsyncFnOnce, Index}, - sync::atomic::Ordering, + sync::{Arc, atomic::Ordering}, }; use sysinfo::Component; @@ -273,212 +273,6 @@ pub fn create_message_deduplicator(config: &SystemConfig) -> Option<MessageDedup Some(MessageDeduplicator::new(max_entries, expiry)) } -pub async fn get_messages_by_offset( - storage: &Storage, - journal: &MemoryMessageJournal, - index: &Option<IggyIndexesMut>, - offset: u64, - end_offset: u64, - count: u32, - segment_start_offset: u64, -) -> Result<IggyMessagesBatchSet, IggyError> { - if count == 0 { - return Ok(IggyMessagesBatchSet::default()); - } - - // Case 0: Accumulator is empty, so all messages have to be on disk - if journal.is_empty() { - return load_messages_from_disk_by_offset( - storage, - index, - offset, - count, - segment_start_offset, - ) - .await; - } - - let journal_first_offset = journal.inner().base_offset; - let journal_last_offset = journal.inner().current_offset; - - // Case 1: All messages are in accumulator buffer - if offset >= journal_first_offset && end_offset <= journal_last_offset { - return Ok(journal.get(|batches| batches.get_by_offset(offset, count))); - } - - // Case 2: All messages are on disk - if end_offset < journal_first_offset { - return load_messages_from_disk_by_offset( - storage, - index, - offset, - count, - segment_start_offset, - ) - .await; - } - - // Case 3: Messages span disk and accumulator buffer boundary - // Calculate how many messages we need from disk - let disk_count = if offset < journal_first_offset { - ((journal_first_offset - offset) as u32).min(count) - } else { - 0 - }; - let mut combined_batch_set = IggyMessagesBatchSet::empty(); - - // Load messages from disk if needed - if disk_count > 0 { - let disk_messages = load_messages_from_disk_by_offset(storage, index, offset, disk_count, segment_start_offset) - .await - .with_error_context(|error| { - format!("Failed to load messages from disk, start offset: {offset}, count: {disk_count}, error: {error}") - })?; - - if !disk_messages.is_empty() { - combined_batch_set.add_batch_set(disk_messages); - } - } - - // Calculate how many more messages we need from the accumulator - let remaining_count = count - combined_batch_set.count(); - - if remaining_count > 0 { - let accumulator_start_offset = std::cmp::max(offset, journal_first_offset); - let accumulator_messages = - journal.get(|batches| batches.get_by_offset(accumulator_start_offset, remaining_count)); - if !accumulator_messages.is_empty() { - combined_batch_set.add_batch_set(accumulator_messages); - } - } - - Ok(combined_batch_set) -} - -async fn load_messages_from_disk_by_offset( - storage: &Storage, - index: &Option<IggyIndexesMut>, - start_offset: u64, - count: u32, - segment_start_offset: u64, -) -> Result<IggyMessagesBatchSet, IggyError> { - // Convert start_offset to relative offset within the segment - let relative_start_offset = (start_offset - segment_start_offset) as u32; - - // Load indexes first - let indexes_to_read = if let Some(indexes) = index { - if !indexes.is_empty() { - indexes.slice_by_offset(relative_start_offset, count) - } else { - storage - .index_reader - .as_ref() - .expect("Index reader not initialized") - .load_from_disk_by_offset(relative_start_offset, count) - .await? - } - } else { - storage - .index_reader - .as_ref() - .expect("Index reader not initialized") - .load_from_disk_by_offset(relative_start_offset, count) - .await? - }; - - if indexes_to_read.is_none() { - return Ok(IggyMessagesBatchSet::empty()); - } - - let indexes_to_read = indexes_to_read.unwrap(); - let batch = storage - .messages_reader - .as_ref() - .expect("Messages reader not initialized") - .load_messages_from_disk(indexes_to_read) - .await - .with_error_context(|error| format!("Failed to load messages from disk: {error}"))?; - - batch - .validate_checksums_and_offsets(start_offset) - .with_error_context(|error| { - format!("Failed to validate messages read from disk! error: {error}") - })?; - - Ok(IggyMessagesBatchSet::from(batch)) -} - -pub fn get_messages_by_offset_range( - offset: u64, - count: u32, - range: std::ops::Range<usize>, -) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<IggyMessagesBatchSet, IggyError> { - async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> { - let segments = log.segments().iter(); - let storages = log.storages().iter(); - let journal = log.journal(); - let indexes = log.indexes().iter(); - - let mut remaining_count = count; - let mut batches = IggyMessagesBatchSet::empty(); - let mut current_offset = offset; - - for (segment, storage, index) in segments - .zip(storages) - .zip(indexes) - .map(|((a, b), c)| (a, b, c)) - .skip(range.start) - .take(range.end - range.start) - { - let start_offset = if current_offset < segment.start_offset { - segment.start_offset - } else { - current_offset - }; - - let mut end_offset = start_offset + (remaining_count - 1) as u64; - if end_offset > segment.end_offset { - end_offset = segment.end_offset; - } - - // Calculate the actual count to request from this segment - let count: u32 = ((end_offset - start_offset + 1) as u32).min(remaining_count); - - let messages = get_messages_by_offset( - storage, - journal, - index, - start_offset, - end_offset, - count, - segment.start_offset, - ) - .await?; - - let messages_count = messages.count(); - if messages_count == 0 { - current_offset = segment.end_offset + 1; - continue; - } - - remaining_count = remaining_count.saturating_sub(messages_count); - - if let Some(last_offset) = messages.last_offset() { - current_offset = last_offset + 1; - } else if messages_count > 0 { - current_offset += messages_count as u64; - } - - batches.add_batch_set(messages); - - if remaining_count == 0 { - break; - } - } - Ok(batches) - } -} - pub fn get_segment_range_by_offset( offset: u64, ) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> { @@ -556,105 +350,6 @@ pub async fn load_messages_from_disk_by_timestamp( Ok(IggyMessagesBatchSet::from(batch)) } -pub async fn get_messages_by_timestamp( - storage: &Storage, - journal: &MemoryMessageJournal, - index: &Option<IggyIndexesMut>, - timestamp: u64, - count: u32, -) -> Result<IggyMessagesBatchSet, IggyError> { - if count == 0 { - return Ok(IggyMessagesBatchSet::default()); - } - - // Case 0: Accumulator is empty, so all messages have to be on disk - if journal.is_empty() { - return load_messages_from_disk_by_timestamp(storage, index, timestamp, count).await; - } - - let journal_first_timestamp = journal.inner().first_timestamp; - let journal_last_timestamp = journal.inner().end_timestamp; - - // Case 1: All messages are in accumulator buffer - if timestamp > journal_last_timestamp { - return Ok(IggyMessagesBatchSet::empty()); - } - - if timestamp >= journal_first_timestamp { - return Ok(journal.get(|batches| batches.get_by_timestamp(timestamp, count))); - } - - // Case 2: All messages are on disk (timestamp is before journal's first timestamp) - let disk_messages = - load_messages_from_disk_by_timestamp(storage, index, timestamp, count).await?; - - if disk_messages.count() >= count { - return Ok(disk_messages); - } - - // Case 3: Messages span disk and accumulator buffer boundary - let remaining_count = count - disk_messages.count(); - let journal_messages = - journal.get(|batches| batches.get_by_timestamp(timestamp, remaining_count)); - - let mut combined_batch_set = disk_messages; - if !journal_messages.is_empty() { - combined_batch_set.add_batch_set(journal_messages); - } - return Ok(combined_batch_set); -} - -pub fn get_messages_by_timestamp_range( - timestamp: u64, - count: u32, - range: std::ops::Range<usize>, -) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<IggyMessagesBatchSet, IggyError> { - async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> { - let segments = log.segments().iter(); - let storages = log.storages().iter(); - let journal = log.journal(); - let indexes = log.indexes().iter(); - - let mut remaining_count = count; - let mut batches = IggyMessagesBatchSet::empty(); - - for (segment, storage, index) in segments - .zip(storages) - .zip(indexes) - .map(|((a, b), c)| (a, b, c)) - .skip(range.start) - .take(range.end - range.start) - { - if remaining_count == 0 { - break; - } - - // Skip segments that end before our timestamp - if segment.end_timestamp < timestamp { - continue; - } - - let messages = - get_messages_by_timestamp(storage, journal, index, timestamp, remaining_count) - .await?; - - let messages_count = messages.count(); - if messages_count == 0 { - continue; - } - - remaining_count = remaining_count.saturating_sub(messages_count); - batches.add_batch_set(messages); - - if remaining_count == 0 { - break; - } - } - - Ok(batches) - } -} - pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>) -> u64 { |(root, _, _, offset, ..)| { if !root.should_increment_offset() { @@ -665,21 +360,11 @@ pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>) - } } -pub fn deduplicate_messages( - current_offset: u64, - current_position: u32, - batch: &mut IggyMessagesBatchMut, -) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) { - async move |(.., deduplicator, _, _, _, log)| { +pub fn get_segment_start_offset_and_deduplicator() +-> impl FnOnce(ComponentsById<PartitionRef>) -> (u64, Option<Arc<MessageDeduplicator>>) { + move |(.., deduplicator, _, _, _, log)| { let segment = log.active_segment(); - batch - .prepare_for_persistence( - segment.start_offset, - current_offset, - current_position, - deduplicator.as_ref(), - ) - .await; + (segment.start_offset, deduplicator.clone()) } } diff --git a/core/server/src/streaming/partitions/partition2.rs b/core/server/src/streaming/partitions/partition2.rs index 6c85dfb0..e965a270 100644 --- a/core/server/src/streaming/partitions/partition2.rs +++ b/core/server/src/streaming/partitions/partition2.rs @@ -17,7 +17,10 @@ use crate::{ }; use iggy_common::{Identifier, IggyTimestamp}; use slab::Slab; -use std::sync::{Arc, atomic::AtomicU64}; +use std::{ + rc::Rc, + sync::{Arc, atomic::AtomicU64}, +}; #[derive(Debug, Clone)] pub struct ConsumerOffsets(papaya::HashMap<usize, consumer_offset::ConsumerOffset>); @@ -87,7 +90,7 @@ impl std::ops::DerefMut for ConsumerGroupOffsets { pub struct Partition { root: PartitionRoot, stats: Arc<PartitionStats>, - message_deduplicator: Option<MessageDeduplicator>, + message_deduplicator: Option<Arc<MessageDeduplicator>>, offset: Arc<AtomicU64>, consumer_offset: Arc<ConsumerOffsets>, consumer_group_offset: Arc<ConsumerGroupOffsets>, @@ -106,6 +109,7 @@ impl Partition { log: SegmentedLog<MemoryMessageJournal>, ) -> Self { let root = PartitionRoot::new(created_at, should_increment_offset); + let message_deduplicator = message_deduplicator.map(Arc::new); Self { root, stats, @@ -120,7 +124,7 @@ impl Partition { pub fn new_with_components( root: PartitionRoot, stats: Arc<PartitionStats>, - message_deduplicator: Option<MessageDeduplicator>, + message_deduplicator: Option<Arc<MessageDeduplicator>>, offset: Arc<AtomicU64>, consumer_offset: Arc<ConsumerOffsets>, consumer_group_offset: Arc<ConsumerGroupOffsets>, @@ -172,7 +176,7 @@ impl IntoComponents for Partition { type Components = ( PartitionRoot, Arc<PartitionStats>, - Option<MessageDeduplicator>, + Option<Arc<MessageDeduplicator>>, Arc<AtomicU64>, Arc<ConsumerOffsets>, Arc<ConsumerGroupOffsets>, @@ -233,7 +237,7 @@ impl PartitionRoot { pub struct PartitionRef<'a> { root: &'a Slab<PartitionRoot>, stats: &'a Slab<Arc<PartitionStats>>, - message_deduplicator: &'a Slab<Option<MessageDeduplicator>>, + message_deduplicator: &'a Slab<Option<Arc<MessageDeduplicator>>>, offset: &'a Slab<Arc<AtomicU64>>, consumer_offset: &'a Slab<Arc<ConsumerOffsets>>, consumer_group_offset: &'a Slab<Arc<ConsumerGroupOffsets>>, @@ -244,7 +248,7 @@ impl<'a> PartitionRef<'a> { pub fn new( root: &'a Slab<PartitionRoot>, stats: &'a Slab<Arc<PartitionStats>>, - message_deduplicator: &'a Slab<Option<MessageDeduplicator>>, + message_deduplicator: &'a Slab<Option<Arc<MessageDeduplicator>>>, offset: &'a Slab<Arc<AtomicU64>>, consumer_offset: &'a Slab<Arc<ConsumerOffsets>>, consumer_group_offset: &'a Slab<Arc<ConsumerGroupOffsets>>, @@ -266,7 +270,7 @@ impl<'a> IntoComponents for PartitionRef<'a> { type Components = ( &'a Slab<PartitionRoot>, &'a Slab<Arc<PartitionStats>>, - &'a Slab<Option<MessageDeduplicator>>, + &'a Slab<Option<Arc<MessageDeduplicator>>>, &'a Slab<Arc<AtomicU64>>, &'a Slab<Arc<ConsumerOffsets>>, &'a Slab<Arc<ConsumerGroupOffsets>>, @@ -291,7 +295,7 @@ impl<'a> IntoComponentsById for PartitionRef<'a> { type Output = ( &'a PartitionRoot, &'a Arc<PartitionStats>, - &'a Option<MessageDeduplicator>, + &'a Option<Arc<MessageDeduplicator>>, &'a Arc<AtomicU64>, &'a Arc<ConsumerOffsets>, &'a Arc<ConsumerGroupOffsets>, @@ -314,7 +318,7 @@ impl<'a> IntoComponentsById for PartitionRef<'a> { pub struct PartitionRefMut<'a> { root: &'a mut Slab<PartitionRoot>, stats: &'a mut Slab<Arc<PartitionStats>>, - message_deduplicator: &'a mut Slab<Option<MessageDeduplicator>>, + message_deduplicator: &'a mut Slab<Option<Arc<MessageDeduplicator>>>, offset: &'a mut Slab<Arc<AtomicU64>>, consumer_offset: &'a mut Slab<Arc<ConsumerOffsets>>, consumer_group_offset: &'a mut Slab<Arc<ConsumerGroupOffsets>>, @@ -325,7 +329,7 @@ impl<'a> PartitionRefMut<'a> { pub fn new( root: &'a mut Slab<PartitionRoot>, stats: &'a mut Slab<Arc<PartitionStats>>, - message_deduplicator: &'a mut Slab<Option<MessageDeduplicator>>, + message_deduplicator: &'a mut Slab<Option<Arc<MessageDeduplicator>>>, offset: &'a mut Slab<Arc<AtomicU64>>, consumer_offset: &'a mut Slab<Arc<ConsumerOffsets>>, consumer_group_offset: &'a mut Slab<Arc<ConsumerGroupOffsets>>, @@ -347,7 +351,7 @@ impl<'a> IntoComponents for PartitionRefMut<'a> { type Components = ( &'a mut Slab<PartitionRoot>, &'a mut Slab<Arc<PartitionStats>>, - &'a mut Slab<Option<MessageDeduplicator>>, + &'a mut Slab<Option<Arc<MessageDeduplicator>>>, &'a mut Slab<Arc<AtomicU64>>, &'a mut Slab<Arc<ConsumerOffsets>>, &'a mut Slab<Arc<ConsumerGroupOffsets>>, @@ -372,7 +376,7 @@ impl<'a> IntoComponentsById for PartitionRefMut<'a> { type Output = ( &'a mut PartitionRoot, &'a mut Arc<PartitionStats>, - &'a mut Option<MessageDeduplicator>, + &'a mut Option<Arc<MessageDeduplicator>>, &'a mut Arc<AtomicU64>, &'a mut Arc<ConsumerOffsets>, &'a mut Arc<ConsumerGroupOffsets>, diff --git a/core/server/src/streaming/segments/storage.rs b/core/server/src/streaming/segments/storage.rs index 84fcd783..53313282 100644 --- a/core/server/src/streaming/segments/storage.rs +++ b/core/server/src/streaming/segments/storage.rs @@ -8,12 +8,14 @@ use crate::streaming::segments::{ messages::{MessagesReader, MessagesWriter}, }; -#[derive(Debug)] +unsafe impl Send for Storage {} + +#[derive(Debug, Clone)] pub struct Storage { - pub messages_writer: Option<MessagesWriter>, - pub messages_reader: Option<MessagesReader>, - pub index_writer: Option<IndexWriter>, - pub index_reader: Option<IndexReader>, + pub messages_writer: Option<Rc<MessagesWriter>>, + pub messages_reader: Option<Rc<MessagesReader>>, + pub index_writer: Option<Rc<IndexWriter>>, + pub index_reader: Option<Rc<IndexReader>>, } impl Storage { @@ -28,19 +30,21 @@ impl Storage { ) -> Result<Self, IggyError> { let size = Rc::new(AtomicU64::new(messages_size)); let indexes_size = Rc::new(AtomicU64::new(indexes_size)); - let messages_writer = - MessagesWriter::new(messages_path, size.clone(), log_fsync, file_exists).await?; + let messages_writer = Rc::new( + MessagesWriter::new(messages_path, size.clone(), log_fsync, file_exists).await?, + ); - let index_writer = - IndexWriter::new(index_path, indexes_size.clone(), index_fsync, file_exists).await?; + let index_writer = Rc::new( + IndexWriter::new(index_path, indexes_size.clone(), index_fsync, file_exists).await?, + ); if file_exists { messages_writer.fsync().await?; index_writer.fsync().await?; } - let messages_reader = MessagesReader::new(messages_path, size).await?; - let index_reader = IndexReader::new(index_path, indexes_size).await?; + let messages_reader = Rc::new(MessagesReader::new(messages_path, size).await?); + let index_reader = Rc::new(IndexReader::new(index_path, indexes_size).await?); Ok(Self { messages_writer: Some(messages_writer), messages_reader: Some(messages_reader), @@ -49,7 +53,7 @@ impl Storage { }) } - pub fn shutdown(&mut self) -> (Option<MessagesWriter>, Option<IndexWriter>) { + pub fn shutdown(&mut self) -> (Option<Rc<MessagesWriter>>, Option<Rc<IndexWriter>>) { let messages_writer = self.messages_writer.take(); let index_writer = self.index_writer.take(); (messages_writer, index_writer) diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs b/core/server/src/streaming/segments/types/messages_batch_mut.rs index 8e4895da..29e5c4d2 100644 --- a/core/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs @@ -29,6 +29,7 @@ use iggy_common::{ }; use lending_iterator::prelude::*; use std::ops::{Deref, Index}; +use std::sync::Arc; use tracing::{error, warn}; /// A container for mutable messages that are being prepared for persistence. @@ -144,7 +145,7 @@ impl IggyMessagesBatchMut { start_offset: u64, base_offset: u64, current_position: u32, - deduplicator: Option<&MessageDeduplicator>, + deduplicator: Option<&Arc<MessageDeduplicator>>, ) { let messages_count = self.count(); if messages_count == 0 { diff --git a/core/server/src/streaming/topics/storage2.rs b/core/server/src/streaming/topics/storage2.rs index 35e854a7..86d783b1 100644 --- a/core/server/src/streaming/topics/storage2.rs +++ b/core/server/src/streaming/topics/storage2.rs @@ -94,191 +94,3 @@ pub async fn delete_topic_from_disk( ); Ok((messages_count, size_bytes, segments_count)) } - -// Old implementation kept for reference -/* -async fn load(&self, topic: &mut Topic, mut state: TopicState) -> Result<(), IggyError> { - info!("Loading topic {} from disk...", topic); - if !Path::new(&topic.path).exists() { - return Err(IggyError::TopicIdNotFound(topic.topic_id, topic.stream_id)); - } - - let message_expiry = Topic::get_message_expiry(state.message_expiry, &topic.config); - let max_topic_size = Topic::get_max_topic_size(state.max_topic_size, &topic.config)?; - topic.created_at = state.created_at; - topic.message_expiry = message_expiry; - topic.max_topic_size = max_topic_size; - topic.compression_algorithm = state.compression_algorithm; - topic.replication_factor = state.replication_factor.unwrap_or(1); - - let mut dir_entries = fs::read_dir(&topic.partitions_path).await - .with_context(|| format!("Failed to read partition with ID: {} for stream with ID: {} for topic with ID: {} and path: {}", - topic.topic_id, topic.stream_id, topic.topic_id, &topic.partitions_path)) - .map_err(|_| IggyError::CannotReadPartitions)?; - - let mut unloaded_partitions = Vec::new(); - while let Some(dir_entry) = dir_entries.next_entry().await.unwrap_or(None) { - let metadata = dir_entry.metadata().await; - if metadata.is_err() || metadata.unwrap().is_file() { - continue; - } - - let name = dir_entry.file_name().into_string().unwrap(); - let partition_id = name.parse::<u32>(); - if partition_id.is_err() { - error!("Invalid partition ID file with name: '{}'.", name); - continue; - } - - let partition_id = partition_id.unwrap(); - let partition_state = state.partitions.get(&partition_id); - if partition_state.is_none() { - let stream_id = topic.stream_id; - let topic_id = topic.topic_id; - error!( - "Partition with ID: '{partition_id}' for stream with ID: '{stream_id}' and topic with ID: '{topic_id}' was not found in state, but exists on disk and will be removed." - ); - if let Err(error) = fs::remove_dir_all(&dir_entry.path()).await { - error!("Cannot remove partition directory: {error}"); - } else { - warn!( - "Partition with ID: '{partition_id}' for stream with ID: '{stream_id}' and topic with ID: '{topic_id}' was removed." - ); - } - continue; - } - - let partition_state = partition_state.unwrap(); - let partition = Partition::create( - topic.stream_id, - topic.topic_id, - partition_id, - false, - topic.config.clone(), - topic.storage.clone(), - message_expiry, - topic.messages_count_of_parent_stream.clone(), - topic.messages_count.clone(), - topic.size_of_parent_stream.clone(), - topic.size_bytes.clone(), - topic.segments_count_of_parent_stream.clone(), - partition_state.created_at, - ) - .await; - unloaded_partitions.push(partition); - } - - let state_partition_ids = state.partitions.keys().copied().collect::<AHashSet<u32>>(); - let unloaded_partition_ids = unloaded_partitions - .iter() - .map(|partition| partition.partition_id) - .collect::<AHashSet<u32>>(); - let missing_ids = state_partition_ids - .difference(&unloaded_partition_ids) - .copied() - .collect::<AHashSet<u32>>(); - if missing_ids.is_empty() { - info!( - "All partitions for topic with ID: '{}' for stream with ID: '{}' found on disk were found in state.", - topic.topic_id, topic.stream_id - ); - } else { - warn!( - "Partitions with IDs: '{missing_ids:?}' for topic with ID: '{topic_id}' for stream with ID: '{stream_id}' were not found on disk.", - topic_id = topic.topic_id, - stream_id = topic.stream_id - ); - if topic.config.recovery.recreate_missing_state { - info!( - "Recreating missing state in recovery config is enabled, missing partitions will be created for topic with ID: '{}' for stream with ID: '{}'.", - topic.topic_id, topic.stream_id - ); - - for partition_id in missing_ids { - let partition_state = state.partitions.get(&partition_id).unwrap(); - let mut partition = Partition::create( - topic.stream_id, - topic.topic_id, - partition_id, - true, - topic.config.clone(), - topic.storage.clone(), - message_expiry, - topic.messages_count_of_parent_stream.clone(), - topic.messages_count.clone(), - topic.size_of_parent_stream.clone(), - topic.size_bytes.clone(), - topic.segments_count_of_parent_stream.clone(), - partition_state.created_at, - ) - .await; - partition.persist().await.with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to persist partition: {partition}" - ) - })?; - partition.segments.clear(); - unloaded_partitions.push(partition); - info!( - "Created missing partition with ID: '{partition_id}', for topic with ID: '{}' for stream with ID: '{}'.", - topic.topic_id, topic.stream_id - ); - } - } else { - warn!( - "Recreating missing state in recovery config is disabled, missing partitions will not be created for topic with ID: '{}' for stream with ID: '{}'.", - topic.topic_id, topic.stream_id - ); - } - } - - let stream_id = topic.stream_id; - let topic_id = topic.topic_id; - let loaded_partitions = Arc::new(Mutex::new(Vec::new())); - let mut load_partitions = Vec::new(); - for mut partition in unloaded_partitions { - let loaded_partitions = loaded_partitions.clone(); - let partition_state = state.partitions.remove(&partition.partition_id).unwrap(); - let load_partition = tokio::spawn(async move { - match partition.load(partition_state).await { - Ok(_) => { - loaded_partitions.lock().await.push(partition); - } - Err(error) => { - error!( - "Failed to load partition with ID: {} for stream with ID: {stream_id} and topic with ID: {topic_id}. Error: {error}", - partition.partition_id - ); - } - } - }); - load_partitions.push(load_partition); - } - - join_all(load_partitions).await; - for partition in loaded_partitions.lock().await.drain(..) { - topic - .partitions - .insert(partition.partition_id, IggySharedMut::new(partition)); - } - - for consumer_group in state.consumer_groups.into_values() { - let consumer_group = ConsumerGroup::new( - topic.topic_id, - consumer_group.id, - &consumer_group.name, - topic.get_partitions_count(), - ); - topic - .consumer_groups_ids - .insert(consumer_group.name.to_owned(), consumer_group.group_id); - topic - .consumer_groups - .insert(consumer_group.group_id, RwLock::new(consumer_group)); - } - - info!("Loaded topic {topic}"); - - Ok(()) - } - */
