This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch fix-e2e-latency-calc in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 553453c10f0837c2688338e9e26d65911b876528 Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Dec 10 15:39:48 2025 +0100 fix(bench): add configurable read_amplification and fix E2E latency - Add --read-amplification (-R) CLI argument for ppc and bpcg benchmarks to configure consumer rate limit multiplier (default: 1.05 = 5%) - Enable origin_timestamp latency calculation for PinnedProducerAndConsumer, EndToEndProducingConsumer, and EndToEndProducingConsumerGroup benchmarks - Fix consumer group ID in e2ecg benchmark (use start_consumer_group_id instead of calculated ID that didn't match server-assigned IDs) - Remove stale TODO comments and simplify match statements to hardcoded values where only one benchmark type calls the function --- core/bench/src/args/common.rs | 4 ++ core/bench/src/args/kind.rs | 4 ++ .../kinds/balanced/producer_and_consumer_group.rs | 10 +++++ .../src/args/kinds/pinned/producer_and_consumer.rs | 10 +++++ core/bench/src/args/props.rs | 7 ++++ core/bench/src/benchmarks/common.rs | 45 +++++++++++----------- 6 files changed, 58 insertions(+), 22 deletions(-) diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs index 26491232b..c3a6f2bd4 100644 --- a/core/bench/src/args/common.rs +++ b/core/bench/src/args/common.rs @@ -334,6 +334,10 @@ impl IggyBenchArgs { self.benchmark_kind.inner().max_topic_size() } + pub fn read_amplification(&self) -> Option<f32> { + self.benchmark_kind.inner().read_amplification() + } + pub const fn high_level_api(&self) -> bool { self.high_level_api } diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs index 9ffa1acf2..7cbdfb7b8 100644 --- a/core/bench/src/args/kind.rs +++ b/core/bench/src/args/kind.rs @@ -149,6 +149,10 @@ impl BenchmarkKindProps for BenchmarkKindCommand { self.inner().max_topic_size() } + fn read_amplification(&self) -> Option<f32> { + self.inner().read_amplification() + } + fn inner(&self) -> &dyn BenchmarkKindProps { match self { Self::PinnedProducer(args) => args, diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs index 126336159..a18c869e8 100644 --- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs +++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs @@ -59,6 +59,12 @@ pub struct BalancedProducerAndConsumerGroupArgs { /// Max topic size in human readable format, e.g. "1GiB", "2MiB", "1GiB". If not provided then the server default will be used. #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + + /// Consumer rate limit multiplier relative to producer rate. + /// When measuring E2E latency, consumers may need higher throughput to prevent queue buildup. + /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable. + #[arg(long, short = 'R', default_value_t = 1.05)] + pub read_amplification: f32, } impl BenchmarkKindProps for BalancedProducerAndConsumerGroupArgs { @@ -86,6 +92,10 @@ impl BenchmarkKindProps for BalancedProducerAndConsumerGroupArgs { self.consumer_groups.get() } + fn read_amplification(&self) -> Option<f32> { + Some(self.read_amplification) + } + fn validate(&self) { let cg_number = self.number_of_consumer_groups(); let streams = self.streams(); diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs index 3d4e8f641..6777e253c 100644 --- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs +++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs @@ -54,6 +54,12 @@ pub struct PinnedProducerAndConsumerArgs { /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". If not provided then the server default will be used. #[arg(long, short = 'T')] pub max_topic_size: Option<IggyByteSize>, + + /// Consumer rate limit multiplier relative to producer rate. + /// When measuring E2E latency, consumers may need higher throughput to prevent queue buildup. + /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable. + #[arg(long, short = 'R', default_value_t = 1.05)] + pub read_amplification: f32, } impl BenchmarkKindProps for PinnedProducerAndConsumerArgs { @@ -85,6 +91,10 @@ impl BenchmarkKindProps for PinnedProducerAndConsumerArgs { self.max_topic_size } + fn read_amplification(&self) -> Option<f32> { + Some(self.read_amplification) + } + fn validate(&self) { let partitions = self.partitions(); let mut cmd = IggyBenchArgs::command(); diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs index 7c8017cb7..2230163e8 100644 --- a/core/bench/src/args/props.rs +++ b/core/bench/src/args/props.rs @@ -28,6 +28,13 @@ pub trait BenchmarkKindProps { fn transport_command(&self) -> &BenchmarkTransportCommand; fn max_topic_size(&self) -> Option<IggyByteSize>; fn validate(&self); + + /// Consumer rate limit multiplier relative to producer rate. + /// Only applicable to benchmarks with separate producer/consumer actors. + fn read_amplification(&self) -> Option<f32> { + None + } + fn inner(&self) -> &dyn BenchmarkKindProps where Self: std::marker::Sized, diff --git a/core/bench/src/benchmarks/common.rs b/core/bench/src/benchmarks/common.rs index 844f83a27..97c66fb7b 100644 --- a/core/bench/src/benchmarks/common.rs +++ b/core/bench/src/benchmarks/common.rs @@ -169,16 +169,30 @@ pub fn build_consumer_futures( PollingKind::Offset }; let origin_timestamp_latency_calculation = match args.kind() { - BenchmarkKind::PinnedConsumer - | BenchmarkKind::PinnedProducerAndConsumer - | BenchmarkKind::BalancedConsumerGroup => false, // TODO(hubcio): in future, PinnedProducerAndConsumer can also be true - BenchmarkKind::BalancedProducerAndConsumerGroup => true, + BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup => false, + BenchmarkKind::PinnedProducerAndConsumer + | BenchmarkKind::BalancedProducerAndConsumerGroup => true, _ => unreachable!(), }; let global_finish_condition = BenchmarkFinishCondition::new(args, BenchmarkFinishConditionMode::Shared); - let rate_limit = rate_limit_per_actor(args.rate_limit(), actors); + // When measuring true E2E latency, apply read_amplification multiplier to consumer rate + // to ensure they can keep up with producers and not inflate latency due to queue buildup + let read_amplification = args.read_amplification().unwrap_or(1.0); + let rate_limit = rate_limit_per_actor(args.rate_limit(), actors).map(|rl| { + if origin_timestamp_latency_calculation { + #[allow( + clippy::cast_sign_loss, + clippy::cast_possible_truncation, + clippy::cast_precision_loss + )] + let amplified = (rl.as_bytes_u64() as f64 * f64::from(read_amplification)) as u64; + IggyByteSize::from(amplified) + } else { + rl + } + }); let use_high_level_api = args.high_level_api(); (1..=consumers) @@ -252,14 +266,7 @@ pub fn build_producing_consumers_futures( &args, BenchmarkFinishConditionMode::PerProducingConsumer, ); - let origin_timestamp_latency_calculation = match args.kind() { - BenchmarkKind::PinnedConsumer - | BenchmarkKind::PinnedProducerAndConsumer - | BenchmarkKind::BalancedConsumerGroup // TODO(hubcio): in future, PinnedProducerAndConsumer can also be true - | BenchmarkKind::EndToEndProducingConsumer => false, - BenchmarkKind::BalancedProducerAndConsumerGroup => true, - _ => unreachable!(), - }; + let origin_timestamp_latency_calculation = true; let use_high_level_api = args.high_level_api(); let rate_limit = rate_limit_per_actor(args.rate_limit(), producing_consumers); @@ -344,18 +351,12 @@ pub fn build_producing_consumer_groups_futures( }; let consumer_group_id = if should_consume { - Some(start_consumer_group_id + 1 + (actor_id % cg_count)) + // Each stream has exactly one CG, server assigns IDs starting from 0 + Some(start_consumer_group_id) } else { None }; - let origin_timestamp_latency_calculation = match args.kind() { - BenchmarkKind::PinnedConsumer - | BenchmarkKind::PinnedProducerAndConsumer - | BenchmarkKind::BalancedConsumerGroup // TODO(hubcio): in future, PinnedProducerAndConsumer can also be true - | BenchmarkKind::EndToEndProducingConsumer => false, - BenchmarkKind::BalancedProducerAndConsumerGroup => true, - _ => unreachable!(), - }; + let origin_timestamp_latency_calculation = true; async move { let actor_type = match (should_produce, should_consume) {
