This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6c0670d1c4 Add spilling related metrics for aggregation (#12888)
6c0670d1c4 is described below
commit 6c0670d1c42bf13b74c5edf6880f044f8ca3b818
Author: Yongting You <[email protected]>
AuthorDate: Mon Oct 14 19:49:40 2024 +0800
Add spilling related metrics for aggregation (#12888)
* External aggregation metrics
* clippy
---
datafusion/physical-plan/src/aggregates/mod.rs | 12 +++++++
.../physical-plan/src/aggregates/row_hash.rs | 41 ++++++++++++++++++++--
2 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index d6f16fb0fd..296c5811e5 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1686,12 +1686,24 @@ mod tests {
let metrics = merged_aggregate.metrics().unwrap();
let output_rows = metrics.output_rows().unwrap();
+ let spill_count = metrics.spill_count().unwrap();
+ let spilled_bytes = metrics.spilled_bytes().unwrap();
+ let spilled_rows = metrics.spilled_rows().unwrap();
+
if spill {
// When spilling, the output rows metrics become partial output
size + final output size
// This is because final aggregation starts while partial
aggregation is still emitting
assert_eq!(8, output_rows);
+
+ assert!(spill_count > 0);
+ assert!(spilled_bytes > 0);
+ assert!(spilled_rows > 0);
} else {
assert_eq!(3, output_rows);
+
+ assert_eq!(0, spill_count);
+ assert_eq!(0, spilled_bytes);
+ assert_eq!(0, spilled_rows);
}
Ok(())
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 5121e6cc3b..624844b6b9 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -102,6 +102,19 @@ struct SpillState {
/// true when streaming merge is in progress
is_stream_merging: bool,
+
+ // ========================================================================
+ // METRICS:
+ // ========================================================================
+ /// Peak memory used for buffered data.
+ /// Calculated as sum of peak memory values across partitions
+ peak_mem_used: metrics::Gauge,
+ /// count of spill files during the execution of the operator
+ spill_count: metrics::Count,
+ /// total spilled bytes during the execution of the operator
+ spilled_bytes: metrics::Count,
+ /// total spilled rows during the execution of the operator
+ spilled_rows: metrics::Count,
}
/// Tracks if the aggregate should skip partial aggregations
@@ -138,6 +151,9 @@ struct SkipAggregationProbe {
/// make any effect (set either while probing or on probing completion)
is_locked: bool,
+ // ========================================================================
+ // METRICS:
+ // ========================================================================
/// Number of rows where state was output without aggregation.
///
/// * If 0, all input rows were aggregated (should_skip was always false)
@@ -510,6 +526,11 @@ impl GroupedHashAggregateStream {
is_stream_merging: false,
merging_aggregate_arguments,
merging_group_by:
PhysicalGroupBy::new_single(agg_group_by.expr.clone()),
+ peak_mem_used: MetricBuilder::new(&agg.metrics)
+ .gauge("peak_mem_used", partition),
+ spill_count:
MetricBuilder::new(&agg.metrics).spill_count(partition),
+ spilled_bytes:
MetricBuilder::new(&agg.metrics).spilled_bytes(partition),
+ spilled_rows:
MetricBuilder::new(&agg.metrics).spilled_rows(partition),
};
// Skip aggregation is supported if:
@@ -865,11 +886,19 @@ impl GroupedHashAggregateStream {
fn update_memory_reservation(&mut self) -> Result<()> {
let acc = self.accumulators.iter().map(|x| x.size()).sum::<usize>();
- self.reservation.try_resize(
+ let reservation_result = self.reservation.try_resize(
acc + self.group_values.size()
+ self.group_ordering.size()
+ self.current_group_indices.allocated_size(),
- )
+ );
+
+ if reservation_result.is_ok() {
+ self.spill_state
+ .peak_mem_used
+ .set_max(self.reservation.size());
+ }
+
+ reservation_result
}
/// Create an output RecordBatch with the group keys and
@@ -946,6 +975,14 @@ impl GroupedHashAggregateStream {
self.batch_size,
)?;
self.spill_state.spills.push(spillfile);
+
+ // Update metrics
+ self.spill_state.spill_count.add(1);
+ self.spill_state
+ .spilled_bytes
+ .add(sorted.get_array_memory_size());
+ self.spill_state.spilled_rows.add(sorted.num_rows());
+
Ok(())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]