This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch high-level-bench-consumer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2fac22fb3ab7e7f34a1e321bed4680c074572bc3
Author: spetz <[email protected]>
AuthorDate: Sat Jun 7 21:03:55 2025 +0200

    store consumer offest in high level bench test, add get last consumed 
offset to iggy consumer
---
 core/bench/src/actors/consumer/backend.rs          |  2 +-
 .../src/actors/consumer/benchmark_consumer.rs      |  6 +-
 .../src/actors/consumer/high_level_backend.rs      | 66 +++++++++++++++-------
 .../bench/src/actors/consumer/low_level_backend.rs | 20 ++-----
 core/sdk/src/clients/consumer.rs                   | 28 ++++++++-
 5 files changed, 81 insertions(+), 41 deletions(-)

diff --git a/core/bench/src/actors/consumer/backend.rs 
b/core/bench/src/actors/consumer/backend.rs
index 4932f7f9..b61aaa41 100644
--- a/core/bench/src/actors/consumer/backend.rs
+++ b/core/bench/src/actors/consumer/backend.rs
@@ -78,7 +78,7 @@ pub trait BenchmarkConsumerBackend {
     async fn consume_batch(
         &self,
         consumer: &mut Self::Consumer,
-    ) -> Result<ConsumedBatch, IggyError>;
+    ) -> Result<Option<ConsumedBatch>, IggyError>;
     fn log_setup_info(&self);
     fn log_warmup_info(&self);
 }
