This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch io_uring_tpc_cli_fixes
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc_cli_fixes by this
push:
new 17c9a9ca fix count
17c9a9ca is described below
commit 17c9a9ca7fd5bcb4e6623a425c9a7c42ca2763b1
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Oct 3 15:55:00 2025 +0200
fix count
---
.../src/binary/handlers/topics/create_topic_handler.rs | 13 -------------
core/server/src/shard/mod.rs | 2 +-
core/server/src/slab/streams.rs | 16 ++++++++--------
3 files changed, 9 insertions(+), 22 deletions(-)
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index bb17cd4e..2f1ee6cd 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -72,13 +72,6 @@ impl ServerCommandHandler for CreateTopic {
topic,
};
let responses = shard.broadcast_event_to_all_shards(event).await;
- // Await all shard responses to ensure topic is created on all shards
- for response in responses {
- if let
crate::shard::transmission::frame::ShardResponse::ErrorResponse(err) = response
{
- return Err(err);
- }
- }
-
let partitions = shard
.create_partitions2(
session,
@@ -93,12 +86,6 @@ impl ServerCommandHandler for CreateTopic {
partitions,
};
let responses = shard.broadcast_event_to_all_shards(event).await;
- // Await all shard responses to ensure partitions are initialized on
all shards
- for response in responses {
- if let
crate::shard::transmission::frame::ShardResponse::ErrorResponse(err) = response
{
- return Err(err);
- }
- }
let response = shard.streams2.with_topic_by_id(
&self.stream_id,
&Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 97bc52ed..c490da49 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -548,7 +548,7 @@ impl IggyShard {
partition_id,
|(_, _, _, offset, _, _, _)| {
let current_offset =
offset.load(Ordering::Relaxed);
- let mut requested_count = 0;
+ let mut requested_count = count as u64;
if requested_count > current_offset + 1 {
requested_count = current_offset + 1
}
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 6167987c..141bd303 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -607,6 +607,10 @@ impl Streams {
let mut current_offset = offset;
for idx in range {
+ if remaining_count == 0 {
+ break;
+ }
+
let (segment_start_offset, segment_end_offset) =
self.with_partition_by_id(
stream_id,
topic_id,
@@ -658,10 +662,6 @@ impl Streams {
}
batches.add_batch_set(messages);
-
- if remaining_count == 0 {
- break;
- }
}
Ok(batches)
@@ -890,6 +890,10 @@ impl Streams {
let mut batches = IggyMessagesBatchSet::empty();
for idx in range {
+ if remaining_count == 0 {
+ break;
+ }
+
let segment_end_timestamp = self.with_partition_by_id(
stream_id,
topic_id,
@@ -922,10 +926,6 @@ impl Streams {
remaining_count = remaining_count.saturating_sub(messages_count);
batches.add_batch_set(messages);
-
- if remaining_count == 0 {
- break;
- }
}
Ok(batches)