This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch high-level-bench-consumer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 922bae95aa1295e3ac3a7fed9bab6c4aad6b6441 Author: Hubert Gruszecki <[email protected]> AuthorDate: Sat Jun 7 16:30:14 2025 +0200 high level consumer api in bench --- Cargo.lock | 1 + core/bench/Cargo.toml | 1 + core/bench/src/actors/consumer.rs | 445 --------------------- core/bench/src/actors/consumer/backend.rs | 84 ++++ .../benchmark_consumer.rs} | 209 +++++----- .../src/actors/consumer/high_level_backend.rs | 153 +++++++ .../bench/src/actors/consumer/low_level_backend.rs | 211 ++++++++++ core/bench/src/actors/consumer/mod.rs | 24 ++ .../benchmark_producer.rs} | 0 core/bench/src/actors/producer/mod.rs | 21 + .../benchmark_producing_consumer.rs} | 0 core/bench/src/actors/producing_consumer/mod.rs | 21 + core/bench/src/args/common.rs | 18 + core/bench/src/benchmarks/common.rs | 2 + 14 files changed, 640 insertions(+), 550 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f410423..6be9aa34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -870,6 +870,7 @@ dependencies = [ "chrono", "clap", "figlet-rs", + "futures-util", "governor", "hostname", "human-repr", diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml index ab5ec18d..fa87b2d7 100644 --- a/core/bench/Cargo.toml +++ b/core/bench/Cargo.toml @@ -31,6 +31,7 @@ charming = { workspace = true } chrono = { workspace = true } clap = { workspace = true } figlet-rs = { workspace = true } +futures-util = { workspace = true } governor = "0.10.0" hostname = "0.4.1" human-repr = { workspace = true } diff --git a/core/bench/src/actors/consumer.rs b/core/bench/src/actors/consumer.rs deleted file mode 100644 index 8ea71282..00000000 --- a/core/bench/src/actors/consumer.rs +++ /dev/null @@ -1,445 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::analytics::metrics::individual::from_records; -use crate::analytics::record::BenchmarkRecord; -use crate::benchmarks::common::create_consumer; -use crate::utils::finish_condition::BenchmarkFinishCondition; -use crate::utils::rate_limiter::BenchmarkRateLimiter; -use crate::utils::{batch_total_size_bytes, batch_user_size_bytes}; -use bench_report::actor_kind::ActorKind; -use bench_report::benchmark_kind::BenchmarkKind; -use bench_report::individual_metrics::BenchmarkIndividualMetrics; -use bench_report::numeric_parameter::BenchmarkNumericParameter; -use human_repr::HumanCount; -use iggy::prelude::*; -use integration::test_server::{ClientFactory, login_root}; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::Instant; -use tracing::{error, info, warn}; - -pub struct BenchmarkConsumer { - client_factory: Arc<dyn ClientFactory>, - benchmark_kind: BenchmarkKind, - consumer_id: u32, - consumer_group_id: Option<u32>, - stream_id: u32, - messages_per_batch: BenchmarkNumericParameter, - finish_condition: Arc<BenchmarkFinishCondition>, - warmup_time: IggyDuration, - sampling_time: IggyDuration, - moving_average_window: u32, - polling_kind: PollingKind, - limit_bytes_per_second: Option<IggyByteSize>, - origin_timestamp_latency_calculation: bool, -} - -impl BenchmarkConsumer { - #[allow(clippy::too_many_arguments)] - pub fn new( - client_factory: Arc<dyn ClientFactory>, - benchmark_kind: BenchmarkKind, - consumer_id: u32, - consumer_group_id: Option<u32>, - stream_id: u32, - messages_per_batch: BenchmarkNumericParameter, - finish_condition: Arc<BenchmarkFinishCondition>, - warmup_time: IggyDuration, - sampling_time: IggyDuration, - moving_average_window: u32, - polling_kind: PollingKind, - limit_bytes_per_second: Option<IggyByteSize>, - origin_timestamp_latency_calculation: bool, - ) -> Self { - Self { - client_factory, - benchmark_kind, - consumer_id, - consumer_group_id, - stream_id, - messages_per_batch, - finish_condition, - warmup_time, - sampling_time, - moving_average_window, - polling_kind, - limit_bytes_per_second, - origin_timestamp_latency_calculation, - } - } - - pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> { - let (client, stream_id, topic_id, partition_id, consumer, rate_limiter) = - self.setup_client().await; - - if self.warmup_time.get_duration() != Duration::from_millis(0) { - self.run_warmup( - &client, - &stream_id, - &topic_id, - partition_id, - &consumer, - rate_limiter.as_ref(), - ) - .await?; - } - - let metrics = self - .run_benchmark( - &client, - &stream_id, - &topic_id, - partition_id, - &consumer, - rate_limiter.as_ref(), - ) - .await?; - Ok(metrics) - } - - async fn setup_client( - &self, - ) -> ( - IggyClient, - Identifier, - Identifier, - Option<u32>, - Consumer, - Option<BenchmarkRateLimiter>, - ) { - let topic_id: u32 = 1; - let default_partition_id: u32 = 1; - let client = self.client_factory.create_client().await; - let client = IggyClient::create(client, None, None); - login_root(&client).await; - - let stream_id = self.stream_id.try_into().unwrap(); - let topic_id = topic_id.try_into().unwrap(); - let partition_id = if self.consumer_group_id.is_some() { - None - } else { - Some(default_partition_id) - }; - let cg_id = self.consumer_group_id; - let consumer = create_consumer( - &client, - cg_id.as_ref(), - &stream_id, - &topic_id, - self.consumer_id, - ) - .await; - let rate_limiter = self.limit_bytes_per_second.map(BenchmarkRateLimiter::new); - - ( - client, - stream_id, - topic_id, - partition_id, - consumer, - rate_limiter, - ) - } - - async fn run_warmup( - &self, - client: &IggyClient, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - consumer: &Consumer, - rate_limiter: Option<&BenchmarkRateLimiter>, - ) -> Result<(), IggyError> { - if let Some(cg_id) = self.consumer_group_id { - info!( - "Consumer #{}, part of consumer group #{}, → warming up for {}...", - self.consumer_id, cg_id, self.warmup_time - ); - } else { - info!( - "Consumer #{} → warming up for {}...", - self.consumer_id, self.warmup_time - ); - } - - let warmup_end = Instant::now() + self.warmup_time.get_duration(); - let mut messages_processed = 0; - let mut last_batch_user_size_bytes = 0; - while Instant::now() < warmup_end { - if let Some(rate_limiter) = rate_limiter { - if last_batch_user_size_bytes > 0 { - rate_limiter - .wait_until_necessary(last_batch_user_size_bytes) - .await; - } - } - let messages_to_receive = self.messages_per_batch.get(); - let offset = messages_processed; - let (strategy, auto_commit) = match self.polling_kind { - PollingKind::Offset => (PollingStrategy::offset(offset), false), - PollingKind::Next => (PollingStrategy::next(), true), - _ => panic!( - "Unsupported polling kind for benchmark: {:?}", - self.polling_kind - ), - }; - let polled_messages = client - .poll_messages( - stream_id, - topic_id, - partition_id, - consumer, - &strategy, - messages_to_receive, - auto_commit, - ) - .await?; - - if polled_messages.messages.is_empty() { - warn!( - "Consumer #{} → Messages are empty for offset: {}, retrying...", - self.consumer_id, offset - ); - continue; - } - messages_processed = polled_messages.messages.len() as u64; - last_batch_user_size_bytes = batch_user_size_bytes(&polled_messages); - } - Ok(()) - } - - #[allow(clippy::too_many_lines)] - async fn run_benchmark( - &self, - client: &IggyClient, - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - consumer: &Consumer, - rate_limiter: Option<&BenchmarkRateLimiter>, - ) -> Result<BenchmarkIndividualMetrics, IggyError> { - if let Some(cg_id) = self.consumer_group_id { - info!( - "Consumer #{}, part of consumer group #{} → polling {} in {} messages per batch from stream {}, rate limit: {:?}...", - self.consumer_id, - cg_id, - self.finish_condition.total_str(), - self.messages_per_batch, - stream_id, - self.limit_bytes_per_second - ); - } else { - info!( - "Consumer #{} → polling {} in {} messages per batch from stream {}, rate limit: {:?}...", - self.consumer_id, - self.finish_condition.total_str(), - self.messages_per_batch, - stream_id, - self.limit_bytes_per_second - ); - } - - let max_capacity = self.finish_condition.max_capacity(); - let mut records = Vec::with_capacity(max_capacity); - let mut skipped_warnings_count: u32 = 0; - let mut topic_not_found_counter = 0; - let mut initial_poll_timestamp: Option<Instant> = None; - let mut last_warning_time: Option<Instant> = None; - let mut messages_processed = 0; - let mut batches_processed = 0; - let mut bytes_processed = 0; - let mut user_data_bytes_processed = 0; - let start_timestamp = Instant::now(); - - loop { - if self.finish_condition.is_done() { - break; - } - - let messages_to_receive = self.messages_per_batch.get(); - let offset = messages_processed; - let (strategy, auto_commit) = match self.polling_kind { - PollingKind::Offset => (PollingStrategy::offset(offset), false), - PollingKind::Next => (PollingStrategy::next(), true), - _ => panic!( - "Unsupported polling kind for benchmark: {:?}", - self.polling_kind - ), - }; - let before_poll = Instant::now(); - let polled_messages = client - .poll_messages( - stream_id, - topic_id, - partition_id, - consumer, - &strategy, - messages_to_receive, - auto_commit, - ) - .await; - if let Err(e) = polled_messages { - if matches!(e, IggyError::TopicIdNotFound(_, _)) { - topic_not_found_counter += 1; - if topic_not_found_counter > 1000 { - return Err(e); - } - } else { - return Err(e); - } - error!("Unexpected error: {:?}", e); - continue; - } - - let polled_messages = polled_messages.unwrap(); - if polled_messages.messages.is_empty() { - if initial_poll_timestamp.is_none() { - initial_poll_timestamp = Some(before_poll); - } - let should_warn = - last_warning_time.is_none_or(|t| t.elapsed() >= Duration::from_secs(1)); - - if should_warn { - warn!( - "Consumer #{} → Messages are empty for offset: {}, received {}, retrying... ({} warnings skipped)", - self.consumer_id, - offset, - self.finish_condition.status(), - skipped_warnings_count - ); - last_warning_time = Some(Instant::now()); - skipped_warnings_count = 0; - } else { - skipped_warnings_count += 1; - } - continue; - } - - if polled_messages.messages.len() != messages_to_receive as usize { - let should_warn = - last_warning_time.is_none_or(|t| t.elapsed() >= Duration::from_secs(1)); - - if should_warn { - warn!( - "Consumer #{} → expected {} messages, but got {} messages (received {}), retrying... ({} warnings skipped)", - self.consumer_id, - messages_to_receive, - polled_messages.messages.len(), - self.finish_condition.status(), - skipped_warnings_count - ); - last_warning_time = Some(Instant::now()); - skipped_warnings_count = 0; - } else { - skipped_warnings_count += 1; - } - } - let latency = if self.origin_timestamp_latency_calculation { - let now = IggyTimestamp::now().as_micros(); - Duration::from_micros(now - polled_messages.messages[0].header.origin_timestamp) - } else { - initial_poll_timestamp.unwrap_or(before_poll).elapsed() - }; - - let batch_user_size_bytes = batch_user_size_bytes(&polled_messages); - let batch_total_size_bytes = batch_total_size_bytes(&polled_messages); - - initial_poll_timestamp = None; - - messages_processed += polled_messages.messages.len() as u64; - batches_processed += 1; - bytes_processed += batch_total_size_bytes; - user_data_bytes_processed += batch_user_size_bytes; - - records.push(BenchmarkRecord { - elapsed_time_us: u64::try_from(start_timestamp.elapsed().as_micros()) - .unwrap_or(u64::MAX), - latency_us: u64::try_from(latency.as_micros()).unwrap_or(u64::MAX), - messages: messages_processed, - message_batches: batches_processed, - user_data_bytes: user_data_bytes_processed, - total_bytes: bytes_processed, - }); - - if let Some(rate_limiter) = rate_limiter { - rate_limiter - .wait_until_necessary(batch_user_size_bytes) - .await; - } - - if self - .finish_condition - .account_and_check(batch_user_size_bytes) - { - break; - } - } - - let metrics = from_records( - &records, - self.benchmark_kind, - ActorKind::Consumer, - self.consumer_id, - self.sampling_time, - self.moving_average_window, - ); - - Self::log_statistics( - self.consumer_id, - messages_processed, - u32::try_from(batches_processed).unwrap_or(u32::MAX), - &self.messages_per_batch, - &metrics, - ); - - Ok(metrics) - } - - pub fn log_statistics( - consumer_id: u32, - total_messages: u64, - message_batches: u32, - messages_per_batch: &BenchmarkNumericParameter, - metrics: &BenchmarkIndividualMetrics, - ) { - info!( - "Consumer #{} → polled {} messages, {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \ - p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, \ - p9999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2} ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms", - consumer_id, - total_messages.human_count_bare(), - message_batches.human_count_bare(), - messages_per_batch, - metrics.summary.total_time_secs, - IggyByteSize::from(metrics.summary.total_user_data_bytes), - metrics.summary.throughput_megabytes_per_second, - metrics.summary.p50_latency_ms, - metrics.summary.p90_latency_ms, - metrics.summary.p95_latency_ms, - metrics.summary.p99_latency_ms, - metrics.summary.p999_latency_ms, - metrics.summary.p9999_latency_ms, - metrics.summary.avg_latency_ms, - metrics.summary.median_latency_ms, - metrics.summary.min_latency_ms, - metrics.summary.max_latency_ms, - metrics.summary.std_dev_latency_ms, - ); - } -} diff --git a/core/bench/src/actors/consumer/backend.rs b/core/bench/src/actors/consumer/backend.rs new file mode 100644 index 00000000..4932f7f9 --- /dev/null +++ b/core/bench/src/actors/consumer/backend.rs @@ -0,0 +1,84 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bench_report::numeric_parameter::BenchmarkNumericParameter; +use iggy::prelude::*; +use integration::test_server::ClientFactory; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone)] +pub struct ConsumerBackendImpl<T> { + pub client_factory: Arc<dyn ClientFactory>, + pub config: BenchmarkConsumerConfig, + _phantom: PhantomData<T>, +} + +impl<T> ConsumerBackendImpl<T> { + pub fn new(client_factory: Arc<dyn ClientFactory>, config: BenchmarkConsumerConfig) -> Self { + Self { + client_factory, + config, + _phantom: PhantomData, + } + } +} + +pub struct LowLevelApiMarker; +pub struct HighLevelApiMarker; + +pub type LowLevelBackend = ConsumerBackendImpl<LowLevelApiMarker>; +pub type HighLevelBackend = ConsumerBackendImpl<HighLevelApiMarker>; + +pub enum ConsumerBackend { + LowLevel(LowLevelBackend), + HighLevel(HighLevelBackend), +} + +#[derive(Debug, Clone)] +pub struct ConsumedBatch { + pub messages: u32, + pub user_data_bytes: u64, + pub total_bytes: u64, + pub latency: Duration, +} + +#[derive(Debug, Clone)] +pub struct BenchmarkConsumerConfig { + pub consumer_id: u32, + pub consumer_group_id: Option<u32>, + pub stream_id: u32, + pub messages_per_batch: BenchmarkNumericParameter, + pub warmup_time: IggyDuration, + pub polling_kind: PollingKind, + pub origin_timestamp_latency_calculation: bool, +} + +pub trait BenchmarkConsumerBackend { + type Consumer; + + async fn setup(&self) -> Result<Self::Consumer, IggyError>; + async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(), IggyError>; + async fn consume_batch( + &self, + consumer: &mut Self::Consumer, + ) -> Result<ConsumedBatch, IggyError>; + fn log_setup_info(&self); + fn log_warmup_info(&self); +} diff --git a/core/bench/src/actors/producer.rs b/core/bench/src/actors/consumer/benchmark_consumer.rs similarity index 52% copy from core/bench/src/actors/producer.rs copy to core/bench/src/actors/consumer/benchmark_consumer.rs index c32191ab..ba1d9e1e 100644 --- a/core/bench/src/actors/producer.rs +++ b/core/bench/src/actors/consumer/benchmark_consumer.rs @@ -16,9 +16,12 @@ * under the License. */ +use super::backend::{ + BenchmarkConsumerBackend, BenchmarkConsumerConfig, ConsumerBackend, HighLevelBackend, + LowLevelBackend, +}; use crate::analytics::metrics::individual::from_records; use crate::analytics::record::BenchmarkRecord; -use crate::utils::batch_generator::BenchmarkBatchGenerator; use crate::utils::finish_condition::BenchmarkFinishCondition; use crate::utils::rate_limiter::BenchmarkRateLimiter; use bench_report::actor_kind::ActorKind; @@ -27,144 +30,143 @@ use bench_report::individual_metrics::BenchmarkIndividualMetrics; use bench_report::numeric_parameter::BenchmarkNumericParameter; use human_repr::HumanCount; use iggy::prelude::*; -use integration::test_server::{ClientFactory, login_root}; +use integration::test_server::ClientFactory; use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; use tracing::info; -pub struct BenchmarkProducer { - client_factory: Arc<dyn ClientFactory>, +pub struct BenchmarkConsumer { + backend: ConsumerBackend, benchmark_kind: BenchmarkKind, - producer_id: u32, - stream_id: u32, - partitions: u32, - messages_per_batch: BenchmarkNumericParameter, - message_size: BenchmarkNumericParameter, finish_condition: Arc<BenchmarkFinishCondition>, - warmup_time: IggyDuration, sampling_time: IggyDuration, moving_average_window: u32, limit_bytes_per_second: Option<IggyByteSize>, + config: BenchmarkConsumerConfig, } -impl BenchmarkProducer { +impl BenchmarkConsumer { #[allow(clippy::too_many_arguments)] pub fn new( client_factory: Arc<dyn ClientFactory>, benchmark_kind: BenchmarkKind, - producer_id: u32, + consumer_id: u32, + consumer_group_id: Option<u32>, stream_id: u32, - partitions: u32, messages_per_batch: BenchmarkNumericParameter, - message_size: BenchmarkNumericParameter, finish_condition: Arc<BenchmarkFinishCondition>, warmup_time: IggyDuration, sampling_time: IggyDuration, moving_average_window: u32, + polling_kind: PollingKind, limit_bytes_per_second: Option<IggyByteSize>, + origin_timestamp_latency_calculation: bool, + use_high_level_api: bool, ) -> Self { - Self { - client_factory, - benchmark_kind, - producer_id, + let config = BenchmarkConsumerConfig { + consumer_id, + consumer_group_id, stream_id, - partitions, messages_per_batch, - message_size, - finish_condition, warmup_time, + polling_kind, + origin_timestamp_latency_calculation, + }; + + let backend = if use_high_level_api { + ConsumerBackend::HighLevel(HighLevelBackend::new(client_factory, config.clone())) + } else { + ConsumerBackend::LowLevel(LowLevelBackend::new(client_factory, config.clone())) + }; + + Self { + backend, + benchmark_kind, + finish_condition, sampling_time, moving_average_window, limit_bytes_per_second, + config, } } pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> { - let mut batch_generator = - BenchmarkBatchGenerator::new(self.message_size, self.messages_per_batch); - let rate_limiter = self.limit_bytes_per_second.map(BenchmarkRateLimiter::new); - - let topic_id: u32 = 1; - let default_partition_id: u32 = 1; - let partitions = self.partitions; - let client = self.client_factory.create_client().await; - let client = IggyClient::create(client, None, None); - login_root(&client).await; - - let stream_id = self.stream_id.try_into()?; - let topic_id = topic_id.try_into()?; - let partitioning = match partitions { - 0 => panic!("Partition count must be greater than 0"), - 1 => Partitioning::partition_id(default_partition_id), - 2.. => Partitioning::balanced(), - }; - - // ----------------------- - // WARM-UP - // ----------------------- - - if self.warmup_time.get_duration() != Duration::from_millis(0) { - info!( - "Producer #{} → warming up for {}...", - self.producer_id, self.warmup_time - ); - let warmup_end = Instant::now() + self.warmup_time.get_duration(); - - while Instant::now() < warmup_end { - let batch = batch_generator.generate_batch(); - client - .send_messages(&stream_id, &topic_id, &partitioning, &mut batch.messages) - .await?; + match self.backend { + ConsumerBackend::LowLevel(backend) => { + Self::run_with_backend( + self.benchmark_kind, + self.finish_condition, + self.sampling_time, + self.moving_average_window, + self.limit_bytes_per_second, + self.config, + backend, + ) + .await + } + ConsumerBackend::HighLevel(backend) => { + Self::run_with_backend( + self.benchmark_kind, + self.finish_condition, + self.sampling_time, + self.moving_average_window, + self.limit_bytes_per_second, + self.config, + backend, + ) + .await } } - // ----------------------- - // MAIN BENCHMARK - // ----------------------- + } - info!( - "Producer #{} → sending {} in batches of {} messages to stream {} with {} partitions, partitioning: {}, rate limit: {:?}...", - self.producer_id, - self.finish_condition.total_str(), - self.messages_per_batch, - stream_id, - partitions, - partitioning, - self.limit_bytes_per_second - ); + async fn run_with_backend<B: BenchmarkConsumerBackend>( + benchmark_kind: BenchmarkKind, + finish_condition: Arc<BenchmarkFinishCondition>, + sampling_time: IggyDuration, + moving_average_window: u32, + limit_bytes_per_second: Option<IggyByteSize>, + config: BenchmarkConsumerConfig, + backend: B, + ) -> Result<BenchmarkIndividualMetrics, IggyError> { + let mut consumer = backend.setup().await?; + + if config.warmup_time.get_duration() != Duration::from_millis(0) { + backend.log_warmup_info(); + backend.warmup(&mut consumer).await?; + } - let max_capacity = self.finish_condition.max_capacity(); - let mut records: Vec<BenchmarkRecord> = Vec::with_capacity(max_capacity); + backend.log_setup_info(); + + let max_capacity = finish_condition.max_capacity(); + let mut records = Vec::with_capacity(max_capacity); let mut messages_processed = 0; let mut batches_processed = 0; - let mut total_bytes_processed = 0; + let mut bytes_processed = 0; let mut user_data_bytes_processed = 0; let start_timestamp = Instant::now(); + let rate_limiter = limit_bytes_per_second.map(BenchmarkRateLimiter::new); - loop { - if self.finish_condition.is_done() { - break; + while !finish_condition.is_done() { + let batch = backend.consume_batch(&mut consumer).await?; + + if batch.messages == 0 { + continue; } - let batch = batch_generator.generate_batch(); - let before_send = Instant::now(); - client - .send_messages(&stream_id, &topic_id, &partitioning, &mut batch.messages) - .await?; - let latency = before_send.elapsed(); - - messages_processed += batch.messages.len() as u64; + + messages_processed += u64::from(batch.messages); batches_processed += 1; user_data_bytes_processed += batch.user_data_bytes; - total_bytes_processed += batch.total_bytes; + bytes_processed += batch.total_bytes; records.push(BenchmarkRecord { elapsed_time_us: u64::try_from(start_timestamp.elapsed().as_micros()) .unwrap_or(u64::MAX), - latency_us: u64::try_from(latency.as_micros()).unwrap_or(u64::MAX), + latency_us: u64::try_from(batch.latency.as_micros()).unwrap_or(u64::MAX), messages: messages_processed, message_batches: batches_processed, user_data_bytes: user_data_bytes_processed, - total_bytes: total_bytes_processed, + total_bytes: bytes_processed, }); if let Some(rate_limiter) = &rate_limiter { @@ -173,46 +175,43 @@ impl BenchmarkProducer { .await; } - if self - .finish_condition - .account_and_check(batch.user_data_bytes) - { + if finish_condition.account_and_check(batch.user_data_bytes) { break; } } let metrics = from_records( &records, - self.benchmark_kind, - ActorKind::Producer, - self.producer_id, - self.sampling_time, - self.moving_average_window, + benchmark_kind, + ActorKind::Consumer, + config.consumer_id, + sampling_time, + moving_average_window, ); Self::log_statistics( - self.producer_id, + config.consumer_id, messages_processed, - batches_processed, - &self.messages_per_batch, + u32::try_from(batches_processed).unwrap_or(u32::MAX), + &config.messages_per_batch, &metrics, ); Ok(metrics) } - fn log_statistics( - producer_id: u32, + pub fn log_statistics( + consumer_id: u32, total_messages: u64, - message_batches: u64, + message_batches: u32, messages_per_batch: &BenchmarkNumericParameter, metrics: &BenchmarkIndividualMetrics, ) { info!( - "Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \ - p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, p9999 latency: {:.2} ms, \ - average latency: {:.2} ms, median latency: {:.2} ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms", - producer_id, + "Consumer #{} → polled {} messages, {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \ + p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, \ + p9999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2} ms, min latency: {:.2} ms, max latency: {:.2} ms, std dev latency: {:.2} ms", + consumer_id, total_messages.human_count_bare(), message_batches.human_count_bare(), messages_per_batch, diff --git a/core/bench/src/actors/consumer/high_level_backend.rs b/core/bench/src/actors/consumer/high_level_backend.rs new file mode 100644 index 00000000..824caf1b --- /dev/null +++ b/core/bench/src/actors/consumer/high_level_backend.rs @@ -0,0 +1,153 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, HighLevelBackend}; +use futures_util::StreamExt; +use iggy::prelude::*; +use integration::test_server::login_root; +use tokio::time::Instant; +use tracing::{info, warn}; + +impl BenchmarkConsumerBackend for HighLevelBackend { + type Consumer = IggyConsumer; + + async fn setup(&self) -> Result<Self::Consumer, IggyError> { + let topic_id: u32 = 1; + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let stream_id_str = self.config.stream_id.to_string(); + let topic_id_str = topic_id.to_string(); + + let mut iggy_consumer = if let Some(cg_id) = self.config.consumer_group_id { + let consumer_group_name = format!("bench_cg_{cg_id}"); + // Consumer groups use auto-commit (matching PollingKind::Next behavior from low-level API) + client + .consumer_group(&consumer_group_name, &stream_id_str, &topic_id_str)? + .batch_size(self.config.messages_per_batch.get()) + .auto_commit(AutoCommit::When(AutoCommitWhen::PollingMessages)) + .auto_join_consumer_group() + .build() + } else { + // TODO(hubcio): as of now, there is no way to mimic the behavior of + // PollingKind::Offset, because high level API doesn't provide method + // to commit local offset manually, only auto-commit on server. + client + .consumer( + &format!("bench_consumer_{}", self.config.consumer_id), + &stream_id_str, + &topic_id_str, + 1, + )? + .batch_size(self.config.messages_per_batch.get()) + .auto_commit(AutoCommit::Disabled) // TODO@spetz + .build() + }; + + iggy_consumer.init().await?; + Ok(iggy_consumer) + } + + async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(), IggyError> { + let warmup_end = Instant::now() + self.config.warmup_time.get_duration(); + while Instant::now() < warmup_end { + if let Some(message) = consumer.next().await { + if message.is_err() { + break; + } + } + } + Ok(()) + } + + async fn consume_batch( + &self, + consumer: &mut Self::Consumer, + ) -> Result<ConsumedBatch, IggyError> { + let batch_start = Instant::now(); + let mut batch_messages = 0; + let mut batch_user_bytes = 0; + let mut batch_total_bytes = 0; + + while batch_messages <= self.config.messages_per_batch.get() { + tracing::warn!("Consuming batch of {} messages", batch_messages); + if let Some(message_result) = consumer.next().await { + match message_result { + Ok(received_message) => { + batch_messages += 1; + batch_user_bytes += received_message.message.payload.len() as u64; + batch_total_bytes += + received_message.message.get_size_bytes().as_bytes_u64(); + + if batch_messages >= self.config.messages_per_batch.get() { + info!( + "Batch of {} messages consumed, last_offset: {}", + batch_messages, received_message.message.header.offset + ); + break; + } + } + Err(err) => { + warn!("Error receiving message: {}", err); + } + } + } else { + break; + } + } + + Ok(ConsumedBatch { + messages: batch_messages, + user_data_bytes: batch_user_bytes, + total_bytes: batch_total_bytes, + latency: batch_start.elapsed(), + }) + } + + fn log_setup_info(&self) { + if let Some(cg_id) = self.config.consumer_group_id { + info!( + "Consumer #{}, part of consumer group #{} → polling in {} messages per batch from stream {}, using high-level API...", + self.config.consumer_id, + cg_id, + self.config.messages_per_batch, + self.config.stream_id, + ); + } else { + info!( + "Consumer #{} → polling in {} messages per batch from stream {}, using high-level API...", + self.config.consumer_id, self.config.messages_per_batch, self.config.stream_id, + ); + } + } + + fn log_warmup_info(&self) { + if let Some(cg_id) = self.config.consumer_group_id { + info!( + "Consumer #{}, part of consumer group #{}, → warming up for {}...", + self.config.consumer_id, cg_id, self.config.warmup_time + ); + } else { + info!( + "Consumer #{} → warming up for {}...", + self.config.consumer_id, self.config.warmup_time + ); + } + } +} diff --git a/core/bench/src/actors/consumer/low_level_backend.rs b/core/bench/src/actors/consumer/low_level_backend.rs new file mode 100644 index 00000000..71f11b5a --- /dev/null +++ b/core/bench/src/actors/consumer/low_level_backend.rs @@ -0,0 +1,211 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::backend::{BenchmarkConsumerBackend, ConsumedBatch, LowLevelBackend}; +use crate::benchmarks::common::create_consumer; +use crate::utils::{batch_total_size_bytes, batch_user_size_bytes}; +use iggy::prelude::*; +use integration::test_server::login_root; +use std::time::Duration; +use tokio::time::Instant; +use tracing::{info, warn}; + +impl BenchmarkConsumerBackend for LowLevelBackend { + type Consumer = ( + IggyClient, + Identifier, + Identifier, + Option<u32>, + Consumer, + u64, + ); + + async fn setup(&self) -> Result<Self::Consumer, IggyError> { + let topic_id: u32 = 1; + let default_partition_id: u32 = 1; + let client = self.client_factory.create_client().await; + let client = IggyClient::create(client, None, None); + login_root(&client).await; + + let stream_id = self.config.stream_id.try_into().unwrap(); + let topic_id = topic_id.try_into().unwrap(); + let partition_id = if self.config.consumer_group_id.is_some() { + None + } else { + Some(default_partition_id) + }; + let cg_id = self.config.consumer_group_id; + let consumer = create_consumer( + &client, + cg_id.as_ref(), + &stream_id, + &topic_id, + self.config.consumer_id, + ) + .await; + + Ok((client, stream_id, topic_id, partition_id, consumer, 0)) + } + + async fn warmup(&self, consumer: &mut Self::Consumer) -> Result<(), IggyError> { + let (client, stream_id, topic_id, partition_id, consumer_obj, messages_processed) = + consumer; + let warmup_end = Instant::now() + self.config.warmup_time.get_duration(); + let mut warmup_messages_processed = 0; + let mut _last_batch_user_size_bytes = 0; + + while Instant::now() < warmup_end { + let messages_to_receive = self.config.messages_per_batch.get(); + let offset = *messages_processed + warmup_messages_processed; + let (strategy, auto_commit) = match self.config.polling_kind { + PollingKind::Offset => (PollingStrategy::offset(offset), false), + PollingKind::Next => (PollingStrategy::next(), true), + _ => panic!( + "Unsupported polling kind for benchmark: {:?}", + self.config.polling_kind + ), + }; + let polled_messages = client + .poll_messages( + stream_id, + topic_id, + *partition_id, + consumer_obj, + &strategy, + messages_to_receive, + auto_commit, + ) + .await?; + + if polled_messages.messages.is_empty() { + warn!( + "Consumer #{} → Messages are empty for offset: {}, retrying...", + self.config.consumer_id, offset + ); + continue; + } + warmup_messages_processed += polled_messages.messages.len() as u64; + _last_batch_user_size_bytes = batch_user_size_bytes(&polled_messages); + } + Ok(()) + } + + async fn consume_batch( + &self, + consumer: &mut Self::Consumer, + ) -> Result<ConsumedBatch, IggyError> { + let (client, stream_id, topic_id, partition_id, consumer_obj, messages_processed) = + consumer; + let messages_to_receive = self.config.messages_per_batch.get(); + let offset = *messages_processed; + let (strategy, auto_commit) = match self.config.polling_kind { + PollingKind::Offset => (PollingStrategy::offset(offset), false), + PollingKind::Next => (PollingStrategy::next(), true), + _ => panic!( + "Unsupported polling kind for benchmark: {:?}", + self.config.polling_kind + ), + }; + + let before_poll = Instant::now(); + let polled_messages = client + .poll_messages( + stream_id, + topic_id, + *partition_id, + consumer_obj, + &strategy, + messages_to_receive, + auto_commit, + ) + .await; + + let polled_messages = match polled_messages { + Ok(messages) => messages, + Err(e) => { + if matches!(e, IggyError::TopicIdNotFound(_, _)) { + return Ok(ConsumedBatch { + messages: 0, + user_data_bytes: 0, + total_bytes: 0, + latency: before_poll.elapsed(), + }); + } + return Err(e); + } + }; + + if polled_messages.messages.is_empty() { + return Ok(ConsumedBatch { + messages: 0, + user_data_bytes: 0, + total_bytes: 0, + latency: before_poll.elapsed(), + }); + } + let latency = if self.config.origin_timestamp_latency_calculation { + let now = IggyTimestamp::now().as_micros(); + Duration::from_micros(now - polled_messages.messages[0].header.origin_timestamp) + } else { + before_poll.elapsed() + }; + + let batch_user_size_bytes = batch_user_size_bytes(&polled_messages); + let batch_total_size_bytes = batch_total_size_bytes(&polled_messages); + + *messages_processed += polled_messages.messages.len() as u64; + + Ok(ConsumedBatch { + messages: u32::try_from(polled_messages.messages.len()).unwrap(), + user_data_bytes: batch_user_size_bytes, + total_bytes: batch_total_size_bytes, + latency, + }) + } + + fn log_setup_info(&self) { + if let Some(cg_id) = self.config.consumer_group_id { + info!( + "Consumer #{}, part of consumer group #{} → polling in {} messages per batch from stream {}, using low-level API...", + self.config.consumer_id, + cg_id, + self.config.messages_per_batch, + self.config.stream_id, + ); + } else { + info!( + "Consumer #{} → polling in {} messages per batch from stream {}, using low-level API...", + self.config.consumer_id, self.config.messages_per_batch, self.config.stream_id, + ); + } + } + + fn log_warmup_info(&self) { + if let Some(cg_id) = self.config.consumer_group_id { + info!( + "Consumer #{}, part of consumer group #{}, → warming up for {}...", + self.config.consumer_id, cg_id, self.config.warmup_time + ); + } else { + info!( + "Consumer #{} → warming up for {}...", + self.config.consumer_id, self.config.warmup_time + ); + } + } +} diff --git a/core/bench/src/actors/consumer/mod.rs b/core/bench/src/actors/consumer/mod.rs new file mode 100644 index 00000000..2912806b --- /dev/null +++ b/core/bench/src/actors/consumer/mod.rs @@ -0,0 +1,24 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod backend; +pub mod benchmark_consumer; +pub mod high_level_backend; +pub mod low_level_backend; + +pub use benchmark_consumer::BenchmarkConsumer; diff --git a/core/bench/src/actors/producer.rs b/core/bench/src/actors/producer/benchmark_producer.rs similarity index 100% rename from core/bench/src/actors/producer.rs rename to core/bench/src/actors/producer/benchmark_producer.rs diff --git a/core/bench/src/actors/producer/mod.rs b/core/bench/src/actors/producer/mod.rs new file mode 100644 index 00000000..d4f6209f --- /dev/null +++ b/core/bench/src/actors/producer/mod.rs @@ -0,0 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod benchmark_producer; + +pub use benchmark_producer::BenchmarkProducer; diff --git a/core/bench/src/actors/producing_consumer.rs b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs similarity index 100% rename from core/bench/src/actors/producing_consumer.rs rename to core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs diff --git a/core/bench/src/actors/producing_consumer/mod.rs b/core/bench/src/actors/producing_consumer/mod.rs new file mode 100644 index 00000000..2927c8d9 --- /dev/null +++ b/core/bench/src/actors/producing_consumer/mod.rs @@ -0,0 +1,21 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod benchmark_producing_consumer; + +pub use benchmark_producing_consumer::BenchmarkProducingConsumer; diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs index 1ca93f0c..7d475aa0 100644 --- a/core/bench/src/args/common.rs +++ b/core/bench/src/args/common.rs @@ -41,6 +41,7 @@ use std::str::FromStr; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] +#[allow(clippy::struct_excessive_bools)] pub struct IggyBenchArgs { /// Benchmark kind #[command(subcommand)] @@ -103,6 +104,10 @@ pub struct IggyBenchArgs { /// Only applicable to local benchmarks. #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START, verbatim_doc_comment)] pub skip_server_start: bool, + + /// Use high-level API for actors + #[arg(long, short = 'H', default_value_t = false)] + pub high_level_api: bool, } fn validate_server_executable_path(v: &str) -> Result<String, String> { @@ -182,6 +187,15 @@ impl IggyBenchArgs { } } + if self.high_level_api && !self.messages_per_batch.is_fixed() { + Self::command() + .error( + ErrorKind::ArgumentConflict, + "High-level consumer API (--high-level-api) requires fixed batch size, but random batch size was specified. Use a single value instead of a range for --messages-per-batch.", + ) + .exit(); + } + self.benchmark_kind.inner().validate(); } @@ -330,6 +344,10 @@ impl IggyBenchArgs { self.benchmark_kind.inner().max_topic_size() } + pub const fn high_level_api(&self) -> bool { + self.high_level_api + } + /// Generates the output directory name based on benchmark parameters. pub fn generate_dir_name(&self) -> String { let benchmark_kind = match &self.benchmark_kind { diff --git a/core/bench/src/benchmarks/common.rs b/core/bench/src/benchmarks/common.rs index 22c9ad22..d030841c 100644 --- a/core/bench/src/benchmarks/common.rs +++ b/core/bench/src/benchmarks/common.rs @@ -190,6 +190,7 @@ pub fn build_consumer_futures( let global_finish_condition = BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::Shared); let rate_limit = rate_limit_per_actor(args.rate_limit(), actors); + let use_high_level_api = args.high_level_api(); (1..=consumers) .map(|consumer_id| { @@ -225,6 +226,7 @@ pub fn build_consumer_futures( polling_kind, rate_limit, origin_timestamp_latency_calculation, + use_high_level_api, ); consumer.run().await }
