This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f44adee78872dca0656c39d3805081db3a40c572 Author: Grzegorz Koszyk <[email protected]> AuthorDate: Tue Jul 1 09:11:12 2025 +0200 feat(io_uring): fix metadata crud operations (#1950) This PR fixes the metadata crud operations, preventing double borrow_mut call on collections across awaits --- core/integration/tests/streaming/stream.rs | 1 - core/server/src/shard/mod.rs | 1 + core/server/src/shard/system/messages.rs | 15 +-- core/server/src/shard/system/partitions.rs | 115 ++++++++++++++++------ core/server/src/shard/system/streams.rs | 7 ++ core/server/src/shard/system/topics.rs | 130 ++++++++++++++++--------- core/server/src/streaming/streams/topics.rs | 29 +----- core/server/src/streaming/topics/partitions.rs | 37 ++----- 8 files changed, 196 insertions(+), 139 deletions(-) diff --git a/core/integration/tests/streaming/stream.rs b/core/integration/tests/streaming/stream.rs index 677e2d32..4cfa88ba 100644 --- a/core/integration/tests/streaming/stream.rs +++ b/core/integration/tests/streaming/stream.rs @@ -132,7 +132,6 @@ async fn should_purge_existing_stream_on_disk() { MaxTopicSize::ServerDefault, 1, ) - .await .unwrap(); let messages = create_messages(); diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 6c3718f8..8ffc6560 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -400,6 +400,7 @@ impl IggyShard { } } + //TODO: Refactor... let mut streams_states = streams .into_iter() .filter(|s| !missing_ids.contains(&s.id)) diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index b27b034c..c5e527d0 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -53,12 +53,15 @@ impl IggyShard { ) })?; let stream_id = stream.stream_id; - let numeric_topic_id = stream.get_topic(topic_id).map(|topic| topic.topic_id).with_error_context(|error| { - format!( - "Failed to get topic with ID: {} (error: {})", - topic_id, error - ) - })?; + let numeric_topic_id = stream + .get_topic(topic_id) + .map(|topic| topic.topic_id) + .with_error_context(|error| { + format!( + "Failed to get topic with ID: {} (error: {})", + topic_id, error + ) + })?; // TODO: We should look into refactoring those permissioners, so they can accept `Identifier` instead of numeric IDs. // Validate permissions for given user on stream and topic. self.permissioner.borrow().append_messages( diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 78ef2144..a5b59516 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -24,7 +24,9 @@ use crate::streaming::session::Session; use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; +use iggy_common::locking::IggySharedMutFn; +// TODO: MAJOR REFACTOR!!!!!!!!!!!!!!!!! impl IggyShard { pub async fn create_partitions( &self, @@ -33,6 +35,9 @@ impl IggyShard { topic_id: &Identifier, partitions_count: u32, ) -> Result<(), IggyError> { + // This whole method is yeah.... + // I don't admit to writing it. + // Sorry, not sorry. self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id).with_error_context(|error| { @@ -53,35 +58,61 @@ impl IggyShard { ))?; } - let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") - })?; - let stream_id = stream.stream_id; - let topic = stream + let partition_ids = { + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = stream .get_topic_mut(topic_id) .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {stream_id}" ) })?; - let topic_id = topic.topic_id; - - // TODO: Make add persisted partitions to topic sync, and extract the storage persister out of it - // perform disk i/o outside of the borrow_mut of the stream. - let partition_ids = topic + let partition_ids = topic .add_persisted_partitions(partitions_count) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to add persisted partitions, topic: {topic}") })?; - let records = partition_ids.into_iter().map(|partition_id| { - let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); - let hash = namespace.generate_hash(); - let shard_id = hash % self.get_available_shards_count(); - let shard_info = ShardInfo::new(shard_id as u16); - (namespace, shard_info) - }); - self.insert_shard_table_records(records); + partition_ids + }; + + { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = stream.get_topic(topic_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") + })?; + let topic_id = topic.topic_id; + for partition_id in &partition_ids { + let partition = topic.partitions.get(partition_id).unwrap(); + let mut partition = partition.write().await; + partition.persist().await.with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to persist partition with id: {}", + partition.partition_id + ) + })?; + } + let records = partition_ids.into_iter().map(|partition_id| { + let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); + let hash = namespace.generate_hash(); + let shard_id = hash % self.get_available_shards_count(); + let shard_info = ShardInfo::new(shard_id as u16); + (namespace, shard_info) + }); + self.insert_shard_table_records(records); + } + + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream.get_topic_mut(topic_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") + })?; topic.reassign_consumer_groups(); self.metrics.increment_partitions(partitions_count); @@ -116,10 +147,11 @@ impl IggyShard { ))?; } - let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") - })?; - let topic = stream + let (numeric_topic_id, partitions) = { + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream .get_topic_mut(topic_id) .with_error_context(|error| { format!( @@ -127,19 +159,42 @@ impl IggyShard { ) })?; - // TODO: Make delete persisted partitions from topic sync, and extract the storage persister out of it - // perform disk i/o outside of the borrow_mut of the stream. - let partitions = topic + let partitions = topic .delete_persisted_partitions(partitions_count) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to delete persisted partitions for topic: {topic}") })?; + (topic.topic_id, partitions) + }; + + let mut segments_count = 0; + let mut messages_count = 0; + for partition in &partitions { + let mut partition = partition.write().await; + let partition_id = partition.partition_id; + let partition_messages_count = partition.get_messages_count(); + segments_count += partition.get_segments_count(); + messages_count += partition_messages_count; + partition.delete().await.with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to delete partition with ID: {} in topic with ID: {}", + partition_id, + numeric_topic_id + ) + })?; + } + + let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream.get_topic_mut(topic_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id}") + })?; topic.reassign_consumer_groups(); - if let Some(partitions) = partitions { + if partitions.len() > 0 { self.metrics.decrement_partitions(partitions_count); - self.metrics.decrement_segments(partitions.segments_count); - self.metrics.decrement_messages(partitions.messages_count); + self.metrics.decrement_segments(segments_count); + self.metrics.decrement_messages(messages_count); } Ok(()) } diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 47068f11..b4921066 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -298,6 +298,13 @@ impl IggyShard { })?; old_name = stream.name.clone(); stream.name = name.to_owned(); + // Drop the exclusive borrow. + drop(stream); + // Get it again using inclusive borrow + let stream = self.get_stream(id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with id: {id}") + })?; + // Persist it. stream.persist().await?; } diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index c247cf74..0f8155fb 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -26,6 +26,7 @@ use error_set::ErrContext; use iggy_common::locking::IggySharedMutFn; use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize}; use tokio_util::io::StreamReader; +use tracing::info; impl IggyShard { pub fn find_topic<'topic, 'stream>( @@ -112,28 +113,35 @@ impl IggyShard { max_topic_size: MaxTopicSize, replication_factor: Option<u8>, ) -> Result<(), IggyError> { - let (stream_id, topic_id, partition_ids) = self - .create_topic_base( - stream_id, - topic_id, - name, - partitions_count, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - ) - .await?; + let (topic_id, partition_ids) = self.create_topic_base( + stream_id, + topic_id, + name, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + )?; + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = stream + .get_topic(&Identifier::numeric(topic_id)?) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") + })?; + topic.persist().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") + })?; + // TODO: Figure out a way how to distribute the shards table among different shards, // without the need to do code from below, everytime we handle a `ShardEvent`. - // I think we shouldn't be sharing it, maintain a single shard table per shard, - // but figure out a way how to distribute it smarter (maybe move the broadcast inside of the `insert_shard_table_records`) method. + // I think we shouldn't be sharing it tho and still maintain a single shard table per shard, + // but find a way how to distribute it smarter (maybe move the broadcast inside of the `insert_shard_table_records`) method. let records = partition_ids.into_iter().map(|partition_id| { let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); - // TODO: This setup isn't deterministic. - // Imagine a scenario where client creates partition using `String` identifiers, - // but then for poll_messages requests uses numeric ones. - // the namespace wouldn't match, therefore we would get miss in the shard table. let hash = namespace.generate_hash(); let shard_id = hash % self.get_available_shards_count(); let shard_info = ShardInfo::new(shard_id as u16); @@ -176,24 +184,32 @@ impl IggyShard { })?; } - let (stream_id, topic_id, partition_ids) = self - .create_topic_base( - stream_id, - topic_id, - name, - partitions_count, - message_expiry, - compression_algorithm, - max_topic_size, - replication_factor, - ) - .await?; + let (topic_id, partition_ids) = self.create_topic_base( + stream_id, + topic_id, + name, + partitions_count, + message_expiry, + compression_algorithm, + max_topic_size, + replication_factor, + )?; + + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let stream_id = stream.stream_id; + let topic = stream + .get_topic(&Identifier::numeric(topic_id)?) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") + })?; + topic.persist().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") + })?; + let records = partition_ids.into_iter().map(|partition_id| { let namespace = IggyNamespace::new(stream_id, topic_id, partition_id); - // TODO: This setup isn't deterministic. - // Imagine a scenario where client creates partition using `String` identifiers, - // but then for poll_messages requests uses numeric ones. - // the namespace wouldn't match, therefore we would get miss in the shard table. let hash = namespace.generate_hash(); let shard_id = hash % self.get_available_shards_count(); let shard_info = ShardInfo::new(shard_id as u16); @@ -208,7 +224,7 @@ impl IggyShard { Ok(Identifier::numeric(topic_id)?) } - async fn create_topic_base( + fn create_topic_base( &self, stream_id: &Identifier, topic_id: Option<u32>, @@ -218,9 +234,7 @@ impl IggyShard { compression_algorithm: CompressionAlgorithm, max_topic_size: MaxTopicSize, replication_factor: Option<u8>, - ) -> Result<(u32, u32, Vec<u32>), IggyError> { - // TODO: Make create topic sync, and extract the storage persister out of it - // perform disk i/o outside of the borrow_mut of the stream. + ) -> Result<(u32, Vec<u32>), IggyError> { let mut stream = self.get_stream_mut(stream_id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}") })?; @@ -235,11 +249,10 @@ impl IggyShard { max_topic_size, replication_factor.unwrap_or(1), ) - .await .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to create topic with name: {name} in stream ID: {stream_id}") })?; - Ok((stream_id, topic_id, partition_ids)) + Ok((topic_id, partition_ids)) } #[allow(clippy::too_many_arguments)] @@ -280,8 +293,6 @@ impl IggyShard { })?; } - // TODO: Make update topic sync, and extract the storage persister out of it - // perform disk i/o outside of the borrow_mut of the stream. self.get_stream_mut(stream_id)? .update_topic( topic_id, @@ -291,12 +302,32 @@ impl IggyShard { max_topic_size, replication_factor.unwrap_or(1), ) - .await .with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to update topic with ID: {topic_id} in stream with ID: {stream_id}", ) })?; + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get stream with ID: {stream_id}") + })?; + let topic = stream + .get_topic(topic_id) + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}", + ) + })?; + for partition in topic.partitions.values() { + let mut partition = partition.write().await; + partition.message_expiry = message_expiry; + for segment in partition.segments.iter_mut() { + segment.update_message_expiry(message_expiry); + } + } + topic.persist().await.with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") + })?; + info!("Updated topic: {topic}"); // TODO: if message_expiry is changed, we need to check if we need to purge messages based on the new expiry // TODO: if max_size_bytes is changed, we need to check if we need to purge messages based on the new size @@ -334,13 +365,20 @@ impl IggyShard { stream_id_value = topic.stream_id; } - // TODO: Make delete topic sync, and extract the storage persister out of it - // perform disk i/o outside of the borrow_mut of the stream. let topic = self .get_stream_mut(stream_id)? .delete_topic(topic_id) - .await .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}"))?; + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to get mutable reference to stream with ID: {stream_id}") + })?; + topic + .delete() + .await + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - failed to delete topic: {topic}") + }) + .map_err(|_| IggyError::CannotDeleteTopic(topic.topic_id, stream.stream_id))?; self.metrics.decrement_topics(1); self.metrics diff --git a/core/server/src/streaming/streams/topics.rs b/core/server/src/streaming/streams/topics.rs index 6a201c7a..7833efa2 100644 --- a/core/server/src/streaming/streams/topics.rs +++ b/core/server/src/streaming/streams/topics.rs @@ -36,7 +36,7 @@ impl Stream { } #[allow(clippy::too_many_arguments)] - pub async fn create_topic( + pub fn create_topic( &mut self, topic_id: Option<u32>, name: &str, @@ -94,16 +94,13 @@ impl Stream { replication_factor, )?; - topic.persist().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") - })?; info!("Created topic {}", topic); self.topics_ids.insert(name.to_owned(), id); self.topics.insert(id, topic); Ok((id, partition_ids)) } - pub async fn update_topic( + pub fn update_topic( &mut self, id: &Identifier, name: &str, @@ -150,21 +147,9 @@ impl Stream { topic.name = name.to_owned(); topic.message_expiry = message_expiry; topic.compression_algorithm = compression_algorithm; - for partition in topic.partitions.values_mut() { - let mut partition = partition.write().await; - partition.message_expiry = message_expiry; - for segment in partition.segments.iter_mut() { - segment.update_message_expiry(message_expiry); - } - } topic.max_topic_size = max_topic_size; topic.replication_factor = replication_factor; - topic.persist().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to persist topic: {topic}") - })?; - info!("Updated topic: {topic}"); } - Ok(()) } @@ -253,7 +238,7 @@ impl Stream { .ok_or(IggyError::TopicIdNotFound(topic_id, self.stream_id)) } - pub async fn delete_topic(&mut self, id: &Identifier) -> Result<Topic, IggyError> { + pub fn delete_topic(&mut self, id: &Identifier) -> Result<Topic, IggyError> { let topic = self.remove_topic(id).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to remove topic with id: {id}") })?; @@ -263,13 +248,6 @@ impl Stream { self.current_topic_id.store(topic_id, Ordering::SeqCst); } - topic - .delete() - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to delete topic: {topic}") - }) - .map_err(|_| IggyError::CannotDeleteTopic(topic.topic_id, self.stream_id))?; Ok(topic) } } @@ -319,7 +297,6 @@ mod tests { max_topic_size, 1, ) - .await .unwrap(); let topic = stream.get_topic(&Identifier::numeric(topic_id).unwrap()); diff --git a/core/server/src/streaming/topics/partitions.rs b/core/server/src/streaming/topics/partitions.rs index b9e56331..cbbe15e5 100644 --- a/core/server/src/streaming/topics/partitions.rs +++ b/core/server/src/streaming/topics/partitions.rs @@ -69,29 +69,19 @@ impl Topic { Ok(partition_ids) } - pub async fn add_persisted_partitions(&mut self, count: u32) -> Result<Vec<u32>, IggyError> { + pub fn add_persisted_partitions(&mut self, count: u32) -> Result<Vec<u32>, IggyError> { let partition_ids = self.add_partitions(count).with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to add partitions, count: {count}") })?; - for partition_id in &partition_ids { - let partition = self.partitions.get(partition_id).unwrap(); - let mut partition = partition.write().await; - partition.persist().await.with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to persist partition with id: {}", - partition.partition_id - ) - })?; - } Ok(partition_ids) } - pub async fn delete_persisted_partitions( + pub fn delete_persisted_partitions( &mut self, mut count: u32, - ) -> Result<Option<DeletedPartitions>, IggyError> { + ) -> Result<Vec<IggyRwLock<Partition>>, IggyError> { if count == 0 { - return Ok(None); + return Ok(vec![]); } let current_partitions_count = self.partitions.len() as u32; @@ -99,25 +89,12 @@ impl Topic { count = current_partitions_count; } - let mut segments_count = 0; - let mut messages_count = 0; + let mut partitions = Vec::with_capacity(count as usize); for partition_id in current_partitions_count - count + 1..=current_partitions_count { let partition = self.partitions.remove(&partition_id).unwrap(); - let mut partition = partition.write().await; - let partition_messages_count = partition.get_messages_count(); - segments_count += partition.get_segments_count(); - messages_count += partition_messages_count; - partition.delete().await.with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - failed to delete partition with ID: {partition_id} in topic with ID: {}", - self.topic_id - ) - })?; + partitions.push(partition); } - Ok(Some(DeletedPartitions { - segments_count, - messages_count, - })) + Ok(partitions) } }
