This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new f940fc82 feat(io_uring): fix nasty concurrency bug (#2223)
f940fc82 is described below
commit f940fc821508b9d1aff9db6bce251c47d7a8a1e8
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Thu Oct 2 09:29:29 2025 +0200
feat(io_uring): fix nasty concurrency bug (#2223)
---
.../consumer_groups/get_consumer_group_handler.rs | 19 +-
.../consumer_groups/get_consumer_groups_handler.rs | 21 +-
.../binary/handlers/topics/get_topic_handler.rs | 14 +-
.../binary/handlers/topics/get_topics_handler.rs | 17 +-
core/server/src/http/consumer_groups.rs | 66 +-
core/server/src/shard/mod.rs | 81 +--
core/server/src/shard/system/consumer_offsets.rs | 48 +-
core/server/src/shard/system/messages.rs | 48 +-
core/server/src/slab/consumer_groups.rs | 18 +-
core/server/src/slab/helpers.rs | 16 +-
core/server/src/slab/partitions.rs | 22 +-
core/server/src/slab/streams.rs | 721 ++++++++++++++++-----
core/server/src/slab/topics.rs | 49 +-
core/server/src/slab/traits_ext.rs | 11 -
core/server/src/streaming/partitions/helpers.rs | 325 +---------
core/server/src/streaming/partitions/partition2.rs | 28 +-
core/server/src/streaming/segments/storage.rs | 28 +-
.../streaming/segments/types/messages_batch_mut.rs | 3 +-
core/server/src/streaming/topics/storage2.rs | 188 ------
19 files changed, 766 insertions(+), 957 deletions(-)
diff --git
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
index 5330e748..a056659c 100644
---
a/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs
@@ -59,18 +59,13 @@ impl ServerCommandHandler for GetConsumerGroup {
numeric_topic_id as u32,
);
- shard
- .streams2
- .with_consumer_group_by_id_async(
- &self.stream_id,
- &self.topic_id,
- &self.group_id,
- async |(root, members)| {
- let consumer_group = mapper::map_consumer_group(root,
members);
- sender.send_ok_response(&consumer_group).await
- },
- )
- .await?;
+ let consumer_group = shard.streams2.with_consumer_group_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ &self.group_id,
+ |(root, members)| mapper::map_consumer_group(root, members),
+ );
+ sender.send_ok_response(&consumer_group).await?;
Ok(())
}
}
diff --git
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
index 1c9ecd95..3740d44a 100644
---
a/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/get_consumer_groups_handler.rs
@@ -61,17 +61,16 @@ impl ServerCommandHandler for GetConsumerGroups {
numeric_topic_id as u32,
);
- shard
- .streams2
- .with_consumer_groups_async(&self.stream_id, &self.topic_id, async
|cgs| {
- cgs.with_components_async(async |cgs| {
- let (roots, members) = cgs.into_components();
- let consumer_groups = mapper::map_consumer_groups(roots,
members);
- sender.send_ok_response(&consumer_groups).await
- })
- .await
- })
- .await?;
+ let consumer_groups =
+ shard
+ .streams2
+ .with_consumer_groups(&self.stream_id, &self.topic_id, |cgs| {
+ cgs.with_components(|cgs| {
+ let (roots, members) = cgs.into_components();
+ mapper::map_consumer_groups(roots, members)
+ })
+ });
+ sender.send_ok_response(&consumer_groups).await?;
Ok(())
}
}
diff --git a/core/server/src/binary/handlers/topics/get_topic_handler.rs
b/core/server/src/binary/handlers/topics/get_topic_handler.rs
index 653ce995..e06a07a7 100644
--- a/core/server/src/binary/handlers/topics/get_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topic_handler.rs
@@ -53,13 +53,13 @@ impl ServerCommandHandler for GetTopic {
self.topic_id.get_u32_value().unwrap_or(0),
)?;
- shard
- .streams2
- .with_topic_by_id_async(&self.stream_id, &self.topic_id, async
|(root, _, stats)| {
- let response = mapper::map_topic(&root, &stats);
- sender.send_ok_response(&response).await
- })
- .await?;
+ let response =
+ shard
+ .streams2
+ .with_topic_by_id(&self.stream_id, &self.topic_id, |(root, _,
stats)| {
+ mapper::map_topic(&root, &stats)
+ });
+ sender.send_ok_response(&response).await?;
Ok(())
}
}
diff --git a/core/server/src/binary/handlers/topics/get_topics_handler.rs
b/core/server/src/binary/handlers/topics/get_topics_handler.rs
index 561c0adc..0c78e90e 100644
--- a/core/server/src/binary/handlers/topics/get_topics_handler.rs
+++ b/core/server/src/binary/handlers/topics/get_topics_handler.rs
@@ -53,18 +53,13 @@ impl ServerCommandHandler for GetTopics {
.borrow()
.get_topics(session.get_user_id(), numeric_stream_id as u32);
- shard
- .streams2
- .with_topics_async(&self.stream_id, async |topics| {
- topics
- .with_components_async(async |topics| {
- let (roots, _, stats) = topics.into_components();
- let response = mapper::map_topics(&roots, &stats);
- sender.send_ok_response(&response).await
- })
- .await
+ let response = shard.streams2.with_topics(&self.stream_id, |topics| {
+ topics.with_components(|topics| {
+ let (roots, _, stats) = topics.into_components();
+ mapper::map_topics(&roots, &stats)
})
- .await?;
+ });
+ sender.send_ok_response(&response).await?;
Ok(())
}
}
diff --git a/core/server/src/http/consumer_groups.rs
b/core/server/src/http/consumer_groups.rs
index 56f4ecf4..d4883470 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -93,21 +93,12 @@ async fn get_consumer_group(
numeric_topic_id as u32,
)?;
- let consumer_group = {
- let future = SendWrapper::new(
- state
- .shard
- .shard()
- .streams2
- .with_consumer_group_by_id_async(
- &identifier_stream_id,
- &identifier_topic_id,
- &identifier_group_id,
- async |(root, members)| mapper::map_consumer_group(root,
members),
- ),
- );
- future.await
- };
+ let consumer_group =
state.shard.shard().streams2.with_consumer_group_by_id(
+ &identifier_stream_id,
+ &identifier_topic_id,
+ &identifier_group_id,
+ |(root, members)| mapper::map_consumer_group(root, members),
+ );
Ok(Json(consumer_group))
}
@@ -150,20 +141,16 @@ async fn get_consumer_groups(
numeric_topic_id as u32,
)?;
- let consumer_groups = {
- let future =
SendWrapper::new(state.shard.shard().streams2.with_consumer_groups_async(
- &identifier_stream_id,
- &identifier_topic_id,
- async |cgs| {
- cgs.with_components_async(async |cgs| {
- let (roots, members) = cgs.into_components();
- mapper::map_consumer_groups(roots, members)
- })
- .await
- },
- ));
- future.await
- };
+ let consumer_groups = state.shard.shard().streams2.with_consumer_groups(
+ &identifier_stream_id,
+ &identifier_topic_id,
+ |cgs| {
+ cgs.with_components(|cgs| {
+ let (roots, members) = cgs.into_components();
+ mapper::map_consumer_groups(roots, members)
+ })
+ },
+ );
Ok(Json(consumer_groups))
}
@@ -213,21 +200,12 @@ async fn create_consumer_group(
// Get the created consumer group details
let group_id_identifier = Identifier::numeric(group_id as u32).unwrap();
- let consumer_group_details = {
- let future = SendWrapper::new(
- state
- .shard
- .shard()
- .streams2
- .with_consumer_group_by_id_async(
- &command.stream_id,
- &command.topic_id,
- &group_id_identifier,
- async |(root, members)| mapper::map_consumer_group(root,
members),
- ),
- );
- future.await
- };
+ let consumer_group_details =
state.shard.shard().streams2.with_consumer_group_by_id(
+ &command.stream_id,
+ &command.topic_id,
+ &group_id_identifier,
+ |(root, members)| mapper::map_consumer_group(root, members),
+ );
// Apply state change
let entry_command =
EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 035cdaac..8a5b7d8b 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -572,55 +572,58 @@ impl IggyShard {
);
match consumer {
PollingConsumer::Consumer(consumer_id, _) => {
- self.streams2.with_partition_by_id(
+ let (offset_value, path) =
self.streams2.with_partition_by_id(
&stream_id,
&topic_id,
partition_id,
- partitions::helpers::store_consumer_offset(
- consumer_id,
- numeric_stream_id,
- numeric_topic_id,
- partition_id,
- offset,
- &self.config.system,
- ),
- );
- self.streams2
- .with_partition_by_id_async(
- &stream_id,
- &topic_id,
- partition_id,
-
partitions::helpers::persist_consumer_offset_to_disk(
- self.id,
+ |(.., offsets, _, _)| {
+ let hdl = offsets.pin();
+ let item = hdl.get_or_insert(
consumer_id,
- ),
- )
- .await?;
+
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer(
+ consumer_id as u32,
+
&self.config.system.get_consumer_offsets_path(numeric_stream_id,
numeric_topic_id, partition_id),
+ ),
+ );
+ item.offset.store(offset,
std::sync::atomic::Ordering::Relaxed);
+ let offset_value =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset_value, path)
+ },
+ );
+
crate::streaming::partitions::storage2::persist_offset(
+ self.id,
+ &path,
+ offset_value,
+ )
+ .await?;
}
PollingConsumer::ConsumerGroup(cg_id, _) => {
- self.streams2.with_partition_by_id(
+ let (offset_value, path) =
self.streams2.with_partition_by_id(
&stream_id,
&topic_id,
partition_id,
-
partitions::helpers::store_consumer_group_member_offset(
- cg_id,
- numeric_stream_id,
- numeric_topic_id,
- partition_id,
- offset,
- &self.config.system,
- ),
- );
- self.streams2.with_partition_by_id_async(
- &stream_id,
- &topic_id,
- partition_id,
-
partitions::helpers::persist_consumer_group_member_offset_to_disk(
- self.id,
+ |(.., offsets, _)| {
+ let hdl = offsets.pin();
+ let item = hdl.get_or_insert(
cg_id,
- ),
- )
- .await?;
+
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
+ cg_id as u32,
+
&self.config.system.get_consumer_group_offsets_path(numeric_stream_id,
numeric_topic_id, partition_id),
+ ),
+ );
+ item.offset.store(offset,
std::sync::atomic::Ordering::Relaxed);
+ let offset_value =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset_value, path)
+ },
+ );
+
crate::streaming::partitions::storage2::persist_offset(
+ self.id,
+ &path,
+ offset_value,
+ )
+ .await?;
}
}
}
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 5e4f4ff5..af5f71bb 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -258,26 +258,38 @@ impl IggyShard {
) -> Result<(), IggyError> {
match polling_consumer {
PollingConsumer::Consumer(id, _) => {
- self.streams2
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
-
partitions::helpers::persist_consumer_offset_to_disk(self.id, *id),
- )
- .await
+ let (offset_value, path) = self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(.., offsets, _, _)| {
+ let hdl = offsets.pin();
+ let item = hdl
+ .get(id)
+ .expect("persist_consumer_offset_to_disk: offset
not found");
+ let offset =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset, path)
+ },
+ );
+ partitions::storage2::persist_offset(self.id, &path,
offset_value).await
}
PollingConsumer::ConsumerGroup(_, id) => {
- self.streams2
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
-
partitions::helpers::persist_consumer_group_member_offset_to_disk(
- self.id, *id,
- ),
- )
- .await
+ let (offset_value, path) = self.streams2.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(.., offsets, _)| {
+ let hdl = offsets.pin();
+ let item = hdl.get(id).expect(
+ "persist_consumer_group_member_offset_to_disk:
offset not found",
+ );
+ let offset =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset, path)
+ },
+ );
+ partitions::storage2::persist_offset(self.id, &path,
offset_value).await
}
}
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 320ce902..211d4a54 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -281,16 +281,23 @@ impl IggyShard {
&self.config.system,
),
);
- self.streams2
- .with_partition_by_id_async(
- &stream_id,
- &topic_id,
- partition_id,
-
partitions::helpers::persist_consumer_offset_to_disk(
- self.id,
- consumer_id,
- ),
- )
+
+ let (offset_value, path) =
self.streams2.with_partition_by_id(
+ &stream_id,
+ &topic_id,
+ partition_id,
+ |(.., offsets, _, _)| {
+ let hdl = offsets.pin();
+ let item =
hdl.get(&consumer_id).expect(
+ "persist_consumer_offset_to_disk:
offset not found",
+ );
+ let offset =
+
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset, path)
+ },
+ );
+ partitions::storage2::persist_offset(self.id,
&path, offset_value)
.await?;
}
PollingConsumer::ConsumerGroup(cg_id, _) => {
@@ -307,16 +314,23 @@ impl IggyShard {
&self.config.system,
),
);
- self.streams2.with_partition_by_id_async(
+
+ let (offset_value, path) =
self.streams2.with_partition_by_id(
&stream_id,
&topic_id,
partition_id,
-
partitions::helpers::persist_consumer_group_member_offset_to_disk(
- self.id,
- cg_id,
- ),
- )
- .await?;
+ |(.., offsets, _)| {
+ let hdl = offsets.pin();
+ let item = hdl
+ .get(&cg_id)
+
.expect("persist_consumer_group_member_offset_to_disk: offset not found");
+ let offset =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset, path)
+ },
+ );
+ partitions::storage2::persist_offset(self.id,
&path, offset_value)
+ .await?;
}
}
}
diff --git a/core/server/src/slab/consumer_groups.rs
b/core/server/src/slab/consumer_groups.rs
index 683c3a0e..5f98ac5d 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -3,7 +3,7 @@ use crate::{
Keyed, consumer_groups,
traits_ext::{
Borrow, ComponentsById, Delete, EntityComponentSystem,
EntityComponentSystemMut,
- EntityMarker, Insert, IntoComponents, IntoComponentsById,
+ Insert, IntoComponents,
},
},
streaming::topics::consumer_group2::{self, ConsumerGroupRef,
ConsumerGroupRefMut},
@@ -81,13 +81,6 @@ impl EntityComponentSystem<Borrow> for ConsumerGroups {
{
f(self.into())
}
-
- fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
- where
- F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
- {
- f(self.into())
- }
}
impl EntityComponentSystemMut for ConsumerGroups {
@@ -146,15 +139,6 @@ impl ConsumerGroups {
let id = self.get_index(identifier);
self.with_components_by_id_mut(id, |components| f(components))
}
-
- pub fn with_consumer_group_by_id_async<T>(
- &self,
- identifier: &Identifier,
- f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T,
- ) -> impl Future<Output = T> {
- let id = self.get_index(identifier);
- self.with_components_by_id_async(id, async |components|
f(components).await)
- }
}
impl Default for ConsumerGroups {
diff --git a/core/server/src/slab/helpers.rs b/core/server/src/slab/helpers.rs
index 31a5a8b7..d2fe7487 100644
--- a/core/server/src/slab/helpers.rs
+++ b/core/server/src/slab/helpers.rs
@@ -1,13 +1,10 @@
use crate::{
slab::{
- consumer_groups::ConsumerGroups,
- partitions::{self, ContainerId, Partitions},
- topics::Topics,
+ consumer_groups::ConsumerGroups, partitions::Partitions,
topics::Topics,
traits_ext::ComponentsById,
},
streaming::{
- partitions::log::Log,
- streams::stream2::{StreamRef, StreamRefMut},
+ streams::stream2::StreamRef,
topics::topic2::{TopicRef, TopicRefMut},
},
};
@@ -26,7 +23,6 @@ where
{
async |(root, ..)| f(root.topics()).await
}
-
pub fn topics_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<StreamRef>) -> O
where
F: for<'a> FnOnce(&'a Topics) -> O,
@@ -47,7 +43,6 @@ where
{
async |(root, ..)| f(root.partitions()).await
}
-
pub fn partitions_mut<O, F>(f: F) -> impl FnOnce(ComponentsById<TopicRefMut>)
-> O
where
F: for<'a> FnOnce(&'a mut Partitions) -> O,
@@ -68,10 +63,3 @@ where
{
|(mut root, ..)| f(root.consumer_groups_mut())
}
-
-pub fn consumer_groups_async<O, F>(f: F) -> impl
AsyncFnOnce(ComponentsById<TopicRef>) -> O
-where
- F: for<'a> AsyncFnOnce(&'a ConsumerGroups) -> O,
-{
- async |(root, ..)| f(root.consumer_groups()).await
-}
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index 678846c3..b3d3fda9 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -1,7 +1,7 @@
use crate::{
slab::traits_ext::{
Borrow, ComponentsById, Delete, EntityComponentSystem,
EntityComponentSystemMut, Insert,
- IntoComponents, IntoComponentsById,
+ IntoComponents,
},
streaming::{
deduplication::message_deduplicator::MessageDeduplicator,
@@ -17,10 +17,7 @@ use crate::{
},
};
use slab::Slab;
-use std::{
- future::Future,
- sync::{Arc, atomic::AtomicU64},
-};
+use std::sync::{Arc, atomic::AtomicU64};
// TODO: This could be upper limit of partitions per topic, use that value to
validate instead of whathever this thing is in `common` crate.
pub const PARTITIONS_CAPACITY: usize = 16384;
@@ -30,7 +27,7 @@ pub type ContainerId = usize;
pub struct Partitions {
root: Slab<partition2::PartitionRoot>,
stats: Slab<Arc<PartitionStats>>,
- message_deduplicator: Slab<Option<MessageDeduplicator>>,
+ message_deduplicator: Slab<Option<Arc<MessageDeduplicator>>>,
offset: Slab<Arc<AtomicU64>>,
consumer_offset: Slab<Arc<ConsumerOffsets>>,
@@ -168,12 +165,6 @@ impl EntityComponentSystem<Borrow> for Partitions {
f(self.into())
}
- fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
- where
- F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
- {
- f(self.into())
- }
}
impl EntityComponentSystemMut for Partitions {
@@ -230,11 +221,4 @@ impl Partitions {
self.with_components_by_id_mut(id, |components| f(components))
}
- pub fn with_partition_by_id_async<T>(
- &self,
- id: ContainerId,
- f: impl AsyncFnOnce(ComponentsById<PartitionRef>) -> T,
- ) -> impl Future<Output = T> {
- self.with_components_by_id_async(id, async move |components|
f(components).await)
- }
}
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 7fadce50..decc6b63 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1,5 +1,8 @@
use crate::shard::task_registry::TaskRegistry;
+use crate::shard_trace;
use crate::streaming::partitions as streaming_partitions;
+use crate::streaming::segments::IggyIndexesMut;
+use crate::streaming::segments::storage::Storage;
use crate::{
binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
@@ -39,6 +42,7 @@ use crate::{
},
};
use ahash::AHashMap;
+use error_set::ErrContext;
use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind};
use slab::Slab;
use std::{
@@ -141,13 +145,6 @@ impl EntityComponentSystem<InteriorMutability> for Streams
{
{
f(self.into())
}
-
- fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
- where
- F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
- {
- f(self.into())
- }
}
impl EntityComponentSystemMutCell for Streams {
@@ -192,17 +189,21 @@ impl MainOps for Streams {
self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
log.journal().inner().size + log.active_segment().size
});
- self.with_partition_by_id_async(
+ let (segment_start_offset, message_deduplicator) =
self.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- streaming_partitions::helpers::deduplicate_messages(
+
streaming_partitions::helpers::get_segment_start_offset_and_deduplicator(),
+ );
+
+ input
+ .prepare_for_persistence(
+ segment_start_offset,
current_offset,
current_position,
- &mut input,
- ),
- )
- .await;
+ message_deduplicator.as_ref(),
+ )
+ .await;
let (journal_messages_count, journal_size) =
self.with_partition_by_id_mut(
stream_id,
@@ -452,15 +453,6 @@ impl Streams {
self.with_components_by_id(id, |stream| f(stream))
}
- pub fn with_stream_by_id_async<T>(
- &self,
- id: &Identifier,
- f: impl AsyncFnOnce(ComponentsById<StreamRef>) -> T,
- ) -> impl Future<Output = T> {
- let id = self.get_index(id);
- self.with_components_by_id_async(id, async |stream| f(stream).await)
- }
-
pub fn with_stream_by_id_mut<T>(
&self,
id: &Identifier,
@@ -474,14 +466,6 @@ impl Streams {
self.with_stream_by_id(stream_id, helpers::topics(f))
}
- pub fn with_topics_async<T>(
- &self,
- stream_id: &Identifier,
- f: impl AsyncFnOnce(&Topics) -> T,
- ) -> impl Future<Output = T> {
- self.with_stream_by_id_async(stream_id, helpers::topics_async(f))
- }
-
pub fn with_topics_mut<T>(&self, stream_id: &Identifier, f: impl
FnOnce(&Topics) -> T) -> T {
self.with_stream_by_id(stream_id, helpers::topics_mut(f))
}
@@ -497,17 +481,6 @@ impl Streams {
})
}
- pub fn with_topic_by_id_async<T>(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- f: impl AsyncFnOnce(ComponentsById<TopicRef>) -> T,
- ) -> impl Future<Output = T> {
- self.with_topics_async(stream_id, async |container| {
- container.with_topic_by_id_async(topic_id, f).await
- })
- }
-
pub fn with_topic_by_id_mut<T>(
&self,
stream_id: &Identifier,
@@ -530,17 +503,6 @@ impl Streams {
})
}
- pub fn with_consumer_groups_async<T>(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- f: impl AsyncFnOnce(&ConsumerGroups) -> T,
- ) -> impl Future<Output = T> {
- self.with_topics_async(stream_id, async |container| {
- container.with_consumer_groups_async(topic_id, f).await
- })
- }
-
pub fn with_consumer_group_by_id<T>(
&self,
stream_id: &Identifier,
@@ -565,18 +527,6 @@ impl Streams {
})
}
- pub fn with_consumer_group_by_id_async<T>(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- group_id: &Identifier,
- f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T,
- ) -> impl Future<Output = T> {
- self.with_consumer_groups_async(stream_id, topic_id, async move
|container| {
- container.with_consumer_group_by_id_async(group_id, f).await
- })
- }
-
pub fn with_consumer_groups_mut<T>(
&self,
stream_id: &Identifier,
@@ -599,17 +549,6 @@ impl Streams {
})
}
- pub fn with_partitions_async<T>(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- f: impl AsyncFnOnce(&Partitions) -> T,
- ) -> impl Future<Output = T> {
- self.with_topics_async(stream_id, async |container| {
- container.with_partitions_async(topic_id, f).await
- })
- }
-
pub fn with_partitions_mut<T>(
&self,
stream_id: &Identifier,
@@ -633,18 +572,6 @@ impl Streams {
})
}
- pub fn with_partition_by_id_async<T>(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- id: partitions::ContainerId,
- f: impl AsyncFnOnce(ComponentsById<PartitionRef>) -> T,
- ) -> impl Future<Output = T> {
- self.with_partitions_async(stream_id, topic_id, async move |container|
{
- container.with_partition_by_id_async(id, f).await
- })
- }
-
pub fn with_partition_by_id_mut<T>(
&self,
stream_id: &Identifier,
@@ -677,13 +604,270 @@ impl Streams {
helpers::get_segment_range_by_offset(offset),
);
- self.with_partition_by_id_async(
+ let mut remaining_count = count;
+ let mut batches = IggyMessagesBatchSet::empty();
+ let mut current_offset = offset;
+
+ for idx in range {
+ let (segment_start_offset, segment_end_offset) =
self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ let segment = &log.segments()[idx];
+ (segment.start_offset, segment.end_offset)
+ },
+ );
+
+ let start_offset = if current_offset < segment_start_offset {
+ segment_start_offset
+ } else {
+ current_offset
+ };
+
+ let mut end_offset = start_offset + (remaining_count - 1) as u64;
+ if end_offset > segment_end_offset {
+ end_offset = segment_end_offset;
+ }
+
+ let count: u32 = ((end_offset - start_offset + 1) as
u32).min(remaining_count);
+
+ let messages = self
+ .get_messages_by_offset_base(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ start_offset,
+ end_offset,
+ count,
+ segment_start_offset,
+ )
+ .await?;
+
+ let messages_count = messages.count();
+ if messages_count == 0 {
+ current_offset = segment_end_offset + 1;
+ continue;
+ }
+
+ remaining_count = remaining_count.saturating_sub(messages_count);
+
+ if let Some(last_offset) = messages.last_offset() {
+ current_offset = last_offset + 1;
+ } else if messages_count > 0 {
+ current_offset += messages_count as u64;
+ }
+
+ batches.add_batch_set(messages);
+
+ if remaining_count == 0 {
+ break;
+ }
+ }
+
+ Ok(batches)
+ }
+
+ async fn get_messages_by_offset_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ idx: usize,
+ offset: u64,
+ end_offset: u64,
+ count: u32,
+ segment_start_offset: u64,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ if count == 0 {
+ return Ok(IggyMessagesBatchSet::default());
+ }
+
+ let (is_journal_empty, journal_first_offset, journal_last_offset) =
self
+ .with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ let journal = log.journal();
+ (
+ journal.is_empty(),
+ journal.inner().base_offset,
+ journal.inner().current_offset,
+ )
+ },
+ );
+
+ // Case 0: Accumulator is empty, so all messages have to be on disk
+ if is_journal_empty {
+ return self
+ .load_messages_from_disk_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ offset,
+ count,
+ segment_start_offset,
+ )
+ .await;
+ }
+
+ // Case 1: All messages are in accumulator buffer
+ if offset >= journal_first_offset && end_offset <= journal_last_offset
{
+ let batches = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ log.journal()
+ .get(|batches| batches.get_by_offset(offset, count))
+ },
+ );
+ return Ok(batches);
+ }
+
+ // Case 2: All messages are on disk
+ if end_offset < journal_first_offset {
+ return self
+ .load_messages_from_disk_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ offset,
+ count,
+ segment_start_offset,
+ )
+ .await;
+ }
+
+ // Case 3: Messages span disk and accumulator buffer boundary
+ // Calculate how many messages we need from disk
+ let disk_count = if offset < journal_first_offset {
+ ((journal_first_offset - offset) as u32).min(count)
+ } else {
+ 0
+ };
+ let mut combined_batch_set = IggyMessagesBatchSet::empty();
+
+ // Load messages from disk if needed
+ if disk_count > 0 {
+ let disk_messages = self
+ .load_messages_from_disk_by_offset(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ offset,
+ disk_count,
+ segment_start_offset,
+ )
+ .await
+ .with_error_context(|error| {
+ format!("Failed to load messages from disk, start offset:
{offset}, count: {disk_count}, error: {error}")
+ })?;
+
+ if !disk_messages.is_empty() {
+ combined_batch_set.add_batch_set(disk_messages);
+ }
+ }
+
+ // Calculate how many more messages we need from the accumulator
+ let remaining_count = count - combined_batch_set.count();
+
+ if remaining_count > 0 {
+ let accumulator_start_offset = std::cmp::max(offset,
journal_first_offset);
+ let journal_messages = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ log.journal().get(|batches| {
+ batches.get_by_offset(accumulator_start_offset,
remaining_count)
+ })
+ },
+ );
+
+ if !journal_messages.is_empty() {
+ combined_batch_set.add_batch_set(journal_messages);
+ }
+ }
+
+ Ok(combined_batch_set)
+ }
+
+ async fn load_messages_from_disk_by_offset(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ idx: usize,
+ start_offset: u64,
+ count: u32,
+ segment_start_offset: u64,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ let relative_start_offset = (start_offset - segment_start_offset) as
u32;
+
+ let (index_reader, messages_reader, indexes) =
self.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- helpers::get_messages_by_offset_range(offset, count, range),
- )
- .await
+ |(_, _, _, _, _, _, log)| {
+ let index_reader = log.storages()[idx]
+ .index_reader
+ .as_ref()
+ .expect("Index reader not initialized")
+ .clone();
+ let message_reader = log.storages()[idx]
+ .messages_reader
+ .as_ref()
+ .expect("Messages reader not initialized")
+ .clone();
+ let indexes = log.indexes()[idx].as_ref().map(|indexes| {
+ indexes
+ .slice_by_offset(relative_start_offset, count)
+ .unwrap_or_default()
+ });
+ (index_reader, message_reader, indexes)
+ },
+ );
+
+ let indexes_to_read = if let Some(indexes) = indexes {
+ if !indexes.is_empty() {
+ Some(indexes)
+ } else {
+ index_reader
+ .as_ref()
+ .load_from_disk_by_offset(relative_start_offset, count)
+ .await?
+ }
+ } else {
+ index_reader
+ .as_ref()
+ .load_from_disk_by_offset(relative_start_offset, count)
+ .await?
+ };
+
+ if indexes_to_read.is_none() {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ let indexes_to_read = indexes_to_read.unwrap();
+ let batch = messages_reader
+ .as_ref()
+ .load_messages_from_disk(indexes_to_read)
+ .await
+ .with_error_context(|error| format!("Failed to load messages from
disk: {error}"))?;
+
+ batch
+ .validate_checksums_and_offsets(start_offset)
+ .with_error_context(|error| {
+ format!("Failed to validate messages read from disk! error:
{error}")
+ })?;
+
+ Ok(IggyMessagesBatchSet::from(batch))
}
pub async fn get_messages_by_timestamp(
@@ -704,13 +888,211 @@ impl Streams {
return Ok(IggyMessagesBatchSet::default());
};
- self.with_partition_by_id_async(
+ let mut remaining_count = count;
+ let mut batches = IggyMessagesBatchSet::empty();
+
+ for idx in range {
+ let segment_end_timestamp = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ let segment = &log.segments()[idx];
+ segment.end_timestamp
+ },
+ );
+
+ if segment_end_timestamp < timestamp {
+ continue;
+ }
+
+ let messages = self
+ .get_messages_by_timestamp_base(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ timestamp,
+ remaining_count,
+ )
+ .await?;
+
+ let messages_count = messages.count();
+ if messages_count == 0 {
+ continue;
+ }
+
+ remaining_count = remaining_count.saturating_sub(messages_count);
+ batches.add_batch_set(messages);
+
+ if remaining_count == 0 {
+ break;
+ }
+ }
+
+ Ok(batches)
+ }
+
+ async fn get_messages_by_timestamp_base(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ idx: usize,
+ timestamp: u64,
+ count: u32,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ if count == 0 {
+ return Ok(IggyMessagesBatchSet::default());
+ }
+
+ let (is_journal_empty, journal_first_timestamp,
journal_last_timestamp) = self
+ .with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ let journal = log.journal();
+ (
+ journal.is_empty(),
+ journal.inner().first_timestamp,
+ journal.inner().end_timestamp,
+ )
+ },
+ );
+
+ // Case 0: Accumulator is empty, so all messages have to be on disk
+ if is_journal_empty {
+ return self
+ .load_messages_from_disk_by_timestamp(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ timestamp,
+ count,
+ )
+ .await;
+ }
+
+ // Case 1: All messages are in accumulator buffer (timestamp is after
journal ends)
+ if timestamp > journal_last_timestamp {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ // Case 1b: Timestamp is within journal range
+ if timestamp >= journal_first_timestamp {
+ let batches = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ log.journal()
+ .get(|batches| batches.get_by_timestamp(timestamp,
count))
+ },
+ );
+ return Ok(batches);
+ }
+
+ // Case 2: All messages are on disk (timestamp is before journal's
first timestamp)
+ let disk_messages = self
+ .load_messages_from_disk_by_timestamp(
+ stream_id,
+ topic_id,
+ partition_id,
+ idx,
+ timestamp,
+ count,
+ )
+ .await?;
+
+ if disk_messages.count() >= count {
+ return Ok(disk_messages);
+ }
+
+ // Case 3: Messages span disk and accumulator buffer boundary
+ let remaining_count = count - disk_messages.count();
+ let journal_messages = self.with_partition_by_id(
stream_id,
topic_id,
partition_id,
- helpers::get_messages_by_timestamp_range(timestamp, count, range),
- )
- .await
+ |(_, _, _, _, _, _, log)| {
+ log.journal()
+ .get(|batches| batches.get_by_timestamp(timestamp,
remaining_count))
+ },
+ );
+
+ let mut combined_batch_set = disk_messages;
+ if !journal_messages.is_empty() {
+ combined_batch_set.add_batch_set(journal_messages);
+ }
+ Ok(combined_batch_set)
+ }
+
+ async fn load_messages_from_disk_by_timestamp(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: partitions::ContainerId,
+ idx: usize,
+ timestamp: u64,
+ count: u32,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ let (index_reader, messages_reader, indexes) =
self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ let index_reader = log.storages()[idx]
+ .index_reader
+ .as_ref()
+ .expect("Index reader not initialized")
+ .clone();
+ let messages_reader = log.storages()[idx]
+ .messages_reader
+ .as_ref()
+ .expect("Messages reader not initialized")
+ .clone();
+ let indexes = log.indexes()[idx].as_ref().map(|indexes| {
+ indexes
+ .slice_by_timestamp(timestamp, count)
+ .unwrap_or_default()
+ });
+ (index_reader, messages_reader, indexes)
+ },
+ );
+
+ let indexes_to_read = if let Some(indexes) = indexes {
+ if !indexes.is_empty() {
+ Some(indexes)
+ } else {
+ index_reader
+ .as_ref()
+ .load_from_disk_by_timestamp(timestamp, count)
+ .await?
+ }
+ } else {
+ index_reader
+ .as_ref()
+ .load_from_disk_by_timestamp(timestamp, count)
+ .await?
+ };
+
+ if indexes_to_read.is_none() {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ let indexes_to_read = indexes_to_read.unwrap();
+
+ let batch = messages_reader
+ .as_ref()
+ .load_messages_from_disk(indexes_to_read)
+ .await
+ .with_error_context(|error| {
+ format!("Failed to load messages from disk by timestamp:
{error}")
+ })?;
+
+ Ok(IggyMessagesBatchSet::from(batch))
}
pub async fn handle_full_segment(
@@ -844,21 +1226,70 @@ impl Streams {
streaming_partitions::helpers::commit_journal(),
);
- let (saved, batch_count) = self
- .with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
- streaming_partitions::helpers::persist_batch(
- shard_id,
- stream_id,
- topic_id,
- partition_id,
- batches,
- reason.to_string(),
- ),
- )
- .await?;
+ shard_trace!(
+ shard_id,
+ "Persisting messages on disk for stream ID: {}, topic ID: {},
partition ID: {} because {}...",
+ stream_id,
+ topic_id,
+ partition_id,
+ reason
+ );
+
+ let batch_count = batches.count();
+ let batch_size = batches.size();
+
+ // Extract storage before async operations
+ let (messages_writer, index_writer) =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ (
+ log.active_storage()
+ .messages_writer
+ .as_ref()
+ .expect("Messages writer not initialized")
+ .clone(),
+ log.active_storage()
+ .index_writer
+ .as_ref()
+ .expect("Index writer not initialized")
+ .clone(),
+ )
+ });
+
+ let saved = messages_writer
+ .as_ref()
+ .save_batch_set(batches)
+ .await
+ .with_error_context(|error| {
+ format!(
+ "Failed to save batch of {batch_count} messages \
+ ({batch_size} bytes) to stream ID: {stream_id}, topic ID:
{topic_id}, partition ID: {partition_id}. {error}",
+ )
+ })?;
+
+ // Extract unsaved indexes before async operation
+ let unsaved_indexes_slice =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.active_indexes().unwrap().unsaved_slice()
+ });
+
+ let indexes_len = unsaved_indexes_slice.len();
+ index_writer
+ .as_ref()
+ .save_indexes(unsaved_indexes_slice)
+ .await
+ .with_error_context(|error| {
+ format!("Failed to save index of {indexes_len} indexes to
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",)
+ })?;
+
+ shard_trace!(
+ shard_id,
+ "Persisted {} messages on disk for stream ID: {}, topic ID: {},
for partition with ID: {}, total bytes written: {}.",
+ batch_count,
+ stream_id,
+ topic_id,
+ partition_id,
+ saved
+ );
self.with_partition_by_id_mut(
stream_id,
@@ -880,57 +1311,35 @@ impl Streams {
topic_id: &Identifier,
partition_id: usize,
) -> Result<(), IggyError> {
- let has_storage =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- let storage = log.active_storage();
- storage.messages_writer.is_some() &&
storage.index_writer.is_some()
- });
+ let storage = self.with_partition_by_id(stream_id, topic_id,
partition_id, |(.., log)| {
+ log.active_storage().clone()
+ });
- if !has_storage {
+ if storage.messages_writer.is_none() || storage.index_writer.is_none()
{
return Ok(());
}
- self.with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
- async move |(.., log)| {
- let storage = log.active_storage();
-
- if let Some(ref messages_writer) = storage.messages_writer {
- if let Err(e) = messages_writer.fsync().await {
- tracing::error!(
- "Failed to fsync messages writer for partition {}:
{}",
- partition_id,
- e
- );
- return Err(e);
- }
- }
- Ok(())
- },
- )
- .await?;
+ if let Some(ref messages_writer) = storage.messages_writer {
+ if let Err(e) = messages_writer.fsync().await {
+ tracing::error!(
+ "Failed to fsync messages writer for partition {}: {}",
+ partition_id,
+ e
+ );
+ return Err(e);
+ }
+ }
- self.with_partition_by_id_async(
- stream_id,
- topic_id,
- partition_id,
- async move |(.., log)| {
- if let Some(ref index_writer) =
log.active_storage().index_writer {
- if let Err(e) = index_writer.fsync().await {
- tracing::error!(
- "Failed to fsync index writer for partition {}:
{}",
- partition_id,
- e
- );
- return Err(e);
- }
- }
- Ok(())
- },
- )
- .await?;
+ if let Some(ref index_writer) = storage.index_writer {
+ if let Err(e) = index_writer.fsync().await {
+ tracing::error!(
+ "Failed to fsync index writer for partition {}: {}",
+ partition_id,
+ e
+ );
+ return Err(e);
+ }
+ }
Ok(())
}
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 7508d675..100b4e1f 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -11,14 +11,10 @@ use crate::{
partitions::Partitions,
traits_ext::{
ComponentsById, DeleteCell, EntityComponentSystem,
EntityComponentSystemMutCell,
- Insert, InsertCell, InteriorMutability, IntoComponents,
+ InsertCell, InteriorMutability, IntoComponents,
},
},
streaming::{
- partitions::{
- journal::MemoryMessageJournal,
- log::{Log, SegmentedLog},
- },
stats::stats::TopicStats,
topics::{
consumer_group2::{ConsumerGroupRef, ConsumerGroupRefMut},
@@ -133,13 +129,6 @@ impl EntityComponentSystem<InteriorMutability> for Topics {
{
f(self.into())
}
-
- fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
- where
- F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
- {
- f(self.into())
- }
}
impl EntityComponentSystemMutCell for Topics {
@@ -206,15 +195,6 @@ impl Topics {
self.with_components_by_id(id, |components| f(components))
}
- pub fn with_topic_by_id_async<T>(
- &self,
- topic_id: &Identifier,
- f: impl AsyncFnOnce(ComponentsById<TopicRef>) -> T,
- ) -> impl Future<Output = T> {
- let id = self.get_index(topic_id);
- self.with_components_by_id_async(id, async |components|
f(components).await)
- }
-
pub fn with_topic_by_id_mut<T>(
&self,
topic_id: &Identifier,
@@ -232,14 +212,6 @@ impl Topics {
self.with_topic_by_id(topic_id, helpers::consumer_groups(f))
}
- pub fn with_consumer_groups_async<T>(
- &self,
- topic_id: &Identifier,
- f: impl AsyncFnOnce(&ConsumerGroups) -> T,
- ) -> impl Future<Output = T> {
- self.with_topic_by_id_async(topic_id,
helpers::consumer_groups_async(f))
- }
-
pub fn with_consumer_groups_mut<T>(
&self,
topic_id: &Identifier,
@@ -259,17 +231,6 @@ impl Topics {
})
}
- pub fn with_consumer_group_by_id_async<T>(
- &self,
- topic_id: &Identifier,
- group_id: &Identifier,
- f: impl AsyncFnOnce(ComponentsById<ConsumerGroupRef>) -> T,
- ) -> impl Future<Output = T> {
- self.with_consumer_groups_async(topic_id, async |container| {
- container.with_consumer_group_by_id_async(group_id, f).await
- })
- }
-
pub fn with_consumer_group_by_id_mut<T>(
&self,
topic_id: &Identifier,
@@ -285,14 +246,6 @@ impl Topics {
self.with_topic_by_id(topic_id, helpers::partitions(f))
}
- pub fn with_partitions_async<T>(
- &self,
- topic_id: &Identifier,
- f: impl AsyncFnOnce(&Partitions) -> T,
- ) -> impl Future<Output = T> {
- self.with_topic_by_id_async(topic_id, helpers::partitions_async(f))
- }
-
pub fn with_partitions_mut<T>(
&self,
topic_id: &Identifier,
diff --git a/core/server/src/slab/traits_ext.rs
b/core/server/src/slab/traits_ext.rs
index 949971fa..d69dfd85 100644
--- a/core/server/src/slab/traits_ext.rs
+++ b/core/server/src/slab/traits_ext.rs
@@ -188,23 +188,12 @@ where
where
F: for<'a> FnOnce(Self::EntityComponents<'a>) -> O;
- fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
- where
- F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O;
-
fn with_components_by_id<O, F>(&self, id: Self::Idx, f: F) -> O
where
F: for<'a> FnOnce(ComponentsById<'a, Self::EntityComponents<'a>>) -> O,
{
self.with_components(|components|
f(components.into_components_by_id(id)))
}
-
- fn with_components_by_id_async<O, F>(&self, id: Self::Idx, f: F) -> impl
Future<Output = O>
- where
- F: for<'a> AsyncFnOnce(ComponentsById<'a, Self::EntityComponents<'a>>)
-> O,
- {
- self.with_components_async(async |components|
f(components.into_components_by_id(id)).await)
- }
}
pub trait EntityComponentSystemMut: EntityComponentSystem<Borrow> {
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index 1062a6dc..312152c6 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -2,7 +2,7 @@ use error_set::ErrContext;
use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
use std::{
ops::{AsyncFnOnce, Index},
- sync::atomic::Ordering,
+ sync::{Arc, atomic::Ordering},
};
use sysinfo::Component;
@@ -273,212 +273,6 @@ pub fn create_message_deduplicator(config: &SystemConfig)
-> Option<MessageDedup
Some(MessageDeduplicator::new(max_entries, expiry))
}
-pub async fn get_messages_by_offset(
- storage: &Storage,
- journal: &MemoryMessageJournal,
- index: &Option<IggyIndexesMut>,
- offset: u64,
- end_offset: u64,
- count: u32,
- segment_start_offset: u64,
-) -> Result<IggyMessagesBatchSet, IggyError> {
- if count == 0 {
- return Ok(IggyMessagesBatchSet::default());
- }
-
- // Case 0: Accumulator is empty, so all messages have to be on disk
- if journal.is_empty() {
- return load_messages_from_disk_by_offset(
- storage,
- index,
- offset,
- count,
- segment_start_offset,
- )
- .await;
- }
-
- let journal_first_offset = journal.inner().base_offset;
- let journal_last_offset = journal.inner().current_offset;
-
- // Case 1: All messages are in accumulator buffer
- if offset >= journal_first_offset && end_offset <= journal_last_offset {
- return Ok(journal.get(|batches| batches.get_by_offset(offset, count)));
- }
-
- // Case 2: All messages are on disk
- if end_offset < journal_first_offset {
- return load_messages_from_disk_by_offset(
- storage,
- index,
- offset,
- count,
- segment_start_offset,
- )
- .await;
- }
-
- // Case 3: Messages span disk and accumulator buffer boundary
- // Calculate how many messages we need from disk
- let disk_count = if offset < journal_first_offset {
- ((journal_first_offset - offset) as u32).min(count)
- } else {
- 0
- };
- let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
- // Load messages from disk if needed
- if disk_count > 0 {
- let disk_messages = load_messages_from_disk_by_offset(storage, index,
offset, disk_count, segment_start_offset)
- .await
- .with_error_context(|error| {
- format!("Failed to load messages from disk, start offset:
{offset}, count: {disk_count}, error: {error}")
- })?;
-
- if !disk_messages.is_empty() {
- combined_batch_set.add_batch_set(disk_messages);
- }
- }
-
- // Calculate how many more messages we need from the accumulator
- let remaining_count = count - combined_batch_set.count();
-
- if remaining_count > 0 {
- let accumulator_start_offset = std::cmp::max(offset,
journal_first_offset);
- let accumulator_messages =
- journal.get(|batches|
batches.get_by_offset(accumulator_start_offset, remaining_count));
- if !accumulator_messages.is_empty() {
- combined_batch_set.add_batch_set(accumulator_messages);
- }
- }
-
- Ok(combined_batch_set)
-}
-
-async fn load_messages_from_disk_by_offset(
- storage: &Storage,
- index: &Option<IggyIndexesMut>,
- start_offset: u64,
- count: u32,
- segment_start_offset: u64,
-) -> Result<IggyMessagesBatchSet, IggyError> {
- // Convert start_offset to relative offset within the segment
- let relative_start_offset = (start_offset - segment_start_offset) as u32;
-
- // Load indexes first
- let indexes_to_read = if let Some(indexes) = index {
- if !indexes.is_empty() {
- indexes.slice_by_offset(relative_start_offset, count)
- } else {
- storage
- .index_reader
- .as_ref()
- .expect("Index reader not initialized")
- .load_from_disk_by_offset(relative_start_offset, count)
- .await?
- }
- } else {
- storage
- .index_reader
- .as_ref()
- .expect("Index reader not initialized")
- .load_from_disk_by_offset(relative_start_offset, count)
- .await?
- };
-
- if indexes_to_read.is_none() {
- return Ok(IggyMessagesBatchSet::empty());
- }
-
- let indexes_to_read = indexes_to_read.unwrap();
- let batch = storage
- .messages_reader
- .as_ref()
- .expect("Messages reader not initialized")
- .load_messages_from_disk(indexes_to_read)
- .await
- .with_error_context(|error| format!("Failed to load messages from
disk: {error}"))?;
-
- batch
- .validate_checksums_and_offsets(start_offset)
- .with_error_context(|error| {
- format!("Failed to validate messages read from disk! error:
{error}")
- })?;
-
- Ok(IggyMessagesBatchSet::from(batch))
-}
-
-pub fn get_messages_by_offset_range(
- offset: u64,
- count: u32,
- range: std::ops::Range<usize>,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) ->
Result<IggyMessagesBatchSet, IggyError> {
- async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
- let segments = log.segments().iter();
- let storages = log.storages().iter();
- let journal = log.journal();
- let indexes = log.indexes().iter();
-
- let mut remaining_count = count;
- let mut batches = IggyMessagesBatchSet::empty();
- let mut current_offset = offset;
-
- for (segment, storage, index) in segments
- .zip(storages)
- .zip(indexes)
- .map(|((a, b), c)| (a, b, c))
- .skip(range.start)
- .take(range.end - range.start)
- {
- let start_offset = if current_offset < segment.start_offset {
- segment.start_offset
- } else {
- current_offset
- };
-
- let mut end_offset = start_offset + (remaining_count - 1) as u64;
- if end_offset > segment.end_offset {
- end_offset = segment.end_offset;
- }
-
- // Calculate the actual count to request from this segment
- let count: u32 = ((end_offset - start_offset + 1) as
u32).min(remaining_count);
-
- let messages = get_messages_by_offset(
- storage,
- journal,
- index,
- start_offset,
- end_offset,
- count,
- segment.start_offset,
- )
- .await?;
-
- let messages_count = messages.count();
- if messages_count == 0 {
- current_offset = segment.end_offset + 1;
- continue;
- }
-
- remaining_count = remaining_count.saturating_sub(messages_count);
-
- if let Some(last_offset) = messages.last_offset() {
- current_offset = last_offset + 1;
- } else if messages_count > 0 {
- current_offset += messages_count as u64;
- }
-
- batches.add_batch_set(messages);
-
- if remaining_count == 0 {
- break;
- }
- }
- Ok(batches)
- }
-}
-
pub fn get_segment_range_by_offset(
offset: u64,
) -> impl FnOnce(ComponentsById<PartitionRef>) -> std::ops::Range<usize> {
@@ -556,105 +350,6 @@ pub async fn load_messages_from_disk_by_timestamp(
Ok(IggyMessagesBatchSet::from(batch))
}
-pub async fn get_messages_by_timestamp(
- storage: &Storage,
- journal: &MemoryMessageJournal,
- index: &Option<IggyIndexesMut>,
- timestamp: u64,
- count: u32,
-) -> Result<IggyMessagesBatchSet, IggyError> {
- if count == 0 {
- return Ok(IggyMessagesBatchSet::default());
- }
-
- // Case 0: Accumulator is empty, so all messages have to be on disk
- if journal.is_empty() {
- return load_messages_from_disk_by_timestamp(storage, index, timestamp,
count).await;
- }
-
- let journal_first_timestamp = journal.inner().first_timestamp;
- let journal_last_timestamp = journal.inner().end_timestamp;
-
- // Case 1: All messages are in accumulator buffer
- if timestamp > journal_last_timestamp {
- return Ok(IggyMessagesBatchSet::empty());
- }
-
- if timestamp >= journal_first_timestamp {
- return Ok(journal.get(|batches| batches.get_by_timestamp(timestamp,
count)));
- }
-
- // Case 2: All messages are on disk (timestamp is before journal's first
timestamp)
- let disk_messages =
- load_messages_from_disk_by_timestamp(storage, index, timestamp,
count).await?;
-
- if disk_messages.count() >= count {
- return Ok(disk_messages);
- }
-
- // Case 3: Messages span disk and accumulator buffer boundary
- let remaining_count = count - disk_messages.count();
- let journal_messages =
- journal.get(|batches| batches.get_by_timestamp(timestamp,
remaining_count));
-
- let mut combined_batch_set = disk_messages;
- if !journal_messages.is_empty() {
- combined_batch_set.add_batch_set(journal_messages);
- }
- return Ok(combined_batch_set);
-}
-
-pub fn get_messages_by_timestamp_range(
- timestamp: u64,
- count: u32,
- range: std::ops::Range<usize>,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) ->
Result<IggyMessagesBatchSet, IggyError> {
- async move |(.., log)| -> Result<IggyMessagesBatchSet, IggyError> {
- let segments = log.segments().iter();
- let storages = log.storages().iter();
- let journal = log.journal();
- let indexes = log.indexes().iter();
-
- let mut remaining_count = count;
- let mut batches = IggyMessagesBatchSet::empty();
-
- for (segment, storage, index) in segments
- .zip(storages)
- .zip(indexes)
- .map(|((a, b), c)| (a, b, c))
- .skip(range.start)
- .take(range.end - range.start)
- {
- if remaining_count == 0 {
- break;
- }
-
- // Skip segments that end before our timestamp
- if segment.end_timestamp < timestamp {
- continue;
- }
-
- let messages =
- get_messages_by_timestamp(storage, journal, index, timestamp,
remaining_count)
- .await?;
-
- let messages_count = messages.count();
- if messages_count == 0 {
- continue;
- }
-
- remaining_count = remaining_count.saturating_sub(messages_count);
- batches.add_batch_set(messages);
-
- if remaining_count == 0 {
- break;
- }
- }
-
- Ok(batches)
- }
-}
-
pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>)
-> u64 {
|(root, _, _, offset, ..)| {
if !root.should_increment_offset() {
@@ -665,21 +360,11 @@ pub fn calculate_current_offset() -> impl
FnOnce(ComponentsById<PartitionRef>) -
}
}
-pub fn deduplicate_messages(
- current_offset: u64,
- current_position: u32,
- batch: &mut IggyMessagesBatchMut,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) {
- async move |(.., deduplicator, _, _, _, log)| {
+pub fn get_segment_start_offset_and_deduplicator()
+-> impl FnOnce(ComponentsById<PartitionRef>) -> (u64,
Option<Arc<MessageDeduplicator>>) {
+ move |(.., deduplicator, _, _, _, log)| {
let segment = log.active_segment();
- batch
- .prepare_for_persistence(
- segment.start_offset,
- current_offset,
- current_position,
- deduplicator.as_ref(),
- )
- .await;
+ (segment.start_offset, deduplicator.clone())
}
}
diff --git a/core/server/src/streaming/partitions/partition2.rs
b/core/server/src/streaming/partitions/partition2.rs
index 6c85dfb0..e965a270 100644
--- a/core/server/src/streaming/partitions/partition2.rs
+++ b/core/server/src/streaming/partitions/partition2.rs
@@ -17,7 +17,10 @@ use crate::{
};
use iggy_common::{Identifier, IggyTimestamp};
use slab::Slab;
-use std::sync::{Arc, atomic::AtomicU64};
+use std::{
+ rc::Rc,
+ sync::{Arc, atomic::AtomicU64},
+};
#[derive(Debug, Clone)]
pub struct ConsumerOffsets(papaya::HashMap<usize,
consumer_offset::ConsumerOffset>);
@@ -87,7 +90,7 @@ impl std::ops::DerefMut for ConsumerGroupOffsets {
pub struct Partition {
root: PartitionRoot,
stats: Arc<PartitionStats>,
- message_deduplicator: Option<MessageDeduplicator>,
+ message_deduplicator: Option<Arc<MessageDeduplicator>>,
offset: Arc<AtomicU64>,
consumer_offset: Arc<ConsumerOffsets>,
consumer_group_offset: Arc<ConsumerGroupOffsets>,
@@ -106,6 +109,7 @@ impl Partition {
log: SegmentedLog<MemoryMessageJournal>,
) -> Self {
let root = PartitionRoot::new(created_at, should_increment_offset);
+ let message_deduplicator = message_deduplicator.map(Arc::new);
Self {
root,
stats,
@@ -120,7 +124,7 @@ impl Partition {
pub fn new_with_components(
root: PartitionRoot,
stats: Arc<PartitionStats>,
- message_deduplicator: Option<MessageDeduplicator>,
+ message_deduplicator: Option<Arc<MessageDeduplicator>>,
offset: Arc<AtomicU64>,
consumer_offset: Arc<ConsumerOffsets>,
consumer_group_offset: Arc<ConsumerGroupOffsets>,
@@ -172,7 +176,7 @@ impl IntoComponents for Partition {
type Components = (
PartitionRoot,
Arc<PartitionStats>,
- Option<MessageDeduplicator>,
+ Option<Arc<MessageDeduplicator>>,
Arc<AtomicU64>,
Arc<ConsumerOffsets>,
Arc<ConsumerGroupOffsets>,
@@ -233,7 +237,7 @@ impl PartitionRoot {
pub struct PartitionRef<'a> {
root: &'a Slab<PartitionRoot>,
stats: &'a Slab<Arc<PartitionStats>>,
- message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
+ message_deduplicator: &'a Slab<Option<Arc<MessageDeduplicator>>>,
offset: &'a Slab<Arc<AtomicU64>>,
consumer_offset: &'a Slab<Arc<ConsumerOffsets>>,
consumer_group_offset: &'a Slab<Arc<ConsumerGroupOffsets>>,
@@ -244,7 +248,7 @@ impl<'a> PartitionRef<'a> {
pub fn new(
root: &'a Slab<PartitionRoot>,
stats: &'a Slab<Arc<PartitionStats>>,
- message_deduplicator: &'a Slab<Option<MessageDeduplicator>>,
+ message_deduplicator: &'a Slab<Option<Arc<MessageDeduplicator>>>,
offset: &'a Slab<Arc<AtomicU64>>,
consumer_offset: &'a Slab<Arc<ConsumerOffsets>>,
consumer_group_offset: &'a Slab<Arc<ConsumerGroupOffsets>>,
@@ -266,7 +270,7 @@ impl<'a> IntoComponents for PartitionRef<'a> {
type Components = (
&'a Slab<PartitionRoot>,
&'a Slab<Arc<PartitionStats>>,
- &'a Slab<Option<MessageDeduplicator>>,
+ &'a Slab<Option<Arc<MessageDeduplicator>>>,
&'a Slab<Arc<AtomicU64>>,
&'a Slab<Arc<ConsumerOffsets>>,
&'a Slab<Arc<ConsumerGroupOffsets>>,
@@ -291,7 +295,7 @@ impl<'a> IntoComponentsById for PartitionRef<'a> {
type Output = (
&'a PartitionRoot,
&'a Arc<PartitionStats>,
- &'a Option<MessageDeduplicator>,
+ &'a Option<Arc<MessageDeduplicator>>,
&'a Arc<AtomicU64>,
&'a Arc<ConsumerOffsets>,
&'a Arc<ConsumerGroupOffsets>,
@@ -314,7 +318,7 @@ impl<'a> IntoComponentsById for PartitionRef<'a> {
pub struct PartitionRefMut<'a> {
root: &'a mut Slab<PartitionRoot>,
stats: &'a mut Slab<Arc<PartitionStats>>,
- message_deduplicator: &'a mut Slab<Option<MessageDeduplicator>>,
+ message_deduplicator: &'a mut Slab<Option<Arc<MessageDeduplicator>>>,
offset: &'a mut Slab<Arc<AtomicU64>>,
consumer_offset: &'a mut Slab<Arc<ConsumerOffsets>>,
consumer_group_offset: &'a mut Slab<Arc<ConsumerGroupOffsets>>,
@@ -325,7 +329,7 @@ impl<'a> PartitionRefMut<'a> {
pub fn new(
root: &'a mut Slab<PartitionRoot>,
stats: &'a mut Slab<Arc<PartitionStats>>,
- message_deduplicator: &'a mut Slab<Option<MessageDeduplicator>>,
+ message_deduplicator: &'a mut Slab<Option<Arc<MessageDeduplicator>>>,
offset: &'a mut Slab<Arc<AtomicU64>>,
consumer_offset: &'a mut Slab<Arc<ConsumerOffsets>>,
consumer_group_offset: &'a mut Slab<Arc<ConsumerGroupOffsets>>,
@@ -347,7 +351,7 @@ impl<'a> IntoComponents for PartitionRefMut<'a> {
type Components = (
&'a mut Slab<PartitionRoot>,
&'a mut Slab<Arc<PartitionStats>>,
- &'a mut Slab<Option<MessageDeduplicator>>,
+ &'a mut Slab<Option<Arc<MessageDeduplicator>>>,
&'a mut Slab<Arc<AtomicU64>>,
&'a mut Slab<Arc<ConsumerOffsets>>,
&'a mut Slab<Arc<ConsumerGroupOffsets>>,
@@ -372,7 +376,7 @@ impl<'a> IntoComponentsById for PartitionRefMut<'a> {
type Output = (
&'a mut PartitionRoot,
&'a mut Arc<PartitionStats>,
- &'a mut Option<MessageDeduplicator>,
+ &'a mut Option<Arc<MessageDeduplicator>>,
&'a mut Arc<AtomicU64>,
&'a mut Arc<ConsumerOffsets>,
&'a mut Arc<ConsumerGroupOffsets>,
diff --git a/core/server/src/streaming/segments/storage.rs
b/core/server/src/streaming/segments/storage.rs
index 84fcd783..53313282 100644
--- a/core/server/src/streaming/segments/storage.rs
+++ b/core/server/src/streaming/segments/storage.rs
@@ -8,12 +8,14 @@ use crate::streaming::segments::{
messages::{MessagesReader, MessagesWriter},
};
-#[derive(Debug)]
+unsafe impl Send for Storage {}
+
+#[derive(Debug, Clone)]
pub struct Storage {
- pub messages_writer: Option<MessagesWriter>,
- pub messages_reader: Option<MessagesReader>,
- pub index_writer: Option<IndexWriter>,
- pub index_reader: Option<IndexReader>,
+ pub messages_writer: Option<Rc<MessagesWriter>>,
+ pub messages_reader: Option<Rc<MessagesReader>>,
+ pub index_writer: Option<Rc<IndexWriter>>,
+ pub index_reader: Option<Rc<IndexReader>>,
}
impl Storage {
@@ -28,19 +30,21 @@ impl Storage {
) -> Result<Self, IggyError> {
let size = Rc::new(AtomicU64::new(messages_size));
let indexes_size = Rc::new(AtomicU64::new(indexes_size));
- let messages_writer =
- MessagesWriter::new(messages_path, size.clone(), log_fsync,
file_exists).await?;
+ let messages_writer = Rc::new(
+ MessagesWriter::new(messages_path, size.clone(), log_fsync,
file_exists).await?,
+ );
- let index_writer =
- IndexWriter::new(index_path, indexes_size.clone(), index_fsync,
file_exists).await?;
+ let index_writer = Rc::new(
+ IndexWriter::new(index_path, indexes_size.clone(), index_fsync,
file_exists).await?,
+ );
if file_exists {
messages_writer.fsync().await?;
index_writer.fsync().await?;
}
- let messages_reader = MessagesReader::new(messages_path, size).await?;
- let index_reader = IndexReader::new(index_path, indexes_size).await?;
+ let messages_reader = Rc::new(MessagesReader::new(messages_path,
size).await?);
+ let index_reader = Rc::new(IndexReader::new(index_path,
indexes_size).await?);
Ok(Self {
messages_writer: Some(messages_writer),
messages_reader: Some(messages_reader),
@@ -49,7 +53,7 @@ impl Storage {
})
}
- pub fn shutdown(&mut self) -> (Option<MessagesWriter>,
Option<IndexWriter>) {
+ pub fn shutdown(&mut self) -> (Option<Rc<MessagesWriter>>,
Option<Rc<IndexWriter>>) {
let messages_writer = self.messages_writer.take();
let index_writer = self.index_writer.take();
(messages_writer, index_writer)
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 8e4895da..29e5c4d2 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -29,6 +29,7 @@ use iggy_common::{
};
use lending_iterator::prelude::*;
use std::ops::{Deref, Index};
+use std::sync::Arc;
use tracing::{error, warn};
/// A container for mutable messages that are being prepared for persistence.
@@ -144,7 +145,7 @@ impl IggyMessagesBatchMut {
start_offset: u64,
base_offset: u64,
current_position: u32,
- deduplicator: Option<&MessageDeduplicator>,
+ deduplicator: Option<&Arc<MessageDeduplicator>>,
) {
let messages_count = self.count();
if messages_count == 0 {
diff --git a/core/server/src/streaming/topics/storage2.rs
b/core/server/src/streaming/topics/storage2.rs
index 35e854a7..86d783b1 100644
--- a/core/server/src/streaming/topics/storage2.rs
+++ b/core/server/src/streaming/topics/storage2.rs
@@ -94,191 +94,3 @@ pub async fn delete_topic_from_disk(
);
Ok((messages_count, size_bytes, segments_count))
}
-
-// Old implementation kept for reference
-/*
-async fn load(&self, topic: &mut Topic, mut state: TopicState) -> Result<(),
IggyError> {
- info!("Loading topic {} from disk...", topic);
- if !Path::new(&topic.path).exists() {
- return Err(IggyError::TopicIdNotFound(topic.topic_id,
topic.stream_id));
- }
-
- let message_expiry = Topic::get_message_expiry(state.message_expiry,
&topic.config);
- let max_topic_size = Topic::get_max_topic_size(state.max_topic_size,
&topic.config)?;
- topic.created_at = state.created_at;
- topic.message_expiry = message_expiry;
- topic.max_topic_size = max_topic_size;
- topic.compression_algorithm = state.compression_algorithm;
- topic.replication_factor = state.replication_factor.unwrap_or(1);
-
- let mut dir_entries = fs::read_dir(&topic.partitions_path).await
- .with_context(|| format!("Failed to read partition with ID: {} for
stream with ID: {} for topic with ID: {} and path: {}",
- topic.topic_id, topic.stream_id,
topic.topic_id, &topic.partitions_path))
- .map_err(|_| IggyError::CannotReadPartitions)?;
-
- let mut unloaded_partitions = Vec::new();
- while let Some(dir_entry) =
dir_entries.next_entry().await.unwrap_or(None) {
- let metadata = dir_entry.metadata().await;
- if metadata.is_err() || metadata.unwrap().is_file() {
- continue;
- }
-
- let name = dir_entry.file_name().into_string().unwrap();
- let partition_id = name.parse::<u32>();
- if partition_id.is_err() {
- error!("Invalid partition ID file with name: '{}'.", name);
- continue;
- }
-
- let partition_id = partition_id.unwrap();
- let partition_state = state.partitions.get(&partition_id);
- if partition_state.is_none() {
- let stream_id = topic.stream_id;
- let topic_id = topic.topic_id;
- error!(
- "Partition with ID: '{partition_id}' for stream with ID:
'{stream_id}' and topic with ID: '{topic_id}' was not found in state, but
exists on disk and will be removed."
- );
- if let Err(error) =
fs::remove_dir_all(&dir_entry.path()).await {
- error!("Cannot remove partition directory: {error}");
- } else {
- warn!(
- "Partition with ID: '{partition_id}' for stream with
ID: '{stream_id}' and topic with ID: '{topic_id}' was removed."
- );
- }
- continue;
- }
-
- let partition_state = partition_state.unwrap();
- let partition = Partition::create(
- topic.stream_id,
- topic.topic_id,
- partition_id,
- false,
- topic.config.clone(),
- topic.storage.clone(),
- message_expiry,
- topic.messages_count_of_parent_stream.clone(),
- topic.messages_count.clone(),
- topic.size_of_parent_stream.clone(),
- topic.size_bytes.clone(),
- topic.segments_count_of_parent_stream.clone(),
- partition_state.created_at,
- )
- .await;
- unloaded_partitions.push(partition);
- }
-
- let state_partition_ids =
state.partitions.keys().copied().collect::<AHashSet<u32>>();
- let unloaded_partition_ids = unloaded_partitions
- .iter()
- .map(|partition| partition.partition_id)
- .collect::<AHashSet<u32>>();
- let missing_ids = state_partition_ids
- .difference(&unloaded_partition_ids)
- .copied()
- .collect::<AHashSet<u32>>();
- if missing_ids.is_empty() {
- info!(
- "All partitions for topic with ID: '{}' for stream with ID:
'{}' found on disk were found in state.",
- topic.topic_id, topic.stream_id
- );
- } else {
- warn!(
- "Partitions with IDs: '{missing_ids:?}' for topic with ID:
'{topic_id}' for stream with ID: '{stream_id}' were not found on disk.",
- topic_id = topic.topic_id,
- stream_id = topic.stream_id
- );
- if topic.config.recovery.recreate_missing_state {
- info!(
- "Recreating missing state in recovery config is enabled,
missing partitions will be created for topic with ID: '{}' for stream with ID:
'{}'.",
- topic.topic_id, topic.stream_id
- );
-
- for partition_id in missing_ids {
- let partition_state =
state.partitions.get(&partition_id).unwrap();
- let mut partition = Partition::create(
- topic.stream_id,
- topic.topic_id,
- partition_id,
- true,
- topic.config.clone(),
- topic.storage.clone(),
- message_expiry,
- topic.messages_count_of_parent_stream.clone(),
- topic.messages_count.clone(),
- topic.size_of_parent_stream.clone(),
- topic.size_bytes.clone(),
- topic.segments_count_of_parent_stream.clone(),
- partition_state.created_at,
- )
- .await;
- partition.persist().await.with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to persist
partition: {partition}"
- )
- })?;
- partition.segments.clear();
- unloaded_partitions.push(partition);
- info!(
- "Created missing partition with ID: '{partition_id}',
for topic with ID: '{}' for stream with ID: '{}'.",
- topic.topic_id, topic.stream_id
- );
- }
- } else {
- warn!(
- "Recreating missing state in recovery config is disabled,
missing partitions will not be created for topic with ID: '{}' for stream with
ID: '{}'.",
- topic.topic_id, topic.stream_id
- );
- }
- }
-
- let stream_id = topic.stream_id;
- let topic_id = topic.topic_id;
- let loaded_partitions = Arc::new(Mutex::new(Vec::new()));
- let mut load_partitions = Vec::new();
- for mut partition in unloaded_partitions {
- let loaded_partitions = loaded_partitions.clone();
- let partition_state =
state.partitions.remove(&partition.partition_id).unwrap();
- let load_partition = tokio::spawn(async move {
- match partition.load(partition_state).await {
- Ok(_) => {
- loaded_partitions.lock().await.push(partition);
- }
- Err(error) => {
- error!(
- "Failed to load partition with ID: {} for stream
with ID: {stream_id} and topic with ID: {topic_id}. Error: {error}",
- partition.partition_id
- );
- }
- }
- });
- load_partitions.push(load_partition);
- }
-
- join_all(load_partitions).await;
- for partition in loaded_partitions.lock().await.drain(..) {
- topic
- .partitions
- .insert(partition.partition_id, IggySharedMut::new(partition));
- }
-
- for consumer_group in state.consumer_groups.into_values() {
- let consumer_group = ConsumerGroup::new(
- topic.topic_id,
- consumer_group.id,
- &consumer_group.name,
- topic.get_partitions_count(),
- );
- topic
- .consumer_groups_ids
- .insert(consumer_group.name.to_owned(),
consumer_group.group_id);
- topic
- .consumer_groups
- .insert(consumer_group.group_id, RwLock::new(consumer_group));
- }
-
- info!("Loaded topic {topic}");
-
- Ok(())
- }
- */