This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8294ccef feat(bench): add high-level consumer API for benchmarking 
(#1855)
8294ccef is described below

commit 8294ccef396f682c04d6013964afc01ec9228212
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Jun 8 14:18:26 2025 +0200

    feat(bench): add high-level consumer API for benchmarking (#1855)
    
    This commit implements usage of high-level consumer API in
    the iggy-bench. The changes include:
    
    - Addition of `futures-util` dependency for stream processing.
    - Refactoring of the consumer module to support both low-level
      and high-level APIs.
    - Implementation of backend logic for high-level consumer
      operations.
    - Updates to the benchmarking configuration to allow selection
      between high-level and low-level APIs.
    - Enhancements to the consumer setup and warmup processes to
      accommodate the new API.
---
 Cargo.lock                                         |   1 +
 core/bench/Cargo.toml                              |   1 +
 core/bench/src/actors/consumer.rs                  | 445 ---------------------
 core/bench/src/actors/consumer/backend.rs          |  84 ++++
 .../benchmark_consumer.rs}                         | 211 +++++-----
 .../src/actors/consumer/high_level_backend.rs      | 179 +++++++++
 .../bench/src/actors/consumer/low_level_backend.rs | 201 ++++++++++
 core/bench/src/actors/consumer/mod.rs              |  24 ++
 .../benchmark_producer.rs}                         |   0
 core/bench/src/actors/producer/mod.rs              |  21 +
 .../benchmark_producing_consumer.rs}               |   0
 core/bench/src/actors/producing_consumer/mod.rs    |  21 +
 core/bench/src/args/common.rs                      |  18 +
 core/bench/src/benchmarks/common.rs                |   2 +
 core/sdk/src/clients/consumer.rs                   |  28 +-
 15 files changed, 683 insertions(+), 553 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1f410423..6be9aa34 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -870,6 +870,7 @@ dependencies = [
  "chrono",
  "clap",
  "figlet-rs",
+ "futures-util",
  "governor",
  "hostname",
  "human-repr",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index ab5ec18d..fa87b2d7 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -31,6 +31,7 @@ charming = { workspace = true }
 chrono = { workspace = true }
 clap = { workspace = true }
 figlet-rs = { workspace = true }
+futures-util = { workspace = true }
 governor = "0.10.0"
 hostname = "0.4.1"
 human-repr = { workspace = true }
diff --git a/core/bench/src/actors/consumer.rs 
b/core/bench/src/actors/consumer.rs
deleted file mode 100644
index 8ea71282..00000000
--- a/core/bench/src/actors/consumer.rs
+++ /dev/null
@@ -1,445 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::analytics::metrics::individual::from_records;
-use crate::analytics::record::BenchmarkRecord;
-use crate::benchmarks::common::create_consumer;
-use crate::utils::finish_condition::BenchmarkFinishCondition;
-use crate::utils::rate_limiter::BenchmarkRateLimiter;
-use crate::utils::{batch_total_size_bytes, batch_user_size_bytes};
-use bench_report::actor_kind::ActorKind;
-use bench_report::benchmark_kind::BenchmarkKind;
-use bench_report::individual_metrics::BenchmarkIndividualMetrics;
-use bench_report::numeric_parameter::BenchmarkNumericParameter;
-use human_repr::HumanCount;
-use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
-use std::sync::Arc;
-use std::time::Duration;
-use tokio::time::Instant;
-use tracing::{error, info, warn};
-
-pub struct BenchmarkConsumer {
-    client_factory: Arc<dyn ClientFactory>,
-    benchmark_kind: BenchmarkKind,
-    consumer_id: u32,
-    consumer_group_id: Option<u32>,
-    stream_id: u32,
-    messages_per_batch: BenchmarkNumericParameter,
-    finish_condition: Arc<BenchmarkFinishCondition>,
-    warmup_time: IggyDuration,
-    sampling_time: IggyDuration,
-    moving_average_window: u32,
-    polling_kind: PollingKind,
-    limit_bytes_per_second: Option<IggyByteSize>,
-    origin_timestamp_latency_calculation: bool,
-}
-
-impl BenchmarkConsumer {
-    #[allow(clippy::too_many_arguments)]
-    pub fn new(
-        client_factory: Arc<dyn ClientFactory>,
-        benchmark_kind: BenchmarkKind,
-        consumer_id: u32,
-        consumer_group_id: Option<u32>,
-        stream_id: u32,
-        messages_per_batch: BenchmarkNumericParameter,
-        finish_condition: Arc<BenchmarkFinishCondition>,
-        warmup_time: IggyDuration,
-        sampling_time: IggyDuration,
-        moving_average_window: u32,
-        polling_kind: PollingKind,
-        limit_bytes_per_second: Option<IggyByteSize>,
-        origin_timestamp_latency_calculation: bool,
-    ) -> Self {
-        Self {
-            client_factory,
-            benchmark_kind,
-            consumer_id,
-            consumer_group_id,
-            stream_id,
-            messages_per_batch,
-            finish_condition,
-            warmup_time,
-            sampling_time,
-            moving_average_window,
-            polling_kind,
-            limit_bytes_per_second,
-            origin_timestamp_latency_calculation,
-        }
-    }
-
-    pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
-        let (client, stream_id, topic_id, partition_id, consumer, 
rate_limiter) =
-            self.setup_client().await;
-
-        if self.warmup_time.get_duration() != Duration::from_millis(0) {
-            self.run_warmup(
-                &client,
-                &stream_id,
-                &topic_id,
-                partition_id,
-                &consumer,
-                rate_limiter.as_ref(),
-            )
-            .await?;
-        }
-
-        let metrics = self
-            .run_benchmark(
-                &client,
-                &stream_id,
-                &topic_id,
-                partition_id,
-                &consumer,
-                rate_limiter.as_ref(),
-            )
-            .await?;
-        Ok(metrics)
-    }
-
-    async fn setup_client(
-        &self,
-    ) -> (
-        IggyClient,
-        Identifier,
-        Identifier,
-        Option<u32>,
-        Consumer,
-        Option<BenchmarkRateLimiter>,
-    ) {
-        let topic_id: u32 = 1;
-        let default_partition_id: u32 = 1;
-        let client = self.client_factory.create_client().await;
-        let client = IggyClient::create(client, None, None);
-        login_root(&client).await;
-
-        let stream_id = self.stream_id.try_into().unwrap();
-        let topic_id = topic_id.try_into().unwrap();
-        let partition_id = if self.consumer_group_id.is_some() {
-            None
-        } else {
-            Some(default_partition_id)
-        };
-        let cg_id = self.consumer_group_id;
-        let consumer = create_consumer(
-            &client,
-            cg_id.as_ref(),
-            &stream_id,
-            &topic_id,
-            self.consumer_id,
-        )
-        .await;
-        let rate_limiter = 
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
-
-        (
-            client,
-            stream_id,
-            topic_id,
-            partition_id,
-            consumer,
-            rate_limiter,
-        )
-    }
-
-    async fn run_warmup(
-        &self,
-        client: &IggyClient,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partition_id: Option<u32>,
-        consumer: &Consumer,
-        rate_limiter: Option<&BenchmarkRateLimiter>,
-    ) -> Result<(), IggyError> {
-        if let Some(cg_id) = self.consumer_group_id {
-            info!(
-                "Consumer #{}, part of consumer group #{}, → warming up for 
{}...",
-                self.consumer_id, cg_id, self.warmup_time
-            );
-        } else {
-            info!(
-                "Consumer #{} → warming up for {}...",
-                self.consumer_id, self.warmup_time
-            );
-        }
-
-        let warmup_end = Instant::now() + self.warmup_time.get_duration();
-        let mut messages_processed = 0;
-        let mut last_batch_user_size_bytes = 0;
-        while Instant::now() < warmup_end {
-            if let Some(rate_limiter) = rate_limiter {
-                if last_batch_user_size_bytes > 0 {
-                    rate_limiter
-                        .wait_until_necessary(last_batch_user_size_bytes)
-                        .await;
-                }
-            }
-            let messages_to_receive = self.messages_per_batch.get();
-            let offset = messages_processed;
-            let (strategy, auto_commit) = match self.polling_kind {
-                PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
-                PollingKind::Next => (PollingStrategy::next(), true),
-                _ => panic!(
-                    "Unsupported polling kind for benchmark: {:?}",
-                    self.polling_kind
-                ),
-            };
-            let polled_messages = client
-                .poll_messages(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    consumer,
-                    &strategy,
-                    messages_to_receive,
-                    auto_commit,
-                )
-                .await?;
-
-            if polled_messages.messages.is_empty() {
-                warn!(
-                    "Consumer #{} → Messages are empty for offset: {}, 
retrying...",
-                    self.consumer_id, offset
-                );
-                continue;
-            }
-            messages_processed = polled_messages.messages.len() as u64;
-            last_batch_user_size_bytes = 
batch_user_size_bytes(&polled_messages);
-        }
-        Ok(())
-    }
-
-    #[allow(clippy::too_many_lines)]
-    async fn run_benchmark(
-        &self,
-        client: &IggyClient,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partition_id: Option<u32>,
-        consumer: &Consumer,
-        rate_limiter: Option<&BenchmarkRateLimiter>,
-    ) -> Result<BenchmarkIndividualMetrics, IggyError> {
-        if let Some(cg_id) = self.consumer_group_id {
-            info!(
-                "Consumer #{}, part of consumer group #{} → polling {} in {} 
messages per batch from stream {}, rate limit: {:?}...",
-                self.consumer_id,
-                cg_id,
-                self.finish_condition.total_str(),
-                self.messages_per_batch,
-                stream_id,
-                self.limit_bytes_per_second
-            );
-        } else {
-            info!(
-                "Consumer #{} → polling {} in {} messages per batch from 
stream {}, rate limit: {:?}...",
-                self.consumer_id,
-                self.finish_condition.total_str(),
-                self.messages_per_batch,
-                stream_id,
-                self.limit_bytes_per_second
-            );
-        }
-
-        let max_capacity = self.finish_condition.max_capacity();
-        let mut records = Vec::with_capacity(max_capacity);
-        let mut skipped_warnings_count: u32 = 0;
-        let mut topic_not_found_counter = 0;
-        let mut initial_poll_timestamp: Option<Instant> = None;
-        let mut last_warning_time: Option<Instant> = None;
-        let mut messages_processed = 0;
-        let mut batches_processed = 0;
-        let mut bytes_processed = 0;
-        let mut user_data_bytes_processed = 0;
-        let start_timestamp = Instant::now();
-
-        loop {
-            if self.finish_condition.is_done() {
-                break;
-            }
-
-            let messages_to_receive = self.messages_per_batch.get();
-            let offset = messages_processed;
-            let (strategy, auto_commit) = match self.polling_kind {
-                PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
-                PollingKind::Next => (PollingStrategy::next(), true),
-                _ => panic!(
-                    "Unsupported polling kind for benchmark: {:?}",
-                    self.polling_kind
-                ),
-            };
-            let before_poll = Instant::now();
-            let polled_messages = client
-                .poll_messages(
-                    stream_id,
-                    topic_id,
-                    partition_id,
-                    consumer,
-                    &strategy,
-                    messages_to_receive,
-                    auto_commit,
-                )
-                .await;
-            if let Err(e) = polled_messages {
-                if matches!(e, IggyError::TopicIdNotFound(_, _)) {
-                    topic_not_found_counter += 1;
-                    if topic_not_found_counter > 1000 {
-                        return Err(e);
-                    }
-                } else {
-                    return Err(e);
-                }
-                error!("Unexpected error: {:?}", e);
-                continue;
-            }
-
-            let polled_messages = polled_messages.unwrap();
-            if polled_messages.messages.is_empty() {
-                if initial_poll_timestamp.is_none() {
-                    initial_poll_timestamp = Some(before_poll);
-                }
-                let should_warn =
-                    last_warning_time.is_none_or(|t| t.elapsed() >= 
Duration::from_secs(1));
-
-                if should_warn {
-                    warn!(
-                        "Consumer #{} → Messages are empty for offset: {}, 
received {}, retrying... ({} warnings skipped)",
-                        self.consumer_id,
-                        offset,
-                        self.finish_condition.status(),
-                        skipped_warnings_count
-                    );
-                    last_warning_time = Some(Instant::now());
-                    skipped_warnings_count = 0;
-                } else {
-                    skipped_warnings_count += 1;
-                }
-                continue;
-            }
-
-            if polled_messages.messages.len() != messages_to_receive as usize {
-                let should_warn =
-                    last_warning_time.is_none_or(|t| t.elapsed() >= 
Duration::from_secs(1));
-
-                if should_warn {
-                    warn!(
-                        "Consumer #{} → expected {} messages, but got {} 
messages (received {}), retrying... ({} warnings skipped)",
-                        self.consumer_id,
-                        messages_to_receive,
-                        polled_messages.messages.len(),
-                        self.finish_condition.status(),
-                        skipped_warnings_count
-                    );
-                    last_warning_time = Some(Instant::now());
-                    skipped_warnings_count = 0;
-                } else {
-                    skipped_warnings_count += 1;
-                }
-            }
-            let latency = if self.origin_timestamp_latency_calculation {
-                let now = IggyTimestamp::now().as_micros();
-                Duration::from_micros(now - 
polled_messages.messages[0].header.origin_timestamp)
-            } else {
-                initial_poll_timestamp.unwrap_or(before_poll).elapsed()
-            };
-
-            let batch_user_size_bytes = 
batch_user_size_bytes(&polled_messages);
-            let batch_total_size_bytes = 
batch_total_size_bytes(&polled_messages);
-
-            initial_poll_timestamp = None;
-
-            messages_processed += polled_messages.messages.len() as u64;
-            batches_processed += 1;
-            bytes_processed += batch_total_size_bytes;
-            user_data_bytes_processed += batch_user_size_bytes;
-
-            records.push(BenchmarkRecord {
-                elapsed_time_us: 
u64::try_from(start_timestamp.elapsed().as_micros())
-                    .unwrap_or(u64::MAX),
-                latency_us: 
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
-                messages: messages_processed,
-                message_batches: batches_processed,
-                user_data_bytes: user_data_bytes_processed,
-                total_bytes: bytes_processed,
-            });
-
-            if let Some(rate_limiter) = rate_limiter {
-                rate_limiter
-                    .wait_until_necessary(batch_user_size_bytes)
-                    .await;
-            }
-
-            if self
-                .finish_condition
-                .account_and_check(batch_user_size_bytes)
-            {
-                break;
-            }
-        }
-
-        let metrics = from_records(
-            &records,
-            self.benchmark_kind,
-            ActorKind::Consumer,
-            self.consumer_id,
-            self.sampling_time,
-            self.moving_average_window,
-        );
-
-        Self::log_statistics(
-            self.consumer_id,
-            messages_processed,
-            u32::try_from(batches_processed).unwrap_or(u32::MAX),
-            &self.messages_per_batch,
-            &metrics,
-        );
-
-        Ok(metrics)
-    }
-
-    pub fn log_statistics(
-        consumer_id: u32,
-        total_messages: u64,
-        message_batches: u32,
-        messages_per_batch: &BenchmarkNumericParameter,
-        metrics: &BenchmarkIndividualMetrics,
-    ) {
-        info!(
-            "Consumer #{} → polled {} messages, {} batches of {} messages in 
{:.2} s, total size: {}, average throughput: {:.2} MB/s, \
-    p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 
latency: {:.2} ms, p999 latency: {:.2} ms, \
-    p9999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2} 
ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms",
-            consumer_id,
-            total_messages.human_count_bare(),
-            message_batches.human_count_bare(),
-            messages_per_batch,
-            metrics.summary.total_time_secs,
-            IggyByteSize::from(metrics.summary.total_user_data_bytes),
-            metrics.summary.throughput_megabytes_per_second,
-            metrics.summary.p50_latency_ms,
-            metrics.summary.p90_latency_ms,
-            metrics.summary.p95_latency_ms,
-            metrics.summary.p99_latency_ms,
-            metrics.summary.p999_latency_ms,
-            metrics.summary.p9999_latency_ms,
-            metrics.summary.avg_latency_ms,
-            metrics.summary.median_latency_ms,
-            metrics.summary.min_latency_ms,
-            metrics.summary.max_latency_ms,
-            metrics.summary.std_dev_latency_ms,
-        );
-    }
-}
diff --git a/core/bench/src/actors/consumer/backend.rs 
b/core/bench/src/actors/consumer/backend.rs
new file mode 100644
index 00000000..b61aaa41
--- /dev/null
+++ b/core/bench/src/actors/consumer/backend.rs
@@ -0,0 +1,84 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bench_report::numeric_parameter::BenchmarkNumericParameter;
+use iggy::prelude::*;
+use integration::test_server::ClientFactory;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::Duration;
+
+#[derive(Clone)]
+pub struct ConsumerBackendImpl<T> {
+    pub client_factory: Arc<dyn ClientFactory>,
+    pub config: BenchmarkConsumerConfig,
+    _phantom: PhantomData<T>,
+}
+
+impl<T> ConsumerBackendImpl<T> {
+    pub fn new(client_factory: Arc<dyn ClientFactory>, config: 
BenchmarkConsumerConfig) -> Self {
+        Self {
+            client_factory,
+            config,
+            _phantom: PhantomData,
+        }
+    }
+}
+
+pub struct LowLevelApiMarker;
+pub struct HighLevelApiMarker;
+
+pub type LowLevelBackend = ConsumerBackendImpl<LowLevelApiMarker>;
+pub type HighLevelBackend = ConsumerBackendImpl<HighLevelApiMarker>;
+
+pub enum ConsumerBackend {
+    LowLevel(LowLevelBackend),
+    HighLevel(HighLevelBackend),
+}
+
+#[derive(Debug, Clone)]
+pub struct ConsumedBatch {
+    pub messages: u32,
+    pub user_data_bytes: u64,
+    pub total_bytes: u64,
+    pub latency: Duration,
+}
+
+#[derive(Debug, Clone)]
+pub struct BenchmarkConsumerConfig {
+    pub consumer_id: u32,
+    pub consumer_group_id: Option<u32>,
+    pub stream_id: u32,
+    pub messages_per_batch: BenchmarkNumericParameter,
+    pub warmup_time: IggyDuration,
+    pub polling_kind: PollingKind,
+    pub origin_timestamp_latency_calculation: bool,
+}
+
+pub trait BenchmarkConsumerBackend {
+    type Consumer;
+
+    async fn setup(&self) -> Result<Self::Consumer, IggyError>;
+    async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(), 
IggyError>;
+    async fn consume_batch(
+        &self,
+        consumer: &mut Self::Consumer,
+    ) -> Result<Option<ConsumedBatch>, IggyError>;
+    fn log_setup_info(&self);
+    fn log_warmup_info(&self);
+}
diff --git a/core/bench/src/actors/producer.rs 
b/core/bench/src/actors/consumer/benchmark_consumer.rs
similarity index 52%
copy from core/bench/src/actors/producer.rs
copy to core/bench/src/actors/consumer/benchmark_consumer.rs
index c32191ab..b739a00d 100644
--- a/core/bench/src/actors/producer.rs
+++ b/core/bench/src/actors/consumer/benchmark_consumer.rs
@@ -16,9 +16,12 @@
  * under the License.
  */
 
