This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c63759  Add spill_count and spilled_bytes to baseline metrics, test 
sort with spill of metrics (#1641)
1c63759 is described below

commit 1c63759b15dec34c7211e19b7a45eed055b08199
Author: Yijie Shen <[email protected]>
AuthorDate: Sat Jan 22 20:51:35 2022 +0800

    Add spill_count and spilled_bytes to baseline metrics, test sort with spill 
of metrics (#1641)
---
 datafusion/src/physical_plan/metrics/baseline.rs |  24 +++++
 datafusion/src/physical_plan/metrics/builder.rs  |  18 ++++
 datafusion/src/physical_plan/metrics/mod.rs      |  14 +++
 datafusion/src/physical_plan/metrics/value.rs    |  27 ++++-
 datafusion/src/physical_plan/sorts/sort.rs       | 130 +++++++++++++++++++----
 5 files changed, 186 insertions(+), 27 deletions(-)

diff --git a/datafusion/src/physical_plan/metrics/baseline.rs 
b/datafusion/src/physical_plan/metrics/baseline.rs
index b007d07..4c3ab6f 100644
--- a/datafusion/src/physical_plan/metrics/baseline.rs
+++ b/datafusion/src/physical_plan/metrics/baseline.rs
@@ -50,6 +50,12 @@ pub struct BaselineMetrics {
     /// amount of time the operator was actively trying to use the CPU
     elapsed_compute: Time,
 
+    /// count of spills during the execution of the operator
+    spill_count: Count,
+
+    /// total spilled bytes during the execution of the operator
+    spilled_bytes: Count,
+
     /// output rows: the total output rows
     output_rows: Count,
 }
@@ -63,6 +69,8 @@ impl BaselineMetrics {
         Self {
             end_time: MetricBuilder::new(metrics).end_timestamp(partition),
             elapsed_compute: 
MetricBuilder::new(metrics).elapsed_compute(partition),
+            spill_count: MetricBuilder::new(metrics).spill_count(partition),
+            spilled_bytes: 
MetricBuilder::new(metrics).spilled_bytes(partition),
             output_rows: MetricBuilder::new(metrics).output_rows(partition),
         }
     }
@@ -72,6 +80,22 @@ impl BaselineMetrics {
         &self.elapsed_compute
     }
 
+    /// return the metric for the total number of spills triggered during 
execution
+    pub fn spill_count(&self) -> &Count {
+        &self.spill_count
+    }
+
+    /// return the metric for the total spilled bytes during execution
+    pub fn spilled_bytes(&self) -> &Count {
+        &self.spilled_bytes
+    }
+
+    /// Record a spill of `spilled_bytes` size.
+    pub fn record_spill(&self, spilled_bytes: usize) {
+        self.spill_count.add(1);
+        self.spilled_bytes.add(spilled_bytes);
+    }
+
     /// return the metric for the total number of output rows produced
     pub fn output_rows(&self) -> &Count {
         &self.output_rows
diff --git a/datafusion/src/physical_plan/metrics/builder.rs 
b/datafusion/src/physical_plan/metrics/builder.rs
index 510366b..13ffede 100644
--- a/datafusion/src/physical_plan/metrics/builder.rs
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -105,6 +105,24 @@ impl<'a> MetricBuilder<'a> {
         count
     }
 
+    /// Consume self and create a new counter for recording the number of 
spills
+    /// triggered by an operator
+    pub fn spill_count(self, partition: usize) -> Count {
+        let count = Count::new();
+        self.with_partition(partition)
+            .build(MetricValue::SpillCount(count.clone()));
+        count
+    }
+
+    /// Consume self and create a new counter for recording the total spilled 
bytes
+    /// triggered by an operator
+    pub fn spilled_bytes(self, partition: usize) -> Count {
+        let count = Count::new();
+        self.with_partition(partition)
+            .build(MetricValue::SpilledBytes(count.clone()));
+        count
+    }
+
     /// Consumes self and creates a new [`Count`] for recording some
     /// arbitrary metric of an operator.
     pub fn counter(
diff --git a/datafusion/src/physical_plan/metrics/mod.rs 
b/datafusion/src/physical_plan/metrics/mod.rs
index 089550c..232583f 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -191,6 +191,20 @@ impl MetricsSet {
             .map(|v| v.as_usize())
     }
 
+    /// convenience: return the count of spills, aggregated
+    /// across partitions or None if no metric is present
+    pub fn spill_count(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// convenience: return the total byte size of spills, aggregated
+    /// across partitions or None if no metric is present
+    pub fn spilled_bytes(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), 
MetricValue::SpilledBytes(_)))
+            .map(|v| v.as_usize())
+    }
+
     /// convenience: return the amount of elapsed CPU time spent,
     /// aggregated across partitions or None if no metric is present
     pub fn elapsed_compute(&self) -> Option<usize> {
diff --git a/datafusion/src/physical_plan/metrics/value.rs 
b/datafusion/src/physical_plan/metrics/value.rs
index 6944aab..8c1097f 100644
--- a/datafusion/src/physical_plan/metrics/value.rs
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -283,6 +283,10 @@ pub enum MetricValue {
     /// classical defintion of "cpu_time", which is the time reported
     /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
     ElapsedCompute(Time),
+    /// Number of spills produced: "spill_count" metric
+    SpillCount(Count),
+    /// Total size of spilled bytes produced: "spilled_bytes" metric
+    SpilledBytes(Count),
     /// Operator defined count.
     Count {
         /// The provided name of this metric
@@ -308,6 +312,8 @@ impl MetricValue {
     pub fn name(&self) -> &str {
         match self {
             Self::OutputRows(_) => "output_rows",
+            Self::SpillCount(_) => "spill_count",
+            Self::SpilledBytes(_) => "spilled_bytes",
             Self::ElapsedCompute(_) => "elapsed_compute",
             Self::Count { name, .. } => name.borrow(),
             Self::Time { name, .. } => name.borrow(),
@@ -320,6 +326,8 @@ impl MetricValue {
     pub fn as_usize(&self) -> usize {
         match self {
             Self::OutputRows(count) => count.value(),
+            Self::SpillCount(count) => count.value(),
+            Self::SpilledBytes(bytes) => bytes.value(),
             Self::ElapsedCompute(time) => time.value(),
             Self::Count { count, .. } => count.value(),
             Self::Time { time, .. } => time.value(),
@@ -339,6 +347,8 @@ impl MetricValue {
     pub fn new_empty(&self) -> Self {
         match self {
             Self::OutputRows(_) => Self::OutputRows(Count::new()),
+            Self::SpillCount(_) => Self::SpillCount(Count::new()),
+            Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
             Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
             Self::Count { name, .. } => Self::Count {
                 name: name.clone(),
@@ -365,6 +375,8 @@ impl MetricValue {
     pub fn aggregate(&mut self, other: &Self) {
         match (self, other) {
             (Self::OutputRows(count), Self::OutputRows(other_count))
+            | (Self::SpillCount(count), Self::SpillCount(other_count))
+            | (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
             | (
                 Self::Count { count, .. },
                 Self::Count {
@@ -401,10 +413,12 @@ impl MetricValue {
         match self {
             Self::OutputRows(_) => 0,     // show first
             Self::ElapsedCompute(_) => 1, // show second
-            Self::Count { .. } => 2,
-            Self::Time { .. } => 3,
-            Self::StartTimestamp(_) => 4, // show timestamps last
-            Self::EndTimestamp(_) => 5,
+            Self::SpillCount(_) => 2,
+            Self::SpilledBytes(_) => 3,
+            Self::Count { .. } => 4,
+            Self::Time { .. } => 5,
+            Self::StartTimestamp(_) => 6, // show timestamps last
+            Self::EndTimestamp(_) => 7,
         }
     }
 
@@ -418,7 +432,10 @@ impl std::fmt::Display for MetricValue {
     /// Prints the value of this metric
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         match self {
-            Self::OutputRows(count) | Self::Count { count, .. } => {
+            Self::OutputRows(count)
+            | Self::SpillCount(count)
+            | Self::SpilledBytes(count)
+            | Self::Count { count, .. } => {
                 write!(f, "{}", count)
             }
             Self::ElapsedCompute(time) | Self::Time { time, .. } => {
diff --git a/datafusion/src/physical_plan/sorts/sort.rs 
b/datafusion/src/physical_plan/sorts/sort.rs
index c3a138e..5ea5a72 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -76,9 +76,8 @@ struct ExternalSorter {
     expr: Vec<PhysicalSortExpr>,
     runtime: Arc<RuntimeEnv>,
     metrics: AggregatedMetricsSet,
+    inner_metrics: BaselineMetrics,
     used: AtomicUsize,
-    spilled_bytes: AtomicUsize,
-    spilled_count: AtomicUsize,
 }
 
 impl ExternalSorter {
@@ -89,6 +88,7 @@ impl ExternalSorter {
         metrics: AggregatedMetricsSet,
         runtime: Arc<RuntimeEnv>,
     ) -> Self {
+        let inner_metrics = metrics.new_intermediate_baseline(partition_id);
         Self {
             id: MemoryConsumerId::new(partition_id),
             schema,
@@ -97,9 +97,8 @@ impl ExternalSorter {
             expr,
             runtime,
             metrics,
+            inner_metrics,
             used: AtomicUsize::new(0),
-            spilled_bytes: AtomicUsize::new(0),
-            spilled_count: AtomicUsize::new(0),
         }
     }
 
@@ -170,11 +169,11 @@ impl ExternalSorter {
     }
 
     fn spilled_bytes(&self) -> usize {
-        self.spilled_bytes.load(Ordering::SeqCst)
+        self.inner_metrics.spilled_bytes().value()
     }
 
-    fn spilled_count(&self) -> usize {
-        self.spilled_count.load(Ordering::SeqCst)
+    fn spill_count(&self) -> usize {
+        self.inner_metrics.spill_count().value()
     }
 }
 
@@ -184,7 +183,7 @@ impl Debug for ExternalSorter {
             .field("id", &self.id())
             .field("memory_used", &self.used())
             .field("spilled_bytes", &self.spilled_bytes())
-            .field("spilled_count", &self.spilled_count())
+            .field("spill_count", &self.spill_count())
             .finish()
     }
 }
@@ -213,7 +212,7 @@ impl MemoryConsumer for ExternalSorter {
             self.name(),
             self.id(),
             self.used(),
-            self.spilled_count()
+            self.spill_count()
         );
 
         let partition = self.partition_id();
@@ -239,8 +238,7 @@ impl MemoryConsumer for ExternalSorter {
 
         let mut spills = self.spills.lock().await;
         let used = self.used.swap(0, Ordering::SeqCst);
-        self.spilled_count.fetch_add(1, Ordering::SeqCst);
-        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        self.inner_metrics.record_spill(total_size);
         spills.push(path);
         Ok(used)
     }
@@ -368,7 +366,7 @@ pub struct SortExec {
 }
 
 #[derive(Debug, Clone)]
-/// Aggregates all metrics during a complex operation, which is composed of 
multiple stages and
+/// Aggregates all metrics during a complex operation, which is composed of 
multiple steps and
 /// each stage reports its statistics separately.
 /// Give sort as an example, when the dataset is more significant than 
available memory, it will report
 /// multiple in-mem sort metrics and final merge-sort  metrics from 
`SortPreservingMergeStream`.
@@ -401,7 +399,7 @@ impl AggregatedMetricsSet {
         result
     }
 
-    /// We should accumulate all times from all stages' reports for the total 
time consumption.
+    /// We should accumulate all times from all steps' reports for the total 
time consumption.
     fn merge_compute_time(&self, dest: &Time) {
         let time1 = self
             .intermediate
@@ -429,6 +427,46 @@ impl AggregatedMetricsSet {
         dest.add_duration(Duration::from_nanos(time2));
     }
 
+    /// We should accumulate all count from all steps' reports for the total 
spill count.
+    fn merge_spill_count(&self, dest: &Count) {
+        let count1 = self
+            .intermediate
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+            .sum();
+        let count2 = self
+            .final_
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spill_count().map_or(0, |v| v))
+            .sum();
+        dest.add(count1);
+        dest.add(count2);
+    }
+
+    /// We should accumulate all spilled bytes from all steps' reports for the 
total spilled bytes.
+    fn merge_spilled_bytes(&self, dest: &Count) {
+        let count1 = self
+            .intermediate
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+            .sum();
+        let count2 = self
+            .final_
+            .lock()
+            .unwrap()
+            .iter()
+            .map(|es| es.clone_inner().spilled_bytes().map_or(0, |v| v))
+            .sum();
+        dest.add(count1);
+        dest.add(count2);
+    }
+
     /// We should only care about output from the final stage metrics.
     fn merge_output_count(&self, dest: &Count) {
         let count = self
@@ -561,6 +599,9 @@ impl ExecutionPlan for SortExec {
         let baseline = BaselineMetrics::new(&metrics, 0);
         self.all_metrics
             .merge_compute_time(baseline.elapsed_compute());
+        self.all_metrics.merge_spill_count(baseline.spill_count());
+        self.all_metrics
+            .merge_spilled_bytes(baseline.spilled_bytes());
         self.all_metrics.merge_output_count(baseline.output_rows());
         Some(metrics.clone_inner())
     }
@@ -668,7 +709,9 @@ mod tests {
     use futures::FutureExt;
     use std::collections::{BTreeMap, HashMap};
 
-    async fn sort_with_runtime(runtime: Arc<RuntimeEnv>) -> 
Result<Vec<RecordBatch>> {
+    #[tokio::test]
+    async fn test_in_mem_sort() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
         let schema = test_util::aggr_test_schema();
         let partitions = 4;
         let (_, files) =
@@ -709,13 +752,7 @@ mod tests {
             Arc::new(CoalescePartitionsExec::new(Arc::new(csv))),
         )?);
 
-        collect(sort_exec, runtime).await
-    }
-
-    #[tokio::test]
-    async fn test_in_mem_sort() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::default());
-        let result = sort_with_runtime(runtime).await?;
+        let result = collect(sort_exec, runtime).await?;
 
         assert_eq!(result.len(), 1);
 
@@ -743,10 +780,59 @@ mod tests {
             // trigger spill there will be 4 batches with 5.5KB for each
             .with_max_execution_memory(12288);
         let runtime = Arc::new(RuntimeEnv::new(config)?);
-        let result = sort_with_runtime(runtime).await?;
+
+        let schema = test_util::aggr_test_schema();
+        let partitions = 4;
+        let (_, files) =
+            test::create_partitioned_csv("aggregate_test_100.csv", 
partitions)?;
+
+        let csv = CsvExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_schema: Arc::clone(&schema),
+                file_groups: files,
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            true,
+            b',',
+        );
+
+        let sort_exec = Arc::new(SortExec::try_new(
+            vec![
+                // c1 string column
+                PhysicalSortExpr {
+                    expr: col("c1", &schema)?,
+                    options: SortOptions::default(),
+                },
+                // c2 uin32 column
+                PhysicalSortExpr {
+                    expr: col("c2", &schema)?,
+                    options: SortOptions::default(),
+                },
+                // c7 uin8 column
+                PhysicalSortExpr {
+                    expr: col("c7", &schema)?,
+                    options: SortOptions::default(),
+                },
+            ],
+            Arc::new(CoalescePartitionsExec::new(Arc::new(csv))),
+        )?);
+
+        let result = collect(sort_exec.clone(), runtime).await?;
 
         assert_eq!(result.len(), 1);
 
+        // Now, validate metrics
+        let metrics = sort_exec.metrics().unwrap();
+
+        assert_eq!(metrics.output_rows().unwrap(), 100);
+        assert!(metrics.elapsed_compute().unwrap() > 0);
+        assert!(metrics.spill_count().unwrap() > 0);
+        assert!(metrics.spilled_bytes().unwrap() > 0);
+
         let columns = result[0].columns();
 
         let c1 = as_string_array(&columns[0]);

Reply via email to