This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 0e61c843 feat(io_uring): fix metadata crud operations (#1950)
0e61c843 is described below
commit 0e61c8433766381c7a85403f02e1caf7eba058f4
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Tue Jul 1 09:11:12 2025 +0200
feat(io_uring): fix metadata crud operations (#1950)
This PR fixes the metadata crud operations, preventing double borrow_mut
call on collections across awaits
---
core/integration/tests/streaming/stream.rs | 1 -
core/server/src/shard/mod.rs | 1 +
core/server/src/shard/system/messages.rs | 15 +--
core/server/src/shard/system/partitions.rs | 115 ++++++++++++++++------
core/server/src/shard/system/streams.rs | 7 ++
core/server/src/shard/system/topics.rs | 130 ++++++++++++++++---------
core/server/src/streaming/streams/topics.rs | 29 +-----
core/server/src/streaming/topics/partitions.rs | 37 ++-----
8 files changed, 196 insertions(+), 139 deletions(-)
diff --git a/core/integration/tests/streaming/stream.rs
b/core/integration/tests/streaming/stream.rs
index 677e2d32..4cfa88ba 100644
--- a/core/integration/tests/streaming/stream.rs
+++ b/core/integration/tests/streaming/stream.rs
@@ -132,7 +132,6 @@ async fn should_purge_existing_stream_on_disk() {
MaxTopicSize::ServerDefault,
1,
)
- .await
.unwrap();
let messages = create_messages();
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 6c3718f8..8ffc6560 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -400,6 +400,7 @@ impl IggyShard {
}
}
+ //TODO: Refactor...
let mut streams_states = streams
.into_iter()
.filter(|s| !missing_ids.contains(&s.id))
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index b27b034c..c5e527d0 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -53,12 +53,15 @@ impl IggyShard {
)
})?;
let stream_id = stream.stream_id;
- let numeric_topic_id = stream.get_topic(topic_id).map(|topic|
topic.topic_id).with_error_context(|error| {
- format!(
- "Failed to get topic with ID: {} (error: {})",
- topic_id, error
- )
- })?;
+ let numeric_topic_id = stream
+ .get_topic(topic_id)
+ .map(|topic| topic.topic_id)
+ .with_error_context(|error| {
+ format!(
+ "Failed to get topic with ID: {} (error: {})",
+ topic_id, error
+ )
+ })?;
// TODO: We should look into refactoring those permissioners, so they
can accept `Identifier` instead of numeric IDs.
// Validate permissions for given user on stream and topic.
self.permissioner.borrow().append_messages(
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index 78ef2144..a5b59516 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -24,7 +24,9 @@ use crate::streaming::session::Session;
use error_set::ErrContext;
use iggy_common::Identifier;
use iggy_common::IggyError;
+use iggy_common::locking::IggySharedMutFn;
+// TODO: MAJOR REFACTOR!!!!!!!!!!!!!!!!!
impl IggyShard {
pub async fn create_partitions(
&self,
@@ -33,6 +35,9 @@ impl IggyShard {
topic_id: &Identifier,
partitions_count: u32,
) -> Result<(), IggyError> {
+ // This whole method is yeah....
+ // I don't admit to writing it.
+ // Sorry, not sorry.
self.ensure_authenticated(session)?;
{
let stream = self.get_stream(stream_id).with_error_context(|error|
{
@@ -53,35 +58,61 @@ impl IggyShard {
))?;
}
- let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
- })?;
- let stream_id = stream.stream_id;
- let topic = stream
+ let partition_ids = {
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream
with ID: {stream_id}")
+ })?;
+ let stream_id = stream.stream_id;
+ let topic = stream
.get_topic_mut(topic_id)
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with id: {stream_id}"
)
})?;
- let topic_id = topic.topic_id;
-
- // TODO: Make add persisted partitions to topic sync, and extract the
storage persister out of it
- // perform disk i/o outside of the borrow_mut of the stream.
- let partition_ids = topic
+ let partition_ids = topic
.add_persisted_partitions(partitions_count)
- .await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to add
persisted partitions, topic: {topic}")
})?;
- let records = partition_ids.into_iter().map(|partition_id| {
- let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
- let hash = namespace.generate_hash();
- let shard_id = hash % self.get_available_shards_count();
- let shard_info = ShardInfo::new(shard_id as u16);
- (namespace, shard_info)
- });
- self.insert_shard_table_records(records);
+ partition_ids
+ };
+
+ {
+ let stream = self.get_stream(stream_id).with_error_context(|error|
{
+ format!("{COMPONENT} (error: {error}) - failed to get stream
with ID: {stream_id}")
+ })?;
+ let stream_id = stream.stream_id;
+ let topic = stream.get_topic(topic_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get topic with
ID: {topic_id} in stream with ID: {stream_id}")
+ })?;
+ let topic_id = topic.topic_id;
+ for partition_id in &partition_ids {
+ let partition = topic.partitions.get(partition_id).unwrap();
+ let mut partition = partition.write().await;
+ partition.persist().await.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to persist
partition with id: {}",
+ partition.partition_id
+ )
+ })?;
+ }
+ let records = partition_ids.into_iter().map(|partition_id| {
+ let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
+ let hash = namespace.generate_hash();
+ let shard_id = hash % self.get_available_shards_count();
+ let shard_info = ShardInfo::new(shard_id as u16);
+ (namespace, shard_info)
+ });
+ self.insert_shard_table_records(records);
+ }
+
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
+ })?;
+ let topic = stream.get_topic_mut(topic_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get topic with
ID: {topic_id} in stream with ID: {stream_id}")
+ })?;
topic.reassign_consumer_groups();
self.metrics.increment_partitions(partitions_count);
@@ -116,10 +147,11 @@ impl IggyShard {
))?;
}
- let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
- })?;
- let topic = stream
+ let (numeric_topic_id, partitions) = {
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream
with ID: {stream_id}")
+ })?;
+ let topic = stream
.get_topic_mut(topic_id)
.with_error_context(|error| {
format!(
@@ -127,19 +159,42 @@ impl IggyShard {
)
})?;
- // TODO: Make delete persisted partitions from topic sync, and extract
the storage persister out of it
- // perform disk i/o outside of the borrow_mut of the stream.
- let partitions = topic
+ let partitions = topic
.delete_persisted_partitions(partitions_count)
- .await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to delete
persisted partitions for topic: {topic}")
})?;
+ (topic.topic_id, partitions)
+ };
+
+ let mut segments_count = 0;
+ let mut messages_count = 0;
+ for partition in &partitions {
+ let mut partition = partition.write().await;
+ let partition_id = partition.partition_id;
+ let partition_messages_count = partition.get_messages_count();
+ segments_count += partition.get_segments_count();
+ messages_count += partition_messages_count;
+ partition.delete().await.with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to delete partition
with ID: {} in topic with ID: {}",
+ partition_id,
+ numeric_topic_id
+ )
+ })?;
+ }
+
+ let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
+ })?;
+ let topic = stream.get_topic_mut(topic_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get topic with
ID: {topic_id}")
+ })?;
topic.reassign_consumer_groups();
- if let Some(partitions) = partitions {
+ if partitions.len() > 0 {
self.metrics.decrement_partitions(partitions_count);
- self.metrics.decrement_segments(partitions.segments_count);
- self.metrics.decrement_messages(partitions.messages_count);
+ self.metrics.decrement_segments(segments_count);
+ self.metrics.decrement_messages(messages_count);
}
Ok(())
}
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 47068f11..b4921066 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -298,6 +298,13 @@ impl IggyShard {
})?;
old_name = stream.name.clone();
stream.name = name.to_owned();
+ // Drop the exclusive borrow.
+ drop(stream);
+ // Get it again using inclusive borrow
+ let stream = self.get_stream(id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with id: {id}")
+ })?;
+ // Persist it.
stream.persist().await?;
}
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index c247cf74..0f8155fb 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -26,6 +26,7 @@ use error_set::ErrContext;
use iggy_common::locking::IggySharedMutFn;
use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry,
MaxTopicSize};
use tokio_util::io::StreamReader;
+use tracing::info;
impl IggyShard {
pub fn find_topic<'topic, 'stream>(
@@ -112,28 +113,35 @@ impl IggyShard {
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
) -> Result<(), IggyError> {
- let (stream_id, topic_id, partition_ids) = self
- .create_topic_base(
- stream_id,
- topic_id,
- name,
- partitions_count,
- message_expiry,
- compression_algorithm,
- max_topic_size,
- replication_factor,
- )
- .await?;
+ let (topic_id, partition_ids) = self.create_topic_base(
+ stream_id,
+ topic_id,
+ name,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ )?;
+ let stream = self.get_stream(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
+ })?;
+ let stream_id = stream.stream_id;
+ let topic = stream
+ .get_topic(&Identifier::numeric(topic_id)?)
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get topic
with ID: {topic_id} in stream with ID: {stream_id}")
+ })?;
+ topic.persist().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
+ })?;
+
// TODO: Figure out a way how to distribute the shards table among
different shards,
// without the need to do code from below, everytime we handle a
`ShardEvent`.
- // I think we shouldn't be sharing it, maintain a single shard table
per shard,
- // but figure out a way how to distribute it smarter (maybe move the
broadcast inside of the `insert_shard_table_records`) method.
+ // I think we shouldn't be sharing it tho and still maintain a single
shard table per shard,
+ // but find a way how to distribute it smarter (maybe move the
broadcast inside of the `insert_shard_table_records`) method.
let records = partition_ids.into_iter().map(|partition_id| {
let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
- // TODO: This setup isn't deterministic.
- // Imagine a scenario where client creates partition using
`String` identifiers,
- // but then for poll_messages requests uses numeric ones.
- // the namespace wouldn't match, therefore we would get miss in
the shard table.
let hash = namespace.generate_hash();
let shard_id = hash % self.get_available_shards_count();
let shard_info = ShardInfo::new(shard_id as u16);
@@ -176,24 +184,32 @@ impl IggyShard {
})?;
}
- let (stream_id, topic_id, partition_ids) = self
- .create_topic_base(
- stream_id,
- topic_id,
- name,
- partitions_count,
- message_expiry,
- compression_algorithm,
- max_topic_size,
- replication_factor,
- )
- .await?;
+ let (topic_id, partition_ids) = self.create_topic_base(
+ stream_id,
+ topic_id,
+ name,
+ partitions_count,
+ message_expiry,
+ compression_algorithm,
+ max_topic_size,
+ replication_factor,
+ )?;
+
+ let stream = self.get_stream(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
+ })?;
+ let stream_id = stream.stream_id;
+ let topic = stream
+ .get_topic(&Identifier::numeric(topic_id)?)
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get
topic with ID: {topic_id} in stream with ID: {stream_id}")
+ })?;
+ topic.persist().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
+ })?;
+
let records = partition_ids.into_iter().map(|partition_id| {
let namespace = IggyNamespace::new(stream_id, topic_id,
partition_id);
- // TODO: This setup isn't deterministic.
- // Imagine a scenario where client creates partition using
`String` identifiers,
- // but then for poll_messages requests uses numeric ones.
- // the namespace wouldn't match, therefore we would get miss in
the shard table.
let hash = namespace.generate_hash();
let shard_id = hash % self.get_available_shards_count();
let shard_info = ShardInfo::new(shard_id as u16);
@@ -208,7 +224,7 @@ impl IggyShard {
Ok(Identifier::numeric(topic_id)?)
}
- async fn create_topic_base(
+ fn create_topic_base(
&self,
stream_id: &Identifier,
topic_id: Option<u32>,
@@ -218,9 +234,7 @@ impl IggyShard {
compression_algorithm: CompressionAlgorithm,
max_topic_size: MaxTopicSize,
replication_factor: Option<u8>,
- ) -> Result<(u32, u32, Vec<u32>), IggyError> {
- // TODO: Make create topic sync, and extract the storage persister out
of it
- // perform disk i/o outside of the borrow_mut of the stream.
+ ) -> Result<(u32, Vec<u32>), IggyError> {
let mut stream =
self.get_stream_mut(stream_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with ID: {stream_id}")
})?;
@@ -235,11 +249,10 @@ impl IggyShard {
max_topic_size,
replication_factor.unwrap_or(1),
)
- .await
.with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to create topic
with name: {name} in stream ID: {stream_id}")
})?;
- Ok((stream_id, topic_id, partition_ids))
+ Ok((topic_id, partition_ids))
}
#[allow(clippy::too_many_arguments)]
@@ -280,8 +293,6 @@ impl IggyShard {
})?;
}
- // TODO: Make update topic sync, and extract the storage persister out
of it
- // perform disk i/o outside of the borrow_mut of the stream.
self.get_stream_mut(stream_id)?
.update_topic(
topic_id,
@@ -291,12 +302,32 @@ impl IggyShard {
max_topic_size,
replication_factor.unwrap_or(1),
)
- .await
.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to update topic
with ID: {topic_id} in stream with ID: {stream_id}",
)
})?;
+ let stream = self.get_stream(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
+ })?;
+ let topic = stream
+ .get_topic(topic_id)
+ .with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to get topic with
ID: {topic_id} in stream with ID: {stream_id}",
+ )
+ })?;
+ for partition in topic.partitions.values() {
+ let mut partition = partition.write().await;
+ partition.message_expiry = message_expiry;
+ for segment in partition.segments.iter_mut() {
+ segment.update_message_expiry(message_expiry);
+ }
+ }
+ topic.persist().await.with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
+ })?;
+ info!("Updated topic: {topic}");
// TODO: if message_expiry is changed, we need to check if we need to
purge messages based on the new expiry
// TODO: if max_size_bytes is changed, we need to check if we need to
purge messages based on the new size
@@ -334,13 +365,20 @@ impl IggyShard {
stream_id_value = topic.stream_id;
}
- // TODO: Make delete topic sync, and extract the storage persister out
of it
- // perform disk i/o outside of the borrow_mut of the stream.
let topic = self
.get_stream_mut(stream_id)?
.delete_topic(topic_id)
- .await
.with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}"))?;
+ let stream = self.get_stream(stream_id).with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to get mutable
reference to stream with ID: {stream_id}")
+ })?;
+ topic
+ .delete()
+ .await
+ .with_error_context(|error| {
+ format!("{COMPONENT} (error: {error}) - failed to delete
topic: {topic}")
+ })
+ .map_err(|_| IggyError::CannotDeleteTopic(topic.topic_id,
stream.stream_id))?;
self.metrics.decrement_topics(1);
self.metrics
diff --git a/core/server/src/streaming/streams/topics.rs
b/core/server/src/streaming/streams/topics.rs
index 6a201c7a..7833efa2 100644
--- a/core/server/src/streaming/streams/topics.rs
+++ b/core/server/src/streaming/streams/topics.rs
@@ -36,7 +36,7 @@ impl Stream {
}
#[allow(clippy::too_many_arguments)]
- pub async fn create_topic(
+ pub fn create_topic(
&mut self,
topic_id: Option<u32>,
name: &str,
@@ -94,16 +94,13 @@ impl Stream {
replication_factor,
)?;
- topic.persist().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to persist topic:
{topic}")
- })?;
info!("Created topic {}", topic);
self.topics_ids.insert(name.to_owned(), id);
self.topics.insert(id, topic);
Ok((id, partition_ids))
}
- pub async fn update_topic(
+ pub fn update_topic(
&mut self,
id: &Identifier,
name: &str,
@@ -150,21 +147,9 @@ impl Stream {
topic.name = name.to_owned();
topic.message_expiry = message_expiry;
topic.compression_algorithm = compression_algorithm;
- for partition in topic.partitions.values_mut() {
- let mut partition = partition.write().await;
- partition.message_expiry = message_expiry;
- for segment in partition.segments.iter_mut() {
- segment.update_message_expiry(message_expiry);
- }
- }
topic.max_topic_size = max_topic_size;
topic.replication_factor = replication_factor;
- topic.persist().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to persist
topic: {topic}")
- })?;
- info!("Updated topic: {topic}");
}
-
Ok(())
}
@@ -253,7 +238,7 @@ impl Stream {
.ok_or(IggyError::TopicIdNotFound(topic_id, self.stream_id))
}
- pub async fn delete_topic(&mut self, id: &Identifier) -> Result<Topic,
IggyError> {
+ pub fn delete_topic(&mut self, id: &Identifier) -> Result<Topic,
IggyError> {
let topic = self.remove_topic(id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to remove topic
with id: {id}")
})?;
@@ -263,13 +248,6 @@ impl Stream {
self.current_topic_id.store(topic_id, Ordering::SeqCst);
}
- topic
- .delete()
- .await
- .with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to delete
topic: {topic}")
- })
- .map_err(|_| IggyError::CannotDeleteTopic(topic.topic_id,
self.stream_id))?;
Ok(topic)
}
}
@@ -319,7 +297,6 @@ mod tests {
max_topic_size,
1,
)
- .await
.unwrap();
let topic = stream.get_topic(&Identifier::numeric(topic_id).unwrap());
diff --git a/core/server/src/streaming/topics/partitions.rs
b/core/server/src/streaming/topics/partitions.rs
index b9e56331..cbbe15e5 100644
--- a/core/server/src/streaming/topics/partitions.rs
+++ b/core/server/src/streaming/topics/partitions.rs
@@ -69,29 +69,19 @@ impl Topic {
Ok(partition_ids)
}
- pub async fn add_persisted_partitions(&mut self, count: u32) ->
Result<Vec<u32>, IggyError> {
+ pub fn add_persisted_partitions(&mut self, count: u32) -> Result<Vec<u32>,
IggyError> {
let partition_ids =
self.add_partitions(count).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to add partitions,
count: {count}")
})?;
- for partition_id in &partition_ids {
- let partition = self.partitions.get(partition_id).unwrap();
- let mut partition = partition.write().await;
- partition.persist().await.with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to persist
partition with id: {}",
- partition.partition_id
- )
- })?;
- }
Ok(partition_ids)
}
- pub async fn delete_persisted_partitions(
+ pub fn delete_persisted_partitions(
&mut self,
mut count: u32,
- ) -> Result<Option<DeletedPartitions>, IggyError> {
+ ) -> Result<Vec<IggyRwLock<Partition>>, IggyError> {
if count == 0 {
- return Ok(None);
+ return Ok(vec![]);
}
let current_partitions_count = self.partitions.len() as u32;
@@ -99,25 +89,12 @@ impl Topic {
count = current_partitions_count;
}
- let mut segments_count = 0;
- let mut messages_count = 0;
+ let mut partitions = Vec::with_capacity(count as usize);
for partition_id in current_partitions_count - count +
1..=current_partitions_count {
let partition = self.partitions.remove(&partition_id).unwrap();
- let mut partition = partition.write().await;
- let partition_messages_count = partition.get_messages_count();
- segments_count += partition.get_segments_count();
- messages_count += partition_messages_count;
- partition.delete().await.with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - failed to delete partition
with ID: {partition_id} in topic with ID: {}",
- self.topic_id
- )
- })?;
+ partitions.push(partition);
}
- Ok(Some(DeletedPartitions {
- segments_count,
- messages_count,
- }))
+ Ok(partitions)
}
}