This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch high-level-bench-consumer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 2fac22fb3ab7e7f34a1e321bed4680c074572bc3 Author: spetz <[email protected]> AuthorDate: Sat Jun 7 21:03:55 2025 +0200 store consumer offest in high level bench test, add get last consumed offset to iggy consumer --- core/bench/src/actors/consumer/backend.rs | 2 +- .../src/actors/consumer/benchmark_consumer.rs | 6 +- .../src/actors/consumer/high_level_backend.rs | 66 +++++++++++++++------- .../bench/src/actors/consumer/low_level_backend.rs | 20 ++----- core/sdk/src/clients/consumer.rs | 28 ++++++++- 5 files changed, 81 insertions(+), 41 deletions(-) diff --git a/core/bench/src/actors/consumer/backend.rs b/core/bench/src/actors/consumer/backend.rs index 4932f7f9..b61aaa41 100644 --- a/core/bench/src/actors/consumer/backend.rs +++ b/core/bench/src/actors/consumer/backend.rs @@ -78,7 +78,7 @@ pub trait BenchmarkConsumerBackend { async fn consume_batch( &self, consumer: &mut Self::Consumer, - ) -> Result<ConsumedBatch, IggyError>; + ) -> Result<Option<ConsumedBatch>, IggyError>; fn log_setup_info(&self); fn log_warmup_info(&self); } diff --git a/core/bench/src/actors/consumer/benchmark_consumer.rs b/core/bench/src/actors/consumer/benchmark_consumer.rs index ba1d9e1e..b739a00d 100644 --- a/core/bench/src/actors/consumer/benchmark_consumer.rs +++ b/core/bench/src/actors/consumer/benchmark_consumer.rs @@ -148,11 +148,11 @@ impl BenchmarkConsumer { let rate_limiter = limit_bytes_per_second.map(BenchmarkRateLimiter::new); while !finish_condition.is_done() { - let batch = backend.consume_batch(&mut consumer).await?; + let batch_opt = backend.consume_batch(&mut consumer).await?; - if batch.messages == 0 { + let Some(batch) = batch_opt else { continue; - } + }; messages_processed += u64::from(batch.messages); batches_processed += 1; diff --git a/core/bench/src/actors/consumer/high_level_backend.rs b/core/bench/src/actors/consumer/high_level_backend.rs index 824caf1b..b20b319c 100644 --- a/core/bench/src/actors/consumer/high_level_backend.rs +++ b/core/bench/src/actors/consumer/high_level_backend.rs @@ -20,8 +20,9 @@ use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, HighLevelBackend}; use futures_util::StreamExt; use iggy::prelude::*; use integration::test_server::login_root; -use tokio::time::Instant; -use tracing::{info, warn}; +use std::time::Duration; +use tokio::time::{Instant, timeout}; +use tracing::{debug, error, info, warn}; impl BenchmarkConsumerBackend for HighLevelBackend { type Consumer = IggyConsumer; @@ -36,12 +37,13 @@ impl BenchmarkConsumerBackend for HighLevelBackend { let topic_id_str = topic_id.to_string(); let mut iggy_consumer = if let Some(cg_id) = self.config.consumer_group_id { - let consumer_group_name = format!("bench_cg_{cg_id}"); + let consumer_group_name = format!("cg_{cg_id}"); // Consumer groups use auto-commit (matching PollingKind::Next behavior from low-level API) client .consumer_group(&consumer_group_name, &stream_id_str, &topic_id_str)? .batch_size(self.config.messages_per_batch.get()) .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .create_consumer_group_if_not_exists() .auto_join_consumer_group() .build() } else { @@ -50,13 +52,13 @@ impl BenchmarkConsumerBackend for HighLevelBackend { // to commit local offset manually, only auto-commit on server. client .consumer( - &format!("bench_consumer_{}", self.config.consumer_id), + &format!("hl_consumer_{}", self.config.consumer_id), &stream_id_str, &topic_id_str, 1, )? .batch_size(self.config.messages_per_batch.get()) - .auto_commit(AutoCommit::Disabled) // TODO@spetz + .auto_commit(AutoCommit::Disabled) .build() }; @@ -79,45 +81,69 @@ impl BenchmarkConsumerBackend for HighLevelBackend { async fn consume_batch( &self, consumer: &mut Self::Consumer, - ) -> Result<ConsumedBatch, IggyError> { + ) -> Result<Option<ConsumedBatch>, IggyError> { let batch_start = Instant::now(); let mut batch_messages = 0; let mut batch_user_bytes = 0; let mut batch_total_bytes = 0; - while batch_messages <= self.config.messages_per_batch.get() { - tracing::warn!("Consuming batch of {} messages", batch_messages); - if let Some(message_result) = consumer.next().await { - match message_result { + while batch_messages < self.config.messages_per_batch.get() { + // Use timeout to avoid getting stuck waiting for messages + let timeout_result = timeout(Duration::from_secs(1), consumer.next()).await; + + match timeout_result { + Ok(Some(message_result)) => match message_result { Ok(received_message) => { batch_messages += 1; batch_user_bytes += received_message.message.payload.len() as u64; batch_total_bytes += received_message.message.get_size_bytes().as_bytes_u64(); + let offset = received_message.message.header.offset; if batch_messages >= self.config.messages_per_batch.get() { info!( - "Batch of {} messages consumed, last_offset: {}", - batch_messages, received_message.message.header.offset + "Batch of {} messages consumed, last_offset: {}, current_offset: {}", + batch_messages, + received_message.message.header.offset, + received_message.current_offset ); + + if let Err(error) = consumer.store_offset(offset, None).await { + error!("Failed to store offset: {offset}. {error}"); + continue; + } + debug!("Offset: {offset} stored successfully"); break; } } Err(err) => { warn!("Error receiving message: {}", err); } + }, + Ok(None) => { + debug!("Consumer stream ended during batching"); + break; + } + Err(_) => { + debug!( + "Timeout waiting for messages, stopping batch at {} messages", + batch_messages + ); + break; } - } else { - break; } } - Ok(ConsumedBatch { - messages: batch_messages, - user_data_bytes: batch_user_bytes, - total_bytes: batch_total_bytes, - latency: batch_start.elapsed(), - }) + if batch_messages == 0 { + Ok(None) + } else { + Ok(Some(ConsumedBatch { + messages: batch_messages, + user_data_bytes: batch_user_bytes, + total_bytes: batch_total_bytes, + latency: batch_start.elapsed(), + })) + } } fn log_setup_info(&self) { diff --git a/core/bench/src/actors/consumer/low_level_backend.rs b/core/bench/src/actors/consumer/low_level_backend.rs index 71f11b5a..bcdb3f3e 100644 --- a/core/bench/src/actors/consumer/low_level_backend.rs +++ b/core/bench/src/actors/consumer/low_level_backend.rs @@ -108,7 +108,7 @@ impl BenchmarkConsumerBackend for LowLevelBackend { async fn consume_batch( &self, consumer: &mut Self::Consumer, - ) -> Result<ConsumedBatch, IggyError> { + ) -> Result<Option<ConsumedBatch>, IggyError> { let (client, stream_id, topic_id, partition_id, consumer_obj, messages_processed) = consumer; let messages_to_receive = self.config.messages_per_batch.get(); @@ -139,24 +139,14 @@ impl BenchmarkConsumerBackend for LowLevelBackend { Ok(messages) => messages, Err(e) => { if matches!(e, IggyError::TopicIdNotFound(_, _)) { - return Ok(ConsumedBatch { - messages: 0, - user_data_bytes: 0, - total_bytes: 0, - latency: before_poll.elapsed(), - }); + return Ok(None); } return Err(e); } }; if polled_messages.messages.is_empty() { - return Ok(ConsumedBatch { - messages: 0, - user_data_bytes: 0, - total_bytes: 0, - latency: before_poll.elapsed(), - }); + return Ok(None); } let latency = if self.config.origin_timestamp_latency_calculation { let now = IggyTimestamp::now().as_micros(); @@ -170,12 +160,12 @@ impl BenchmarkConsumerBackend for LowLevelBackend { *messages_processed += polled_messages.messages.len() as u64; - Ok(ConsumedBatch { + Ok(Some(ConsumedBatch { messages: u32::try_from(polled_messages.messages.len()).unwrap(), user_data_bytes: batch_user_size_bytes, total_bytes: batch_total_size_bytes, latency, - }) + })) } fn log_setup_info(&self) { diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 795f4edc..b2dd5d20 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -251,6 +251,13 @@ impl IggyConsumer { .await } + /// Retrieves the last consumed offset for the specified partition ID. + /// To get the current partition ID use `partition_id()` + pub fn get_last_consumed_offset(&self, partition_id: u32) -> Option<u64> { + let offset = self.last_consumed_offsets.get(&partition_id)?; + Some(offset.load(ORDERING)) + } + /// Deletes the consumer offset on the server either for the current partition or the provided partition ID. pub async fn delete_offset(&self, partition_id: Option<u32>) -> Result<(), IggyError> { let client = self.client.read().await; @@ -264,6 +271,13 @@ impl IggyConsumer { .await } + /// Retrieves the last stored offset (on the server) for the specified partition ID. + /// To get the current partition ID use `partition_id()` + pub fn get_last_stored_offset(&self, partition_id: u32) -> Option<u64> { + let offset = self.last_stored_offsets.get(&partition_id)?; + Some(offset.load(ORDERING)) + } + /// Initializes the consumer by subscribing to diagnostic events, initializing the consumer group if needed, storing the offsets in the background etc. /// /// Note: This method must be called before polling messages. @@ -805,9 +819,19 @@ impl IggyConsumer { info!( "Creating consumer group: {consumer_group_id} for topic: {topic_id}, stream: {stream_id}" ); - client + match client .create_consumer_group(&stream_id, &topic_id, &name, id) - .await?; + .await + { + Ok(_) => {} + Err(IggyError::ConsumerGroupNameAlreadyExists(_, _)) => {} + Err(error) => { + error!( + "Failed to create consumer group {consumer_group_id} for topic: {topic_id}, stream: {stream_id}: {error}" + ); + return Err(error); + } + } } info!(
