This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a13c37d1d0 fix: `RepartitionExec` metrics (#10025)
a13c37d1d0 is described below
commit a13c37d1d0e3cd0a1383d1685e1efdc015bb4bc8
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Apr 10 19:34:53 2024 +0200
fix: `RepartitionExec` metrics (#10025)
`RepartitionExec` is somewhat special. While most execs operate on
"input partition = output partition", `RepartitionExec` drives all of
its work using input-bound tasks. The metrics "fetch time" and
"repartition time" therefore have to be accounted for the input
partition, not for the output partition. The only metric that has an
input & output partition label is the "send time".
Fixes #10015.
---
datafusion/physical-plan/src/repartition/mod.rs | 37 +++++++++++++------------
1 file changed, 20 insertions(+), 17 deletions(-)
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 2ed5da7ced..59c71dbf89 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -130,8 +130,7 @@ impl RepartitionExecState {
})
.collect();
- // TODO: metric input-output mapping is broken
- let r_metrics = RepartitionMetrics::new(i, 0, &metrics);
+ let r_metrics = RepartitionMetrics::new(i, num_output_partitions,
&metrics);
let input_task =
SpawnedTask::spawn(RepartitionExec::pull_from_input(
input.clone(),
@@ -411,32 +410,36 @@ struct RepartitionMetrics {
fetch_time: metrics::Time,
/// Time in nanos to perform repartitioning
repartition_time: metrics::Time,
- /// Time in nanos for sending resulting batches to channels
- send_time: metrics::Time,
+ /// Time in nanos for sending resulting batches to channels.
+ ///
+ /// One metric per output partition.
+ send_time: Vec<metrics::Time>,
}
impl RepartitionMetrics {
pub fn new(
- output_partition: usize,
input_partition: usize,
+ num_output_partitions: usize,
metrics: &ExecutionPlanMetricsSet,
) -> Self {
- let label = metrics::Label::new("inputPartition",
input_partition.to_string());
-
// Time in nanos to execute child operator and fetch batches
- let fetch_time = MetricBuilder::new(metrics)
- .with_label(label.clone())
- .subset_time("fetch_time", output_partition);
+ let fetch_time =
+ MetricBuilder::new(metrics).subset_time("fetch_time",
input_partition);
// Time in nanos to perform repartitioning
- let repart_time = MetricBuilder::new(metrics)
- .with_label(label.clone())
- .subset_time("repart_time", output_partition);
+ let repart_time =
+ MetricBuilder::new(metrics).subset_time("repart_time",
input_partition);
// Time in nanos for sending resulting batches to channels
- let send_time = MetricBuilder::new(metrics)
- .with_label(label)
- .subset_time("send_time", output_partition);
+ let send_time = (0..num_output_partitions)
+ .map(|output_partition| {
+ let label =
+ metrics::Label::new("outputPartition",
output_partition.to_string());
+ MetricBuilder::new(metrics)
+ .with_label(label)
+ .subset_time("send_time", input_partition)
+ })
+ .collect();
Self {
fetch_time,
@@ -786,7 +789,7 @@ impl RepartitionExec {
let (partition, batch) = res?;
let size = batch.get_array_memory_size();
- let timer = metrics.send_time.timer();
+ let timer = metrics.send_time[partition].timer();
// if there is still a receiver, send to it
if let Some((tx, reservation)) =
output_channels.get_mut(&partition) {
reservation.lock().try_grow(size)?;