This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a1f51fdb3 feat(partitions): add poll_messages support to simulator 
(#2960)
a1f51fdb3 is described below

commit a1f51fdb350f559ea524b32a7f192a7b58561115
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Mar 20 15:03:10 2026 +0530

    feat(partitions): add poll_messages support to simulator (#2960)
---
 core/partitions/src/iggy_partition.rs  | 156 ++++++++++++++++++++++++++++++++-
 core/partitions/src/iggy_partitions.rs |  30 ++++++-
 core/simulator/src/client.rs           |  46 +++++++---
 core/simulator/src/lib.rs              |  42 +++++++++
 core/simulator/src/main.rs             |  58 ++++++++++++
 5 files changed, 316 insertions(+), 16 deletions(-)

diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 65fb470ae..5addf1516 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -15,12 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
+use crate::journal::{
+    MessageLookup, PartitionJournal, PartitionJournalMemStorage, 
QueryableJournal,
+};
 use crate::log::SegmentedLog;
-use crate::{AppendResult, Partition, decode_send_messages_batch};
+use crate::{
+    AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer,
+    decode_send_messages_batch,
+};
 use iggy_common::{
-    ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
IggyMessagesBatchMut,
-    IggyTimestamp, PartitionStats,
+    ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, 
ConsumerOffsets,
+    IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet, 
IggyTimestamp,
+    PartitionStats, PollingKind,
     header::{Operation, PrepareHeader},
     message::Message,
 };
@@ -28,6 +34,7 @@ use journal::Journal as _;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
 use tokio::sync::Mutex as TokioMutex;
+use tracing::warn;
 
 // This struct aliases in terms of the code contained the `LocalPartition from 
`core/server/src/streaming/partitions/local_partition.rs`.
 #[derive(Debug)]
@@ -165,4 +172,145 @@ impl Partition for IggyPartition {
             batch_messages_count,
         ))
     }
