This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f9b6ad773 fix(bench): add configurable read_amplification and fix E2E
latency (#2468)
f9b6ad773 is described below
commit f9b6ad7733da32b2b408b5426e801f98fddb2c7e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Dec 21 14:00:32 2025 +0100
fix(bench): add configurable read_amplification and fix E2E latency (#2468)
- Add --read-amplification (-R) CLI argument for ppc and bpcg benchmarks
to configure consumer rate limit multiplier (default: 1.05 = 5%)
- Enable origin_timestamp latency calculation for
PinnedProducerAndConsumer, EndToEndProducingConsumer, and
EndToEndProducingConsumerGroup benchmarks
- Fix consumer group ID in e2ecg benchmark (use start_consumer_group_id
instead of calculated ID that didn't match server-assigned IDs)
- Remove stale TODO comments and simplify match statements to hardcoded
values where only one benchmark type calls the function
---
core/bench/src/args/common.rs | 4 ++
core/bench/src/args/kind.rs | 4 ++
.../kinds/balanced/producer_and_consumer_group.rs | 10 +++++
.../src/args/kinds/pinned/producer_and_consumer.rs | 10 +++++
core/bench/src/args/props.rs | 7 ++++
core/bench/src/benchmarks/common.rs | 45 +++++++++++-----------
6 files changed, 58 insertions(+), 22 deletions(-)
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 26491232b..c3a6f2bd4 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -334,6 +334,10 @@ impl IggyBenchArgs {
self.benchmark_kind.inner().max_topic_size()
}
+ pub fn read_amplification(&self) -> Option<f32> {
+ self.benchmark_kind.inner().read_amplification()
+ }
+
pub const fn high_level_api(&self) -> bool {
self.high_level_api
}
diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs
index 9ffa1acf2..7cbdfb7b8 100644
--- a/core/bench/src/args/kind.rs
+++ b/core/bench/src/args/kind.rs
@@ -149,6 +149,10 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
self.inner().max_topic_size()
}
+ fn read_amplification(&self) -> Option<f32> {
+ self.inner().read_amplification()
+ }
+
fn inner(&self) -> &dyn BenchmarkKindProps {
match self {
Self::PinnedProducer(args) => args,
diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
index 126336159..a18c869e8 100644
--- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
@@ -59,6 +59,12 @@ pub struct BalancedProducerAndConsumerGroupArgs {
/// Max topic size in human readable format, e.g. "1GiB", "2MiB", "1GiB".
If not provided then the server default will be used.
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+
+ /// Consumer rate limit multiplier relative to producer rate.
+ /// When measuring E2E latency, consumers may need higher throughput to
prevent queue buildup.
+ /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
+ #[arg(long, short = 'R', default_value_t = 1.05)]
+ pub read_amplification: f32,
}
impl BenchmarkKindProps for BalancedProducerAndConsumerGroupArgs {
@@ -86,6 +92,10 @@ impl BenchmarkKindProps for
BalancedProducerAndConsumerGroupArgs {
self.consumer_groups.get()
}
+ fn read_amplification(&self) -> Option<f32> {
+ Some(self.read_amplification)
+ }
+
fn validate(&self) {
let cg_number = self.number_of_consumer_groups();
let streams = self.streams();
diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
index 3d4e8f641..6777e253c 100644
--- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
+++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
@@ -54,6 +54,12 @@ pub struct PinnedProducerAndConsumerArgs {
/// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB".
If not provided then the server default will be used.
#[arg(long, short = 'T')]
pub max_topic_size: Option<IggyByteSize>,
+
+ /// Consumer rate limit multiplier relative to producer rate.
+ /// When measuring E2E latency, consumers may need higher throughput to
prevent queue buildup.
+ /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
+ #[arg(long, short = 'R', default_value_t = 1.05)]
+ pub read_amplification: f32,
}
impl BenchmarkKindProps for PinnedProducerAndConsumerArgs {
@@ -85,6 +91,10 @@ impl BenchmarkKindProps for PinnedProducerAndConsumerArgs {
self.max_topic_size
}
+ fn read_amplification(&self) -> Option<f32> {
+ Some(self.read_amplification)
+ }
+
fn validate(&self) {
let partitions = self.partitions();
let mut cmd = IggyBenchArgs::command();
diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs
index 7c8017cb7..2230163e8 100644
--- a/core/bench/src/args/props.rs
+++ b/core/bench/src/args/props.rs
@@ -28,6 +28,13 @@ pub trait BenchmarkKindProps {
fn transport_command(&self) -> &BenchmarkTransportCommand;
fn max_topic_size(&self) -> Option<IggyByteSize>;
fn validate(&self);
+
+ /// Consumer rate limit multiplier relative to producer rate.
+ /// Only applicable to benchmarks with separate producer/consumer actors.
+ fn read_amplification(&self) -> Option<f32> {
+ None
+ }
+
fn inner(&self) -> &dyn BenchmarkKindProps
where
Self: std::marker::Sized,
diff --git a/core/bench/src/benchmarks/common.rs
b/core/bench/src/benchmarks/common.rs
index 844f83a27..97c66fb7b 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -169,16 +169,30 @@ pub fn build_consumer_futures(
PollingKind::Offset
};
let origin_timestamp_latency_calculation = match args.kind() {
- BenchmarkKind::PinnedConsumer
- | BenchmarkKind::PinnedProducerAndConsumer
- | BenchmarkKind::BalancedConsumerGroup => false, // TODO(hubcio): in
future, PinnedProducerAndConsumer can also be true
- BenchmarkKind::BalancedProducerAndConsumerGroup => true,
+ BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup
=> false,
+ BenchmarkKind::PinnedProducerAndConsumer
+ | BenchmarkKind::BalancedProducerAndConsumerGroup => true,
_ => unreachable!(),
};
let global_finish_condition =
BenchmarkFinishCondition::new(args,
BenchmarkFinishConditionMode::Shared);
- let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
+ // When measuring true E2E latency, apply read_amplification multiplier to
consumer rate
+ // to ensure they can keep up with producers and not inflate latency due
to queue buildup
+ let read_amplification = args.read_amplification().unwrap_or(1.0);
+ let rate_limit = rate_limit_per_actor(args.rate_limit(), actors).map(|rl| {
+ if origin_timestamp_latency_calculation {
+ #[allow(
+ clippy::cast_sign_loss,
+ clippy::cast_possible_truncation,
+ clippy::cast_precision_loss
+ )]
+ let amplified = (rl.as_bytes_u64() as f64 *
f64::from(read_amplification)) as u64;
+ IggyByteSize::from(amplified)
+ } else {
+ rl
+ }
+ });
let use_high_level_api = args.high_level_api();
(1..=consumers)
@@ -252,14 +266,7 @@ pub fn build_producing_consumers_futures(
&args,
BenchmarkFinishConditionMode::PerProducingConsumer,
);
- let origin_timestamp_latency_calculation = match args.kind() {
- BenchmarkKind::PinnedConsumer
- | BenchmarkKind::PinnedProducerAndConsumer
- | BenchmarkKind::BalancedConsumerGroup // TODO(hubcio): in
future, PinnedProducerAndConsumer can also be true
- | BenchmarkKind::EndToEndProducingConsumer => false,
- BenchmarkKind::BalancedProducerAndConsumerGroup => true,
- _ => unreachable!(),
- };
+ let origin_timestamp_latency_calculation = true;
let use_high_level_api = args.high_level_api();
let rate_limit = rate_limit_per_actor(args.rate_limit(),
producing_consumers);
@@ -344,18 +351,12 @@ pub fn build_producing_consumer_groups_futures(
};
let consumer_group_id = if should_consume {
- Some(start_consumer_group_id + 1 + (actor_id % cg_count))
+ // Each stream has exactly one CG, server assigns IDs starting
from 0
+ Some(start_consumer_group_id)
} else {
None
};
- let origin_timestamp_latency_calculation = match args.kind() {
- BenchmarkKind::PinnedConsumer
- | BenchmarkKind::PinnedProducerAndConsumer
- | BenchmarkKind::BalancedConsumerGroup // TODO(hubcio): in
future, PinnedProducerAndConsumer can also be true
- | BenchmarkKind::EndToEndProducingConsumer => false,
- BenchmarkKind::BalancedProducerAndConsumerGroup => true,
- _ => unreachable!(),
- };
+ let origin_timestamp_latency_calculation = true;
async move {
let actor_type = match (should_produce, should_consume) {