This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b6ad773 fix(bench): add configurable read_amplification and fix E2E 
latency (#2468)
f9b6ad773 is described below

commit f9b6ad7733da32b2b408b5426e801f98fddb2c7e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Dec 21 14:00:32 2025 +0100

    fix(bench): add configurable read_amplification and fix E2E latency (#2468)
    
    - Add --read-amplification (-R) CLI argument for ppc and bpcg benchmarks
    to configure consumer rate limit multiplier (default: 1.05 = 5%)
    - Enable origin_timestamp latency calculation for
    PinnedProducerAndConsumer, EndToEndProducingConsumer, and
    EndToEndProducingConsumerGroup benchmarks
    - Fix consumer group ID in e2ecg benchmark (use start_consumer_group_id
    instead of calculated ID that didn't match server-assigned IDs)
    - Remove stale TODO comments and simplify match statements to hardcoded
    values where only one benchmark type calls the function
---
 core/bench/src/args/common.rs                      |  4 ++
 core/bench/src/args/kind.rs                        |  4 ++
 .../kinds/balanced/producer_and_consumer_group.rs  | 10 +++++
 .../src/args/kinds/pinned/producer_and_consumer.rs | 10 +++++
 core/bench/src/args/props.rs                       |  7 ++++
 core/bench/src/benchmarks/common.rs                | 45 +++++++++++-----------
 6 files changed, 58 insertions(+), 22 deletions(-)

diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 26491232b..c3a6f2bd4 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -334,6 +334,10 @@ impl IggyBenchArgs {
         self.benchmark_kind.inner().max_topic_size()
     }
 
+    pub fn read_amplification(&self) -> Option<f32> {
+        self.benchmark_kind.inner().read_amplification()
+    }
+
     pub const fn high_level_api(&self) -> bool {
         self.high_level_api
     }
diff --git a/core/bench/src/args/kind.rs b/core/bench/src/args/kind.rs
index 9ffa1acf2..7cbdfb7b8 100644
--- a/core/bench/src/args/kind.rs
+++ b/core/bench/src/args/kind.rs
@@ -149,6 +149,10 @@ impl BenchmarkKindProps for BenchmarkKindCommand {
         self.inner().max_topic_size()
     }
 
+    fn read_amplification(&self) -> Option<f32> {
+        self.inner().read_amplification()
+    }
+
     fn inner(&self) -> &dyn BenchmarkKindProps {
         match self {
             Self::PinnedProducer(args) => args,
diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs 
b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
index 126336159..a18c869e8 100644
--- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
@@ -59,6 +59,12 @@ pub struct BalancedProducerAndConsumerGroupArgs {
     /// Max topic size in human readable format, e.g. "1GiB", "2MiB", "1GiB". 
If not provided then the server default will be used.
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
+
+    /// Consumer rate limit multiplier relative to producer rate.
+    /// When measuring E2E latency, consumers may need higher throughput to 
prevent queue buildup.
+    /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
+    #[arg(long, short = 'R', default_value_t = 1.05)]
+    pub read_amplification: f32,
 }
 
 impl BenchmarkKindProps for BalancedProducerAndConsumerGroupArgs {
@@ -86,6 +92,10 @@ impl BenchmarkKindProps for 
BalancedProducerAndConsumerGroupArgs {
         self.consumer_groups.get()
     }
 
+    fn read_amplification(&self) -> Option<f32> {
+        Some(self.read_amplification)
+    }
+
     fn validate(&self) {
         let cg_number = self.number_of_consumer_groups();
         let streams = self.streams();
diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs 
b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
index 3d4e8f641..6777e253c 100644
--- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
+++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
@@ -54,6 +54,12 @@ pub struct PinnedProducerAndConsumerArgs {
     /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". 
If not provided then the server default will be used.
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
+
+    /// Consumer rate limit multiplier relative to producer rate.
+    /// When measuring E2E latency, consumers may need higher throughput to 
prevent queue buildup.
+    /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
+    #[arg(long, short = 'R', default_value_t = 1.05)]
+    pub read_amplification: f32,
 }
 
 impl BenchmarkKindProps for PinnedProducerAndConsumerArgs {
@@ -85,6 +91,10 @@ impl BenchmarkKindProps for PinnedProducerAndConsumerArgs {
         self.max_topic_size
     }
 
+    fn read_amplification(&self) -> Option<f32> {
+        Some(self.read_amplification)
+    }
+
     fn validate(&self) {
         let partitions = self.partitions();
         let mut cmd = IggyBenchArgs::command();
diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs
index 7c8017cb7..2230163e8 100644
--- a/core/bench/src/args/props.rs
+++ b/core/bench/src/args/props.rs
@@ -28,6 +28,13 @@ pub trait BenchmarkKindProps {
     fn transport_command(&self) -> &BenchmarkTransportCommand;
     fn max_topic_size(&self) -> Option<IggyByteSize>;
     fn validate(&self);
+
+    /// Consumer rate limit multiplier relative to producer rate.
+    /// Only applicable to benchmarks with separate producer/consumer actors.
+    fn read_amplification(&self) -> Option<f32> {
+        None
+    }
+
     fn inner(&self) -> &dyn BenchmarkKindProps
     where
         Self: std::marker::Sized,
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index 844f83a27..97c66fb7b 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -169,16 +169,30 @@ pub fn build_consumer_futures(
         PollingKind::Offset
     };
     let origin_timestamp_latency_calculation = match args.kind() {
-        BenchmarkKind::PinnedConsumer
-        | BenchmarkKind::PinnedProducerAndConsumer
-        | BenchmarkKind::BalancedConsumerGroup => false, // TODO(hubcio): in 
future, PinnedProducerAndConsumer can also be true
-        BenchmarkKind::BalancedProducerAndConsumerGroup => true,
+        BenchmarkKind::PinnedConsumer | BenchmarkKind::BalancedConsumerGroup 
=> false,
+        BenchmarkKind::PinnedProducerAndConsumer
+        | BenchmarkKind::BalancedProducerAndConsumerGroup => true,
         _ => unreachable!(),
     };
 
     let global_finish_condition =
         BenchmarkFinishCondition::new(args, 
BenchmarkFinishConditionMode::Shared);
-    let rate_limit = rate_limit_per_actor(args.rate_limit(), actors);
+    // When measuring true E2E latency, apply read_amplification multiplier to 
consumer rate
+    // to ensure they can keep up with producers and not inflate latency due 
to queue buildup
+    let read_amplification = args.read_amplification().unwrap_or(1.0);
+    let rate_limit = rate_limit_per_actor(args.rate_limit(), actors).map(|rl| {
+        if origin_timestamp_latency_calculation {
+            #[allow(
+                clippy::cast_sign_loss,
+                clippy::cast_possible_truncation,
+                clippy::cast_precision_loss
+            )]
+            let amplified = (rl.as_bytes_u64() as f64 * 
f64::from(read_amplification)) as u64;
+            IggyByteSize::from(amplified)
+        } else {
+            rl
+        }
+    });
     let use_high_level_api = args.high_level_api();
 
     (1..=consumers)
@@ -252,14 +266,7 @@ pub fn build_producing_consumers_futures(
                 &args,
                 BenchmarkFinishConditionMode::PerProducingConsumer,
             );
-            let origin_timestamp_latency_calculation = match args.kind() {
-                BenchmarkKind::PinnedConsumer
-                | BenchmarkKind::PinnedProducerAndConsumer
-                | BenchmarkKind::BalancedConsumerGroup  // TODO(hubcio): in 
future, PinnedProducerAndConsumer can also be true
-                | BenchmarkKind::EndToEndProducingConsumer => false,
-                BenchmarkKind::BalancedProducerAndConsumerGroup => true,
-                _ => unreachable!(),
-            };
+            let origin_timestamp_latency_calculation = true;
             let use_high_level_api = args.high_level_api();
             let rate_limit = rate_limit_per_actor(args.rate_limit(), 
producing_consumers);
 
@@ -344,18 +351,12 @@ pub fn build_producing_consumer_groups_futures(
             };
 
             let consumer_group_id = if should_consume {
-                Some(start_consumer_group_id + 1 + (actor_id % cg_count))
+                // Each stream has exactly one CG, server assigns IDs starting 
from 0
+                Some(start_consumer_group_id)
             } else {
                 None
             };
-            let origin_timestamp_latency_calculation = match args.kind() {
-                BenchmarkKind::PinnedConsumer
-                | BenchmarkKind::PinnedProducerAndConsumer
-                | BenchmarkKind::BalancedConsumerGroup  // TODO(hubcio): in 
future, PinnedProducerAndConsumer can also be true
-                | BenchmarkKind::EndToEndProducingConsumer => false,
-                BenchmarkKind::BalancedProducerAndConsumerGroup => true,
-                _ => unreachable!(),
-            };
+            let origin_timestamp_latency_calculation = true;
 
             async move {
                 let actor_type = match (should_produce, should_consume) {

Reply via email to