This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_integration_error in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1e02350d2e26774a76201d43fa61016030267d31 Author: numminex <[email protected]> AuthorDate: Wed Jul 16 10:44:54 2025 +0200 feat(io_uring): fix descriptor creation for all partitions on origin shard --- core/integration/tests/streaming/get_by_offset.rs | 1 + core/integration/tests/streaming/get_by_timestamp.rs | 1 + core/integration/tests/streaming/messages.rs | 2 ++ core/integration/tests/streaming/partition.rs | 4 ++++ core/integration/tests/streaming/segment.rs | 6 ++++++ core/integration/tests/streaming/topic_messages.rs | 6 ++++++ .../src/binary/handlers/partitions/create_partitions_handler.rs | 5 +++-- core/server/src/streaming/partitions/storage.rs | 6 ------ 8 files changed, 23 insertions(+), 8 deletions(-) diff --git a/core/integration/tests/streaming/get_by_offset.rs b/core/integration/tests/streaming/get_by_offset.rs index a2a1ee8a..7f104858 100644 --- a/core/integration/tests/streaming/get_by_offset.rs +++ b/core/integration/tests/streaming/get_by_offset.rs @@ -131,6 +131,7 @@ async fn test_get_messages_by_offset( setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); + partition.open().await.unwrap(); let mut all_messages = Vec::with_capacity(total_messages_count as usize); diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs index 4013e37b..f0a952ec 100644 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ b/core/integration/tests/streaming/get_by_timestamp.rs @@ -132,6 +132,7 @@ async fn test_get_messages_by_timestamp( setup.create_partitions_directory(stream_id, topic_id).await; partition.persist().await.unwrap(); + partition.open().await.unwrap(); let mut all_messages = Vec::with_capacity(total_messages_count as usize); diff --git a/core/integration/tests/streaming/messages.rs b/core/integration/tests/streaming/messages.rs index b6feecec..3561dd7b 100644 --- a/core/integration/tests/streaming/messages.rs +++ b/core/integration/tests/streaming/messages.rs @@ -59,6 +59,8 @@ async fn should_persist_messages_and_then_load_them_from_disk() { Arc::new(AtomicU32::new(0)), IggyTimestamp::now(), ); + partition.persist().await.unwrap(); + partition.open().await.unwrap(); let mut messages = Vec::with_capacity(messages_count as usize); let mut appended_messages = Vec::with_capacity(messages_count as usize); diff --git a/core/integration/tests/streaming/partition.rs b/core/integration/tests/streaming/partition.rs index 850e51db..828532f9 100644 --- a/core/integration/tests/streaming/partition.rs +++ b/core/integration/tests/streaming/partition.rs @@ -52,6 +52,7 @@ async fn should_persist_partition_with_segment() { ); partition.persist().await.unwrap(); + partition.open().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; } @@ -82,6 +83,7 @@ async fn should_load_existing_partition_from_disk() { IggyTimestamp::now(), ); partition.persist().await.unwrap(); + partition.open().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; let now = IggyTimestamp::now(); @@ -150,6 +152,7 @@ async fn should_delete_existing_partition_from_disk() { IggyTimestamp::now(), ); partition.persist().await.unwrap(); + partition.open().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; partition.delete().await.unwrap(); @@ -183,6 +186,7 @@ async fn should_purge_existing_partition_on_disk() { IggyTimestamp::now(), ); partition.persist().await.unwrap(); + partition.open().await.unwrap(); assert_persisted_partition(&partition.partition_path, with_segment).await; let messages = create_messages(); let messages_count = messages.len() as u32; diff --git a/core/integration/tests/streaming/segment.rs b/core/integration/tests/streaming/segment.rs index aa73d392..0ee68893 100644 --- a/core/integration/tests/streaming/segment.rs +++ b/core/integration/tests/streaming/segment.rs @@ -373,6 +373,12 @@ async fn should_delete_persisted_segments() -> Result<(), Box<dyn std::error::Er let stream = shard.find_stream(&session, &stream_id)?; let topic = shard.find_topic(&session, &stream, &topic_id)?; let partitions = topic.get_partitions(); + for partition in &partitions { + let mut partition_lock = partition.write().await; + partition_lock.persist().await.unwrap(); + partition_lock.open().await.unwrap(); + drop(partition_lock); + } let partition = partitions .first() .ok_or(IggyError::Error) diff --git a/core/integration/tests/streaming/topic_messages.rs b/core/integration/tests/streaming/topic_messages.rs index a1dad0f3..2e46280a 100644 --- a/core/integration/tests/streaming/topic_messages.rs +++ b/core/integration/tests/streaming/topic_messages.rs @@ -19,6 +19,7 @@ use crate::streaming::common::test_setup::TestSetup; use bytes::Bytes; use iggy::prelude::*; +use iggy_common::locking::IggyRwLockFn; use server::configs::system::SystemConfig; use server::streaming::polling_consumer::PollingConsumer; use server::streaming::segments::IggyMessagesBatchMut; @@ -225,6 +226,11 @@ async fn init_topic(setup: &TestSetup, partitions_count: u32) -> Topic { .unwrap(); let topic = created_topic_info.topic; topic.persist().await.unwrap(); + for partition in topic.get_partitions() { + let mut partition = partition.write().await; + partition.persist().await.unwrap(); + partition.open().await.unwrap(); + } topic } diff --git a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs index 40c21c64..c25944d4 100644 --- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs @@ -80,10 +80,11 @@ impl ServerCommandHandler for CreatePartitions { .collect::<Vec<_>>(); // Open partition and segments for that particular shard. for (ns, shard_info) in records.iter() { + let partition = topic.get_partition(ns.partition_id).unwrap(); + let mut partition = partition.write().await; + partition.persist().await.unwrap(); if shard_info.id() == shard.id { let partition_id = ns.partition_id; - let partition = topic.get_partition(partition_id)?; - let mut partition = partition.write().await; partition.open().await.with_error_context(|error| { format!( "{COMPONENT} (error: {error}) - failed to open partition with ID: {partition_id} in topic with ID: {topic_id} for stream with ID: {stream_id}" diff --git a/core/server/src/streaming/partitions/storage.rs b/core/server/src/streaming/partitions/storage.rs index 0b3a65d3..6b5205e5 100644 --- a/core/server/src/streaming/partitions/storage.rs +++ b/core/server/src/streaming/partitions/storage.rs @@ -323,12 +323,6 @@ impl PartitionStorage for FilePartitionStorage { )); } - for segment in partition.get_segments_mut() { - segment.open().await.with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to persist segment: {segment}",) - })?; - } - info!( "Saved partition with start ID: {} for stream with ID: {} and topic with ID: {}, path: {}.", partition.partition_id,
