This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc_cli_fixes
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc_cli_fixes by this
push:
new 582e54c8 return first messages when no consumer offset found for
PollingKind::Next
582e54c8 is described below
commit 582e54c83f9d5048ec21e52f3db64bad293ffc9a
Author: numminex <[email protected]>
AuthorDate: Fri Oct 3 16:25:08 2025 +0200
return first messages when no consumer offset found for PollingKind::Next
---
core/server/src/shard/mod.rs | 10 +++++++---
core/server/src/slab/streams.rs | 3 ++-
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index c490da49..00cdaa88 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -597,9 +597,11 @@ impl IggyShard {
),
};
- let Some(consumer_offset) = consumer_offset else {
- return
Err(IggyError::ConsumerOffsetNotFound(consumer_id));
- };
+ let batches = if consumer_offset.is_none() {
+ let batches =
self.streams2.get_messages_by_offset(&stream_id, &topic_id, partition_id, 0,
count).await?;
+ Ok(batches)
+ } else {
+ let consumer_offset = consumer_offset.unwrap();
let offset = consumer_offset + 1;
trace!(
"Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
@@ -616,6 +618,8 @@ impl IggyShard {
)
.await?;
Ok(batches)
+ };
+ batches
}
}?;
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 141bd303..729e85a0 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -381,7 +381,8 @@ impl MainOps for Streams {
};
let Some(consumer_offset) = consumer_offset else {
- return Err(IggyError::ConsumerOffsetNotFound(consumer_id));
+ let batches = self.get_messages_by_offset(stream_id,
topic_id, partition_id, 0, count).await?;
+ return Ok((metadata, batches));
};
let offset = consumer_offset + 1;
trace!(