This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c0670d1c4 Add spilling related metrics for aggregation (#12888)
6c0670d1c4 is described below

commit 6c0670d1c42bf13b74c5edf6880f044f8ca3b818
Author: Yongting You <[email protected]>
AuthorDate: Mon Oct 14 19:49:40 2024 +0800

    Add spilling related metrics for aggregation (#12888)
    
    * External aggregation metrics
    
    * clippy
---
 datafusion/physical-plan/src/aggregates/mod.rs     | 12 +++++++
 .../physical-plan/src/aggregates/row_hash.rs       | 41 ++++++++++++++++++++--
 2 files changed, 51 insertions(+), 2 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index d6f16fb0fd..296c5811e5 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1686,12 +1686,24 @@ mod tests {
 
         let metrics = merged_aggregate.metrics().unwrap();
         let output_rows = metrics.output_rows().unwrap();
+        let spill_count = metrics.spill_count().unwrap();
+        let spilled_bytes = metrics.spilled_bytes().unwrap();
+        let spilled_rows = metrics.spilled_rows().unwrap();
+
         if spill {
             // When spilling, the output rows metrics become partial output 
size + final output size
             // This is because final aggregation starts while partial 
aggregation is still emitting
             assert_eq!(8, output_rows);
+
+            assert!(spill_count > 0);
+            assert!(spilled_bytes > 0);
+            assert!(spilled_rows > 0);
         } else {
             assert_eq!(3, output_rows);
+
+            assert_eq!(0, spill_count);
+            assert_eq!(0, spilled_bytes);
+            assert_eq!(0, spilled_rows);
         }
 
         Ok(())
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 5121e6cc3b..624844b6b9 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -102,6 +102,19 @@ struct SpillState {
 
     /// true when streaming merge is in progress
     is_stream_merging: bool,
+
+    // ========================================================================
+    // METRICS:
+    // ========================================================================
+    /// Peak memory used for buffered data.
+    /// Calculated as sum of peak memory values across partitions
+    peak_mem_used: metrics::Gauge,
+    /// count of spill files during the execution of the operator
+    spill_count: metrics::Count,
+    /// total spilled bytes during the execution of the operator
+    spilled_bytes: metrics::Count,
+    /// total spilled rows during the execution of the operator
+    spilled_rows: metrics::Count,
 }
 
 /// Tracks if the aggregate should skip partial aggregations
@@ -138,6 +151,9 @@ struct SkipAggregationProbe {
     /// make any effect (set either while probing or on probing completion)
     is_locked: bool,
 
+    // ========================================================================
+    // METRICS:
+    // ========================================================================
     /// Number of rows where state was output without aggregation.
     ///
     /// * If 0, all input rows were aggregated (should_skip was always false)
@@ -510,6 +526,11 @@ impl GroupedHashAggregateStream {
             is_stream_merging: false,
             merging_aggregate_arguments,
             merging_group_by: 
PhysicalGroupBy::new_single(agg_group_by.expr.clone()),
+            peak_mem_used: MetricBuilder::new(&agg.metrics)
+                .gauge("peak_mem_used", partition),
+            spill_count: 
MetricBuilder::new(&agg.metrics).spill_count(partition),
+            spilled_bytes: 
MetricBuilder::new(&agg.metrics).spilled_bytes(partition),
+            spilled_rows: 
MetricBuilder::new(&agg.metrics).spilled_rows(partition),
         };
 
         // Skip aggregation is supported if:
@@ -865,11 +886,19 @@ impl GroupedHashAggregateStream {
 
     fn update_memory_reservation(&mut self) -> Result<()> {
         let acc = self.accumulators.iter().map(|x| x.size()).sum::<usize>();
-        self.reservation.try_resize(
+        let reservation_result = self.reservation.try_resize(
             acc + self.group_values.size()
                 + self.group_ordering.size()
                 + self.current_group_indices.allocated_size(),
-        )
+        );
+
+        if reservation_result.is_ok() {
+            self.spill_state
+                .peak_mem_used
+                .set_max(self.reservation.size());
+        }
+
+        reservation_result
     }
 
     /// Create an output RecordBatch with the group keys and
@@ -946,6 +975,14 @@ impl GroupedHashAggregateStream {
             self.batch_size,
         )?;
         self.spill_state.spills.push(spillfile);
+
+        // Update metrics
+        self.spill_state.spill_count.add(1);
+        self.spill_state
+            .spilled_bytes
+            .add(sorted.get_array_memory_size());
+        self.spill_state.spilled_rows.add(sorted.num_rows());
+
         Ok(())
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to