This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1c63759 Add spill_count and spilled_bytes to baseline metrics, test
sort with spill of metrics (#1641)
1c63759 is described below
commit 1c63759b15dec34c7211e19b7a45eed055b08199
Author: Yijie Shen <[email protected]>
AuthorDate: Sat Jan 22 20:51:35 2022 +0800
Add spill_count and spilled_bytes to baseline metrics, test sort with spill
of metrics (#1641)
---
datafusion/src/physical_plan/metrics/baseline.rs | 24 +++++
datafusion/src/physical_plan/metrics/builder.rs | 18 ++++
datafusion/src/physical_plan/metrics/mod.rs | 14 +++
datafusion/src/physical_plan/metrics/value.rs | 27 ++++-
datafusion/src/physical_plan/sorts/sort.rs | 130 +++++++++++++++++++----
5 files changed, 186 insertions(+), 27 deletions(-)
diff --git a/datafusion/src/physical_plan/metrics/baseline.rs
b/datafusion/src/physical_plan/metrics/baseline.rs
index b007d07..4c3ab6f 100644
--- a/datafusion/src/physical_plan/metrics/baseline.rs
+++ b/datafusion/src/physical_plan/metrics/baseline.rs
@@ -50,6 +50,12 @@ pub struct BaselineMetrics {
/// amount of time the operator was actively trying to use the CPU
elapsed_compute: Time,
+ /// count of spills during the execution of the operator
+ spill_count: Count,
+
+ /// total spilled bytes during the execution of the operator
+ spilled_bytes: Count,
+
/// output rows: the total output rows
output_rows: Count,
}
@@ -63,6 +69,8 @@ impl BaselineMetrics {
Self {
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
elapsed_compute:
MetricBuilder::new(metrics).elapsed_compute(partition),
+ spill_count: MetricBuilder::new(metrics).spill_count(partition),
+ spilled_bytes:
MetricBuilder::new(metrics).spilled_bytes(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}
@@ -72,6 +80,22 @@ impl BaselineMetrics {
&self.elapsed_compute
}
+ /// return the metric for the total number of spills triggered during
execution
+ pub fn spill_count(&self) -> &Count {
+ &self.spill_count
+ }
+
+ /// return the metric for the total spilled bytes during execution
+ pub fn spilled_bytes(&self) -> &Count {
+ &self.spilled_bytes
+ }
+
+ /// Record a spill of `spilled_bytes` size.
+ pub fn record_spill(&self, spilled_bytes: usize) {
+ self.spill_count.add(1);
+ self.spilled_bytes.add(spilled_bytes);
+ }
+
/// return the metric for the total number of output rows produced
pub fn output_rows(&self) -> &Count {
&self.output_rows
diff --git a/datafusion/src/physical_plan/metrics/builder.rs
b/datafusion/src/physical_plan/metrics/builder.rs
index 510366b..13ffede 100644
--- a/datafusion/src/physical_plan/metrics/builder.rs
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -105,6 +105,24 @@ impl<'a> MetricBuilder<'a> {
count
}
+ /// Consume self and create a new counter for recording the number of
spills
+ /// triggered by an operator
+ pub fn spill_count(self, partition: usize) -> Count {
+ let count = Count::new();
+ self.with_partition(partition)
+ .build(MetricValue::SpillCount(count.clone()));
+ count
+ }
+
+ /// Consume self and create a new counter for recording the total spilled
bytes
+ /// triggered by an operator
+ pub fn spilled_bytes(self, partition: usize) -> Count {
+ let count = Count::new();
+ self.with_partition(partition)
+ .build(MetricValue::SpilledBytes(count.clone()));
+ count
+ }
+
/// Consumes self and creates a new [`Count`] for recording some
/// arbitrary metric of an operator.
pub fn counter(
diff --git a/datafusion/src/physical_plan/metrics/mod.rs
b/datafusion/src/physical_plan/metrics/mod.rs
index 089550c..232583f 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -191,6 +191,20 @@ impl MetricsSet {
.map(|v| v.as_usize())
}
+ /// convenience: return the count of spills, aggregated
+ /// across partitions or None if no metric is present
+ pub fn spill_count(&self) -> Option<usize> {
+ self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
+ .map(|v| v.as_usize())
+ }
+
+ /// convenience: return the total byte size of spills, aggregated
+ /// across partitions or None if no metric is present
+ pub fn spilled_bytes(&self) -> Option<usize> {
+ self.sum(|metric| matches!(metric.value(),
MetricValue::SpilledBytes(_)))
+ .map(|v| v.as_usize())
+ }
+
/// convenience: return the amount of elapsed CPU time spent,
/// aggregated across partitions or None if no metric is present
pub fn elapsed_compute(&self) -> Option<usize> {
diff --git a/datafusion/src/physical_plan/metrics/value.rs
b/datafusion/src/physical_plan/metrics/value.rs
index 6944aab..8c1097f 100644
--- a/datafusion/src/physical_plan/metrics/value.rs
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -283,6 +283,10 @@ pub enum MetricValue {
/// classical defintion of "cpu_time", which is the time reported
/// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
ElapsedCompute(Time),
+ /// Number of spills produced: "spill_count" metric
+ SpillCount(Count),
+ /// Total size of spilled bytes produced: "spilled_bytes" metric
+ SpilledBytes(Count),
/// Operator defined count.
Count {
/// The provided name of this metric
@@ -308,6 +312,8 @@ impl MetricValue {
pub fn name(&self) -> &str {
match self {
Self::OutputRows(_) => "output_rows",
+ Self::SpillCount(_) => "spill_count",
+ Self::SpilledBytes(_) => "spilled_bytes",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
@@ -320,6 +326,8 @@ impl MetricValue {
pub fn as_usize(&self) -> usize {
match self {
Self::OutputRows(count) => count.value(),
+ Self::SpillCount(count) => count.value(),
+ Self::SpilledBytes(bytes) => bytes.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Time { time, .. } => time.value(),
@@ -339,6 +347,8 @@ impl MetricValue {
pub fn new_empty(&self) -> Self {
match self {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
+ Self::SpillCount(_) => Self::SpillCount(Count::new()),
+ Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
@@ -365,6 +375,8 @@ impl MetricValue {
pub fn aggregate(&mut self, other: &Self) {
match (self, other) {
(Self::OutputRows(count), Self::OutputRows(other_count))
+ | (Self::SpillCount(count), Self::SpillCount(other_count))
+ | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
| (
Self::Count { count, .. },
Self::Count {
@@ -401,10 +413,12 @@ impl MetricValue {
match self {
Self::OutputRows(_) => 0, // show first
Self::ElapsedCompute(_) => 1, // show second
- Self::Count { .. } => 2,
- Self::Time { .. } => 3,
- Self::StartTimestamp(_) => 4, // show timestamps last
- Self::EndTimestamp(_) => 5,
+ Self::SpillCount(_) => 2,
+ Self::SpilledBytes(_) => 3,
+ Self::Count { .. } => 4,
+ Self::Time { .. } => 5,
+ Self::StartTimestamp(_) => 6, // show timestamps last
+ Self::EndTimestamp(_) => 7,
}
}
@@ -418,7 +432,10 @@ impl std::fmt::Display for MetricValue {
/// Prints the value of this metric
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
- Self::OutputRows(count) | Self::Count { count, .. } => {
+ Self::OutputRows(count)
+ | Self::SpillCount(count)
+ | Self::SpilledBytes(count)
+ | Self::Count { count, .. } => {
write!(f, "{}", count)
}
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
diff --git a/datafusion/src/physical_plan/sorts/sort.rs
b/datafusion/src/physical_plan/sorts/sort.rs
index c3a138e..5ea5a72 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -76,9 +76,8 @@ struct ExternalSorter {
expr: Vec<PhysicalSortExpr>,
runtime: Arc<RuntimeEnv>,
metrics: AggregatedMetricsSet,
+ inner_metrics: BaselineMetrics,
used: AtomicUsize,
- spilled_bytes: AtomicUsize,
- spilled_count: AtomicUsize,
}
impl ExternalSorter {
@@ -89,6 +88,7 @@ impl ExternalSorter {
metrics: AggregatedMetricsSet,
runtime: Arc<RuntimeEnv>,
) -> Self {
+ let inner_metrics = metrics.new_intermediate_baseline(partition_id);
Self {
id: MemoryConsumerId::new(partition_id),
schema,
@@ -97,9 +97,8 @@ impl ExternalSorter {
expr,
runtime,
metrics,
+ inner_metrics,
used: AtomicUsize::new(0),
- spilled_bytes: AtomicUsize::new(0),
- spilled_count: AtomicUsize::new(0),
}
}
@@ -170,11 +169,11 @@ impl ExternalSorter {
}
fn spilled_bytes(&self) -> usize {
- self.spilled_bytes.load(Ordering::SeqCst)
+ self.inner_metrics.spilled_bytes().value()
}
- fn spilled_count(&self) -> usize {
- self.spilled_count.load(Ordering::SeqCst)
+ fn spill_count(&self) -> usize {
+ self.inner_metrics.spill_count().value()
}
}
@@ -184,7 +183,7 @@ impl Debug for ExternalSorter {
.field("id", &self.id())
.field("memory_used", &self.used())
.field("spilled_bytes", &self.spilled_bytes())
- .field("spilled_count", &self.spilled_count())
+ .field("spill_count", &self.spill_count())
.finish()
}
}
@@ -213,7 +212,7 @@ impl MemoryConsumer for ExternalSorter {
self.name(),
self.id(),
self.used(),
- self.spilled_count()
+ self.spill_count()
);
let partition = self.partition_id();
@@ -239,8 +238,7 @@ impl MemoryConsumer for ExternalSorter {
let mut spills = self.spills.lock().await;
let used = self.used.swap(0, Ordering::SeqCst);
- self.spilled_count.fetch_add(1, Ordering::SeqCst);
- self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+ self.inner_metrics.record_spill(total_size);
spills.push(path);
Ok(used)
}
@@ -368,7 +366,7 @@ pub struct SortExec {
}
#[derive(Debug, Clone)]
-/// Aggregates all metrics during a complex operation, which is composed of
multiple stages and
+/// Aggregates all metrics during a complex operation, which is composed of
multiple steps and
/// each stage reports its statistics separately.
/// Give sort as an example, when the dataset is more significant than
available memory, it will report
/// multiple in-mem sort metrics and final merge-sort metrics from
`SortPreservingMergeStream`.
@@ -401,7 +399,7 @@ impl AggregatedMetricsSet {
result
}
- /// We should accumulate all times from all stages' reports for the total
time consumption.
+ /// We should accumulate all times from all steps' reports for the total
time consumption.
fn merge_compute_time(&self, dest: &Time) {
let time1 = self
.intermediate
@@ -429,6 +427,46 @@ impl AggregatedMetricsSet {
dest.add_duration(Duration::from_nanos(time2));
}
+ /// We should accumulate all count from all steps' reports for the total
spill count.
+ fn merge_spill_count(&self, dest: &Count) {
+ let count1 = self
+ .intermediate
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+ .sum();
+ let count2 = self
+ .final_
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+ .sum();
+ dest.add(count1);
+ dest.add(count2);
+ }
+
+ /// We should accumulate all spilled bytes from all steps' reports for the
total spilled bytes.
+ fn merge_spilled_bytes(&self, dest: &Count) {
+ let count1 = self
+ .intermediate
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+ .sum();
+ let count2 = self
+ .final_
+ .lock()
+ .unwrap()
+ .iter()
+ .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+ .sum();
+ dest.add(count1);
+ dest.add(count2);
+ }
+
/// We should only care about output from the final stage metrics.
fn merge_output_count(&self, dest: &Count) {
let count = self
@@ -561,6 +599,9 @@ impl ExecutionPlan for SortExec {
let baseline = BaselineMetrics::new(&metrics, 0);
self.all_metrics
.merge_compute_time(baseline.elapsed_compute());
+ self.all_metrics.merge_spill_count(baseline.spill_count());
+ self.all_metrics
+ .merge_spilled_bytes(baseline.spilled_bytes());
self.all_metrics.merge_output_count(baseline.output_rows());
Some(metrics.clone_inner())
}
@@ -668,7 +709,9 @@ mod tests {
use futures::FutureExt;
use std::collections::{BTreeMap, HashMap};
- async fn sort_with_runtime(runtime: Arc<RuntimeEnv>) ->
Result<Vec<RecordBatch>> {
+ #[tokio::test]
+ async fn test_in_mem_sort() -> Result<()> {
+ let runtime = Arc::new(RuntimeEnv::default());
let schema = test_util::aggr_test_schema();
let partitions = 4;
let (_, files) =
@@ -709,13 +752,7 @@ mod tests {
Arc::new(CoalescePartitionsExec::new(Arc::new(csv))),
)?);
- collect(sort_exec, runtime).await
- }
-
- #[tokio::test]
- async fn test_in_mem_sort() -> Result<()> {
- let runtime = Arc::new(RuntimeEnv::default());
- let result = sort_with_runtime(runtime).await?;
+ let result = collect(sort_exec, runtime).await?;
assert_eq!(result.len(), 1);
@@ -743,10 +780,59 @@ mod tests {
// trigger spill there will be 4 batches with 5.5KB for each
.with_max_execution_memory(12288);
let runtime = Arc::new(RuntimeEnv::new(config)?);
- let result = sort_with_runtime(runtime).await?;
+
+ let schema = test_util::aggr_test_schema();
+ let partitions = 4;
+ let (_, files) =
+ test::create_partitioned_csv("aggregate_test_100.csv",
partitions)?;
+
+ let csv = CsvExec::new(
+ FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_schema: Arc::clone(&schema),
+ file_groups: files,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ true,
+ b',',
+ );
+
+ let sort_exec = Arc::new(SortExec::try_new(
+ vec![
+ // c1 string column
+ PhysicalSortExpr {
+ expr: col("c1", &schema)?,
+ options: SortOptions::default(),
+ },
+ // c2 uin32 column
+ PhysicalSortExpr {
+ expr: col("c2", &schema)?,
+ options: SortOptions::default(),
+ },
+ // c7 uin8 column
+ PhysicalSortExpr {
+ expr: col("c7", &schema)?,
+ options: SortOptions::default(),
+ },
+ ],
+ Arc::new(CoalescePartitionsExec::new(Arc::new(csv))),
+ )?);
+
+ let result = collect(sort_exec.clone(), runtime).await?;
assert_eq!(result.len(), 1);
+ // Now, validate metrics
+ let metrics = sort_exec.metrics().unwrap();
+
+ assert_eq!(metrics.output_rows().unwrap(), 100);
+ assert!(metrics.elapsed_compute().unwrap() > 0);
+ assert!(metrics.spill_count().unwrap() > 0);
+ assert!(metrics.spilled_bytes().unwrap() > 0);
+
let columns = result[0].columns();
let c1 = as_string_array(&columns[0]);