This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 177daea6 refactor(bench): parallelize benchmark report builder (#1990)
177daea6 is described below
commit 177daea6546850dcae854d62a46e810162f5dc0b
Author: Blake <[email protected]>
AuthorDate: Thu Jul 10 03:17:42 2025 -0400
refactor(bench): parallelize benchmark report builder (#1990)
## Description
Currently, the build() method from BenchmarkReportBuilder is
single-threaded. This PR parallelizes this in 3 places:
1. In build(), spawn a new thread for every call to
`from_individual_metrics` or `from_producers_and_consumers_statistics`.
2. In calculating aggregated time series, spawn one thread for each time
series (MB, msg, latency).
3. In TimeSeriesCalculator, use rayon parallel iterators.
(I also tried implementing multithreading and `par_iter`s in other
places, but those led to slower performance, likely due to threading
overhead. Those are not included in the PR.)
## Measuring speedup
I measured the average runtime of build() with 200 producers and 200
consumers on a Ryzen 9 7950X with 124 GB RAM. build() shows between
3.7-11x speedup, with greater speedup for greater message loads.
| Message count | Single-threaded runtime (ms) | Multi-threaded runtime
(ms) | Speedup |
| ------------- | ---------------------------- |
--------------------------- | ------- |
| 8M messages | 155 | 42 | 3.7x |
| 16M messages | 497 | 74 | 6.7x |
| 32M messages | 1791 | 162 | 11.0x |
Closes #1976
---
Cargo.lock | 1 +
core/bench/Cargo.toml | 1 +
core/bench/src/analytics/metrics/group.rs | 84 +++++++++++++++-------
core/bench/src/analytics/report_builder.rs | 60 +++++++++-------
core/bench/src/analytics/time_series/calculator.rs | 5 +-
5 files changed, 98 insertions(+), 53 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index ef4c02c9..a89b2ecf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3747,6 +3747,7 @@ dependencies = [
"integration",
"nonzero_lit",
"rand 0.9.1",
+ "rayon",
"serde",
"sysinfo 0.35.2",
"tokio",
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index b48ba893..40937ced 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -44,6 +44,7 @@ iggy = { workspace = true }
integration = { workspace = true }
nonzero_lit = { workspace = true }
rand = { workspace = true }
+rayon = "1.10.0"
serde = { workspace = true }
sysinfo = { workspace = true }
tokio = { workspace = true }
diff --git a/core/bench/src/analytics/metrics/group.rs
b/core/bench/src/analytics/metrics/group.rs
index dd585e37..9e0bd4cf 100644
--- a/core/bench/src/analytics/metrics/group.rs
+++ b/core/bench/src/analytics/metrics/group.rs
@@ -29,8 +29,10 @@ use bench_report::{
group_metrics_kind::GroupMetricsKind,
group_metrics_summary::BenchmarkGroupMetricsSummary,
individual_metrics::BenchmarkIndividualMetrics,
+ time_series::{TimeSeries, TimeSeriesKind},
utils::{max, min, std_dev},
};
+use std::thread;
pub fn from_producers_and_consumers_statistics(
producers_stats: &[BenchmarkIndividualMetrics],
@@ -155,37 +157,69 @@ fn determine_group_kind(stats:
&[BenchmarkIndividualMetrics]) -> GroupMetricsKin
}
}
-use bench_report::time_series::TimeSeries;
-
fn calculate_group_time_series(
stats: &[BenchmarkIndividualMetrics],
moving_average_window: u32,
) -> (TimeSeries, TimeSeries, TimeSeries) {
- let mut avg_throughput_mb_ts = TimeSeriesCalculator::aggregate_sum(
- &stats
- .iter()
- .map(|r| r.throughput_mb_ts.clone())
- .collect::<Vec<_>>(),
- );
- let mut avg_throughput_msg_ts = TimeSeriesCalculator::aggregate_sum(
- &stats
- .iter()
- .map(|r| r.throughput_msg_ts.clone())
- .collect::<Vec<_>>(),
- );
- let mut avg_latency_ts = TimeSeriesCalculator::aggregate_avg(
- &stats
- .iter()
- .map(|r| r.latency_ts.clone())
- .collect::<Vec<_>>(),
- );
-
let sma = MovingAverageProcessor::new(moving_average_window as usize);
- avg_throughput_mb_ts = sma.process(&avg_throughput_mb_ts);
- avg_throughput_msg_ts = sma.process(&avg_throughput_msg_ts);
- avg_latency_ts = sma.process(&avg_latency_ts);
- (avg_throughput_mb_ts, avg_throughput_msg_ts, avg_latency_ts)
+ thread::scope(|scope| {
+ let mut join_handles = Vec::new();
+
+ join_handles.push(scope.spawn(|| {
+ let avg_throughput_mb_ts = TimeSeriesCalculator::aggregate_sum(
+ &stats
+ .iter()
+ .map(|r| r.throughput_mb_ts.clone())
+ .collect::<Vec<_>>(),
+ );
+ sma.process(&avg_throughput_mb_ts)
+ }));
+ join_handles.push(scope.spawn(|| {
+ let avg_throughput_msg_ts = TimeSeriesCalculator::aggregate_sum(
+ &stats
+ .iter()
+ .map(|r| r.throughput_msg_ts.clone())
+ .collect::<Vec<_>>(),
+ );
+ sma.process(&avg_throughput_msg_ts)
+ }));
+ join_handles.push(scope.spawn(|| {
+ let avg_latency_ts = TimeSeriesCalculator::aggregate_avg(
+ &stats
+ .iter()
+ .map(|r| r.latency_ts.clone())
+ .collect::<Vec<_>>(),
+ );
+ sma.process(&avg_latency_ts)
+ }));
+
+ let mut time_series: Vec<_> = join_handles
+ .into_iter()
+ .map(|handle| {
+ handle
+ .join()
+ .expect("Should not fail to compute aggregated time
series")
+ })
+ .collect();
+
+ (
+ extract_time_series_of_kind(&mut time_series,
TimeSeriesKind::ThroughputMB),
+ extract_time_series_of_kind(&mut time_series,
TimeSeriesKind::ThroughputMsg),
+ extract_time_series_of_kind(&mut time_series,
TimeSeriesKind::Latency),
+ )
+ })
+}
+
+fn extract_time_series_of_kind(
+ ts_vec: &mut Vec<TimeSeries>,
+ target_kind: TimeSeriesKind,
+) -> TimeSeries {
+ let position = ts_vec
+ .iter()
+ .position(|ts| ts.kind == target_kind)
+ .expect("Should be able to find TimeSeries of target kind");
+ ts_vec.swap_remove(position)
}
fn calculate_min_max_latencies(
diff --git a/core/bench/src/analytics/report_builder.rs
b/core/bench/src/analytics/report_builder.rs
index 8e778919..8a00c8cc 100644
--- a/core/bench/src/analytics/report_builder.rs
+++ b/core/bench/src/analytics/report_builder.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use std::collections::HashMap;
+use std::{collections::HashMap, thread};
use super::metrics::group::{from_individual_metrics,
from_producers_and_consumers_statistics};
use crate::utils::get_server_stats;
@@ -83,25 +83,24 @@ impl BenchmarkReportBuilder {
.cloned()
.collect();
- if !producer_metrics.is_empty() {
- if let Some(metrics) = from_individual_metrics(&producer_metrics,
moving_average_window)
- {
- group_metrics.push(metrics);
- }
- }
-
- if !consumer_metrics.is_empty() {
- if let Some(metrics) = from_individual_metrics(&consumer_metrics,
moving_average_window)
- {
- group_metrics.push(metrics);
- }
- }
-
- if !producing_consumers_metrics.is_empty() {
- if let Some(metrics) =
- from_individual_metrics(&producing_consumers_metrics,
moving_average_window)
- {
- group_metrics.push(metrics);
+ let mut join_handles = Vec::new();
+
+ for individual_metric in [
+ &producer_metrics,
+ &consumer_metrics,
+ &producing_consumers_metrics,
+ ] {
+ if !individual_metric.is_empty() {
+ let individual_metric_copy = individual_metric.clone();
+
+ join_handles.push(thread::spawn(move || {
+ if let Some(metric) =
+ from_individual_metrics(&individual_metric_copy,
moving_average_window)
+ {
+ return Some(metric);
+ }
+ None
+ }));
}
}
@@ -112,12 +111,21 @@ impl BenchmarkReportBuilder {
) && !producer_metrics.is_empty()
&& !consumer_metrics.is_empty()
{
- if let Some(metrics) = from_producers_and_consumers_statistics(
- &producer_metrics,
- &consumer_metrics,
- moving_average_window,
- ) {
- group_metrics.push(metrics);
+ join_handles.push(thread::spawn(move || {
+ if let Some(metric) = from_producers_and_consumers_statistics(
+ &producer_metrics,
+ &consumer_metrics,
+ moving_average_window,
+ ) {
+ return Some(metric);
+ }
+ None
+ }));
+ }
+
+ for handle in join_handles {
+ if let Some(metric) = handle.join().expect("Should have computed
group metric") {
+ group_metrics.push(metric);
}
}
diff --git a/core/bench/src/analytics/time_series/calculator.rs
b/core/bench/src/analytics/time_series/calculator.rs
index 42cf211d..b88c64c4 100644
--- a/core/bench/src/analytics/time_series/calculator.rs
+++ b/core/bench/src/analytics/time_series/calculator.rs
@@ -25,6 +25,7 @@ use super::calculators::{
use crate::analytics::record::BenchmarkRecord;
use bench_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind};
use iggy::prelude::IggyDuration;
+use rayon::prelude::*;
use tracing::warn;
/// Calculate time series data from benchmark records
@@ -64,7 +65,7 @@ impl TimeSeriesCalculator {
all_times.dedup();
let points = all_times
- .into_iter()
+ .into_par_iter()
.map(|time| {
let sum: f64 = series
.iter()
@@ -101,7 +102,7 @@ impl TimeSeriesCalculator {
all_times.dedup();
let points = all_times
- .into_iter()
+ .into_par_iter()
.map(|time| {
let matching_values: Vec<f64> = series
.iter()