This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 177daea6 refactor(bench): parallelize benchmark report builder (#1990)
177daea6 is described below

commit 177daea6546850dcae854d62a46e810162f5dc0b
Author: Blake <[email protected]>
AuthorDate: Thu Jul 10 03:17:42 2025 -0400

    refactor(bench): parallelize benchmark report builder (#1990)
    
    ## Description
    
    Currently, the build() method from BenchmarkReportBuilder is
    single-threaded. This PR parallelizes this in 3 places:
    
    1. In build(), spawn a new thread for every call to
    `from_individual_metrics` or `from_producers_and_consumers_statistics`.
    2. In calculating aggregated time series, spawn one thread for each time
    series (MB, msg, latency).
    3. In TimeSeriesCalculator, use rayon parallel iterators.
    
    (I also tried implementing multithreading and `par_iter`s in other
    places, but those led to slower performance, likely due to threading
    overhead. Those are not included in the PR.)
    
    ## Measuring speedup
    
    I measured the average runtime of build() with 200 producers and 200
    consumers on a Ryzen 9 7950X with 124 GB RAM. build() shows between
    3.7-11x speedup, with greater speedup for greater message loads.
    
    | Message count | Single-threaded runtime (ms) | Multi-threaded runtime
    (ms) | Speedup |
    | ------------- | ---------------------------- |
    --------------------------- | ------- |
    | 8M messages | 155 | 42 | 3.7x |
    | 16M messages | 497 | 74 | 6.7x |
    | 32M messages | 1791 | 162 | 11.0x |
    
    Closes #1976
---
 Cargo.lock                                         |  1 +
 core/bench/Cargo.toml                              |  1 +
 core/bench/src/analytics/metrics/group.rs          | 84 +++++++++++++++-------
 core/bench/src/analytics/report_builder.rs         | 60 +++++++++-------
 core/bench/src/analytics/time_series/calculator.rs |  5 +-
 5 files changed, 98 insertions(+), 53 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ef4c02c9..a89b2ecf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3747,6 +3747,7 @@ dependencies = [
  "integration",
  "nonzero_lit",
  "rand 0.9.1",
+ "rayon",
  "serde",
  "sysinfo 0.35.2",
  "tokio",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index b48ba893..40937ced 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -44,6 +44,7 @@ iggy = { workspace = true }
 integration = { workspace = true }
 nonzero_lit = { workspace = true }
 rand = { workspace = true }
+rayon = "1.10.0"
 serde = { workspace = true }
 sysinfo = { workspace = true }
 tokio = { workspace = true }
diff --git a/core/bench/src/analytics/metrics/group.rs 
b/core/bench/src/analytics/metrics/group.rs
index dd585e37..9e0bd4cf 100644
--- a/core/bench/src/analytics/metrics/group.rs
+++ b/core/bench/src/analytics/metrics/group.rs
@@ -29,8 +29,10 @@ use bench_report::{
     group_metrics_kind::GroupMetricsKind,
     group_metrics_summary::BenchmarkGroupMetricsSummary,
     individual_metrics::BenchmarkIndividualMetrics,
+    time_series::{TimeSeries, TimeSeriesKind},
     utils::{max, min, std_dev},
 };
+use std::thread;
 
 pub fn from_producers_and_consumers_statistics(
     producers_stats: &[BenchmarkIndividualMetrics],
@@ -155,37 +157,69 @@ fn determine_group_kind(stats: 
&[BenchmarkIndividualMetrics]) -> GroupMetricsKin
     }
 }
 
-use bench_report::time_series::TimeSeries;
-
 fn calculate_group_time_series(
     stats: &[BenchmarkIndividualMetrics],
     moving_average_window: u32,
 ) -> (TimeSeries, TimeSeries, TimeSeries) {
-    let mut avg_throughput_mb_ts = TimeSeriesCalculator::aggregate_sum(
-        &stats
-            .iter()
-            .map(|r| r.throughput_mb_ts.clone())
-            .collect::<Vec<_>>(),
-    );
-    let mut avg_throughput_msg_ts = TimeSeriesCalculator::aggregate_sum(
-        &stats
-            .iter()
-            .map(|r| r.throughput_msg_ts.clone())
-            .collect::<Vec<_>>(),
-    );
-    let mut avg_latency_ts = TimeSeriesCalculator::aggregate_avg(
-        &stats
-            .iter()
-            .map(|r| r.latency_ts.clone())
-            .collect::<Vec<_>>(),
-    );
-
     let sma = MovingAverageProcessor::new(moving_average_window as usize);
-    avg_throughput_mb_ts = sma.process(&avg_throughput_mb_ts);
-    avg_throughput_msg_ts = sma.process(&avg_throughput_msg_ts);
-    avg_latency_ts = sma.process(&avg_latency_ts);
 
-    (avg_throughput_mb_ts, avg_throughput_msg_ts, avg_latency_ts)
+    thread::scope(|scope| {
+        let mut join_handles = Vec::new();
+
+        join_handles.push(scope.spawn(|| {
+            let avg_throughput_mb_ts = TimeSeriesCalculator::aggregate_sum(
+                &stats
+                    .iter()
+                    .map(|r| r.throughput_mb_ts.clone())
+                    .collect::<Vec<_>>(),
+            );
+            sma.process(&avg_throughput_mb_ts)
+        }));
+        join_handles.push(scope.spawn(|| {
+            let avg_throughput_msg_ts = TimeSeriesCalculator::aggregate_sum(
+                &stats
+                    .iter()
+                    .map(|r| r.throughput_msg_ts.clone())
+                    .collect::<Vec<_>>(),
+            );
+            sma.process(&avg_throughput_msg_ts)
+        }));
+        join_handles.push(scope.spawn(|| {
+            let avg_latency_ts = TimeSeriesCalculator::aggregate_avg(
+                &stats
+                    .iter()
+                    .map(|r| r.latency_ts.clone())
+                    .collect::<Vec<_>>(),
+            );
+            sma.process(&avg_latency_ts)
+        }));
+
+        let mut time_series: Vec<_> = join_handles
+            .into_iter()
+            .map(|handle| {
+                handle
+                    .join()
+                    .expect("Should not fail to compute aggregated time 
series")
+            })
+            .collect();
+
+        (
+            extract_time_series_of_kind(&mut time_series, 
TimeSeriesKind::ThroughputMB),
+            extract_time_series_of_kind(&mut time_series, 
TimeSeriesKind::ThroughputMsg),
+            extract_time_series_of_kind(&mut time_series, 
TimeSeriesKind::Latency),
+        )
+    })
+}
+
+fn extract_time_series_of_kind(
+    ts_vec: &mut Vec<TimeSeries>,
+    target_kind: TimeSeriesKind,
+) -> TimeSeries {
+    let position = ts_vec
+        .iter()
+        .position(|ts| ts.kind == target_kind)
+        .expect("Should be able to find TimeSeries of target kind");
+    ts_vec.swap_remove(position)
 }
 
 fn calculate_min_max_latencies(
diff --git a/core/bench/src/analytics/report_builder.rs 
b/core/bench/src/analytics/report_builder.rs
index 8e778919..8a00c8cc 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use std::collections::HashMap;
+use std::{collections::HashMap, thread};
 
 use super::metrics::group::{from_individual_metrics, 
from_producers_and_consumers_statistics};
 use crate::utils::get_server_stats;
@@ -83,25 +83,24 @@ impl BenchmarkReportBuilder {
             .cloned()
             .collect();
 
-        if !producer_metrics.is_empty() {
-            if let Some(metrics) = from_individual_metrics(&producer_metrics, 
moving_average_window)
-            {
-                group_metrics.push(metrics);
-            }
-        }
-
-        if !consumer_metrics.is_empty() {
-            if let Some(metrics) = from_individual_metrics(&consumer_metrics, 
moving_average_window)
-            {
-                group_metrics.push(metrics);
-            }
-        }
-
-        if !producing_consumers_metrics.is_empty() {
-            if let Some(metrics) =
-                from_individual_metrics(&producing_consumers_metrics, 
moving_average_window)
-            {
-                group_metrics.push(metrics);
+        let mut join_handles = Vec::new();
+
+        for individual_metric in [
+            &producer_metrics,
+            &consumer_metrics,
+            &producing_consumers_metrics,
+        ] {
+            if !individual_metric.is_empty() {
+                let individual_metric_copy = individual_metric.clone();
+
+                join_handles.push(thread::spawn(move || {
+                    if let Some(metric) =
+                        from_individual_metrics(&individual_metric_copy, 
moving_average_window)
+                    {
+                        return Some(metric);
+                    }
+                    None
+                }));
             }
         }
 
@@ -112,12 +111,21 @@ impl BenchmarkReportBuilder {
         ) && !producer_metrics.is_empty()
             && !consumer_metrics.is_empty()
         {
-            if let Some(metrics) = from_producers_and_consumers_statistics(
-                &producer_metrics,
-                &consumer_metrics,
-                moving_average_window,
-            ) {
-                group_metrics.push(metrics);
+            join_handles.push(thread::spawn(move || {
+                if let Some(metric) = from_producers_and_consumers_statistics(
+                    &producer_metrics,
+                    &consumer_metrics,
+                    moving_average_window,
+                ) {
+                    return Some(metric);
+                }
+                None
+            }));
+        }
+
+        for handle in join_handles {
+            if let Some(metric) = handle.join().expect("Should have computed 
group metric") {
+                group_metrics.push(metric);
             }
         }
 
diff --git a/core/bench/src/analytics/time_series/calculator.rs 
b/core/bench/src/analytics/time_series/calculator.rs
index 42cf211d..b88c64c4 100644
--- a/core/bench/src/analytics/time_series/calculator.rs
+++ b/core/bench/src/analytics/time_series/calculator.rs
@@ -25,6 +25,7 @@ use super::calculators::{
 use crate::analytics::record::BenchmarkRecord;
 use bench_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind};
 use iggy::prelude::IggyDuration;
+use rayon::prelude::*;
 use tracing::warn;
 
 /// Calculate time series data from benchmark records
@@ -64,7 +65,7 @@ impl TimeSeriesCalculator {
         all_times.dedup();
 
         let points = all_times
-            .into_iter()
+            .into_par_iter()
             .map(|time| {
                 let sum: f64 = series
                     .iter()
@@ -101,7 +102,7 @@ impl TimeSeriesCalculator {
         all_times.dedup();
 
         let points = all_times
-            .into_iter()
+            .into_par_iter()
             .map(|time| {
                 let matching_values: Vec<f64> = series
                     .iter()

Reply via email to