This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new f7e99178 fixes
f7e99178 is described below
commit f7e991780c045df9388b72241ac33332098a90e1
Author: numinex <[email protected]>
AuthorDate: Wed Jun 25 14:32:26 2025 +0200
fixes
---
core/common/src/locking/mod.rs | 10 +-
core/common/src/locking/tokio_lock.rs | 2 -
core/server/src/shard/system/mod.rs | 2 +-
.../src/streaming/partitions/consumer_offsets.rs | 2 +-
core/server/src/streaming/storage.rs | 2 +-
core/server/src/streaming/streams/topics.rs | 3 +-
core/server/src/streaming/topics/consumer_group.rs | 31 +++--
.../server/src/streaming/topics/consumer_groups.rs | 136 +++++++++++++--------
.../src/streaming/topics/consumer_offsets.rs | 4 -
core/server/src/streaming/topics/messages.rs | 5 +-
core/server/src/streaming/topics/partitions.rs | 4 +-
core/server/src/streaming/topics/storage.rs | 9 +-
core/server/src/streaming/topics/topic.rs | 25 ++--
13 files changed, 135 insertions(+), 100 deletions(-)
diff --git a/core/common/src/locking/mod.rs b/core/common/src/locking/mod.rs
index 42d72ebb..c2e542c7 100644
--- a/core/common/src/locking/mod.rs
+++ b/core/common/src/locking/mod.rs
@@ -29,19 +29,19 @@ mod fast_async_lock;
#[cfg(feature = "tokio_lock")]
#[cfg(not(any(feature = "fast_async_lock")))]
-pub type IggySharedMut<T> = tokio_lock::IggyTokioRwLock<T>;
+pub type IggyRwLock<T> = tokio_lock::IggyTokioRwLock<T>;
//this can be used in the future to provide different locking mechanisms
#[cfg(feature = "fast_async_lock")]
-pub type IggySharedMut<T> = fast_async_lock::IggyFastAsyncRwLock<T>;
+pub type IggyRwLock<T> = fast_async_lock::IggyFastAsyncRwLock<T>;
#[allow(async_fn_in_trait)]
-pub trait IggySharedMutFn<T>: Send + Sync {
- type ReadGuard<'a>: Deref<Target = T> + Send
+pub trait IggySharedMutFn<T> {
+ type ReadGuard<'a>: Deref<Target = T>
where
T: 'a,
Self: 'a;
- type WriteGuard<'a>: DerefMut<Target = T> + Send
+ type WriteGuard<'a>: DerefMut<Target = T>
where
T: 'a,
Self: 'a;
diff --git a/core/common/src/locking/tokio_lock.rs
b/core/common/src/locking/tokio_lock.rs
index 2e26b367..ef5baaad 100644
--- a/core/common/src/locking/tokio_lock.rs
+++ b/core/common/src/locking/tokio_lock.rs
@@ -25,8 +25,6 @@ use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard,
RwLockWriteGuard};
pub struct IggyTokioRwLock<T>(Arc<TokioRwLock<T>>);
impl<T> IggySharedMutFn<T> for IggyTokioRwLock<T>
-where
- T: Send + Sync,
{
type ReadGuard<'a>
= RwLockReadGuard<'a, T>
diff --git a/core/server/src/shard/system/mod.rs
b/core/server/src/shard/system/mod.rs
index 1419abb9..7a00fcf8 100644
--- a/core/server/src/shard/system/mod.rs
+++ b/core/server/src/shard/system/mod.rs
@@ -32,4 +32,4 @@ pub mod system;
pub mod topics;
pub mod users;
-pub const COMPONENT: &str = "SYSTEM";
+pub const COMPONENT: &str = "SHARD_SYSTEM";
diff --git a/core/server/src/streaming/partitions/consumer_offsets.rs
b/core/server/src/streaming/partitions/consumer_offsets.rs
index a9f82df7..cf2bed12 100644
--- a/core/server/src/streaming/partitions/consumer_offsets.rs
+++ b/core/server/src/streaming/partitions/consumer_offsets.rs
@@ -26,7 +26,7 @@ use iggy_common::IggyError;
use tracing::trace;
impl Partition {
- pub async fn get_consumer_offset(
+ pub fn get_consumer_offset(
&self,
consumer: PollingConsumer,
) -> Result<Option<u64>, IggyError> {
diff --git a/core/server/src/streaming/storage.rs
b/core/server/src/streaming/storage.rs
index 51e2565e..978b97aa 100644
--- a/core/server/src/streaming/storage.rs
+++ b/core/server/src/streaming/storage.rs
@@ -154,7 +154,7 @@ pub struct SystemStorage {
impl SystemStorage {
pub fn new(config: Rc<SystemConfig>, persister: Arc<PersisterKind>) ->
Self {
Self {
- info:
Arc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
+ info:
Rc::new(SystemInfoStorageKind::File(FileSystemInfoStorage::new(
config.get_state_info_path(),
persister.clone(),
))),
diff --git a/core/server/src/streaming/streams/topics.rs
b/core/server/src/streaming/streams/topics.rs
index b975ee83..ed350bee 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -146,7 +146,8 @@ impl Stream {
topic.name = name.to_owned();
topic.message_expiry = message_expiry;
topic.compression_algorithm = compression_algorithm;
- for partition in topic.partitions.borrow_mut().values_mut() {
+ for partition in topic.partitions.values_mut() {
+ let mut partition = partition.write().await;
partition.message_expiry = message_expiry;
for segment in partition.segments.iter_mut() {
segment.update_message_expiry(message_expiry);
diff --git a/core/server/src/streaming/topics/consumer_group.rs
b/core/server/src/streaming/topics/consumer_group.rs
index cc28ef6f..f000ce79 100644
--- a/core/server/src/streaming/topics/consumer_group.rs
+++ b/core/server/src/streaming/topics/consumer_group.rs
@@ -52,12 +52,12 @@ impl ConsumerGroup {
self.members.values().cloned().collect()
}
- pub async fn reassign_partitions(&mut self, partitions_count: u32) {
+ pub fn reassign_partitions(&mut self, partitions_count: u32) {
self.partitions_count = partitions_count;
- self.assign_partitions().await;
+ self.assign_partitions();
}
- pub async fn calculate_partition_id(
+ pub fn calculate_partition_id(
&mut self,
member_id: u32,
) -> Result<Option<u32>, IggyError> {
@@ -72,7 +72,7 @@ impl ConsumerGroup {
))
}
- pub async fn get_current_partition_id(&self, member_id: u32) ->
Result<Option<u32>, IggyError> {
+ pub fn get_current_partition_id(&mut self, member_id: u32) ->
Result<Option<u32>, IggyError> {
let member = self.members.get(&member_id);
if let Some(member) = member {
return Ok(member.current_partition_id);
@@ -84,7 +84,7 @@ impl ConsumerGroup {
))
}
- pub async fn add_member(&mut self, member_id: u32) {
+ pub fn add_member(&mut self, member_id: u32) {
self.members.insert(
member_id,
ConsumerGroupMember {
@@ -98,20 +98,20 @@ impl ConsumerGroup {
"Added member with ID: {} to consumer group: {} for topic with ID:
{}",
member_id, self.group_id, self.topic_id
);
- self.assign_partitions().await;
+ self.assign_partitions();
}
- pub async fn delete_member(&mut self, member_id: u32) {
+ pub fn delete_member(&mut self, member_id: u32) {
if self.members.remove(&member_id).is_some() {
trace!(
"Deleted member with ID: {} in consumer group: {} for topic
with ID: {}",
member_id, self.group_id, self.topic_id
);
- self.assign_partitions().await;
+ self.assign_partitions();
}
}
- async fn assign_partitions(&mut self) {
+ fn assign_partitions(&mut self) {
let mut members = self.members.values_mut().collect::<Vec<_>>();
if members.is_empty() {
return;
@@ -189,11 +189,10 @@ mod tests {
members: AHashMap::new(),
};
- consumer_group.add_member(member_id).await;
+ consumer_group.add_member(member_id);
for i in 0..1000 {
let partition_id = consumer_group
.calculate_partition_id(member_id)
- .await
.unwrap()
.expect("Partition ID not found");
assert_eq!(partition_id, (i % consumer_group.partitions_count) +
1);
@@ -211,7 +210,7 @@ mod tests {
members: AHashMap::new(),
};
- consumer_group.add_member(member_id).await;
+ consumer_group.add_member(member_id);
let member = consumer_group.members.get(&member_id).unwrap();
assert_eq!(
member.partitions.len() as u32,
@@ -235,8 +234,8 @@ mod tests {
members: AHashMap::new(),
};
- consumer_group.add_member(member1_id).await;
- consumer_group.add_member(member2_id).await;
+ consumer_group.add_member(member1_id);
+ consumer_group.add_member(member2_id);
let member1 = consumer_group.members.get(&member1_id).unwrap();
let member2 = consumer_group.members.get(&member2_id).unwrap();
assert_eq!(
@@ -270,8 +269,8 @@ mod tests {
members: AHashMap::new(),
};
- consumer_group.add_member(member1_id).await;
- consumer_group.add_member(member2_id).await;
+ consumer_group.add_member(member1_id);
+ consumer_group.add_member(member2_id);
let member1 = consumer_group.members.get(&member1_id).unwrap();
let member2 = consumer_group.members.get(&member2_id).unwrap();
if member1.partitions.len() == 1 {
diff --git a/core/server/src/streaming/topics/consumer_groups.rs
b/core/server/src/streaming/topics/consumer_groups.rs
index 1f50a1bb..c1371096 100644
--- a/core/server/src/streaming/topics/consumer_groups.rs
+++ b/core/server/src/streaming/topics/consumer_groups.rs
@@ -16,36 +16,39 @@
* under the License.
*/
-use crate::streaming::topics::COMPONENT;
+use crate::binary::handlers::topics::get_topic_handler;
+use crate::streaming::topics::{consumer_group, COMPONENT};
use crate::streaming::topics::consumer_group::ConsumerGroup;
use crate::streaming::topics::topic::Topic;
use error_set::ErrContext;
+use iggy_common::locking::IggySharedMutFn;
use iggy_common::IggyError;
use iggy_common::{IdKind, Identifier};
+use std::cell::{Ref, RefMut};
use std::sync::atomic::Ordering;
use tracing::info;
impl Topic {
pub fn reassign_consumer_groups(&mut self) {
- if self.consumer_groups.is_empty() {
+ if self.consumer_groups.borrow().is_empty() {
return;
}
- let partitions_count = self.partitions.borrow().len() as u32;
+ let partitions_count = self.partitions.len() as u32;
info!(
"Reassigning consumer groups for topic with ID: {} for stream with
ID with {}, partitions count: {}",
self.topic_id, self.stream_id, partitions_count
);
- for (_, consumer_group) in self.consumer_groups.iter_mut() {
+ for (_, consumer_group) in
self.consumer_groups.borrow_mut().iter_mut() {
consumer_group.reassign_partitions(partitions_count);
}
}
- pub fn get_consumer_groups(&self) -> Vec<&ConsumerGroup> {
- self.consumer_groups.values().collect()
+ pub fn get_consumer_groups(&self) -> Vec<ConsumerGroup> {
+ self.consumer_groups.borrow().values().cloned().collect()
}
- pub fn get_consumer_group(&self, identifier: &Identifier) ->
Result<&ConsumerGroup, IggyError> {
+ pub fn get_consumer_group(&self, identifier: &Identifier) ->
Result<Ref<'_, ConsumerGroup>, IggyError> {
match identifier.kind {
IdKind::Numeric =>
self.get_consumer_group_by_id(identifier.get_u32_value().unwrap()),
IdKind::String =>
self.get_consumer_group_by_name(&identifier.get_cow_str_value()?),
@@ -55,22 +58,42 @@ impl Topic {
pub fn try_get_consumer_group(
&self,
identifier: &Identifier,
- ) -> Result<Option<&ConsumerGroup>, IggyError> {
+ ) -> Result<Option<Ref<'_, ConsumerGroup>>, IggyError> {
match identifier.kind {
- IdKind::Numeric =>
Ok(self.consumer_groups.get(&identifier.get_u32_value()?)),
+ IdKind::Numeric =>
Ok(self.try_get_consumer_group_by_id(&identifier.get_u32_value()?)),
IdKind::String => {
Ok(self.try_get_consumer_group_by_name(&identifier.get_cow_str_value()?))
}
}
}
+ fn try_get_consumer_group_by_id(&self, id: &u32) -> Option<Ref<'_,
ConsumerGroup>> {
+ let consumer_groups = self.consumer_groups.borrow();
+ let exists = consumer_groups.contains_key(id);
+ if !exists {
+ return None;
+ }
+
+ Some(Ref::map(consumer_groups, |cg| {
+ let consumer_group = cg.get(id);
+ consumer_group.unwrap()
+ }))
+ }
+
+ fn try_get_consumer_group_by_name(&self, name: &str) -> Option<Ref<'_,
ConsumerGroup>> {
+ let consumer_groups = self.consumer_groups.borrow();
+ let exists = self.consumer_groups_ids.contains_key(name);
+ let id = self.consumer_groups_ids.get(name).unwrap();
+ if !exists {
+ return None;
+ }
- fn try_get_consumer_group_by_name(&self, name: &str) ->
Option<&ConsumerGroup> {
- self.consumer_groups_ids
- .get(name)
- .and_then(|id| self.consumer_groups.get(id))
+ Some(Ref::map(consumer_groups, |cg| {
+ let consumer_group = cg.get(id);
+ consumer_group.unwrap()
+ }))
}
- pub fn get_consumer_group_by_name(&self, name: &str) ->
Result<&ConsumerGroup, IggyError> {
+ pub fn get_consumer_group_by_name(&self, name: &str) -> Result<Ref<'_,
ConsumerGroup>, IggyError> {
let group_id = self.consumer_groups_ids.get(name);
if group_id.is_none() {
return Err(IggyError::ConsumerGroupNameNotFound(
@@ -82,19 +105,24 @@ impl Topic {
self.get_consumer_group_by_id(*group_id.unwrap())
}
- pub fn get_consumer_group_by_id(&self, id: u32) -> Result<&ConsumerGroup,
IggyError> {
- let consumer_group = self.consumer_groups.get(&id);
- if consumer_group.is_none() {
+ pub fn get_consumer_group_by_id(&self, id: u32) -> Result<Ref<'_,
ConsumerGroup>, IggyError> {
+ let consumer_groups = self.consumer_groups.borrow();
+ let exists = consumer_groups.contains_key(&id);
+ if !exists {
return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
}
+ let consumer_group = Ref::map(consumer_groups, |cg| {
+ let consumer_group = cg.get(&id);
+ consumer_group.unwrap()
+ });
- Ok(consumer_group.unwrap())
+ Ok(consumer_group)
}
pub fn get_consumer_group_mut(
- &mut self,
+ &self,
identifier: &Identifier,
- ) -> Result<&mut ConsumerGroup, IggyError> {
+ ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> {
match identifier.kind {
IdKind::Numeric => {
self.get_consumer_group_by_id_mut(identifier.get_u32_value().unwrap())
@@ -104,21 +132,25 @@ impl Topic {
}
pub fn get_consumer_group_by_id_mut(
- &mut self,
+ &self,
id: u32,
- ) -> Result<&mut ConsumerGroup, IggyError> {
- let consumer_group = self.consumer_groups.get_mut(&id);
- if consumer_group.is_none() {
+ ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> {
+ let consumer_groups = self.consumer_groups.borrow_mut();
+ let exists = consumer_groups.contains_key(&id);
+ if !exists {
return Err(IggyError::ConsumerGroupIdNotFound(id, self.topic_id));
}
-
- Ok(consumer_group.unwrap())
+ let consumer_group = RefMut::map(consumer_groups, |cg| {
+ let consumer_group = cg.get_mut(&id);
+ consumer_group.unwrap()
+ });
+ Ok(consumer_group)
}
pub fn get_consumer_group_by_name_mut(
- &mut self,
+ &self,
name: &str,
- ) -> Result<&mut ConsumerGroup, IggyError> {
+ ) -> Result<RefMut<'_, ConsumerGroup>, IggyError> {
let group_id = self.consumer_groups_ids.get(name).copied();
if group_id.is_none() {
return Err(IggyError::ConsumerGroupNameNotFound(
@@ -148,7 +180,7 @@ impl Topic {
.current_consumer_group_id
.fetch_add(1, Ordering::SeqCst);
loop {
- if self.consumer_groups.contains_key(&id) {
+ if self.consumer_groups.borrow().contains_key(&id) {
if id == u32::MAX {
return Err(IggyError::ConsumerGroupIdAlreadyExists(id,
self.topic_id));
}
@@ -163,7 +195,7 @@ impl Topic {
id = group_id.unwrap();
}
- if self.consumer_groups.contains_key(&id) {
+ if self.consumer_groups.borrow().contains_key(&id) {
return Err(IggyError::ConsumerGroupIdAlreadyExists(id,
self.topic_id));
}
@@ -171,11 +203,11 @@ impl Topic {
self.topic_id,
id,
name,
- self.partitions.borrow().len() as u32,
+ self.partitions.len() as u32,
);
self.consumer_groups_ids.insert(name.to_owned(), id);
let cloned_group = consumer_group.clone();
- self.consumer_groups.insert(id, consumer_group);
+ self.consumer_groups.borrow_mut().insert(id, consumer_group);
info!(
"Created consumer group with ID: {} for topic with ID: {} and
stream with ID: {}.",
id, self.topic_id, self.stream_id
@@ -195,11 +227,14 @@ impl Topic {
group_id = consumer_group.group_id;
}
- let consumer_group = self.consumer_groups.remove(&group_id);
+ let mut consumer_groups = self.consumer_groups.borrow_mut();
+ let consumer_group = consumer_groups.remove(&group_id);
if consumer_group.is_none() {
return Err(IggyError::ConsumerGroupIdNotFound(group_id,
self.topic_id));
}
let consumer_group = consumer_group.unwrap();
+ let consumer_group = consumer_group.clone();
+ drop(consumer_groups);
{
self.consumer_groups_ids.remove(&consumer_group.name);
let current_group_id =
self.current_consumer_group_id.load(Ordering::SeqCst);
@@ -208,7 +243,8 @@ impl Topic {
.store(group_id, Ordering::SeqCst);
}
- for (_, partition) in self.partitions.borrow().iter() {
+ for (_, partition) in self.partitions.iter() {
+ let partition = partition.read().await;
if let Some((_, offset)) =
partition.consumer_group_offsets.remove(&group_id) {
self.storage
.partition
@@ -231,13 +267,15 @@ impl Topic {
group_id: &Identifier,
member_id: u32,
) -> Result<(), IggyError> {
- let consumer_group =
self.get_consumer_group_mut(group_id).with_error_context(|error| {
+ let topic_id = self.topic_id;
+ let stream_id = self.stream_id;
+ let mut consumer_group =
self.get_consumer_group_mut(group_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get consumer
group with id: {group_id}")
})?;
consumer_group.add_member(member_id);
info!(
"Member with ID: {} has joined consumer group with ID: {} for
topic with ID: {} and stream with ID: {}.",
- member_id, group_id, self.topic_id, self.stream_id
+ member_id, group_id, topic_id, stream_id
);
Ok(())
}
@@ -247,13 +285,15 @@ impl Topic {
group_id: &Identifier,
member_id: u32,
) -> Result<(), IggyError> {
- let consumer_group =
self.get_consumer_group_mut(group_id).with_error_context(|error| {
+ let topic_id = self.topic_id;
+ let stream_id = self.stream_id;
+ let mut consumer_group =
self.get_consumer_group_mut(group_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get consumer
group with id: {group_id}")
})?;
consumer_group.delete_member(member_id);
info!(
"Member with ID: {} has left consumer group with ID: {} for topic
with ID: {} and stream with ID: {}.",
- member_id, group_id, self.topic_id, self.stream_id
+ member_id, group_id, topic_id, stream_id
);
Ok(())
}
@@ -286,7 +326,7 @@ mod tests {
assert_eq!(created_consumer_group.topic_id, topic_id);
}
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
let consumer_group = topic
.get_consumer_group(&Identifier::numeric(group_id).unwrap())
.unwrap();
@@ -295,7 +335,7 @@ mod tests {
assert_eq!(consumer_group.topic_id, topic_id);
assert_eq!(
consumer_group.partitions_count,
- topic.partitions.borrow().len() as u32
+ topic.partitions.len() as u32
);
}
@@ -306,12 +346,12 @@ mod tests {
let mut topic = get_topic().await;
let result = topic.create_consumer_group(Some(group_id), name);
assert!(result.is_ok());
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
let result = topic.create_consumer_group(Some(group_id), "test2");
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, IggyError::ConsumerGroupIdAlreadyExists(_, _)));
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
}
#[tokio::test]
@@ -321,7 +361,7 @@ mod tests {
let mut topic = get_topic().await;
let result = topic.create_consumer_group(Some(group_id), name);
assert!(result.is_ok());
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
let group_id = group_id + 1;
let result = topic.create_consumer_group(Some(group_id), name);
assert!(result.is_err());
@@ -330,7 +370,7 @@ mod tests {
err,
IggyError::ConsumerGroupNameAlreadyExists(_, _)
));
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
}
#[tokio::test]
@@ -340,12 +380,12 @@ mod tests {
let mut topic = get_topic().await;
let result = topic.create_consumer_group(Some(group_id), name);
assert!(result.is_ok());
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
let result = topic
.delete_consumer_group(&Identifier::numeric(group_id).unwrap())
.await;
assert!(result.is_ok());
- assert!(topic.consumer_groups.is_empty());
+ assert!(topic.consumer_groups.borrow().is_empty());
}
#[tokio::test]
@@ -355,13 +395,13 @@ mod tests {
let mut topic = get_topic().await;
let result = topic.create_consumer_group(Some(group_id), name);
assert!(result.is_ok());
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
let group_id = group_id + 1;
let result = topic
.delete_consumer_group(&Identifier::numeric(group_id).unwrap())
.await;
assert!(result.is_err());
- assert_eq!(topic.consumer_groups.len(), 1);
+ assert_eq!(topic.consumer_groups.borrow().len(), 1);
}
#[tokio::test]
diff --git a/core/server/src/streaming/topics/consumer_offsets.rs
b/core/server/src/streaming/topics/consumer_offsets.rs
index 8558acbc..8748576b 100644
--- a/core/server/src/streaming/topics/consumer_offsets.rs
+++ b/core/server/src/streaming/topics/consumer_offsets.rs
@@ -34,7 +34,6 @@ impl Topic {
) -> Result<(), IggyError> {
let Some((polling_consumer, partition_id)) = self
.resolve_consumer_with_partition_id(&consumer, client_id,
partition_id, false)
- .await
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to resolve consumer with partition id, consumer ID: {}, client ID: {},
partition ID: {:?}", consumer.id, client_id, partition_id))? else {
return Err(IggyError::ConsumerOffsetNotFound(client_id));
};
@@ -78,7 +77,6 @@ impl Topic {
) -> Result<Option<ConsumerOffsetInfo>, IggyError> {
let Some((polling_consumer, partition_id)) = self
.resolve_consumer_with_partition_id(consumer, client_id,
partition_id, false)
- .await
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to resolve consumer offset for consumer: {consumer}, client ID:
{client_id}, partition ID: {:#?}", partition_id))? else {
return Ok(None);
};
@@ -93,7 +91,6 @@ impl Topic {
let partition = partition.read().await;
let offset = partition
.get_consumer_offset(polling_consumer)
- .await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get consumer
offset for consumer: {polling_consumer}"
@@ -118,7 +115,6 @@ impl Topic {
) -> Result<(), IggyError> {
let Some((polling_consumer, partition_id)) = self
.resolve_consumer_with_partition_id(&consumer, client_id,
partition_id, false)
- .await
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to resolve consumer with partition id, consumer ID: {}, client ID: {},
partition ID: {:?}", consumer.id, client_id, partition_id))? else {
return Err(IggyError::ConsumerOffsetNotFound(client_id));
};
diff --git a/core/server/src/streaming/topics/messages.rs
b/core/server/src/streaming/topics/messages.rs
index 56d7c1c7..e8c52385 100644
--- a/core/server/src/streaming/topics/messages.rs
+++ b/core/server/src/streaming/topics/messages.rs
@@ -209,6 +209,7 @@ mod tests {
use bytes::Bytes;
use iggy_common::CompressionAlgorithm;
use iggy_common::{IggyMessage, MaxTopicSize};
+ use std::rc::Rc;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::AtomicU64;
@@ -321,11 +322,11 @@ mod tests {
async fn init_topic(partitions_count: u32) -> Topic {
let tempdir = tempfile::TempDir::new().unwrap();
- let config = Arc::new(SystemConfig {
+ let config = Rc::new(SystemConfig {
path: tempdir.path().to_str().unwrap().to_string(),
..Default::default()
});
- let storage = Arc::new(SystemStorage::new(
+ let storage = Rc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));
diff --git a/core/server/src/streaming/topics/partitions.rs
b/core/server/src/streaming/topics/partitions.rs
index eb6b4280..dfedf0a9 100644
--- a/core/server/src/streaming/topics/partitions.rs
+++ b/core/server/src/streaming/topics/partitions.rs
@@ -20,7 +20,7 @@ use crate::streaming::partitions::partition::Partition;
use crate::streaming::topics::COMPONENT;
use crate::streaming::topics::topic::Topic;
use error_set::ErrContext;
-use iggy_common::locking::{IggySharedMut, IggySharedMutFn};
+use iggy_common::locking::{IggyRwLock, IggySharedMutFn};
use iggy_common::{IggyError, IggyTimestamp};
const MAX_PARTITIONS_COUNT: u32 = 100_000;
@@ -63,7 +63,7 @@ impl Topic {
)
.await;
self.partitions
- .insert(partition_id, IggySharedMut::new(partition));
+ .insert(partition_id, IggyRwLock::new(partition));
partition_ids.push(partition_id)
}
diff --git a/core/server/src/streaming/topics/storage.rs
b/core/server/src/streaming/topics/storage.rs
index 1cc33310..ae9435e5 100644
--- a/core/server/src/streaming/topics/storage.rs
+++ b/core/server/src/streaming/topics/storage.rs
@@ -27,7 +27,7 @@ use anyhow::Context;
use error_set::ErrContext;
use futures::future::join_all;
use iggy_common::IggyError;
-use iggy_common::locking::IggySharedMut;
+use iggy_common::locking::IggyRwLock;
use iggy_common::locking::IggySharedMutFn;
use serde::{Deserialize, Serialize};
use std::path::Path;
@@ -189,7 +189,7 @@ impl TopicStorage for FileTopicStorage {
for mut partition in unloaded_partitions {
let loaded_partitions = loaded_partitions.clone();
let partition_state =
state.partitions.remove(&partition.partition_id).unwrap();
- let load_partition = tokio::spawn(async move {
+ let load_partition = monoio::spawn(async move {
match partition.load(partition_state).await {
Ok(_) => {
loaded_partitions.lock().await.push(partition);
@@ -209,7 +209,7 @@ impl TopicStorage for FileTopicStorage {
for partition in loaded_partitions.lock().await.drain(..) {
topic
.partitions
- .insert(partition.partition_id, IggySharedMut::new(partition));
+ .insert(partition.partition_id, IggyRwLock::new(partition));
}
for consumer_group in state.consumer_groups.into_values() {
@@ -224,7 +224,8 @@ impl TopicStorage for FileTopicStorage {
.insert(consumer_group.name.to_owned(),
consumer_group.group_id);
topic
.consumer_groups
- .insert(consumer_group.group_id, RwLock::new(consumer_group));
+ .borrow_mut()
+ .insert(consumer_group.group_id, consumer_group);
}
info!("Loaded topic {topic}");
diff --git a/core/server/src/streaming/topics/topic.rs
b/core/server/src/streaming/topics/topic.rs
index ad3e25ef..16aa324c 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -25,7 +25,7 @@ use ahash::AHashMap;
use core::fmt;
use std::cell::RefCell;
use std::rc::Rc;
-use iggy_common::locking::IggySharedMut;
+use iggy_common::locking::IggyRwLock;
use iggy_common::{
CompressionAlgorithm, Consumer, ConsumerKind, IggyByteSize, IggyError,
IggyExpiry,
IggyTimestamp, MaxTopicSize, Sizeable,
@@ -33,7 +33,6 @@ use iggy_common::{
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
-use tokio::sync::RwLock;
use tracing::info;
const ALMOST_FULL_THRESHOLD: f64 = 0.9;
@@ -51,9 +50,9 @@ pub struct Topic {
pub(crate) messages_count: Arc<AtomicU64>,
pub(crate) segments_count_of_parent_stream: Arc<AtomicU32>,
pub(crate) config: Rc<SystemConfig>,
- pub(crate) partitions: RefCell<AHashMap<u32, Partition>>,
+ pub(crate) partitions: AHashMap<u32, IggyRwLock<Partition>>,
pub(crate) storage: Rc<SystemStorage>,
- pub(crate) consumer_groups: AHashMap<u32, ConsumerGroup>,
+ pub(crate) consumer_groups: RefCell<AHashMap<u32, ConsumerGroup>>,
pub(crate) consumer_groups_ids: AHashMap<String, u32>,
pub(crate) current_consumer_group_id: AtomicU32,
pub(crate) current_partition_id: AtomicU32,
@@ -126,7 +125,7 @@ impl Topic {
messages_count_of_parent_stream,
messages_count: Arc::new(AtomicU64::new(0)),
segments_count_of_parent_stream,
- consumer_groups: AHashMap::new(),
+ consumer_groups: RefCell::new(AHashMap::new()),
consumer_groups_ids: AHashMap::new(),
current_consumer_group_id: AtomicU32::new(1),
current_partition_id: AtomicU32::new(1),
@@ -172,11 +171,11 @@ impl Topic {
matches!(self.max_topic_size, MaxTopicSize::Unlimited)
}
- pub fn get_partitions(&self) -> Vec<IggySharedMut<Partition>> {
+ pub fn get_partitions(&self) -> Vec<IggyRwLock<Partition>> {
self.partitions.values().cloned().collect()
}
- pub fn get_partition(&self, partition_id: u32) ->
Result<IggySharedMut<Partition>, IggyError> {
+ pub fn get_partition(&self, partition_id: u32) ->
Result<IggyRwLock<Partition>, IggyError> {
match self.partitions.get(&partition_id) {
Some(partition_arc) => Ok(partition_arc.clone()),
None => Err(IggyError::PartitionNotFound(
@@ -187,7 +186,7 @@ impl Topic {
}
}
- pub async fn resolve_consumer_with_partition_id(
+ pub fn resolve_consumer_with_partition_id(
&self,
consumer: &Consumer,
client_id: u32,
@@ -203,7 +202,7 @@ impl Topic {
)))
}
ConsumerKind::ConsumerGroup => {
- let consumer_group =
self.get_consumer_group(&consumer.id)?.read().await;
+ let mut consumer_group =
self.get_consumer_group_mut(&consumer.id)?;
if let Some(partition_id) = partition_id {
return Ok(Some((
PollingConsumer::consumer_group(consumer_group.group_id, client_id),
@@ -212,9 +211,9 @@ impl Topic {
}
let partition_id = if calculate_partition_id {
- consumer_group.calculate_partition_id(client_id).await?
+ consumer_group.calculate_partition_id(client_id)?
} else {
- consumer_group.get_current_partition_id(client_id).await?
+ consumer_group.get_current_partition_id(client_id)?
};
let Some(partition_id) = partition_id else {
return Ok(None);
@@ -291,11 +290,11 @@ mod tests {
#[tokio::test]
async fn should_be_created_given_valid_parameters() {
let tempdir = tempfile::TempDir::new().unwrap();
- let config = Arc::new(SystemConfig {
+ let config = Rc::new(SystemConfig {
path: tempdir.path().to_str().unwrap().to_string(),
..Default::default()
});
- let storage = Arc::new(SystemStorage::new(
+ let storage = Rc::new(SystemStorage::new(
config.clone(),
Arc::new(PersisterKind::FileWithSync(FileWithSyncPersister {})),
));