+
+    async fn poll_messages(
+        &self,
+        consumer: PollingConsumer,
+        args: PollingArgs,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        if !self.should_increment_offset || args.count == 0 {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let committed_offset = self.offset.load(Ordering::Relaxed);
+
+        let start_offset = match args.strategy.kind {
+            PollingKind::Offset => args.strategy.value,
+            PollingKind::First => 0,
+            PollingKind::Last => 
committed_offset.saturating_sub(u64::from(args.count) - 1),
+            PollingKind::Timestamp => {
+                let result = self
+                    .log
+                    .journal()
+                    .inner
+                    .get(&MessageLookup::Timestamp {
+                        timestamp: args.strategy.value,
+                        count: args.count,
+                    })
+                    .await;
+                let batch_set = 
result.unwrap_or_else(IggyMessagesBatchSet::empty);
+                if let Some(first) = batch_set.first_offset() {
+                    if first > committed_offset {
+                        return Ok(IggyMessagesBatchSet::empty());
+                    }
+                    let max_count = u32::try_from(committed_offset - first + 
1).unwrap_or(u32::MAX);
+                    return Ok(batch_set.get_by_offset(first, 
batch_set.count().min(max_count)));
+                }
+                return Ok(batch_set);
+            }
+            PollingKind::Next => self
+                .get_consumer_offset(consumer)
+                .map_or(0, |offset| offset + 1),
+        };
+
+        if start_offset > committed_offset {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let max_count = u32::try_from(committed_offset - start_offset + 
1).unwrap_or(u32::MAX);
+        let count = args.count.min(max_count);
+
+        let result = self
+            .log
+            .journal()
+            .inner
+            .get(&MessageLookup::Offset {
+                offset: start_offset,
+                count,
+            })
+            .await;
+
+        let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty);
+
+        if args.auto_commit && !batch_set.is_empty() {
+            let last_offset = start_offset + u64::from(batch_set.count()) - 1;
+            if let Err(err) = self.store_consumer_offset(consumer, 
last_offset) {
+                // warning for now.
+                warn!(
+                    consumer = ?consumer,
+                    last_offset,
+                    %err,
+                    "poll_messages: failed to store consumer offset"
+                );
+            }
+        }
+
+        Ok(batch_set)
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn store_consumer_offset(
+        &self,
+        consumer: PollingConsumer,
+        offset: u64,
+    ) -> Result<(), IggyError> {
+        match consumer {
+            PollingConsumer::Consumer(id, _) => {
+                let guard = self.consumer_offsets.pin();
+                if let Some(existing) = guard.get(&id) {
+                    existing.offset.store(offset, Ordering::Relaxed);
+                } else {
+                    guard.insert(
+                        id,
+                        ConsumerOffset::new(
+                            ConsumerKind::Consumer,
+                            id as u32,
+                            offset,
+                            String::new(),
+                        ),
+                    );
+                }
+            }
+            PollingConsumer::ConsumerGroup(group_id, _) => {
+                let guard = self.consumer_group_offsets.pin();
+                let key = ConsumerGroupId(group_id);
+                if let Some(existing) = guard.get(&key) {
+                    existing.offset.store(offset, Ordering::Relaxed);
+                } else {
+                    guard.insert(
+                        key,
+                        ConsumerOffset::new(
+                            ConsumerKind::ConsumerGroup,
+                            group_id as u32,
+                            offset,
+                            String::new(),
+                        ),
+                    );
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> {
+        match consumer {
+            PollingConsumer::Consumer(id, _) => self
+                .consumer_offsets
+                .pin()
+                .get(&id)
+                .map(|co| co.offset.load(Ordering::Relaxed)),
+            PollingConsumer::ConsumerGroup(group_id, _) => self
+                .consumer_group_offsets
+                .pin()
+                .get(&ConsumerGroupId(group_id))
+                .map(|co| co.offset.load(Ordering::Relaxed)),
+        }
+    }
+
+    fn offsets(&self) -> PartitionOffsets {
+        PartitionOffsets::new(
+            self.offset.load(Ordering::Relaxed),
+            self.dirty_offset.load(Ordering::Relaxed),
+        )
+    }
 }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 931e804a7..d219c905e 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -19,6 +19,7 @@
 
 use crate::IggyPartition;
 use crate::Partition;
+use crate::PollingConsumer;
 use crate::log::JournalInfo;
 use crate::types::PartitionsConfig;
 use consensus::PlaneIdentity;
@@ -562,11 +563,36 @@ where
                 );
             }
             Operation::StoreConsumerOffset => {
-                // TODO: Deserialize consumer offset from prepare body
-                // and store in partition's consumer_offsets.
+                let body = message.body_bytes();
+                let body = body.as_ref();
+                let consumer_kind = body[0];
+                let consumer_id = 
u32::from_le_bytes(body[1..5].try_into().unwrap()) as usize;
+                let offset = 
u64::from_le_bytes(body[5..13].try_into().unwrap());
+                let consumer = match consumer_kind {
+                    1 => PollingConsumer::Consumer(consumer_id, 0),
+                    2 => PollingConsumer::ConsumerGroup(consumer_id, 0),
+                    _ => {
+                        warn!(
+                            replica = consensus.replica(),
+                            op = header.op,
+                            consumer_kind,
+                            "on_replicate: unknown consumer kind"
+                        );
+                        return;
+                    }
+                };
+
+                let partition = self
+                    .get_by_ns(namespace)
+                    .expect("store_consumer_offset: partition not found for 
namespace");
+                let _ = partition.store_consumer_offset(consumer, offset);
+
                 debug!(
                     replica = consensus.replica(),
                     op = header.op,
+                    consumer_kind,
+                    consumer_id,
+                    offset,
                     "on_replicate: consumer offset stored"
                 );
             }
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 7b55af548..6dc9e1eab 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use iggy_common::{
-    BytesSerializable, INDEX_SIZE, Identifier,
+    BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier,
     create_stream::CreateStream,
     delete_stream::DeleteStream,
     header::{Operation, RequestHeader},
@@ -72,23 +72,34 @@ impl SimClient {
         namespace: IggyNamespace,
         messages: &[&[u8]],
     ) -> Message<RequestHeader> {
-        // Build batch: count | indexes | messages
         let count = messages.len() as u32;
         let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE);
         let mut messages_buf = Vec::new();
 
         let mut current_position = 0u32;
-        for msg in messages {
-            // Write index: position (u32) + length (u32)
-            indexes.extend_from_slice(&current_position.to_le_bytes());
-            indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes());
-
-            // Append message
+        for (i, msg) in messages.iter().enumerate() {
+            let msg_total_len = (IGGY_MESSAGE_HEADER_SIZE + msg.len()) as u32;
+
+            // Index: offset(u32) + position(u32) + timestamp(u64)
+            indexes.extend_from_slice(&(i as u32).to_le_bytes()); // offset 
(relative)
+            indexes.extend_from_slice(&current_position.to_le_bytes()); // 
position
+            indexes.extend_from_slice(&0u64.to_le_bytes()); // timestamp (set 
in prepare)
+
+            // Message header (64 bytes)
+            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // checksum
+            messages_buf.extend_from_slice(&0u128.to_le_bytes()); // id
+            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // offset
+            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // timestamp
+            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // 
origin_timestamp
+            messages_buf.extend_from_slice(&0u32.to_le_bytes()); // 
user_headers_length
+            messages_buf.extend_from_slice(&(msg.len() as u32).to_le_bytes()); 
// payload_length
+            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // reserved
+
+            // Payload
             messages_buf.extend_from_slice(msg);
-            current_position += msg.len() as u32;
+            current_position += msg_total_len;
         }
 
-        // Build payload: count | indexes | messages
         let mut payload = Vec::with_capacity(4 + indexes.len() + 
messages_buf.len());
         payload.extend_from_slice(&count.to_le_bytes());
         payload.extend_from_slice(&indexes);
@@ -97,6 +108,21 @@ impl SimClient {
         self.build_request_with_namespace(Operation::SendMessages, &payload, 
namespace)
     }
 
+    pub fn store_consumer_offset(
+        &self,
+        namespace: IggyNamespace,
+        consumer_kind: u8,
+        consumer_id: u32,
+        offset: u64,
+    ) -> Message<RequestHeader> {
+        let mut payload = Vec::with_capacity(13);
+        payload.push(consumer_kind);
+        payload.extend_from_slice(&consumer_id.to_le_bytes());
+        payload.extend_from_slice(&offset.to_le_bytes());
+
+        self.build_request_with_namespace(Operation::StoreConsumerOffset, 
&payload, namespace)
+    }
+
     #[allow(clippy::cast_possible_truncation)]
     fn build_request_with_namespace(
         &self,
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index f3cf63ea3..5e64aca81 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -24,9 +24,13 @@ pub mod ready_queue;
 pub mod replica;
 
 use bus::MemBus;
+use consensus::PartitionsHandle;
 use iggy_common::header::ReplyHeader;
 use iggy_common::message::Message;
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyError, IggyMessagesBatchSet};
 use message_bus::MessageBus;
+use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer};
 use replica::{Replica, new_replica};
 use std::sync::Arc;
 
@@ -139,6 +143,44 @@ impl Simulator {
     }
 }
 
+impl Simulator {
+    /// Poll messages directly from a replica's partition.
+    ///
+    /// # Errors
+    /// Returns `IggyError::ResourceNotFound` if the namespace does not exist 
on this replica.
+    #[allow(clippy::future_not_send)]
+    pub async fn poll_messages(
+        &self,
+        replica_idx: usize,
+        namespace: IggyNamespace,
+        consumer: PollingConsumer,
+        args: PollingArgs,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        let replica = &self.replicas[replica_idx];
+        let partition =
+            replica
+                .plane
+                .partitions()
+                .get_by_ns(&namespace)
+                .ok_or(IggyError::ResourceNotFound(format!(
+                    "partition not found for namespace {namespace:?} on 
replica {replica_idx}"
+                )))?;
+        partition.poll_messages(consumer, args).await
+    }
+
+    /// Get partition offsets from a replica.
+    #[must_use]
+    pub fn offsets(
+        &self,
+        replica_idx: usize,
+        namespace: IggyNamespace,
+    ) -> Option<PartitionOffsets> {
+        let replica = &self.replicas[replica_idx];
+        let partition = replica.plane.partitions().get_by_ns(&namespace)?;
+        Some(partition.offsets())
+    }
+}
+
 // TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
 // Setup: 3-replica simulator, two partitions (ns_a, ns_b).
 // 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering 
acks.
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index de2288273..1ff966253 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -15,10 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use iggy_common::PollingStrategy;
 use iggy_common::header::ReplyHeader;
 use iggy_common::message::Message;
 use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
 use message_bus::MessageBus;
+use partitions::{PollingArgs, PollingConsumer};
 use simulator::{Simulator, client::SimClient};
 use std::collections::VecDeque;
 use std::sync::{Arc, Mutex};
@@ -39,7 +42,16 @@ impl Responses {
     }
 }
 
+#[allow(clippy::too_many_lines)]
 fn main() {
+    // PooledBuffer::from (used by poll_messages) panics if the global pool is 
uninitialized.
+    // Disabled pooling just falls through to the system allocator.
+    MemoryPool::init_pool(&MemoryPoolConfigOther {
+        enabled: false,
+        size: IggyByteSize::from(0u64),
+        bucket_capacity: 1,
+    });
+
     let client_id: u128 = 1;
     let leader: u8 = 0;
     let mut sim = Simulator::new(3, std::iter::once(client_id));
@@ -126,6 +138,52 @@ fn main() {
                 break;
             }
         }
+
+        // Poll messages directly from the leader's partition (bypassing 
consensus)
+        let consumer = PollingConsumer::Consumer(1, 0);
+        let args = PollingArgs::new(PollingStrategy::first(), 10, false);
+        match sim
+            .poll_messages(leader as usize, test_namespace, consumer, args)
+            .await
+        {
+            Ok(batch_set) => {
+                println!(
+                    "[sim] Poll returned {} messages (expected 3)",
+                    batch_set.count()
+                );
+            }
+            Err(e) => {
+                println!("[sim] Poll failed: {e}");
+            }
+        }
+
+        let args_auto = PollingArgs::new(PollingStrategy::first(), 2, true);
+        if let Ok(batch) = sim
+            .poll_messages(leader as usize, test_namespace, consumer, 
args_auto)
+            .await
+        {
+            println!("[sim] Auto-commit poll returned {} messages", 
batch.count());
+        }
+
+        // Next poll should start from offset 2 (after auto-commit of 0,1)
+        let args_next = PollingArgs::new(PollingStrategy::next(), 10, false);
+        if let Ok(batch) = sim
+            .poll_messages(leader as usize, test_namespace, consumer, 
args_next)
+            .await
+        {
+            println!(
+                "[sim] Next poll returned {} messages (expected 1)",
+                batch.count()
+            );
+        }
+
+        // Check offsets
+        if let Some(offsets) = sim.offsets(leader as usize, test_namespace) {
+            println!(
+                "[sim] Partition offsets: commit={}, write={}",
+                offsets.commit_offset, offsets.write_offset
+            );
+        }
     });
 
     client_handle.join().expect("client thread panicked");

Reply via email to