diff --git a/core/bench/src/actors/consumer/benchmark_consumer.rs 
b/core/bench/src/actors/consumer/benchmark_consumer.rs
index ba1d9e1e..b739a00d 100644
--- a/core/bench/src/actors/consumer/benchmark_consumer.rs
+++ b/core/bench/src/actors/consumer/benchmark_consumer.rs
@@ -148,11 +148,11 @@ impl BenchmarkConsumer {
         let rate_limiter = 
limit_bytes_per_second.map(BenchmarkRateLimiter::new);
 
         while !finish_condition.is_done() {
-            let batch = backend.consume_batch(&mut consumer).await?;
+            let batch_opt = backend.consume_batch(&mut consumer).await?;
 
-            if batch.messages == 0 {
+            let Some(batch) = batch_opt else {
                 continue;
-            }
+            };
 
             messages_processed += u64::from(batch.messages);
             batches_processed += 1;
diff --git a/core/bench/src/actors/consumer/high_level_backend.rs 
b/core/bench/src/actors/consumer/high_level_backend.rs
index 824caf1b..b20b319c 100644
--- a/core/bench/src/actors/consumer/high_level_backend.rs
+++ b/core/bench/src/actors/consumer/high_level_backend.rs
@@ -20,8 +20,9 @@ use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, 
HighLevelBackend};
 use futures_util::StreamExt;
 use iggy::prelude::*;
 use integration::test_server::login_root;
-use tokio::time::Instant;
-use tracing::{info, warn};
+use std::time::Duration;
+use tokio::time::{Instant, timeout};
+use tracing::{debug, error, info, warn};
 
 impl BenchmarkConsumerBackend for HighLevelBackend {
     type Consumer = IggyConsumer;
@@ -36,12 +37,13 @@ impl BenchmarkConsumerBackend for HighLevelBackend {
         let topic_id_str = topic_id.to_string();
 
         let mut iggy_consumer = if let Some(cg_id) = 
self.config.consumer_group_id {
-            let consumer_group_name = format!("bench_cg_{cg_id}");
+            let consumer_group_name = format!("cg_{cg_id}");
             // Consumer groups use auto-commit (matching PollingKind::Next 
behavior from low-level API)
             client
                 .consumer_group(&consumer_group_name, &stream_id_str, 
&topic_id_str)?
                 .batch_size(self.config.messages_per_batch.get())
                 .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+                .create_consumer_group_if_not_exists()
                 .auto_join_consumer_group()
                 .build()
         } else {
@@ -50,13 +52,13 @@ impl BenchmarkConsumerBackend for HighLevelBackend {
             // to commit local offset manually, only auto-commit on server.
             client
                 .consumer(
-                    &format!("bench_consumer_{}", self.config.consumer_id),
+                    &format!("hl_consumer_{}", self.config.consumer_id),
                     &stream_id_str,
                     &topic_id_str,
                     1,
                 )?
                 .batch_size(self.config.messages_per_batch.get())
-                .auto_commit(AutoCommit::Disabled) // TODO@spetz
+                .auto_commit(AutoCommit::Disabled)
                 .build()
         };
 
@@ -79,45 +81,69 @@ impl BenchmarkConsumerBackend for HighLevelBackend {
     async fn consume_batch(
         &self,
         consumer: &mut Self::Consumer,
-    ) -> Result<ConsumedBatch, IggyError> {
+    ) -> Result<Option<ConsumedBatch>, IggyError> {
         let batch_start = Instant::now();
         let mut batch_messages = 0;
         let mut batch_user_bytes = 0;
         let mut batch_total_bytes = 0;
 
-        while batch_messages <= self.config.messages_per_batch.get() {
-            tracing::warn!("Consuming batch of {} messages", batch_messages);
-            if let Some(message_result) = consumer.next().await {
-                match message_result {
+        while batch_messages < self.config.messages_per_batch.get() {
+            // Use timeout to avoid getting stuck waiting for messages
+            let timeout_result = timeout(Duration::from_secs(1), 
consumer.next()).await;
+
+            match timeout_result {
+                Ok(Some(message_result)) => match message_result {
                     Ok(received_message) => {
                         batch_messages += 1;
                         batch_user_bytes += 
received_message.message.payload.len() as u64;
                         batch_total_bytes +=
                             
received_message.message.get_size_bytes().as_bytes_u64();
 
+                        let offset = received_message.message.header.offset;
                         if batch_messages >= 
self.config.messages_per_batch.get() {
                             info!(
-                                "Batch of {} messages consumed, last_offset: 
{}",
-                                batch_messages, 
received_message.message.header.offset
+                                "Batch of {} messages consumed, last_offset: 
{}, current_offset: {}",
+                                batch_messages,
+                                received_message.message.header.offset,
+                                received_message.current_offset
                             );
+
+                            if let Err(error) = consumer.store_offset(offset, 
None).await {
+                                error!("Failed to store offset: {offset}. 
{error}");
+                                continue;
+                            }
+                            debug!("Offset: {offset} stored successfully");
                             break;
                         }
                     }
                     Err(err) => {
                         warn!("Error receiving message: {}", err);
                     }
+                },
+                Ok(None) => {
+                    debug!("Consumer stream ended during batching");
+                    break;
+                }
+                Err(_) => {
+                    debug!(
+                        "Timeout waiting for messages, stopping batch at {} 
messages",
+                        batch_messages
+                    );
+                    break;
                 }
-            } else {
-                break;
             }
         }
 
-        Ok(ConsumedBatch {
-            messages: batch_messages,
-            user_data_bytes: batch_user_bytes,
-            total_bytes: batch_total_bytes,
-            latency: batch_start.elapsed(),
-        })
+        if batch_messages == 0 {
+            Ok(None)
+        } else {
+            Ok(Some(ConsumedBatch {
+                messages: batch_messages,
+                user_data_bytes: batch_user_bytes,
+                total_bytes: batch_total_bytes,
+                latency: batch_start.elapsed(),
+            }))
+        }
     }
 
     fn log_setup_info(&self) {
diff --git a/core/bench/src/actors/consumer/low_level_backend.rs 
b/core/bench/src/actors/consumer/low_level_backend.rs
index 71f11b5a..bcdb3f3e 100644
--- a/core/bench/src/actors/consumer/low_level_backend.rs
+++ b/core/bench/src/actors/consumer/low_level_backend.rs
@@ -108,7 +108,7 @@ impl BenchmarkConsumerBackend for LowLevelBackend {
     async fn consume_batch(
         &self,
         consumer: &mut Self::Consumer,
-    ) -> Result<ConsumedBatch, IggyError> {
+    ) -> Result<Option<ConsumedBatch>, IggyError> {
         let (client, stream_id, topic_id, partition_id, consumer_obj, 
messages_processed) =
             consumer;
         let messages_to_receive = self.config.messages_per_batch.get();
@@ -139,24 +139,14 @@ impl BenchmarkConsumerBackend for LowLevelBackend {
             Ok(messages) => messages,
             Err(e) => {
                 if matches!(e, IggyError::TopicIdNotFound(_, _)) {
-                    return Ok(ConsumedBatch {
-                        messages: 0,
-                        user_data_bytes: 0,
-                        total_bytes: 0,
-                        latency: before_poll.elapsed(),
-                    });
+                    return Ok(None);
                 }
                 return Err(e);
             }
         };
 
         if polled_messages.messages.is_empty() {
-            return Ok(ConsumedBatch {
-                messages: 0,
-                user_data_bytes: 0,
-                total_bytes: 0,
-                latency: before_poll.elapsed(),
-            });
+            return Ok(None);
         }
         let latency = if self.config.origin_timestamp_latency_calculation {
             let now = IggyTimestamp::now().as_micros();
@@ -170,12 +160,12 @@ impl BenchmarkConsumerBackend for LowLevelBackend {
 
         *messages_processed += polled_messages.messages.len() as u64;
 
-        Ok(ConsumedBatch {
+        Ok(Some(ConsumedBatch {
             messages: u32::try_from(polled_messages.messages.len()).unwrap(),
             user_data_bytes: batch_user_size_bytes,
             total_bytes: batch_total_size_bytes,
             latency,
-        })
+        }))
     }
 
     fn log_setup_info(&self) {
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 795f4edc..b2dd5d20 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -251,6 +251,13 @@ impl IggyConsumer {
         .await
     }
 
+    /// Retrieves the last consumed offset for the specified partition ID.
+    /// To get the current partition ID use `partition_id()`
+    pub fn get_last_consumed_offset(&self, partition_id: u32) -> Option<u64> {
+        let offset = self.last_consumed_offsets.get(&partition_id)?;
+        Some(offset.load(ORDERING))
+    }
+
     /// Deletes the consumer offset on the server either for the current 
partition or the provided partition ID.
     pub async fn delete_offset(&self, partition_id: Option<u32>) -> Result<(), 
IggyError> {
         let client = self.client.read().await;
@@ -264,6 +271,13 @@ impl IggyConsumer {
             .await
     }
 
+    /// Retrieves the last stored offset (on the server) for the specified 
partition ID.
+    /// To get the current partition ID use `partition_id()`
+    pub fn get_last_stored_offset(&self, partition_id: u32) -> Option<u64> {
+        let offset = self.last_stored_offsets.get(&partition_id)?;
+        Some(offset.load(ORDERING))
+    }
+
     /// Initializes the consumer by subscribing to diagnostic events, 
initializing the consumer group if needed, storing the offsets in the 
background etc.
     ///
     /// Note: This method must be called before polling messages.
@@ -805,9 +819,19 @@ impl IggyConsumer {
             info!(
                 "Creating consumer group: {consumer_group_id} for topic: 
{topic_id}, stream: {stream_id}"
             );
-            client
+            match client
                 .create_consumer_group(&stream_id, &topic_id, &name, id)
-                .await?;
+                .await
+            {
+                Ok(_) => {}
+                Err(IggyError::ConsumerGroupNameAlreadyExists(_, _)) => {}
+                Err(error) => {
+                    error!(
+                        "Failed to create consumer group {consumer_group_id} 
for topic: {topic_id}, stream: {stream_id}: {error}"
+                    );
+                    return Err(error);
+                }
+            }
         }
 
         info!(

Reply via email to