This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a1f51fdb3 feat(partitions): add poll_messages support to simulator
(#2960)
a1f51fdb3 is described below
commit a1f51fdb350f559ea524b32a7f192a7b58561115
Author: Krishna Vishal <[email protected]>
AuthorDate: Fri Mar 20 15:03:10 2026 +0530
feat(partitions): add poll_messages support to simulator (#2960)
---
core/partitions/src/iggy_partition.rs | 156 ++++++++++++++++++++++++++++++++-
core/partitions/src/iggy_partitions.rs | 30 ++++++-
core/simulator/src/client.rs | 46 +++++++---
core/simulator/src/lib.rs | 42 +++++++++
core/simulator/src/main.rs | 58 ++++++++++++
5 files changed, 316 insertions(+), 16 deletions(-)
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 65fb470ae..5addf1516 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -15,12 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
+use crate::journal::{
+ MessageLookup, PartitionJournal, PartitionJournalMemStorage,
QueryableJournal,
+};
use crate::log::SegmentedLog;
-use crate::{AppendResult, Partition, decode_send_messages_batch};
+use crate::{
+ AppendResult, Partition, PartitionOffsets, PollingArgs, PollingConsumer,
+ decode_send_messages_batch,
+};
use iggy_common::{
- ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError,
IggyMessagesBatchMut,
- IggyTimestamp, PartitionStats,
+ ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset,
ConsumerOffsets,
+ IggyByteSize, IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet,
IggyTimestamp,
+ PartitionStats, PollingKind,
header::{Operation, PrepareHeader},
message::Message,
};
@@ -28,6 +34,7 @@ use journal::Journal as _;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex as TokioMutex;
+use tracing::warn;
// This struct aliases in terms of the code contained the `LocalPartition from
`core/server/src/streaming/partitions/local_partition.rs`.
#[derive(Debug)]
@@ -165,4 +172,145 @@ impl Partition for IggyPartition {
batch_messages_count,
))
}
+
+ async fn poll_messages(
+ &self,
+ consumer: PollingConsumer,
+ args: PollingArgs,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ if !self.should_increment_offset || args.count == 0 {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ let committed_offset = self.offset.load(Ordering::Relaxed);
+
+ let start_offset = match args.strategy.kind {
+ PollingKind::Offset => args.strategy.value,
+ PollingKind::First => 0,
+ PollingKind::Last =>
committed_offset.saturating_sub(u64::from(args.count) - 1),
+ PollingKind::Timestamp => {
+ let result = self
+ .log
+ .journal()
+ .inner
+ .get(&MessageLookup::Timestamp {
+ timestamp: args.strategy.value,
+ count: args.count,
+ })
+ .await;
+ let batch_set =
result.unwrap_or_else(IggyMessagesBatchSet::empty);
+ if let Some(first) = batch_set.first_offset() {
+ if first > committed_offset {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+ let max_count = u32::try_from(committed_offset - first +
1).unwrap_or(u32::MAX);
+ return Ok(batch_set.get_by_offset(first,
batch_set.count().min(max_count)));
+ }
+ return Ok(batch_set);
+ }
+ PollingKind::Next => self
+ .get_consumer_offset(consumer)
+ .map_or(0, |offset| offset + 1),
+ };
+
+ if start_offset > committed_offset {
+ return Ok(IggyMessagesBatchSet::empty());
+ }
+
+ let max_count = u32::try_from(committed_offset - start_offset +
1).unwrap_or(u32::MAX);
+ let count = args.count.min(max_count);
+
+ let result = self
+ .log
+ .journal()
+ .inner
+ .get(&MessageLookup::Offset {
+ offset: start_offset,
+ count,
+ })
+ .await;
+
+ let batch_set = result.unwrap_or_else(IggyMessagesBatchSet::empty);
+
+ if args.auto_commit && !batch_set.is_empty() {
+ let last_offset = start_offset + u64::from(batch_set.count()) - 1;
+ if let Err(err) = self.store_consumer_offset(consumer,
last_offset) {
+ // warning for now.
+ warn!(
+ consumer = ?consumer,
+ last_offset,
+ %err,
+ "poll_messages: failed to store consumer offset"
+ );
+ }
+ }
+
+ Ok(batch_set)
+ }
+
+ #[allow(clippy::cast_possible_truncation)]
+ fn store_consumer_offset(
+ &self,
+ consumer: PollingConsumer,
+ offset: u64,
+ ) -> Result<(), IggyError> {
+ match consumer {
+ PollingConsumer::Consumer(id, _) => {
+ let guard = self.consumer_offsets.pin();
+ if let Some(existing) = guard.get(&id) {
+ existing.offset.store(offset, Ordering::Relaxed);
+ } else {
+ guard.insert(
+ id,
+ ConsumerOffset::new(
+ ConsumerKind::Consumer,
+ id as u32,
+ offset,
+ String::new(),
+ ),
+ );
+ }
+ }
+ PollingConsumer::ConsumerGroup(group_id, _) => {
+ let guard = self.consumer_group_offsets.pin();
+ let key = ConsumerGroupId(group_id);
+ if let Some(existing) = guard.get(&key) {
+ existing.offset.store(offset, Ordering::Relaxed);
+ } else {
+ guard.insert(
+ key,
+ ConsumerOffset::new(
+ ConsumerKind::ConsumerGroup,
+ group_id as u32,
+ offset,
+ String::new(),
+ ),
+ );
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn get_consumer_offset(&self, consumer: PollingConsumer) -> Option<u64> {
+ match consumer {
+ PollingConsumer::Consumer(id, _) => self
+ .consumer_offsets
+ .pin()
+ .get(&id)
+ .map(|co| co.offset.load(Ordering::Relaxed)),
+ PollingConsumer::ConsumerGroup(group_id, _) => self
+ .consumer_group_offsets
+ .pin()
+ .get(&ConsumerGroupId(group_id))
+ .map(|co| co.offset.load(Ordering::Relaxed)),
+ }
+ }
+
+ fn offsets(&self) -> PartitionOffsets {
+ PartitionOffsets::new(
+ self.offset.load(Ordering::Relaxed),
+ self.dirty_offset.load(Ordering::Relaxed),
+ )
+ }
}
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index 931e804a7..d219c905e 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -19,6 +19,7 @@
use crate::IggyPartition;
use crate::Partition;
+use crate::PollingConsumer;
use crate::log::JournalInfo;
use crate::types::PartitionsConfig;
use consensus::PlaneIdentity;
@@ -562,11 +563,36 @@ where
);
}
Operation::StoreConsumerOffset => {
- // TODO: Deserialize consumer offset from prepare body
- // and store in partition's consumer_offsets.
+ let body = message.body_bytes();
+ let body = body.as_ref();
+ let consumer_kind = body[0];
+ let consumer_id =
u32::from_le_bytes(body[1..5].try_into().unwrap()) as usize;
+ let offset =
u64::from_le_bytes(body[5..13].try_into().unwrap());
+ let consumer = match consumer_kind {
+ 1 => PollingConsumer::Consumer(consumer_id, 0),
+ 2 => PollingConsumer::ConsumerGroup(consumer_id, 0),
+ _ => {
+ warn!(
+ replica = consensus.replica(),
+ op = header.op,
+ consumer_kind,
+ "on_replicate: unknown consumer kind"
+ );
+ return;
+ }
+ };
+
+ let partition = self
+ .get_by_ns(namespace)
+ .expect("store_consumer_offset: partition not found for
namespace");
+ let _ = partition.store_consumer_offset(consumer, offset);
+
debug!(
replica = consensus.replica(),
op = header.op,
+ consumer_kind,
+ consumer_id,
+ offset,
"on_replicate: consumer offset stored"
);
}
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 7b55af548..6dc9e1eab 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -16,7 +16,7 @@
// under the License.
use iggy_common::{
- BytesSerializable, INDEX_SIZE, Identifier,
+ BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier,
create_stream::CreateStream,
delete_stream::DeleteStream,
header::{Operation, RequestHeader},
@@ -72,23 +72,34 @@ impl SimClient {
namespace: IggyNamespace,
messages: &[&[u8]],
) -> Message<RequestHeader> {
- // Build batch: count | indexes | messages
let count = messages.len() as u32;
let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE);
let mut messages_buf = Vec::new();
let mut current_position = 0u32;
- for msg in messages {
- // Write index: position (u32) + length (u32)
- indexes.extend_from_slice(¤t_position.to_le_bytes());
- indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes());
-
- // Append message
+ for (i, msg) in messages.iter().enumerate() {
+ let msg_total_len = (IGGY_MESSAGE_HEADER_SIZE + msg.len()) as u32;
+
+ // Index: offset(u32) + position(u32) + timestamp(u64)
+ indexes.extend_from_slice(&(i as u32).to_le_bytes()); // offset
(relative)
+ indexes.extend_from_slice(¤t_position.to_le_bytes()); //
position
+ indexes.extend_from_slice(&0u64.to_le_bytes()); // timestamp (set
in prepare)
+
+ // Message header (64 bytes)
+ messages_buf.extend_from_slice(&0u64.to_le_bytes()); // checksum
+ messages_buf.extend_from_slice(&0u128.to_le_bytes()); // id
+ messages_buf.extend_from_slice(&0u64.to_le_bytes()); // offset
+ messages_buf.extend_from_slice(&0u64.to_le_bytes()); // timestamp
+ messages_buf.extend_from_slice(&0u64.to_le_bytes()); //
origin_timestamp
+ messages_buf.extend_from_slice(&0u32.to_le_bytes()); //
user_headers_length
+ messages_buf.extend_from_slice(&(msg.len() as u32).to_le_bytes());
// payload_length
+ messages_buf.extend_from_slice(&0u64.to_le_bytes()); // reserved
+
+ // Payload
messages_buf.extend_from_slice(msg);
- current_position += msg.len() as u32;
+ current_position += msg_total_len;
}
- // Build payload: count | indexes | messages
let mut payload = Vec::with_capacity(4 + indexes.len() +
messages_buf.len());
payload.extend_from_slice(&count.to_le_bytes());
payload.extend_from_slice(&indexes);
@@ -97,6 +108,21 @@ impl SimClient {
self.build_request_with_namespace(Operation::SendMessages, &payload,
namespace)
}
+ pub fn store_consumer_offset(
+ &self,
+ namespace: IggyNamespace,
+ consumer_kind: u8,
+ consumer_id: u32,
+ offset: u64,
+ ) -> Message<RequestHeader> {
+ let mut payload = Vec::with_capacity(13);
+ payload.push(consumer_kind);
+ payload.extend_from_slice(&consumer_id.to_le_bytes());
+ payload.extend_from_slice(&offset.to_le_bytes());
+
+ self.build_request_with_namespace(Operation::StoreConsumerOffset,
&payload, namespace)
+ }
+
#[allow(clippy::cast_possible_truncation)]
fn build_request_with_namespace(
&self,
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index f3cf63ea3..5e64aca81 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -24,9 +24,13 @@ pub mod ready_queue;
pub mod replica;
use bus::MemBus;
+use consensus::PartitionsHandle;
use iggy_common::header::ReplyHeader;
use iggy_common::message::Message;
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyError, IggyMessagesBatchSet};
use message_bus::MessageBus;
+use partitions::{Partition, PartitionOffsets, PollingArgs, PollingConsumer};
use replica::{Replica, new_replica};
use std::sync::Arc;
@@ -139,6 +143,44 @@ impl Simulator {
}
}
+impl Simulator {
+ /// Poll messages directly from a replica's partition.
+ ///
+ /// # Errors
+ /// Returns `IggyError::ResourceNotFound` if the namespace does not exist
on this replica.
+ #[allow(clippy::future_not_send)]
+ pub async fn poll_messages(
+ &self,
+ replica_idx: usize,
+ namespace: IggyNamespace,
+ consumer: PollingConsumer,
+ args: PollingArgs,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ let replica = &self.replicas[replica_idx];
+ let partition =
+ replica
+ .plane
+ .partitions()
+ .get_by_ns(&namespace)
+ .ok_or(IggyError::ResourceNotFound(format!(
+ "partition not found for namespace {namespace:?} on
replica {replica_idx}"
+ )))?;
+ partition.poll_messages(consumer, args).await
+ }
+
+ /// Get partition offsets from a replica.
+ #[must_use]
+ pub fn offsets(
+ &self,
+ replica_idx: usize,
+ namespace: IggyNamespace,
+ ) -> Option<PartitionOffsets> {
+ let replica = &self.replicas[replica_idx];
+ let partition = replica.plane.partitions().get_by_ns(&namespace)?;
+ Some(partition.offsets())
+ }
+}
+
// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
// Setup: 3-replica simulator, two partitions (ns_a, ns_b).
// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering
acks.
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index de2288273..1ff966253 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use iggy_common::PollingStrategy;
use iggy_common::header::ReplyHeader;
use iggy_common::message::Message;
use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyByteSize, MemoryPool, MemoryPoolConfigOther};
use message_bus::MessageBus;
+use partitions::{PollingArgs, PollingConsumer};
use simulator::{Simulator, client::SimClient};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
@@ -39,7 +42,16 @@ impl Responses {
}
}
+#[allow(clippy::too_many_lines)]
fn main() {
+ // PooledBuffer::from (used by poll_messages) panics if the global pool is
uninitialized.
+ // Disabled pooling just falls through to the system allocator.
+ MemoryPool::init_pool(&MemoryPoolConfigOther {
+ enabled: false,
+ size: IggyByteSize::from(0u64),
+ bucket_capacity: 1,
+ });
+
let client_id: u128 = 1;
let leader: u8 = 0;
let mut sim = Simulator::new(3, std::iter::once(client_id));
@@ -126,6 +138,52 @@ fn main() {
break;
}
}
+
+ // Poll messages directly from the leader's partition (bypassing
consensus)
+ let consumer = PollingConsumer::Consumer(1, 0);
+ let args = PollingArgs::new(PollingStrategy::first(), 10, false);
+ match sim
+ .poll_messages(leader as usize, test_namespace, consumer, args)
+ .await
+ {
+ Ok(batch_set) => {
+ println!(
+ "[sim] Poll returned {} messages (expected 3)",
+ batch_set.count()
+ );
+ }
+ Err(e) => {
+ println!("[sim] Poll failed: {e}");
+ }
+ }
+
+ let args_auto = PollingArgs::new(PollingStrategy::first(), 2, true);
+ if let Ok(batch) = sim
+ .poll_messages(leader as usize, test_namespace, consumer,
args_auto)
+ .await
+ {
+ println!("[sim] Auto-commit poll returned {} messages",
batch.count());
+ }
+
+ // Next poll should start from offset 2 (after auto-commit of 0,1)
+ let args_next = PollingArgs::new(PollingStrategy::next(), 10, false);
+ if let Ok(batch) = sim
+ .poll_messages(leader as usize, test_namespace, consumer,
args_next)
+ .await
+ {
+ println!(
+ "[sim] Next poll returned {} messages (expected 1)",
+ batch.count()
+ );
+ }
+
+ // Check offsets
+ if let Some(offsets) = sim.offsets(leader as usize, test_namespace) {
+ println!(
+ "[sim] Partition offsets: commit={}, write={}",
+ offsets.commit_offset, offsets.write_offset
+ );
+ }
});
client_handle.join().expect("client thread panicked");