This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_cli_fixes
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc_cli_fixes by this 
push:
     new 17c9a9ca fix count
17c9a9ca is described below

commit 17c9a9ca7fd5bcb4e6623a425c9a7c42ca2763b1
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Oct 3 15:55:00 2025 +0200

    fix count
---
 .../src/binary/handlers/topics/create_topic_handler.rs   | 13 -------------
 core/server/src/shard/mod.rs                             |  2 +-
 core/server/src/slab/streams.rs                          | 16 ++++++++--------
 3 files changed, 9 insertions(+), 22 deletions(-)

diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index bb17cd4e..2f1ee6cd 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -72,13 +72,6 @@ impl ServerCommandHandler for CreateTopic {
             topic,
         };
         let responses = shard.broadcast_event_to_all_shards(event).await;
-        // Await all shard responses to ensure topic is created on all shards
-        for response in responses {
-            if let 
crate::shard::transmission::frame::ShardResponse::ErrorResponse(err) = response 
{
-                return Err(err);
-            }
-        }
-
         let partitions = shard
             .create_partitions2(
                 session,
@@ -93,12 +86,6 @@ impl ServerCommandHandler for CreateTopic {
             partitions,
         };
         let responses = shard.broadcast_event_to_all_shards(event).await;
-        // Await all shard responses to ensure partitions are initialized on 
all shards
-        for response in responses {
-            if let 
crate::shard::transmission::frame::ShardResponse::ErrorResponse(err) = response 
{
-                return Err(err);
-            }
-        }
         let response = shard.streams2.with_topic_by_id(
             &self.stream_id,
             &Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 97bc52ed..c490da49 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -548,7 +548,7 @@ impl IggyShard {
                             partition_id,
                             |(_, _, _, offset, _, _, _)| {
                                 let current_offset = 
offset.load(Ordering::Relaxed);
-                                let mut requested_count = 0;
+                                let mut requested_count = count as u64;
                                 if requested_count > current_offset + 1 {
                                     requested_count = current_offset + 1
                                 }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 6167987c..141bd303 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -607,6 +607,10 @@ impl Streams {
         let mut current_offset = offset;
 
         for idx in range {
+            if remaining_count == 0 {
+                break;
+            }
+
             let (segment_start_offset, segment_end_offset) = 
self.with_partition_by_id(
                 stream_id,
                 topic_id,
@@ -658,10 +662,6 @@ impl Streams {
             }
 
             batches.add_batch_set(messages);
-
-            if remaining_count == 0 {
-                break;
-            }
         }
 
         Ok(batches)
@@ -890,6 +890,10 @@ impl Streams {
         let mut batches = IggyMessagesBatchSet::empty();
 
         for idx in range {
+            if remaining_count == 0 {
+                break;
+            }
+
             let segment_end_timestamp = self.with_partition_by_id(
                 stream_id,
                 topic_id,
@@ -922,10 +926,6 @@ impl Streams {
 
             remaining_count = remaining_count.saturating_sub(messages_count);
             batches.add_batch_set(messages);
-
-            if remaining_count == 0 {
-                break;
-            }
         }
 
         Ok(batches)

Reply via email to