This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8294ccef feat(bench): add high-level consumer API for benchmarking
(#1855)
8294ccef is described below
commit 8294ccef396f682c04d6013964afc01ec9228212
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Jun 8 14:18:26 2025 +0200
feat(bench): add high-level consumer API for benchmarking (#1855)
This commit implements usage of high-level consumer API in
the iggy-bench. The changes include:
- Addition of `futures-util` dependency for stream processing.
- Refactoring of the consumer module to support both low-level
and high-level APIs.
- Implementation of backend logic for high-level consumer
operations.
- Updates to the benchmarking configuration to allow selection
between high-level and low-level APIs.
- Enhancements to the consumer setup and warmup processes to
accommodate the new API.
---
Cargo.lock | 1 +
core/bench/Cargo.toml | 1 +
core/bench/src/actors/consumer.rs | 445 ---------------------
core/bench/src/actors/consumer/backend.rs | 84 ++++
.../benchmark_consumer.rs} | 211 +++++-----
.../src/actors/consumer/high_level_backend.rs | 179 +++++++++
.../bench/src/actors/consumer/low_level_backend.rs | 201 ++++++++++
core/bench/src/actors/consumer/mod.rs | 24 ++
.../benchmark_producer.rs} | 0
core/bench/src/actors/producer/mod.rs | 21 +
.../benchmark_producing_consumer.rs} | 0
core/bench/src/actors/producing_consumer/mod.rs | 21 +
core/bench/src/args/common.rs | 18 +
core/bench/src/benchmarks/common.rs | 2 +
core/sdk/src/clients/consumer.rs | 28 +-
15 files changed, 683 insertions(+), 553 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 1f410423..6be9aa34 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -870,6 +870,7 @@ dependencies = [
"chrono",
"clap",
"figlet-rs",
+ "futures-util",
"governor",
"hostname",
"human-repr",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index ab5ec18d..fa87b2d7 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -31,6 +31,7 @@ charming = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
figlet-rs = { workspace = true }
+futures-util = { workspace = true }
governor = "0.10.0"
hostname = "0.4.1"
human-repr = { workspace = true }
diff --git a/core/bench/src/actors/consumer.rs
b/core/bench/src/actors/consumer.rs
deleted file mode 100644
index 8ea71282..00000000
--- a/core/bench/src/actors/consumer.rs
+++ /dev/null
@@ -1,445 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::analytics::metrics::individual::from_records;
-use crate::analytics::record::BenchmarkRecord;
-use crate::benchmarks::common::create_consumer;
-use crate::utils::finish_condition::BenchmarkFinishCondition;
-use crate::utils::rate_limiter::BenchmarkRateLimiter;
-use crate::utils::{batch_total_size_bytes, batch_user_size_bytes};
-use bench_report::actor_kind::ActorKind;
-use bench_report::benchmark_kind::BenchmarkKind;
-use bench_report::individual_metrics::BenchmarkIndividualMetrics;
-use bench_report::numeric_parameter::BenchmarkNumericParameter;
-use human_repr::HumanCount;
-use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
-use std::sync::Arc;
-use std::time::Duration;
-use tokio::time::Instant;
-use tracing::{error, info, warn};
-
-pub struct BenchmarkConsumer {
- client_factory: Arc<dyn ClientFactory>,
- benchmark_kind: BenchmarkKind,
- consumer_id: u32,
- consumer_group_id: Option<u32>,
- stream_id: u32,
- messages_per_batch: BenchmarkNumericParameter,
- finish_condition: Arc<BenchmarkFinishCondition>,
- warmup_time: IggyDuration,
- sampling_time: IggyDuration,
- moving_average_window: u32,
- polling_kind: PollingKind,
- limit_bytes_per_second: Option<IggyByteSize>,
- origin_timestamp_latency_calculation: bool,
-}
-
-impl BenchmarkConsumer {
- #[allow(clippy::too_many_arguments)]
- pub fn new(
- client_factory: Arc<dyn ClientFactory>,
- benchmark_kind: BenchmarkKind,
- consumer_id: u32,
- consumer_group_id: Option<u32>,
- stream_id: u32,
- messages_per_batch: BenchmarkNumericParameter,
- finish_condition: Arc<BenchmarkFinishCondition>,
- warmup_time: IggyDuration,
- sampling_time: IggyDuration,
- moving_average_window: u32,
- polling_kind: PollingKind,
- limit_bytes_per_second: Option<IggyByteSize>,
- origin_timestamp_latency_calculation: bool,
- ) -> Self {
- Self {
- client_factory,
- benchmark_kind,
- consumer_id,
- consumer_group_id,
- stream_id,
- messages_per_batch,
- finish_condition,
- warmup_time,
- sampling_time,
- moving_average_window,
- polling_kind,
- limit_bytes_per_second,
- origin_timestamp_latency_calculation,
- }
- }
-
- pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
- let (client, stream_id, topic_id, partition_id, consumer,
rate_limiter) =
- self.setup_client().await;
-
- if self.warmup_time.get_duration() != Duration::from_millis(0) {
- self.run_warmup(
- &client,
- &stream_id,
- &topic_id,
- partition_id,
- &consumer,
- rate_limiter.as_ref(),
- )
- .await?;
- }
-
- let metrics = self
- .run_benchmark(
- &client,
- &stream_id,
- &topic_id,
- partition_id,
- &consumer,
- rate_limiter.as_ref(),
- )
- .await?;
- Ok(metrics)
- }
-
- async fn setup_client(
- &self,
- ) -> (
- IggyClient,
- Identifier,
- Identifier,
- Option<u32>,
- Consumer,
- Option<BenchmarkRateLimiter>,
- ) {
- let topic_id: u32 = 1;
- let default_partition_id: u32 = 1;
- let client = self.client_factory.create_client().await;
- let client = IggyClient::create(client, None, None);
- login_root(&client).await;
-
- let stream_id = self.stream_id.try_into().unwrap();
- let topic_id = topic_id.try_into().unwrap();
- let partition_id = if self.consumer_group_id.is_some() {
- None
- } else {
- Some(default_partition_id)
- };
- let cg_id = self.consumer_group_id;
- let consumer = create_consumer(
- &client,
- cg_id.as_ref(),
- &stream_id,
- &topic_id,
- self.consumer_id,
- )
- .await;
- let rate_limiter =
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
-
- (
- client,
- stream_id,
- topic_id,
- partition_id,
- consumer,
- rate_limiter,
- )
- }
-
- async fn run_warmup(
- &self,
- client: &IggyClient,
- stream_id: &Identifier,
- topic_id: &Identifier,
- partition_id: Option<u32>,
- consumer: &Consumer,
- rate_limiter: Option<&BenchmarkRateLimiter>,
- ) -> Result<(), IggyError> {
- if let Some(cg_id) = self.consumer_group_id {
- info!(
- "Consumer #{}, part of consumer group #{}, → warming up for
{}...",
- self.consumer_id, cg_id, self.warmup_time
- );
- } else {
- info!(
- "Consumer #{} → warming up for {}...",
- self.consumer_id, self.warmup_time
- );
- }
-
- let warmup_end = Instant::now() + self.warmup_time.get_duration();
- let mut messages_processed = 0;
- let mut last_batch_user_size_bytes = 0;
- while Instant::now() < warmup_end {
- if let Some(rate_limiter) = rate_limiter {
- if last_batch_user_size_bytes > 0 {
- rate_limiter
- .wait_until_necessary(last_batch_user_size_bytes)
- .await;
- }
- }
- let messages_to_receive = self.messages_per_batch.get();
- let offset = messages_processed;
- let (strategy, auto_commit) = match self.polling_kind {
- PollingKind::Offset => (PollingStrategy::offset(offset),
false),
- PollingKind::Next => (PollingStrategy::next(), true),
- _ => panic!(
- "Unsupported polling kind for benchmark: {:?}",
- self.polling_kind
- ),
- };
- let polled_messages = client
- .poll_messages(
- stream_id,
- topic_id,
- partition_id,
- consumer,
- &strategy,
- messages_to_receive,
- auto_commit,
- )
- .await?;
-
- if polled_messages.messages.is_empty() {
- warn!(
- "Consumer #{} → Messages are empty for offset: {},
retrying...",
- self.consumer_id, offset
- );
- continue;
- }
- messages_processed = polled_messages.messages.len() as u64;
- last_batch_user_size_bytes =
batch_user_size_bytes(&polled_messages);
- }
- Ok(())
- }
-
- #[allow(clippy::too_many_lines)]
- async fn run_benchmark(
- &self,
- client: &IggyClient,
- stream_id: &Identifier,
- topic_id: &Identifier,
- partition_id: Option<u32>,
- consumer: &Consumer,
- rate_limiter: Option<&BenchmarkRateLimiter>,
- ) -> Result<BenchmarkIndividualMetrics, IggyError> {
- if let Some(cg_id) = self.consumer_group_id {
- info!(
- "Consumer #{}, part of consumer group #{} → polling {} in {}
messages per batch from stream {}, rate limit: {:?}...",
- self.consumer_id,
- cg_id,
- self.finish_condition.total_str(),
- self.messages_per_batch,
- stream_id,
- self.limit_bytes_per_second
- );
- } else {
- info!(
- "Consumer #{} → polling {} in {} messages per batch from
stream {}, rate limit: {:?}...",
- self.consumer_id,
- self.finish_condition.total_str(),
- self.messages_per_batch,
- stream_id,
- self.limit_bytes_per_second
- );
- }
-
- let max_capacity = self.finish_condition.max_capacity();
- let mut records = Vec::with_capacity(max_capacity);
- let mut skipped_warnings_count: u32 = 0;
- let mut topic_not_found_counter = 0;
- let mut initial_poll_timestamp: Option<Instant> = None;
- let mut last_warning_time: Option<Instant> = None;
- let mut messages_processed = 0;
- let mut batches_processed = 0;
- let mut bytes_processed = 0;
- let mut user_data_bytes_processed = 0;
- let start_timestamp = Instant::now();
-
- loop {
- if self.finish_condition.is_done() {
- break;
- }
-
- let messages_to_receive = self.messages_per_batch.get();
- let offset = messages_processed;
- let (strategy, auto_commit) = match self.polling_kind {
- PollingKind::Offset => (PollingStrategy::offset(offset),
false),
- PollingKind::Next => (PollingStrategy::next(), true),
- _ => panic!(
- "Unsupported polling kind for benchmark: {:?}",
- self.polling_kind
- ),
- };
- let before_poll = Instant::now();
- let polled_messages = client
- .poll_messages(
- stream_id,
- topic_id,
- partition_id,
- consumer,
- &strategy,
- messages_to_receive,
- auto_commit,
- )
- .await;
- if let Err(e) = polled_messages {
- if matches!(e, IggyError::TopicIdNotFound(_, _)) {
- topic_not_found_counter += 1;
- if topic_not_found_counter > 1000 {
- return Err(e);
- }
- } else {
- return Err(e);
- }
- error!("Unexpected error: {:?}", e);
- continue;
- }
-
- let polled_messages = polled_messages.unwrap();
- if polled_messages.messages.is_empty() {
- if initial_poll_timestamp.is_none() {
- initial_poll_timestamp = Some(before_poll);
- }
- let should_warn =
- last_warning_time.is_none_or(|t| t.elapsed() >=
Duration::from_secs(1));
-
- if should_warn {
- warn!(
- "Consumer #{} → Messages are empty for offset: {},
received {}, retrying... ({} warnings skipped)",
- self.consumer_id,
- offset,
- self.finish_condition.status(),
- skipped_warnings_count
- );
- last_warning_time = Some(Instant::now());
- skipped_warnings_count = 0;
- } else {
- skipped_warnings_count += 1;
- }
- continue;
- }
-
- if polled_messages.messages.len() != messages_to_receive as usize {
- let should_warn =
- last_warning_time.is_none_or(|t| t.elapsed() >=
Duration::from_secs(1));
-
- if should_warn {
- warn!(
- "Consumer #{} → expected {} messages, but got {}
messages (received {}), retrying... ({} warnings skipped)",
- self.consumer_id,
- messages_to_receive,
- polled_messages.messages.len(),
- self.finish_condition.status(),
- skipped_warnings_count
- );
- last_warning_time = Some(Instant::now());
- skipped_warnings_count = 0;
- } else {
- skipped_warnings_count += 1;
- }
- }
- let latency = if self.origin_timestamp_latency_calculation {
- let now = IggyTimestamp::now().as_micros();
- Duration::from_micros(now -
polled_messages.messages[0].header.origin_timestamp)
- } else {
- initial_poll_timestamp.unwrap_or(before_poll).elapsed()
- };
-
- let batch_user_size_bytes =
batch_user_size_bytes(&polled_messages);
- let batch_total_size_bytes =
batch_total_size_bytes(&polled_messages);
-
- initial_poll_timestamp = None;
-
- messages_processed += polled_messages.messages.len() as u64;
- batches_processed += 1;
- bytes_processed += batch_total_size_bytes;
- user_data_bytes_processed += batch_user_size_bytes;
-
- records.push(BenchmarkRecord {
- elapsed_time_us:
u64::try_from(start_timestamp.elapsed().as_micros())
- .unwrap_or(u64::MAX),
- latency_us:
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
- messages: messages_processed,
- message_batches: batches_processed,
- user_data_bytes: user_data_bytes_processed,
- total_bytes: bytes_processed,
- });
-
- if let Some(rate_limiter) = rate_limiter {
- rate_limiter
- .wait_until_necessary(batch_user_size_bytes)
- .await;
- }
-
- if self
- .finish_condition
- .account_and_check(batch_user_size_bytes)
- {
- break;
- }
- }
-
- let metrics = from_records(
- &records,
- self.benchmark_kind,
- ActorKind::Consumer,
- self.consumer_id,
- self.sampling_time,
- self.moving_average_window,
- );
-
- Self::log_statistics(
- self.consumer_id,
- messages_processed,
- u32::try_from(batches_processed).unwrap_or(u32::MAX),
- &self.messages_per_batch,
- &metrics,
- );
-
- Ok(metrics)
- }
-
- pub fn log_statistics(
- consumer_id: u32,
- total_messages: u64,
- message_batches: u32,
- messages_per_batch: &BenchmarkNumericParameter,
- metrics: &BenchmarkIndividualMetrics,
- ) {
- info!(
- "Consumer #{} → polled {} messages, {} batches of {} messages in
{:.2} s, total size: {}, average throughput: {:.2} MB/s, \
- p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99
latency: {:.2} ms, p999 latency: {:.2} ms, \
- p9999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2}
ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms",
- consumer_id,
- total_messages.human_count_bare(),
- message_batches.human_count_bare(),
- messages_per_batch,
- metrics.summary.total_time_secs,
- IggyByteSize::from(metrics.summary.total_user_data_bytes),
- metrics.summary.throughput_megabytes_per_second,
- metrics.summary.p50_latency_ms,
- metrics.summary.p90_latency_ms,
- metrics.summary.p95_latency_ms,
- metrics.summary.p99_latency_ms,
- metrics.summary.p999_latency_ms,
- metrics.summary.p9999_latency_ms,
- metrics.summary.avg_latency_ms,
- metrics.summary.median_latency_ms,
- metrics.summary.min_latency_ms,
- metrics.summary.max_latency_ms,
- metrics.summary.std_dev_latency_ms,
- );
- }
-}
diff --git a/core/bench/src/actors/consumer/backend.rs
b/core/bench/src/actors/consumer/backend.rs
new file mode 100644
index 00000000..b61aaa41
--- /dev/null
+++ b/core/bench/src/actors/consumer/backend.rs
@@ -0,0 +1,84 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bench_report::numeric_parameter::BenchmarkNumericParameter;
+use iggy::prelude::*;
+use integration::test_server::ClientFactory;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use std::time::Duration;
+
+#[derive(Clone)]
+pub struct ConsumerBackendImpl<T> {
+ pub client_factory: Arc<dyn ClientFactory>,
+ pub config: BenchmarkConsumerConfig,
+ _phantom: PhantomData<T>,
+}
+
+impl<T> ConsumerBackendImpl<T> {
+ pub fn new(client_factory: Arc<dyn ClientFactory>, config:
BenchmarkConsumerConfig) -> Self {
+ Self {
+ client_factory,
+ config,
+ _phantom: PhantomData,
+ }
+ }
+}
+
+pub struct LowLevelApiMarker;
+pub struct HighLevelApiMarker;
+
+pub type LowLevelBackend = ConsumerBackendImpl<LowLevelApiMarker>;
+pub type HighLevelBackend = ConsumerBackendImpl<HighLevelApiMarker>;
+
+pub enum ConsumerBackend {
+ LowLevel(LowLevelBackend),
+ HighLevel(HighLevelBackend),
+}
+
+#[derive(Debug, Clone)]
+pub struct ConsumedBatch {
+ pub messages: u32,
+ pub user_data_bytes: u64,
+ pub total_bytes: u64,
+ pub latency: Duration,
+}
+
+#[derive(Debug, Clone)]
+pub struct BenchmarkConsumerConfig {
+ pub consumer_id: u32,
+ pub consumer_group_id: Option<u32>,
+ pub stream_id: u32,
+ pub messages_per_batch: BenchmarkNumericParameter,
+ pub warmup_time: IggyDuration,
+ pub polling_kind: PollingKind,
+ pub origin_timestamp_latency_calculation: bool,
+}
+
+pub trait BenchmarkConsumerBackend {
+ type Consumer;
+
+ async fn setup(&self) -> Result<Self::Consumer, IggyError>;
+ async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(),
IggyError>;
+ async fn consume_batch(
+ &self,
+ consumer: &mut Self::Consumer,
+ ) -> Result<Option<ConsumedBatch>, IggyError>;
+ fn log_setup_info(&self);
+ fn log_warmup_info(&self);
+}
diff --git a/core/bench/src/actors/producer.rs
b/core/bench/src/actors/consumer/benchmark_consumer.rs
similarity index 52%
copy from core/bench/src/actors/producer.rs
copy to core/bench/src/actors/consumer/benchmark_consumer.rs
index c32191ab..b739a00d 100644
--- a/core/bench/src/actors/producer.rs
+++ b/core/bench/src/actors/consumer/benchmark_consumer.rs
@@ -16,9 +16,12 @@
* under the License.
*/
+use super::backend::{
+ BenchmarkConsumerBackend, BenchmarkConsumerConfig, ConsumerBackend,
HighLevelBackend,
+ LowLevelBackend,
+};
use crate::analytics::metrics::individual::from_records;
use crate::analytics::record::BenchmarkRecord;
-use crate::utils::batch_generator::BenchmarkBatchGenerator;
use crate::utils::finish_condition::BenchmarkFinishCondition;
use crate::utils::rate_limiter::BenchmarkRateLimiter;
use bench_report::actor_kind::ActorKind;
@@ -27,144 +30,143 @@ use
bench_report::individual_metrics::BenchmarkIndividualMetrics;
use bench_report::numeric_parameter::BenchmarkNumericParameter;
use human_repr::HumanCount;
use iggy::prelude::*;
-use integration::test_server::{ClientFactory, login_root};
+use integration::test_server::ClientFactory;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tracing::info;
-pub struct BenchmarkProducer {
- client_factory: Arc<dyn ClientFactory>,
+pub struct BenchmarkConsumer {
+ backend: ConsumerBackend,
benchmark_kind: BenchmarkKind,
- producer_id: u32,
- stream_id: u32,
- partitions: u32,
- messages_per_batch: BenchmarkNumericParameter,
- message_size: BenchmarkNumericParameter,
finish_condition: Arc<BenchmarkFinishCondition>,
- warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
limit_bytes_per_second: Option<IggyByteSize>,
+ config: BenchmarkConsumerConfig,
}
-impl BenchmarkProducer {
+impl BenchmarkConsumer {
#[allow(clippy::too_many_arguments)]
pub fn new(
client_factory: Arc<dyn ClientFactory>,
benchmark_kind: BenchmarkKind,
- producer_id: u32,
+ consumer_id: u32,
+ consumer_group_id: Option<u32>,
stream_id: u32,
- partitions: u32,
messages_per_batch: BenchmarkNumericParameter,
- message_size: BenchmarkNumericParameter,
finish_condition: Arc<BenchmarkFinishCondition>,
warmup_time: IggyDuration,
sampling_time: IggyDuration,
moving_average_window: u32,
+ polling_kind: PollingKind,
limit_bytes_per_second: Option<IggyByteSize>,
+ origin_timestamp_latency_calculation: bool,
+ use_high_level_api: bool,
) -> Self {
- Self {
- client_factory,
- benchmark_kind,
- producer_id,
+ let config = BenchmarkConsumerConfig {
+ consumer_id,
+ consumer_group_id,
stream_id,
- partitions,
messages_per_batch,
- message_size,
- finish_condition,
warmup_time,
+ polling_kind,
+ origin_timestamp_latency_calculation,
+ };
+
+ let backend = if use_high_level_api {
+ ConsumerBackend::HighLevel(HighLevelBackend::new(client_factory,
config.clone()))
+ } else {
+ ConsumerBackend::LowLevel(LowLevelBackend::new(client_factory,
config.clone()))
+ };
+
+ Self {
+ backend,
+ benchmark_kind,
+ finish_condition,
sampling_time,
moving_average_window,
limit_bytes_per_second,
+ config,
}
}
pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
- let mut batch_generator =
- BenchmarkBatchGenerator::new(self.message_size,
self.messages_per_batch);
- let rate_limiter =
self.limit_bytes_per_second.map(BenchmarkRateLimiter::new);
-
- let topic_id: u32 = 1;
- let default_partition_id: u32 = 1;
- let partitions = self.partitions;
- let client = self.client_factory.create_client().await;
- let client = IggyClient::create(client, None, None);
- login_root(&client).await;
-
- let stream_id = self.stream_id.try_into()?;
- let topic_id = topic_id.try_into()?;
- let partitioning = match partitions {
- 0 => panic!("Partition count must be greater than 0"),
- 1 => Partitioning::partition_id(default_partition_id),
- 2.. => Partitioning::balanced(),
- };
-
- // -----------------------
- // WARM-UP
- // -----------------------
-
- if self.warmup_time.get_duration() != Duration::from_millis(0) {
- info!(
- "Producer #{} → warming up for {}...",
- self.producer_id, self.warmup_time
- );
- let warmup_end = Instant::now() + self.warmup_time.get_duration();
-
- while Instant::now() < warmup_end {
- let batch = batch_generator.generate_batch();
- client
- .send_messages(&stream_id, &topic_id, &partitioning, &mut
batch.messages)
- .await?;
+ match self.backend {
+ ConsumerBackend::LowLevel(backend) => {
+ Self::run_with_backend(
+ self.benchmark_kind,
+ self.finish_condition,
+ self.sampling_time,
+ self.moving_average_window,
+ self.limit_bytes_per_second,
+ self.config,
+ backend,
+ )
+ .await
+ }
+ ConsumerBackend::HighLevel(backend) => {
+ Self::run_with_backend(
+ self.benchmark_kind,
+ self.finish_condition,
+ self.sampling_time,
+ self.moving_average_window,
+ self.limit_bytes_per_second,
+ self.config,
+ backend,
+ )
+ .await
}
}
- // -----------------------
- // MAIN BENCHMARK
- // -----------------------
+ }
- info!(
- "Producer #{} → sending {} in batches of {} messages to stream {}
with {} partitions, partitioning: {}, rate limit: {:?}...",
- self.producer_id,
- self.finish_condition.total_str(),
- self.messages_per_batch,
- stream_id,
- partitions,
- partitioning,
- self.limit_bytes_per_second
- );
+ async fn run_with_backend<B: BenchmarkConsumerBackend>(
+ benchmark_kind: BenchmarkKind,
+ finish_condition: Arc<BenchmarkFinishCondition>,
+ sampling_time: IggyDuration,
+ moving_average_window: u32,
+ limit_bytes_per_second: Option<IggyByteSize>,
+ config: BenchmarkConsumerConfig,
+ backend: B,
+ ) -> Result<BenchmarkIndividualMetrics, IggyError> {
+ let mut consumer = backend.setup().await?;
+
+ if config.warmup_time.get_duration() != Duration::from_millis(0) {
+ backend.log_warmup_info();
+ backend.warmup(&mut consumer).await?;
+ }
+
+ backend.log_setup_info();
- let max_capacity = self.finish_condition.max_capacity();
- let mut records: Vec<BenchmarkRecord> =
Vec::with_capacity(max_capacity);
+ let max_capacity = finish_condition.max_capacity();
+ let mut records = Vec::with_capacity(max_capacity);
let mut messages_processed = 0;
let mut batches_processed = 0;
- let mut total_bytes_processed = 0;
+ let mut bytes_processed = 0;
let mut user_data_bytes_processed = 0;
let start_timestamp = Instant::now();
+ let rate_limiter =
limit_bytes_per_second.map(BenchmarkRateLimiter::new);
- loop {
- if self.finish_condition.is_done() {
- break;
- }
- let batch = batch_generator.generate_batch();
- let before_send = Instant::now();
- client
- .send_messages(&stream_id, &topic_id, &partitioning, &mut
batch.messages)
- .await?;
- let latency = before_send.elapsed();
-
- messages_processed += batch.messages.len() as u64;
+ while !finish_condition.is_done() {
+ let batch_opt = backend.consume_batch(&mut consumer).await?;
+
+ let Some(batch) = batch_opt else {
+ continue;
+ };
+
+ messages_processed += u64::from(batch.messages);
batches_processed += 1;
user_data_bytes_processed += batch.user_data_bytes;
- total_bytes_processed += batch.total_bytes;
+ bytes_processed += batch.total_bytes;
records.push(BenchmarkRecord {
elapsed_time_us:
u64::try_from(start_timestamp.elapsed().as_micros())
.unwrap_or(u64::MAX),
- latency_us:
u64::try_from(latency.as_micros()).unwrap_or(u64::MAX),
+ latency_us:
u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX),
messages: messages_processed,
message_batches: batches_processed,
user_data_bytes: user_data_bytes_processed,
- total_bytes: total_bytes_processed,
+ total_bytes: bytes_processed,
});
if let Some(rate_limiter) = &rate_limiter {
@@ -173,46 +175,43 @@ impl BenchmarkProducer {
.await;
}
- if self
- .finish_condition
- .account_and_check(batch.user_data_bytes)
- {
+ if finish_condition.account_and_check(batch.user_data_bytes) {
break;
}
}
let metrics = from_records(
&records,
- self.benchmark_kind,
- ActorKind::Producer,
- self.producer_id,
- self.sampling_time,
- self.moving_average_window,
+ benchmark_kind,
+ ActorKind::Consumer,
+ config.consumer_id,
+ sampling_time,
+ moving_average_window,
);
Self::log_statistics(
- self.producer_id,
+ config.consumer_id,
messages_processed,
- batches_processed,
- &self.messages_per_batch,
+ u32::try_from(batches_processed).unwrap_or(u32::MAX),
+ &config.messages_per_batch,
&metrics,
);
Ok(metrics)
}
- fn log_statistics(
- producer_id: u32,
+ pub fn log_statistics(
+ consumer_id: u32,
total_messages: u64,
- message_batches: u64,
+ message_batches: u32,
messages_per_batch: &BenchmarkNumericParameter,
metrics: &BenchmarkIndividualMetrics,
) {
info!(
- "Producer #{} → sent {} messages in {} batches of {} messages in
{:.2} s, total size: {}, average throughput: {:.2} MB/s, \
- p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99
latency: {:.2} ms, p999 latency: {:.2} ms, p9999 latency: {:.2} ms, \
- average latency: {:.2} ms, median latency: {:.2} ms, min latency: {:.2}
ms, max latency: {:.2} ms, std dev latency: {:.2} ms",
- producer_id,
+ "Consumer #{} → polled {} messages, {} batches of {} messages in
{:.2} s, total size: {}, average throughput: {:.2} MB/s, \
+ p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99
latency: {:.2} ms, p999 latency: {:.2} ms, \
+ p9999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2}
ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms",
+ consumer_id,
total_messages.human_count_bare(),
message_batches.human_count_bare(),
messages_per_batch,
diff --git a/core/bench/src/actors/consumer/high_level_backend.rs
b/core/bench/src/actors/consumer/high_level_backend.rs
new file mode 100644
index 00000000..b20b319c
--- /dev/null
+++ b/core/bench/src/actors/consumer/high_level_backend.rs
@@ -0,0 +1,179 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::backend::{BenchmarkConsumerBackend, ConsumedBatch,
HighLevelBackend};
+use futures_util::StreamExt;
+use iggy::prelude::*;
+use integration::test_server::login_root;
+use std::time::Duration;
+use tokio::time::{Instant, timeout};
+use tracing::{debug, error, info, warn};
+
+impl BenchmarkConsumerBackend for HighLevelBackend {
+ type Consumer = IggyConsumer;
+
+ async fn setup(&self) -> Result<Self::Consumer, IggyError> {
+ let topic_id: u32 = 1;
+ let client = self.client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
+ login_root(&client).await;
+
+ let stream_id_str = self.config.stream_id.to_string();
+ let topic_id_str = topic_id.to_string();
+
+ let mut iggy_consumer = if let Some(cg_id) =
self.config.consumer_group_id {
+ let consumer_group_name = format!("cg_{cg_id}");
+ // Consumer groups use auto-commit (matching PollingKind::Next
behavior from low-level API)
+ client
+ .consumer_group(&consumer_group_name, &stream_id_str,
&topic_id_str)?
+ .batch_size(self.config.messages_per_batch.get())
+ .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages))
+ .create_consumer_group_if_not_exists()
+ .auto_join_consumer_group()
+ .build()
+ } else {
+ // TODO(hubcio): as of now, there is no way to mimic the behavior
of
+ // PollingKind::Offset, because high level API doesn't provide
method
+ // to commit local offset manually, only auto-commit on server.
+ client
+ .consumer(
+ &format!("hl_consumer_{}", self.config.consumer_id),
+ &stream_id_str,
+ &topic_id_str,
+ 1,
+ )?
+ .batch_size(self.config.messages_per_batch.get())
+ .auto_commit(AutoCommit::Disabled)
+ .build()
+ };
+
+ iggy_consumer.init().await?;
+ Ok(iggy_consumer)
+ }
+
+ async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(),
IggyError> {
+ let warmup_end = Instant::now() +
self.config.warmup_time.get_duration();
+ while Instant::now() < warmup_end {
+ if let Some(message) = consumer.next().await {
+ if message.is_err() {
+ break;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ async fn consume_batch(
+ &self,
+ consumer: &mut Self::Consumer,
+ ) -> Result<Option<ConsumedBatch>, IggyError> {
+ let batch_start = Instant::now();
+ let mut batch_messages = 0;
+ let mut batch_user_bytes = 0;
+ let mut batch_total_bytes = 0;
+
+ while batch_messages < self.config.messages_per_batch.get() {
+ // Use timeout to avoid getting stuck waiting for messages
+ let timeout_result = timeout(Duration::from_secs(1),
consumer.next()).await;
+
+ match timeout_result {
+ Ok(Some(message_result)) => match message_result {
+ Ok(received_message) => {
+ batch_messages += 1;
+ batch_user_bytes +=
received_message.message.payload.len() as u64;
+ batch_total_bytes +=
+
received_message.message.get_size_bytes().as_bytes_u64();
+
+ let offset = received_message.message.header.offset;
+ if batch_messages >=
self.config.messages_per_batch.get() {
+ info!(
+ "Batch of {} messages consumed, last_offset:
{}, current_offset: {}",
+ batch_messages,
+ received_message.message.header.offset,
+ received_message.current_offset
+ );
+
+ if let Err(error) = consumer.store_offset(offset,
None).await {
+ error!("Failed to store offset: {offset}.
{error}");
+ continue;
+ }
+ debug!("Offset: {offset} stored successfully");
+ break;
+ }
+ }
+ Err(err) => {
+ warn!("Error receiving message: {}", err);
+ }
+ },
+ Ok(None) => {
+ debug!("Consumer stream ended during batching");
+ break;
+ }
+ Err(_) => {
+ debug!(
+ "Timeout waiting for messages, stopping batch at {}
messages",
+ batch_messages
+ );
+ break;
+ }
+ }
+ }
+
+ if batch_messages == 0 {
+ Ok(None)
+ } else {
+ Ok(Some(ConsumedBatch {
+ messages: batch_messages,
+ user_data_bytes: batch_user_bytes,
+ total_bytes: batch_total_bytes,
+ latency: batch_start.elapsed(),
+ }))
+ }
+ }
+
+ fn log_setup_info(&self) {
+ if let Some(cg_id) = self.config.consumer_group_id {
+ info!(
+ "Consumer #{}, part of consumer group #{} → polling in {}
messages per batch from stream {}, using high-level API...",
+ self.config.consumer_id,
+ cg_id,
+ self.config.messages_per_batch,
+ self.config.stream_id,
+ );
+ } else {
+ info!(
+ "Consumer #{} → polling in {} messages per batch from stream
{}, using high-level API...",
+ self.config.consumer_id, self.config.messages_per_batch,
self.config.stream_id,
+ );
+ }
+ }
+
+ fn log_warmup_info(&self) {
+ if let Some(cg_id) = self.config.consumer_group_id {
+ info!(
+ "Consumer #{}, part of consumer group #{}, → warming up for
{}...",
+ self.config.consumer_id, cg_id, self.config.warmup_time
+ );
+ } else {
+ info!(
+ "Consumer #{} → warming up for {}...",
+ self.config.consumer_id, self.config.warmup_time
+ );
+ }
+ }
+}
diff --git a/core/bench/src/actors/consumer/low_level_backend.rs
b/core/bench/src/actors/consumer/low_level_backend.rs
new file mode 100644
index 00000000..bcdb3f3e
--- /dev/null
+++ b/core/bench/src/actors/consumer/low_level_backend.rs
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, LowLevelBackend};
+use crate::benchmarks::common::create_consumer;
+use crate::utils::{batch_total_size_bytes, batch_user_size_bytes};
+use iggy::prelude::*;
+use integration::test_server::login_root;
+use std::time::Duration;
+use tokio::time::Instant;
+use tracing::{info, warn};
+
+impl BenchmarkConsumerBackend for LowLevelBackend {
+ type Consumer = (
+ IggyClient,
+ Identifier,
+ Identifier,
+ Option<u32>,
+ Consumer,
+ u64,
+ );
+
+ async fn setup(&self) -> Result<Self::Consumer, IggyError> {
+ let topic_id: u32 = 1;
+ let default_partition_id: u32 = 1;
+ let client = self.client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
+ login_root(&client).await;
+
+ let stream_id = self.config.stream_id.try_into().unwrap();
+ let topic_id = topic_id.try_into().unwrap();
+ let partition_id = if self.config.consumer_group_id.is_some() {
+ None
+ } else {
+ Some(default_partition_id)
+ };
+ let cg_id = self.config.consumer_group_id;
+ let consumer = create_consumer(
+ &client,
+ cg_id.as_ref(),
+ &stream_id,
+ &topic_id,
+ self.config.consumer_id,
+ )
+ .await;
+
+ Ok((client, stream_id, topic_id, partition_id, consumer, 0))
+ }
+
+ async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(),
IggyError> {
+ let (client, stream_id, topic_id, partition_id, consumer_obj,
messages_processed) =
+ consumer;
+ let warmup_end = Instant::now() +
self.config.warmup_time.get_duration();
+ let mut warmup_messages_processed = 0;
+ let mut _last_batch_user_size_bytes = 0;
+
+ while Instant::now() < warmup_end {
+ let messages_to_receive = self.config.messages_per_batch.get();
+ let offset = *messages_processed + warmup_messages_processed;
+ let (strategy, auto_commit) = match self.config.polling_kind {
+ PollingKind::Offset => (PollingStrategy::offset(offset),
false),
+ PollingKind::Next => (PollingStrategy::next(), true),
+ _ => panic!(
+ "Unsupported polling kind for benchmark: {:?}",
+ self.config.polling_kind
+ ),
+ };
+ let polled_messages = client
+ .poll_messages(
+ stream_id,
+ topic_id,
+ *partition_id,
+ consumer_obj,
+ &strategy,
+ messages_to_receive,
+ auto_commit,
+ )
+ .await?;
+
+ if polled_messages.messages.is_empty() {
+ warn!(
+ "Consumer #{} → Messages are empty for offset: {},
retrying...",
+ self.config.consumer_id, offset
+ );
+ continue;
+ }
+ warmup_messages_processed += polled_messages.messages.len() as u64;
+ _last_batch_user_size_bytes =
batch_user_size_bytes(&polled_messages);
+ }
+ Ok(())
+ }
+
+ async fn consume_batch(
+ &self,
+ consumer: &mut Self::Consumer,
+ ) -> Result<Option<ConsumedBatch>, IggyError> {
+ let (client, stream_id, topic_id, partition_id, consumer_obj,
messages_processed) =
+ consumer;
+ let messages_to_receive = self.config.messages_per_batch.get();
+ let offset = *messages_processed;
+ let (strategy, auto_commit) = match self.config.polling_kind {
+ PollingKind::Offset => (PollingStrategy::offset(offset), false),
+ PollingKind::Next => (PollingStrategy::next(), true),
+ _ => panic!(
+ "Unsupported polling kind for benchmark: {:?}",
+ self.config.polling_kind
+ ),
+ };
+
+ let before_poll = Instant::now();
+ let polled_messages = client
+ .poll_messages(
+ stream_id,
+ topic_id,
+ *partition_id,
+ consumer_obj,
+ &strategy,
+ messages_to_receive,
+ auto_commit,
+ )
+ .await;
+
+ let polled_messages = match polled_messages {
+ Ok(messages) => messages,
+ Err(e) => {
+ if matches!(e, IggyError::TopicIdNotFound(_, _)) {
+ return Ok(None);
+ }
+ return Err(e);
+ }
+ };
+
+ if polled_messages.messages.is_empty() {
+ return Ok(None);
+ }
+ let latency = if self.config.origin_timestamp_latency_calculation {
+ let now = IggyTimestamp::now().as_micros();
+ Duration::from_micros(now -
polled_messages.messages[0].header.origin_timestamp)
+ } else {
+ before_poll.elapsed()
+ };
+
+ let batch_user_size_bytes = batch_user_size_bytes(&polled_messages);
+ let batch_total_size_bytes = batch_total_size_bytes(&polled_messages);
+
+ *messages_processed += polled_messages.messages.len() as u64;
+
+ Ok(Some(ConsumedBatch {
+ messages: u32::try_from(polled_messages.messages.len()).unwrap(),
+ user_data_bytes: batch_user_size_bytes,
+ total_bytes: batch_total_size_bytes,
+ latency,
+ }))
+ }
+
+ fn log_setup_info(&self) {
+ if let Some(cg_id) = self.config.consumer_group_id {
+ info!(
+ "Consumer #{}, part of consumer group #{} → polling in {}
messages per batch from stream {}, using low-level API...",
+ self.config.consumer_id,
+ cg_id,
+ self.config.messages_per_batch,
+ self.config.stream_id,
+ );
+ } else {
+ info!(
+ "Consumer #{} → polling in {} messages per batch from stream
{}, using low-level API...",
+ self.config.consumer_id, self.config.messages_per_batch,
self.config.stream_id,
+ );
+ }
+ }
+
+ fn log_warmup_info(&self) {
+ if let Some(cg_id) = self.config.consumer_group_id {
+ info!(
+ "Consumer #{}, part of consumer group #{}, → warming up for
{}...",
+ self.config.consumer_id, cg_id, self.config.warmup_time
+ );
+ } else {
+ info!(
+ "Consumer #{} → warming up for {}...",
+ self.config.consumer_id, self.config.warmup_time
+ );
+ }
+ }
+}
diff --git a/core/bench/src/actors/consumer/mod.rs
b/core/bench/src/actors/consumer/mod.rs
new file mode 100644
index 00000000..2912806b
--- /dev/null
+++ b/core/bench/src/actors/consumer/mod.rs
@@ -0,0 +1,24 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod backend;
+pub mod benchmark_consumer;
+pub mod high_level_backend;
+pub mod low_level_backend;
+
+pub use benchmark_consumer::BenchmarkConsumer;
diff --git a/core/bench/src/actors/producer.rs
b/core/bench/src/actors/producer/benchmark_producer.rs
similarity index 100%
rename from core/bench/src/actors/producer.rs
rename to core/bench/src/actors/producer/benchmark_producer.rs
diff --git a/core/bench/src/actors/producer/mod.rs
b/core/bench/src/actors/producer/mod.rs
new file mode 100644
index 00000000..d4f6209f
--- /dev/null
+++ b/core/bench/src/actors/producer/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod benchmark_producer;
+
+pub use benchmark_producer::BenchmarkProducer;
diff --git a/core/bench/src/actors/producing_consumer.rs
b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
similarity index 100%
rename from core/bench/src/actors/producing_consumer.rs
rename to
core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
diff --git a/core/bench/src/actors/producing_consumer/mod.rs
b/core/bench/src/actors/producing_consumer/mod.rs
new file mode 100644
index 00000000..2927c8d9
--- /dev/null
+++ b/core/bench/src/actors/producing_consumer/mod.rs
@@ -0,0 +1,21 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod benchmark_producing_consumer;
+
+pub use benchmark_producing_consumer::BenchmarkProducingConsumer;
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 1ca93f0c..7d475aa0 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -41,6 +41,7 @@ use std::str::FromStr;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
+#[allow(clippy::struct_excessive_bools)]
pub struct IggyBenchArgs {
/// Benchmark kind
#[command(subcommand)]
@@ -103,6 +104,10 @@ pub struct IggyBenchArgs {
/// Only applicable to local benchmarks.
#[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START,
verbatim_doc_comment)]
pub skip_server_start: bool,
+
+ /// Use high-level API for actors
+ #[arg(long, short = 'H', default_value_t = false)]
+ pub high_level_api: bool,
}
fn validate_server_executable_path(v: &str) -> Result<String, String> {
@@ -182,6 +187,15 @@ impl IggyBenchArgs {
}
}
+ if self.high_level_api && !self.messages_per_batch.is_fixed() {
+ Self::command()
+ .error(
+ ErrorKind::ArgumentConflict,
+ "High-level consumer API (--high-level-api) requires fixed
batch size, but random batch size was specified. Use a single value instead of
a range for --messages-per-batch.",
+ )
+ .exit();
+ }
+
self.benchmark_kind.inner().validate();
}
@@ -330,6 +344,10 @@ impl IggyBenchArgs {
self.benchmark_kind.inner().max_topic_size()
}
+ pub const fn high_level_api(&self) -> bool {
+ self.high_level_api
+ }
+
/// Generates the output directory name based on benchmark parameters.
pub fn generate_dir_name(&self) -> String {
let benchmark_kind = match &self.benchmark_kind {
diff --git a/core/bench/src/benchmarks/common.rs
b/core/bench/src/benchmarks/common.rs
index 22c9ad22..d030841c 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -190,6 +190,7 @@ pub fn build_consumer_futures(
let global_finish_condition =
BenchmarkFinishCondition::new(args,
BenchmarkFinishConditionMode::Shared);
let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
+ let use_high_level_api = args.high_level_api();
(1..=consumers)
.map(|consumer_id| {
@@ -225,6 +226,7 @@ pub fn build_consumer_futures(
polling_kind,
rate_limit,
origin_timestamp_latency_calculation,
+ use_high_level_api,
);
consumer.run().await
}
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 795f4edc..b2dd5d20 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -251,6 +251,13 @@ impl IggyConsumer {
.await
}
+ /// Retrieves the last consumed offset for the specified partition ID.
+ /// To get the current partition ID use `partition_id()`
+ pub fn get_last_consumed_offset(&self, partition_id: u32) -> Option<u64> {
+ let offset = self.last_consumed_offsets.get(&partition_id)?;
+ Some(offset.load(ORDERING))
+ }
+
/// Deletes the consumer offset on the server either for the current
partition or the provided partition ID.
pub async fn delete_offset(&self, partition_id: Option<u32>) -> Result<(),
IggyError> {
let client = self.client.read().await;
@@ -264,6 +271,13 @@ impl IggyConsumer {
.await
}
+ /// Retrieves the last stored offset (on the server) for the specified
partition ID.
+ /// To get the current partition ID use `partition_id()`
+ pub fn get_last_stored_offset(&self, partition_id: u32) -> Option<u64> {
+ let offset = self.last_stored_offsets.get(&partition_id)?;
+ Some(offset.load(ORDERING))
+ }
+
/// Initializes the consumer by subscribing to diagnostic events,
initializing the consumer group if needed, storing the offsets in the
background etc.
///
/// Note: This method must be called before polling messages.
@@ -805,9 +819,19 @@ impl IggyConsumer {
info!(
"Creating consumer group: {consumer_group_id} for topic:
{topic_id}, stream: {stream_id}"
);
- client
+ match client
.create_consumer_group(&stream_id, &topic_id, &name, id)
- .await?;
+ .await
+ {
+ Ok(_) => {}
+ Err(IggyError::ConsumerGroupNameAlreadyExists(_, _)) => {}
+ Err(error) => {
+ error!(
+ "Failed to create consumer group {consumer_group_id}
for topic: {topic_id}, stream: {stream_id}: {error}"
+ );
+ return Err(error);
+ }
+ }
}
info!(