+use super::backend::{
+    BenchmarkConsumerBackend, BenchmarkConsumerConfig, ConsumerBackend, 
HighLevelBackend,
+    LowLevelBackend,
+};
 use crate::analytics::metrics::individual::from_records;
 use crate::analytics::record::BenchmarkRecord;
-use crate::utils::batch_generator::BenchmarkBatchGenerator;
 use crate::utils::finish_condition::BenchmarkFinishCondition;
 use crate::utils::rate_limiter::BenchmarkRateLimiter;
 use bench_report::actor_kind::ActorKind;
@@ -27,144 +30,143 @@ use 
bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use bench_report::numeric_parameter::BenchmarkNumericParameter;
 use human_repr::HumanCount;
 use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
+use integration::test_server::ClientFactory;
 use std::sync::Arc;
 use std::time::Duration;
 use tokio::time::Instant;
 use tracing::info;
 
-pub struct BenchmarkProducer {
-    client_factory: Arc<dyn ClientFactory>,
+pub struct BenchmarkConsumer {
+    backend: ConsumerBackend,
     benchmark_kind: BenchmarkKind,
-    producer_id: u32,
-    stream_id: u32,
-    partitions: u32,
-    messages_per_batch: BenchmarkNumericParameter,
-    message_size: BenchmarkNumericParameter,
     finish_condition: Arc<BenchmarkFinishCondition>,
-    warmup_time: IggyDuration,
     sampling_time: IggyDuration,
     moving_average_window: u32,
     limit_bytes_per_second: Option<IggyByteSize>,
+    config: BenchmarkConsumerConfig,
 }
 
-impl BenchmarkProducer {
+impl BenchmarkConsumer {
     #[allow(clippy::too_many_arguments)]
     pub fn new(
         client_factory: Arc<dyn ClientFactory>,
         benchmark_kind: BenchmarkKind,
-        producer_id: u32,
+        consumer_id: u32,
+        consumer_group_id: Option<u32>,
         stream_id: u32,
-        partitions: u32,
         messages_per_batch: BenchmarkNumericParameter,
-        message_size: BenchmarkNumericParameter,
         finish_condition: Arc<BenchmarkFinishCondition>,
         warmup_time: IggyDuration,
         sampling_time: IggyDuration,
         moving_average_window: u32,
+        polling_kind: PollingKind,
         limit_bytes_per_second: Option<IggyByteSize>,
+        origin_timestamp_latency_calculation: bool,
+        use_high_level_api: bool,
     ) -> Self {
-        Self {
-            client_factory,
-            benchmark_kind,
-            producer_id,
+        let config = BenchmarkConsumerConfig {
+            consumer_id,
+            consumer_group_id,
             stream_id,
-            partitions,
             messages_per_batch,
-            message_size,
-            finish_condition,
             warmup_time,
+            polling_kind,
+            origin_timestamp_latency_calculation,
+        };
+
+        let backend = if use_high_level_api {
+            ConsumerBackend::HighLevel(HighLevelBackend::new(client_factory, 
config.clone()))
+        } else {
+            ConsumerBackend::LowLevel(LowLevelBackend::new(client_factory, 
config.clone()))
+        };
+
+        Self {
+            backend,
+            benchmark_kind,
+            finish_condition,
             sampling_time,
             moving_average_window,
             limit_bytes_per_second,
+            config,
         }
     }
 
     pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
-        let mut batch_generator =
-            BenchmarkBatchGenerator::new(self.message_size, 
self.messages_per_batch);
-        let rate_limiter = 
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
-
-        let topic_id: u32 = 1;
-        let default_partition_id: u32 = 1;
-        let partitions = self.partitions;
-        let client = self.client_factory.create_client().await;
-        let client = IggyClient::create(client, None, None);
-        login_root(&client).await;
-
-        let stream_id = self.stream_id.try_into()?;
-        let topic_id = topic_id.try_into()?;
-        let partitioning = match partitions {
-            0 => panic!("Partition count must be greater than 0"),
-            1 => Partitioning::partition_id(default_partition_id),
-            2.. => Partitioning::balanced(),
-        };
-
-        // -----------------------
-        // WARM-UP
-        // -----------------------
-
-        if self.warmup_time.get_duration() != Duration::from_millis(0) {
-            info!(
-                "Producer #{} → warming up for {}...",
-                self.producer_id, self.warmup_time
-            );
-            let warmup_end = Instant::now() + self.warmup_time.get_duration();
-
-            while Instant::now() < warmup_end {
-                let batch = batch_generator.generate_batch();
-                client
-                    .send_messages(&stream_id, &topic_id, &partitioning, &mut 
batch.messages)
-                    .await?;
+        match self.backend {
+            ConsumerBackend::LowLevel(backend) => {
+                Self::run_with_backend(
+                    self.benchmark_kind,
+                    self.finish_condition,
+                    self.sampling_time,
+                    self.moving_average_window,
+                    self.limit_bytes_per_second,
+                    self.config,
+                    backend,
+                )
+                .await
+            }
+            ConsumerBackend::HighLevel(backend) => {
+                Self::run_with_backend(
+                    self.benchmark_kind,
+                    self.finish_condition,
+                    self.sampling_time,
+                    self.moving_average_window,
+                    self.limit_bytes_per_second,
+                    self.config,
+                    backend,
+                )
+                .await
             }
         }
-        // -----------------------
-        // MAIN BENCHMARK
-        // -----------------------
+    }
 
-        info!(
-            "Producer #{} → sending {} in batches of {} messages to stream {} 
with {} partitions, partitioning: {}, rate limit: {:?}...",
-            self.producer_id,
-            self.finish_condition.total_str(),
-            self.messages_per_batch,
-            stream_id,
-            partitions,
-            partitioning,
-            self.limit_bytes_per_second
-        );
+    async fn run_with_backend<B: BenchmarkConsumerBackend>(
+        benchmark_kind: BenchmarkKind,
+        finish_condition: Arc<BenchmarkFinishCondition>,
+        sampling_time: IggyDuration,
+        moving_average_window: u32,
+        limit_bytes_per_second: Option<IggyByteSize>,
+        config: BenchmarkConsumerConfig,
+        backend: B,
+    ) -> Result<BenchmarkIndividualMetrics, IggyError> {
+        let mut consumer = backend.setup().await?;
+
+        if config.warmup_time.get_duration() != Duration::from_millis(0) {
+            backend.log_warmup_info();
+            backend.warmup(&mut consumer).await?;
+        }
+
+        backend.log_setup_info();
 
-        let max_capacity = self.finish_condition.max_capacity();
-        let mut records: Vec<BenchmarkRecord> = 
Vec::with_capacity(max_capacity);
+        let max_capacity = finish_condition.max_capacity();
+        let mut records = Vec::with_capacity(max_capacity);
         let mut messages_processed = 0;
         let mut batches_processed = 0;
-        let mut total_bytes_processed = 0;
+        let mut bytes_processed = 0;
         let mut user_data_bytes_processed = 0;
         let start_timestamp = Instant::now();
+        let rate_limiter = 
limit_bytes_per_second.map(BenchmarkRateLimiter::new);
 
-        loop {
-            if self.finish_condition.is_done() {
-                break;
-            }
-            let batch = batch_generator.generate_batch();
-            let before_send = Instant::now();
-            client
-                .send_messages(&stream_id, &topic_id, &partitioning, &mut 
batch.messages)
-                .await?;
-            let latency = before_send.elapsed();
-
-            messages_processed += batch.messages.len() as u64;
+        while !finish_condition.is_done() {
+            let batch_opt = backend.consume_batch(&mut consumer).await?;
+
+            let Some(batch) = batch_opt else {
+                continue;
+            };
+
+            messages_processed += u64::from(batch.messages);
             batches_processed += 1;
             user_data_bytes_processed += batch.user_data_bytes;
-            total_bytes_processed += batch.total_bytes;
+            bytes_processed += batch.total_bytes;
 
             records.push(BenchmarkRecord {
                 elapsed_time_us: 
u64::try_from(start_timestamp.elapsed().as_micros())
                     .unwrap_or(u64::MAX),
-                latency_us: 
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
+                latency_us: 
u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX),
                 messages: messages_processed,
                 message_batches: batches_processed,
                 user_data_bytes: user_data_bytes_processed,
-                total_bytes: total_bytes_processed,
+                total_bytes: bytes_processed,
             });
 
             if let Some(rate_limiter) = &rate_limiter {
@@ -173,46 +175,43 @@ impl BenchmarkProducer {
                     .await;
             }
 
-            if self
-                .finish_condition
-                .account_and_check(batch.user_data_bytes)
-            {
+            if finish_condition.account_and_check(batch.user_data_bytes) {
                 break;
             }
         }
 
         let metrics = from_records(
             &records,
-            self.benchmark_kind,
-            ActorKind::Producer,
-            self.producer_id,
-            self.sampling_time,
-            self.moving_average_window,
+            benchmark_kind,
+            ActorKind::Consumer,
+            config.consumer_id,
+            sampling_time,
+            moving_average_window,
         );
 
         Self::log_statistics(
-            self.producer_id,
+            config.consumer_id,
             messages_processed,
-            batches_processed,
-            &self.messages_per_batch,
+            u32::try_from(batches_processed).unwrap_or(u32::MAX),
+            &config.messages_per_batch,
             &metrics,
         );
 
         Ok(metrics)
     }
 
-    fn log_statistics(
-        producer_id: u32,
+    pub fn log_statistics(
+        consumer_id: u32,
         total_messages: u64,
-        message_batches: u64,
+        message_batches: u32,
         messages_per_batch: &BenchmarkNumericParameter,
         metrics: &BenchmarkIndividualMetrics,
     ) {
         info!(
-            "Producer #{} → sent {} messages in {} batches of {} messages in 
{:.2} s, total size: {}, average throughput: {:.2} MB/s, \
-    p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 
latency: {:.2} ms, p999 latency: {:.2} ms, p9999 latency: {:.2} ms, \
-    average latency: {:.2} ms, median latency: {:.2} ms, min latency: {:.2} 
ms, max latency: {:.2} ms, std dev latency: {:.2} ms",
-            producer_id,
+            "Consumer #{} → polled {} messages, {} batches of {} messages in 
{:.2} s, total size: {}, average throughput: {:.2} MB/s, \
+    p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 
latency: {:.2} ms, p999 latency: {:.2} ms, \
+    p9999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2} 
ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms",
+            consumer_id,
             total_messages.human_count_bare(),
             message_batches.human_count_bare(),
             messages_per_batch,
diff --git a/core/bench/src/actors/consumer/high_level_backend.rs 
b/core/bench/src/actors/consumer/high_level_backend.rs
new file mode 100644
index 00000000..b20b319c
--- /dev/null
+++ b/core/bench/src/actors/consumer/high_level_backend.rs
@@ -0,0 +1,179 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, 
HighLevelBackend};
+use futures_util::StreamExt;
+use iggy::prelude::*;
+use integration::test_server::login_root;
+use std::time::Duration;
+use tokio::time::{Instant, timeout};
+use tracing::{debug, error, info, warn};
+
+impl BenchmarkConsumerBackend for HighLevelBackend {
+    type Consumer = IggyConsumer;
+
+    async fn setup(&self) -> Result<Self::Consumer, IggyError> {
+        let topic_id: u32 = 1;
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let stream_id_str = self.config.stream_id.to_string();
+        let topic_id_str = topic_id.to_string();
+
+        let mut iggy_consumer = if let Some(cg_id) = 
self.config.consumer_group_id {
+            let consumer_group_name = format!("cg_{cg_id}");
+            // Consumer groups use auto-commit (matching PollingKind::Next 
behavior from low-level API)
+            client
+                .consumer_group(&consumer_group_name, &stream_id_str, 
&topic_id_str)?
+                .batch_size(self.config.messages_per_batch.get())
+                .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+                .create_consumer_group_if_not_exists()
+                .auto_join_consumer_group()
+                .build()
+        } else {
+            // TODO(hubcio): as of now, there is no way to mimic the behavior 
of
+            // PollingKind::Offset, because high level API doesn't provide 
method
+            // to commit local offset manually, only auto-commit on server.
+            client
+                .consumer(
+                    &format!("hl_consumer_{}", self.config.consumer_id),
+                    &stream_id_str,
+                    &topic_id_str,
+                    1,
+                )?
+                .batch_size(self.config.messages_per_batch.get())
+                .auto_commit(AutoCommit::Disabled)
+                .build()
+        };
+
+        iggy_consumer.init().await?;
+        Ok(iggy_consumer)
+    }
+
+    async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(), 
IggyError> {
+        let warmup_end = Instant::now() + 
self.config.warmup_time.get_duration();
+        while Instant::now() < warmup_end {
+            if let Some(message) = consumer.next().await {
+                if message.is_err() {
+                    break;
+                }
+            }
+        }
+        Ok(())
+    }
+
+    async fn consume_batch(
+        &self,
+        consumer: &mut Self::Consumer,
+    ) -> Result<Option<ConsumedBatch>, IggyError> {
+        let batch_start = Instant::now();
+        let mut batch_messages = 0;
+        let mut batch_user_bytes = 0;
+        let mut batch_total_bytes = 0;
+
+        while batch_messages < self.config.messages_per_batch.get() {
+            // Use timeout to avoid getting stuck waiting for messages
+            let timeout_result = timeout(Duration::from_secs(1), 
consumer.next()).await;
+
+            match timeout_result {
+                Ok(Some(message_result)) => match message_result {
+                    Ok(received_message) => {
+                        batch_messages += 1;
+                        batch_user_bytes += 
received_message.message.payload.len() as u64;
+                        batch_total_bytes +=
+                            
received_message.message.get_size_bytes().as_bytes_u64();
+
+                        let offset = received_message.message.header.offset;
+                        if batch_messages >= 
self.config.messages_per_batch.get() {
+                            info!(
+                                "Batch of {} messages consumed, last_offset: 
{}, current_offset: {}",
+                                batch_messages,
+                                received_message.message.header.offset,
+                                received_message.current_offset
+                            );
+
+                            if let Err(error) = consumer.store_offset(offset, 
None).await {
+                                error!("Failed to store offset: {offset}. 
{error}");
+                                continue;
+                            }
+                            debug!("Offset: {offset} stored successfully");
+                            break;
+                        }
+                    }
+                    Err(err) => {
+                        warn!("Error receiving message: {}", err);
+                    }
+                },
+                Ok(None) => {
+                    debug!("Consumer stream ended during batching");
+                    break;
+                }
+                Err(_) => {
+                    debug!(
+                        "Timeout waiting for messages, stopping batch at {} 
messages",
+                        batch_messages
+                    );
+                    break;
+                }
+            }
+        }
+
+        if batch_messages == 0 {
+            Ok(None)
+        } else {
+            Ok(Some(ConsumedBatch {
+                messages: batch_messages,
+                user_data_bytes: batch_user_bytes,
+                total_bytes: batch_total_bytes,
+                latency: batch_start.elapsed(),
+            }))
+        }
+    }
+
+    fn log_setup_info(&self) {
+        if let Some(cg_id) = self.config.consumer_group_id {
+            info!(
+                "Consumer #{}, part of consumer group #{} → polling in {} 
messages per batch from stream {}, using high-level API...",
+                self.config.consumer_id,
+                cg_id,
+                self.config.messages_per_batch,
+                self.config.stream_id,
+            );
+        } else {
+            info!(
+                "Consumer #{} → polling in {} messages per batch from stream 
{}, using high-level API...",
+                self.config.consumer_id, self.config.messages_per_batch, 
self.config.stream_id,
+            );
+        }
+    }
+
+    fn log_warmup_info(&self) {
+        if let Some(cg_id) = self.config.consumer_group_id {
+            info!(
+                "Consumer #{}, part of consumer group #{}, → warming up for 
{}...",
+                self.config.consumer_id, cg_id, self.config.warmup_time
+            );
+        } else {
+            info!(
+                "Consumer #{} → warming up for {}...",
+                self.config.consumer_id, self.config.warmup_time
+            );
+        }
+    }
+}
diff --git a/core/bench/src/actors/consumer/low_level_backend.rs 
b/core/bench/src/actors/consumer/low_level_backend.rs
new file mode 100644
index 00000000..bcdb3f3e
--- /dev/null
+++ b/core/bench/src/actors/consumer/low_level_backend.rs
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, LowLevelBackend};
+use crate::benchmarks::common::create_consumer;
+use crate::utils::{batch_total_size_bytes, batch_user_size_bytes};
+use iggy::prelude::*;
+use integration::test_server::login_root;
+use std::time::Duration;
+use tokio::time::Instant;
+use tracing::{info, warn};
+
+impl BenchmarkConsumerBackend for LowLevelBackend {
+    type Consumer = (
+        IggyClient,
+        Identifier,
+        Identifier,
+        Option<u32>,
+        Consumer,
+        u64,
+    );
+
+    async fn setup(&self) -> Result<Self::Consumer, IggyError> {
+        let topic_id: u32 = 1;
+        let default_partition_id: u32 = 1;
+        let client = self.client_factory.create_client().await;
+        let client = IggyClient::create(client, None, None);
+        login_root(&client).await;
+
+        let stream_id = self.config.stream_id.try_into().unwrap();
+        let topic_id = topic_id.try_into().unwrap();
+        let partition_id = if self.config.consumer_group_id.is_some() {
+            None
+        } else {
+            Some(default_partition_id)
+        };
+        let cg_id = self.config.consumer_group_id;
+        let consumer = create_consumer(
+            &client,
+            cg_id.as_ref(),
+            &stream_id,
+            &topic_id,
+            self.config.consumer_id,
+        )
+        .await;
+
+        Ok((client, stream_id, topic_id, partition_id, consumer, 0))
+    }
+
+    async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(), 
IggyError> {
+        let (client, stream_id, topic_id, partition_id, consumer_obj, 
messages_processed) =
+            consumer;
+        let warmup_end = Instant::now() + 
self.config.warmup_time.get_duration();
+        let mut warmup_messages_processed = 0;
+        let mut _last_batch_user_size_bytes = 0;
+
+        while Instant::now() < warmup_end {
+            let messages_to_receive = self.config.messages_per_batch.get();
+            let offset = *messages_processed + warmup_messages_processed;
+            let (strategy, auto_commit) = match self.config.polling_kind {
+                PollingKind::Offset => (PollingStrategy::offset(offset), 
false),
+                PollingKind::Next => (PollingStrategy::next(), true),
+                _ => panic!(
+                    "Unsupported polling kind for benchmark: {:?}",
+                    self.config.polling_kind
+                ),
+            };
+            let polled_messages = client
+                .poll_messages(
+                    stream_id,
+                    topic_id,
+                    *partition_id,
+                    consumer_obj,
+                    &strategy,
+                    messages_to_receive,
+                    auto_commit,
+                )
+                .await?;
+
+            if polled_messages.messages.is_empty() {
+                warn!(
+                    "Consumer #{} → Messages are empty for offset: {}, 
retrying...",
+                    self.config.consumer_id, offset
+                );
+                continue;
+            }
+            warmup_messages_processed += polled_messages.messages.len() as u64;
+            _last_batch_user_size_bytes = 
batch_user_size_bytes(&polled_messages);
+        }
+        Ok(())
+    }
+
+    async fn consume_batch(
+        &self,
+        consumer: &mut Self::Consumer,
+    ) -> Result<Option<ConsumedBatch>, IggyError> {
+        let (client, stream_id, topic_id, partition_id, consumer_obj, 
messages_processed) =
+            consumer;
+        let messages_to_receive = self.config.messages_per_batch.get();
+        let offset = *messages_processed;
+        let (strategy, auto_commit) = match self.config.polling_kind {
+            PollingKind::Offset => (PollingStrategy::offset(offset), false),
+            PollingKind::Next => (PollingStrategy::next(), true),
+            _ => panic!(
+                "Unsupported polling kind for benchmark: {:?}",
+                self.config.polling_kind
+            ),
+        };
+
+        let before_poll = Instant::now();
+        let polled_messages = client
+            .poll_messages(
+                stream_id,
+                topic_id,
+                *partition_id,
+                consumer_obj,
+                &strategy,
+                messages_to_receive,
+                auto_commit,
+            )
+            .await;
+
+        let polled_messages = match polled_messages {
+            Ok(messages) => messages,
+            Err(e) => {
+                if matches!(e, IggyError::TopicIdNotFound(_, _)) {
+                    return Ok(None);
+                }
+                return Err(e);
+            }
+        };
+
+        if polled_messages.messages.is_empty() {
+            return Ok(None);
+        }
+        let latency = if self.config.origin_timestamp_latency_calculation {
+            let now = IggyTimestamp::now().as_micros();
+            Duration::from_micros(now - 
polled_messages.messages[0].header.origin_timestamp)
+        } else {
+            before_poll.elapsed()
+        };
+
+        let batch_user_size_bytes = batch_user_size_bytes(&polled_messages);
+        let batch_total_size_bytes = batch_total_size_bytes(&polled_messages);
+
+        *messages_processed += polled_messages.messages.len() as u64;
+
+        Ok(Some(ConsumedBatch {
+            messages: u32::try_from(polled_messages.messages.len()).unwrap(),
+            user_data_bytes: batch_user_size_bytes,
+            total_bytes: batch_total_size_bytes,
+            latency,
+        }))
+    }
+
+    fn log_setup_info(&self) {
+        if let Some(cg_id) = self.config.consumer_group_id {
+            info!(
+                "Consumer #{}, part of consumer group #{} → polling in {} 
messages per batch from stream {}, using low-level API...",
+                self.config.consumer_id,
+                cg_id,
+                self.config.messages_per_batch,
+                self.config.stream_id,
+            );
+        } else {
+            info!(
+                "Consumer #{} → polling in {} messages per batch from stream 
{}, using low-level API...",
+                self.config.consumer_id, self.config.messages_per_batch, 
self.config.stream_id,
+            );
+        }
+    }
+
+    fn log_warmup_info(&self) {
+        if let Some(cg_id) = self.config.consumer_group_id {
+            info!(
+                "Consumer #{}, part of consumer group #{}, → warming up for 
{}...",
+                self.config.consumer_id, cg_id, self.config.warmup_time
+            );
+        } else {
+            info!(
+                "Consumer #{} → warming up for {}...",
+                self.config.consumer_id, self.config.warmup_time
+            );
+        }
+    }
+}
diff --git a/core/bench/src/actors/consumer/mod.rs 
b/core/bench/src/actors/consumer/mod.rs
new file mode 100644
index 00000000..2912806b
--- /dev/null
+++ b/core/bench/src/actors/consumer/mod.rs
@@ -0,0 +1,24 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod backend;
+pub mod benchmark_consumer;
+pub mod high_level_backend;
+pub mod low_level_backend;
+
+pub use benchmark_consumer::BenchmarkConsumer;
diff --git a/core/bench/src/actors/producer.rs 
b/core/bench/src/actors/producer/benchmark_producer.rs
similarity index 100%
rename from core/bench/src/actors/producer.rs
rename to core/bench/src/actors/producer/benchmark_producer.rs
diff --git a/core/bench/src/actors/producer/mod.rs 
b/core/bench/src/actors/producer/mod.rs
new file mode 100644
index 00000000..d4f6209f
--- /dev/null
+++ b/core/bench/src/actors/producer/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod benchmark_producer;
+
+pub use benchmark_producer::BenchmarkProducer;
diff --git a/core/bench/src/actors/producing_consumer.rs 
b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
similarity index 100%
rename from core/bench/src/actors/producing_consumer.rs
rename to 
core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
diff --git a/core/bench/src/actors/producing_consumer/mod.rs 
b/core/bench/src/actors/producing_consumer/mod.rs
new file mode 100644
index 00000000..2927c8d9
--- /dev/null
+++ b/core/bench/src/actors/producing_consumer/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod benchmark_producing_consumer;
+
+pub use benchmark_producing_consumer::BenchmarkProducingConsumer;
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 1ca93f0c..7d475aa0 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -41,6 +41,7 @@ use std::str::FromStr;
 
 #[derive(Parser, Debug)]
 #[command(author, version, about, long_about = None)]
+#[allow(clippy::struct_excessive_bools)]
 pub struct IggyBenchArgs {
     /// Benchmark kind
     #[command(subcommand)]
@@ -103,6 +104,10 @@ pub struct IggyBenchArgs {
     /// Only applicable to local benchmarks.
     #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START, 
verbatim_doc_comment)]
     pub skip_server_start: bool,
+
+    /// Use high-level API for actors
+    #[arg(long, short = 'H', default_value_t = false)]
+    pub high_level_api: bool,
 }
 
 fn validate_server_executable_path(v: &str) -> Result<String, String> {
@@ -182,6 +187,15 @@ impl IggyBenchArgs {
             }
         }
 
+        if self.high_level_api && !self.messages_per_batch.is_fixed() {
+            Self::command()
+                .error(
+                    ErrorKind::ArgumentConflict,
+                    "High-level consumer API (--high-level-api) requires fixed 
batch size, but random batch size was specified. Use a single value instead of 
a range for --messages-per-batch.",
+                )
+                .exit();
+        }
+
         self.benchmark_kind.inner().validate();
     }
 
@@ -330,6 +344,10 @@ impl IggyBenchArgs {
         self.benchmark_kind.inner().max_topic_size()
     }
 
+    pub const fn high_level_api(&self) -> bool {
+        self.high_level_api
+    }
+
     /// Generates the output directory name based on benchmark parameters.
     pub fn generate_dir_name(&self) -> String {
         let benchmark_kind = match &self.benchmark_kind {
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index 22c9ad22..d030841c 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -190,6 +190,7 @@ pub fn build_consumer_futures(
     let global_finish_condition =
         BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared);
     let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
+    let use_high_level_api = args.high_level_api();
 
     (1..=consumers)
         .map(|consumer_id| {
@@ -225,6 +226,7 @@ pub fn build_consumer_futures(
                     polling_kind,
                     rate_limit,
                     origin_timestamp_latency_calculation,
+                    use_high_level_api,
                 );
                 consumer.run().await
             }
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 795f4edc..b2dd5d20 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -251,6 +251,13 @@ impl IggyConsumer {
         .await
     }
 
+    /// Retrieves the last consumed offset for the specified partition ID.
+    /// To get the current partition ID use `partition_id()`
+    pub fn get_last_consumed_offset(&self, partition_id: u32) -> Option<u64> {
+        let offset = self.last_consumed_offsets.get(&partition_id)?;
+        Some(offset.load(ORDERING))
+    }
+
     /// Deletes the consumer offset on the server either for the current 
partition or the provided partition ID.
     pub async fn delete_offset(&self, partition_id: Option<u32>) -> Result<(), 
IggyError> {
         let client = self.client.read().await;
@@ -264,6 +271,13 @@ impl IggyConsumer {
             .await
     }
 
+    /// Retrieves the last stored offset (on the server) for the specified 
partition ID.
+    /// To get the current partition ID use `partition_id()`
+    pub fn get_last_stored_offset(&self, partition_id: u32) -> Option<u64> {
+        let offset = self.last_stored_offsets.get(&partition_id)?;
+        Some(offset.load(ORDERING))
+    }
+
     /// Initializes the consumer by subscribing to diagnostic events, 
initializing the consumer group if needed, storing the offsets in the 
background etc.
     ///
     /// Note: This method must be called before polling messages.
@@ -805,9 +819,19 @@ impl IggyConsumer {
             info!(
                 "Creating consumer group: {consumer_group_id} for topic: 
{topic_id}, stream: {stream_id}"
             );
-            client
+            match client
                 .create_consumer_group(&stream_id, &topic_id, &name, id)
-                .await?;
+                .await
+            {
+                Ok(_) => {}
+                Err(IggyError::ConsumerGroupNameAlreadyExists(_, _)) => {}
+                Err(error) => {
+                    error!(
+                        "Failed to create consumer group {consumer_group_id} 
for topic: {topic_id}, stream: {stream_id}: {error}"
+                    );
+                    return Err(error);
+                }
+            }
         }
 
         info!(

Reply via email to