This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2837e02b7e minor: split repartition time and send time metrics (#11440)
2837e02b7e is described below

commit 2837e02b7ec7dfbca576451e63db25b84ed2c97d
Author: Eduard Karacharov <[email protected]>
AuthorDate: Tue Jul 16 13:23:48 2024 +0300

    minor: split repartition time and send time metrics (#11440)
---
 datafusion/physical-plan/src/repartition/mod.rs | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 3d4d305839..e5c506403f 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -261,6 +261,7 @@ impl BatchPartitioner {
                     num_partitions: partitions,
                     hash_buffer,
                 } => {
+                    // Tracking time required for distributing indexes across 
output partitions
                     let timer = self.timer.timer();
 
                     let arrays = exprs
@@ -282,6 +283,11 @@ impl BatchPartitioner {
                             .append_value(index as u64);
                     }
 
+                    // Finished building index-arrays for output partitions
+                    timer.done();
+
+                    // Borrowing partitioner timer to prevent moving `self` to 
closure
+                    let partitioner_timer = &self.timer;
                     let it = indices
                         .into_iter()
                         .enumerate()
@@ -290,6 +296,9 @@ impl BatchPartitioner {
                             (!indices.is_empty()).then_some((partition, 
indices))
                         })
                         .map(move |(partition, indices)| {
+                            // Tracking time required for repartitioned 
batches construction
+                            let _timer = partitioner_timer.timer();
+
                             // Produce batches based on indices
                             let columns = batch
                                 .columns()
@@ -303,9 +312,6 @@ impl BatchPartitioner {
                             let batch =
                                 RecordBatch::try_new(batch.schema(), 
columns).unwrap();
 
-                            // bind timer so it drops w/ this iterator
-                            let _ = &timer;
-
                             Ok((partition, batch))
                         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to