This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a13c37d1d0 fix: `RepartitionExec` metrics (#10025)
a13c37d1d0 is described below

commit a13c37d1d0e3cd0a1383d1685e1efdc015bb4bc8
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Apr 10 19:34:53 2024 +0200

    fix: `RepartitionExec` metrics (#10025)
    
    `RepartitionExec` is somewhat special. While most execs operate on
    "input partition = output partition", `RepartitionExec` drives all of
    its work using input-bound tasks. The metrics "fetch time" and
    "repartition time" therefore have to be accounted for the input
    partition, not for the output partition. The only metric that has an
    input & output partition label is the "send time".
    
    Fixes #10015.
---
 datafusion/physical-plan/src/repartition/mod.rs | 37 +++++++++++++------------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 2ed5da7ced..59c71dbf89 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -130,8 +130,7 @@ impl RepartitionExecState {
                 })
                 .collect();
 
-            // TODO: metric input-output mapping is broken
-            let r_metrics = RepartitionMetrics::new(i, 0, &metrics);
+            let r_metrics = RepartitionMetrics::new(i, num_output_partitions, 
&metrics);
 
             let input_task = 
SpawnedTask::spawn(RepartitionExec::pull_from_input(
                 input.clone(),
@@ -411,32 +410,36 @@ struct RepartitionMetrics {
     fetch_time: metrics::Time,
     /// Time in nanos to perform repartitioning
     repartition_time: metrics::Time,
-    /// Time in nanos for sending resulting batches to channels
-    send_time: metrics::Time,
+    /// Time in nanos for sending resulting batches to channels.
+    ///
+    /// One metric per output partition.
+    send_time: Vec<metrics::Time>,
 }
 
 impl RepartitionMetrics {
     pub fn new(
-        output_partition: usize,
         input_partition: usize,
+        num_output_partitions: usize,
         metrics: &ExecutionPlanMetricsSet,
     ) -> Self {
-        let label = metrics::Label::new("inputPartition", 
input_partition.to_string());
-
         // Time in nanos to execute child operator and fetch batches
-        let fetch_time = MetricBuilder::new(metrics)
-            .with_label(label.clone())
-            .subset_time("fetch_time", output_partition);
+        let fetch_time =
+            MetricBuilder::new(metrics).subset_time("fetch_time", 
input_partition);
 
         // Time in nanos to perform repartitioning
-        let repart_time = MetricBuilder::new(metrics)
-            .with_label(label.clone())
-            .subset_time("repart_time", output_partition);
+        let repart_time =
+            MetricBuilder::new(metrics).subset_time("repart_time", 
input_partition);
 
         // Time in nanos for sending resulting batches to channels
-        let send_time = MetricBuilder::new(metrics)
-            .with_label(label)
-            .subset_time("send_time", output_partition);
+        let send_time = (0..num_output_partitions)
+            .map(|output_partition| {
+                let label =
+                    metrics::Label::new("outputPartition", 
output_partition.to_string());
+                MetricBuilder::new(metrics)
+                    .with_label(label)
+                    .subset_time("send_time", input_partition)
+            })
+            .collect();
 
         Self {
             fetch_time,
@@ -786,7 +789,7 @@ impl RepartitionExec {
                 let (partition, batch) = res?;
                 let size = batch.get_array_memory_size();
 
-                let timer = metrics.send_time.timer();
+                let timer = metrics.send_time[partition].timer();
                 // if there is still a receiver, send to it
                 if let Some((tx, reservation)) = 
output_channels.get_mut(&partition) {
                     reservation.lock().try_grow(size)?;

Reply via email to