This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new ae78182b feat(io_uring): fix open descriptor when creating partition
on origin shard (#2016)
ae78182b is described below
commit ae78182bf10a9271f11d4e3fa7cc838c26658282
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Jul 16 13:30:38 2025 +0200
feat(io_uring): fix open descriptor when creating partition on origin shard
(#2016)
---
core/integration/tests/streaming/get_by_offset.rs | 1 +
core/integration/tests/streaming/get_by_timestamp.rs | 1 +
core/integration/tests/streaming/messages.rs | 2 ++
core/integration/tests/streaming/partition.rs | 4 ++++
core/integration/tests/streaming/segment.rs | 6 ++++++
core/integration/tests/streaming/topic_messages.rs | 6 ++++++
.../src/binary/handlers/partitions/create_partitions_handler.rs | 5 +++--
core/server/src/streaming/partitions/storage.rs | 6 ------
8 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/core/integration/tests/streaming/get_by_offset.rs
b/core/integration/tests/streaming/get_by_offset.rs
index a2a1ee8a..7f104858 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -131,6 +131,7 @@ async fn test_get_messages_by_offset(
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
+ partition.open().await.unwrap();
let mut all_messages = Vec::with_capacity(total_messages_count as usize);
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs
b/core/integration/tests/streaming/get_by_timestamp.rs
index 4013e37b..f0a952ec 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -132,6 +132,7 @@ async fn test_get_messages_by_timestamp(
setup.create_partitions_directory(stream_id, topic_id).await;
partition.persist().await.unwrap();
+ partition.open().await.unwrap();
let mut all_messages = Vec::with_capacity(total_messages_count as usize);
diff --git a/core/integration/tests/streaming/messages.rs
b/core/integration/tests/streaming/messages.rs
index b6feecec..3561dd7b 100644
--- a/core/integration/tests/streaming/messages.rs
+++ b/core/integration/tests/streaming/messages.rs
@@ -59,6 +59,8 @@ async fn
should_persist_messages_and_then_load_them_from_disk() {
Arc::new(AtomicU32::new(0)),
IggyTimestamp::now(),
);
+ partition.persist().await.unwrap();
+ partition.open().await.unwrap();
let mut messages = Vec::with_capacity(messages_count as usize);
let mut appended_messages = Vec::with_capacity(messages_count as usize);
diff --git a/core/integration/tests/streaming/partition.rs
b/core/integration/tests/streaming/partition.rs
index 850e51db..828532f9 100644
--- a/core/integration/tests/streaming/partition.rs
+++ b/core/integration/tests/streaming/partition.rs
@@ -52,6 +52,7 @@ async fn should_persist_partition_with_segment() {
);
partition.persist().await.unwrap();
+ partition.open().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
}
@@ -82,6 +83,7 @@ async fn should_load_existing_partition_from_disk() {
IggyTimestamp::now(),
);
partition.persist().await.unwrap();
+ partition.open().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
let now = IggyTimestamp::now();
@@ -150,6 +152,7 @@ async fn should_delete_existing_partition_from_disk() {
IggyTimestamp::now(),
);
partition.persist().await.unwrap();
+ partition.open().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
partition.delete().await.unwrap();
@@ -183,6 +186,7 @@ async fn should_purge_existing_partition_on_disk() {
IggyTimestamp::now(),
);
partition.persist().await.unwrap();
+ partition.open().await.unwrap();
assert_persisted_partition(&partition.partition_path,
with_segment).await;
let messages = create_messages();
let messages_count = messages.len() as u32;
diff --git a/core/integration/tests/streaming/segment.rs
b/core/integration/tests/streaming/segment.rs
index aa73d392..0ee68893 100644
--- a/core/integration/tests/streaming/segment.rs
+++ b/core/integration/tests/streaming/segment.rs
@@ -373,6 +373,12 @@ async fn should_delete_persisted_segments() -> Result<(),
Box<dyn std::error::Er
let stream = shard.find_stream(&session, &stream_id)?;
let topic = shard.find_topic(&session, &stream, &topic_id)?;
let partitions = topic.get_partitions();
+ for partition in &partitions {
+ let mut partition_lock = partition.write().await;
+ partition_lock.persist().await.unwrap();
+ partition_lock.open().await.unwrap();
+ drop(partition_lock);
+ }
let partition = partitions
.first()
.ok_or(IggyError::Error)
diff --git a/core/integration/tests/streaming/topic_messages.rs
b/core/integration/tests/streaming/topic_messages.rs
index a1dad0f3..2e46280a 100644
--- a/core/integration/tests/streaming/topic_messages.rs
+++ b/core/integration/tests/streaming/topic_messages.rs
@@ -19,6 +19,7 @@
use crate::streaming::common::test_setup::TestSetup;
use bytes::Bytes;
use iggy::prelude::*;
+use iggy_common::locking::IggyRwLockFn;
use server::configs::system::SystemConfig;
use server::streaming::polling_consumer::PollingConsumer;
use server::streaming::segments::IggyMessagesBatchMut;
@@ -225,6 +226,11 @@ async fn init_topic(setup: &TestSetup, partitions_count:
u32) -> Topic {
.unwrap();
let topic = created_topic_info.topic;
topic.persist().await.unwrap();
+ for partition in topic.get_partitions() {
+ let mut partition = partition.write().await;
+ partition.persist().await.unwrap();
+ partition.open().await.unwrap();
+ }
topic
}
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 40c21c64..c25944d4 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -80,10 +80,11 @@ impl ServerCommandHandler for CreatePartitions {
.collect::<Vec<_>>();
// Open partition and segments for that particular shard.
for (ns, shard_info) in records.iter() {
+ let partition = topic.get_partition(ns.partition_id).unwrap();
+ let mut partition = partition.write().await;
+ partition.persist().await.unwrap();
if shard_info.id() == shard.id {
let partition_id = ns.partition_id;
- let partition = topic.get_partition(partition_id)?;
- let mut partition = partition.write().await;
partition.open().await.with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to open
partition with ID: {partition_id} in topic with ID: {topic_id} for stream with
ID: {stream_id}"
diff --git a/core/server/src/streaming/partitions/storage.rs
b/core/server/src/streaming/partitions/storage.rs
index 0b3a65d3..6b5205e5 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -323,12 +323,6 @@ impl PartitionStorage for FilePartitionStorage {
));
}
- for segment in partition.get_segments_mut() {
- segment.open().await.with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - failed to persist
segment: {segment}",)
- })?;
- }
-
info!(
"Saved partition with start ID: {} for stream with ID: {} and
topic with ID: {}, path: {}.",
partition.partition_id,