This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_integration_error
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1e02350d2e26774a76201d43fa61016030267d31
Author: numminex <[email protected]>
AuthorDate: Wed Jul 16 10:44:54 2025 +0200

    feat(io_uring): fix descriptor creation for all partitions on origin shard
---
 core/integration/tests/streaming/get_by_offset.rs                   | 1 +
 core/integration/tests/streaming/get_by_timestamp.rs                | 1 +
 core/integration/tests/streaming/messages.rs                        | 2 ++
 core/integration/tests/streaming/partition.rs                       | 4 ++++
 core/integration/tests/streaming/segment.rs                         | 6 ++++++
 core/integration/tests/streaming/topic_messages.rs                  | 6 ++++++
 .../src/binary/handlers/partitions/create_partitions_handler.rs     | 5 +++--
 core/server/src/streaming/partitions/storage.rs                     | 6 ------
 8 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/core/integration/tests/streaming/get_by_offset.rs 
b/core/integration/tests/streaming/get_by_offset.rs
index a2a1ee8a..7f104858 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -131,6 +131,7 @@ async fn test_get_messages_by_offset(
 
     setup.create_partitions_directory(stream_id, topic_id).await;
     partition.persist().await.unwrap();
+    partition.open().await.unwrap();
 
     let mut all_messages = Vec::with_capacity(total_messages_count as usize);
 
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index 4013e37b..f0a952ec 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -132,6 +132,7 @@ async fn test_get_messages_by_timestamp(
 
     setup.create_partitions_directory(stream_id, topic_id).await;
     partition.persist().await.unwrap();
+    partition.open().await.unwrap();
 
     let mut all_messages = Vec::with_capacity(total_messages_count as usize);
 
diff --git a/core/integration/tests/streaming/messages.rs 
b/core/integration/tests/streaming/messages.rs
index b6feecec..3561dd7b 100644
--- a/core/integration/tests/streaming/messages.rs
+++ b/core/integration/tests/streaming/messages.rs
@@ -59,6 +59,8 @@ async fn 
should_persist_messages_and_then_load_them_from_disk() {
         Arc::new(AtomicU32::new(0)),
         IggyTimestamp::now(),
     );
+    partition.persist().await.unwrap();
+    partition.open().await.unwrap();
 
     let mut messages = Vec::with_capacity(messages_count as usize);
     let mut appended_messages = Vec::with_capacity(messages_count as usize);
diff --git a/core/integration/tests/streaming/partition.rs 
b/core/integration/tests/streaming/partition.rs
index 850e51db..828532f9 100644
--- a/core/integration/tests/streaming/partition.rs
+++ b/core/integration/tests/streaming/partition.rs
@@ -52,6 +52,7 @@ async fn should_persist_partition_with_segment() {
         );
 
         partition.persist().await.unwrap();
+        partition.open().await.unwrap();
 
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
     }
@@ -82,6 +83,7 @@ async fn should_load_existing_partition_from_disk() {
             IggyTimestamp::now(),
         );
         partition.persist().await.unwrap();
+        partition.open().await.unwrap();
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
 
         let now = IggyTimestamp::now();
@@ -150,6 +152,7 @@ async fn should_delete_existing_partition_from_disk() {
             IggyTimestamp::now(),
         );
         partition.persist().await.unwrap();
+        partition.open().await.unwrap();
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
 
         partition.delete().await.unwrap();
@@ -183,6 +186,7 @@ async fn should_purge_existing_partition_on_disk() {
             IggyTimestamp::now(),
         );
         partition.persist().await.unwrap();
+        partition.open().await.unwrap();
         assert_persisted_partition(&partition.partition_path, 
with_segment).await;
         let messages = create_messages();
         let messages_count = messages.len() as u32;
diff --git a/core/integration/tests/streaming/segment.rs 
b/core/integration/tests/streaming/segment.rs
index aa73d392..0ee68893 100644
--- a/core/integration/tests/streaming/segment.rs
+++ b/core/integration/tests/streaming/segment.rs
@@ -373,6 +373,12 @@ async fn should_delete_persisted_segments() -> Result<(), 
Box<dyn std::error::Er
     let stream = shard.find_stream(&session, &stream_id)?;
     let topic = shard.find_topic(&session, &stream, &topic_id)?;
     let partitions = topic.get_partitions();
+    for partition in &partitions {
+        let mut partition_lock = partition.write().await;
+        partition_lock.persist().await.unwrap();
+        partition_lock.open().await.unwrap();
+        drop(partition_lock);
+    }
     let partition = partitions
         .first()
         .ok_or(IggyError::Error)
diff --git a/core/integration/tests/streaming/topic_messages.rs 
b/core/integration/tests/streaming/topic_messages.rs
index a1dad0f3..2e46280a 100644
--- a/core/integration/tests/streaming/topic_messages.rs
+++ b/core/integration/tests/streaming/topic_messages.rs
@@ -19,6 +19,7 @@
 use crate::streaming::common::test_setup::TestSetup;
 use bytes::Bytes;
 use iggy::prelude::*;
+use iggy_common::locking::IggyRwLockFn;
 use server::configs::system::SystemConfig;
 use server::streaming::polling_consumer::PollingConsumer;
 use server::streaming::segments::IggyMessagesBatchMut;
@@ -225,6 +226,11 @@ async fn init_topic(setup: &TestSetup, partitions_count: 
u32) -> Topic {
     .unwrap();
     let topic = created_topic_info.topic;
     topic.persist().await.unwrap();
+    for partition in topic.get_partitions() {
+        let mut partition = partition.write().await;
+        partition.persist().await.unwrap();
+        partition.open().await.unwrap();
+    }
     topic
 }
 
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 40c21c64..c25944d4 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -80,10 +80,11 @@ impl ServerCommandHandler for CreatePartitions {
             .collect::<Vec<_>>();
         // Open partition and segments for that particular shard.
         for (ns, shard_info) in records.iter() {
+            let partition = topic.get_partition(ns.partition_id).unwrap();
+            let mut partition = partition.write().await;
+            partition.persist().await.unwrap();
             if shard_info.id() == shard.id {
                 let partition_id = ns.partition_id;
-                let partition = topic.get_partition(partition_id)?;
-                let mut partition = partition.write().await;
                 partition.open().await.with_error_context(|error| {
                     format!(
                         "{COMPONENT} (error: {error}) - failed to open 
partition with ID: {partition_id} in topic with ID: {topic_id} for stream with 
ID: {stream_id}"
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
index 0b3a65d3..6b5205e5 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -323,12 +323,6 @@ impl PartitionStorage for FilePartitionStorage {
             ));
         }
 
-        for segment in partition.get_segments_mut() {
-            segment.open().await.with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to persist 
segment: {segment}",)
-            })?;
-        }
-
         info!(
             "Saved partition with start ID: {} for stream with ID: {} and 
topic with ID: {}, path: {}.",
             partition.partition_id,

Reply via email to