This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc_cli_fixes
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc_cli_fixes by this 
push:
     new 582e54c8 return first messages when no consumer offset found for 
PollingKind::Next
582e54c8 is described below

commit 582e54c83f9d5048ec21e52f3db64bad293ffc9a
Author: numminex <[email protected]>
AuthorDate: Fri Oct 3 16:25:08 2025 +0200

    return first messages when no consumer offset found for PollingKind::Next
---
 core/server/src/shard/mod.rs    | 10 +++++++---
 core/server/src/slab/streams.rs |  3 ++-
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index c490da49..00cdaa88 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -597,9 +597,11 @@ impl IggyShard {
                             ),
                         };
 
-                        let Some(consumer_offset) = consumer_offset else {
-                            return 
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
-                        };
+                        let batches = if consumer_offset.is_none() {
+                            let batches = 
self.streams2.get_messages_by_offset(&stream_id, &topic_id, partition_id, 0, 
count).await?;
+                            Ok(batches)
+                        } else {
+                            let consumer_offset = consumer_offset.unwrap();
                         let offset = consumer_offset + 1;
                         trace!(
                             "Getting next messages for consumer id: {} for 
partition: {} from offset: {}...",
@@ -616,6 +618,8 @@ impl IggyShard {
                             )
                             .await?;
                         Ok(batches)
+                        };
+                        batches
                     }
                 }?;
 
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 141bd303..729e85a0 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -381,7 +381,8 @@ impl MainOps for Streams {
                 };
 
                 let Some(consumer_offset) = consumer_offset else {
-                    return Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+                    let batches = self.get_messages_by_offset(stream_id, 
topic_id, partition_id, 0, count).await?;
+                    return Ok((metadata, batches));
                 };
                 let offset = consumer_offset + 1;
                 trace!(

Reply via email to