This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d297540  Add a new metric type: `Gauge` + `CurrentMemoryUsage` to 
metrics (#1682)
d297540 is described below

commit d2975408c98c3483bc41d3cdd6bb6894ec3c992f
Author: Yijie Shen <[email protected]>
AuthorDate: Thu Jan 27 02:55:41 2022 +0800

    Add a new metric type: `Gauge` + `CurrentMemoryUsage` to metrics (#1682)
    
    * Add gauge
    
    * fix doc
    
    * fix fmt
---
 datafusion/src/physical_plan/metrics/aggregated.rs | 14 +++-
 datafusion/src/physical_plan/metrics/baseline.rs   | 11 ++-
 datafusion/src/physical_plan/metrics/builder.rs    | 31 ++++++-
 datafusion/src/physical_plan/metrics/mod.rs        |  2 +-
 datafusion/src/physical_plan/metrics/value.rs      | 94 +++++++++++++++++++++-
 datafusion/src/physical_plan/sorts/sort.rs         | 48 +++++------
 6 files changed, 163 insertions(+), 37 deletions(-)

diff --git a/datafusion/src/physical_plan/metrics/aggregated.rs 
b/datafusion/src/physical_plan/metrics/aggregated.rs
index cfcdcb7..c55cc16 100644
--- a/datafusion/src/physical_plan/metrics/aggregated.rs
+++ b/datafusion/src/physical_plan/metrics/aggregated.rs
@@ -35,9 +35,15 @@ pub struct AggregatedMetricsSet {
     final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
 }
 
+impl Default for AggregatedMetricsSet {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl AggregatedMetricsSet {
     /// Create a new aggregated set
-    pub(crate) fn new() -> Self {
+    pub fn new() -> Self {
         Self {
             intermediate: Arc::new(std::sync::Mutex::new(vec![])),
             final_: Arc::new(std::sync::Mutex::new(vec![])),
@@ -45,7 +51,7 @@ impl AggregatedMetricsSet {
     }
 
     /// create a new intermediate baseline
-    pub(crate) fn new_intermediate_baseline(&self, partition: usize) -> 
BaselineMetrics {
+    pub fn new_intermediate_baseline(&self, partition: usize) -> 
BaselineMetrics {
         let ms = ExecutionPlanMetricsSet::new();
         let result = BaselineMetrics::new(&ms, partition);
         self.intermediate.lock().unwrap().push(ms);
@@ -53,7 +59,7 @@ impl AggregatedMetricsSet {
     }
 
     /// create a new final baseline
-    pub(crate) fn new_final_baseline(&self, partition: usize) -> 
BaselineMetrics {
+    pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
         let ms = ExecutionPlanMetricsSet::new();
         let result = BaselineMetrics::new(&ms, partition);
         self.final_.lock().unwrap().push(ms);
@@ -137,7 +143,7 @@ impl AggregatedMetricsSet {
     }
 
     /// Aggregate all metrics into a one
-    pub(crate) fn aggregate_all(&self) -> MetricsSet {
+    pub fn aggregate_all(&self) -> MetricsSet {
         let metrics = ExecutionPlanMetricsSet::new();
         let baseline = BaselineMetrics::new(&metrics, 0);
         self.merge_compute_time(baseline.elapsed_compute());
diff --git a/datafusion/src/physical_plan/metrics/baseline.rs 
b/datafusion/src/physical_plan/metrics/baseline.rs
index 4c3ab6f..50c49ec 100644
--- a/datafusion/src/physical_plan/metrics/baseline.rs
+++ b/datafusion/src/physical_plan/metrics/baseline.rs
@@ -21,7 +21,7 @@ use std::task::Poll;
 
 use arrow::{error::ArrowError, record_batch::RecordBatch};
 
-use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
+use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time, 
Timestamp};
 
 /// Helper for creating and tracking common "baseline" metrics for
 /// each operator
@@ -56,6 +56,9 @@ pub struct BaselineMetrics {
     /// total spilled bytes during the execution of the operator
     spilled_bytes: Count,
 
+    /// current memory usage for the operator
+    mem_used: Gauge,
+
     /// output rows: the total output rows
     output_rows: Count,
 }
@@ -71,6 +74,7 @@ impl BaselineMetrics {
             elapsed_compute: 
MetricBuilder::new(metrics).elapsed_compute(partition),
             spill_count: MetricBuilder::new(metrics).spill_count(partition),
             spilled_bytes: 
MetricBuilder::new(metrics).spilled_bytes(partition),
+            mem_used: MetricBuilder::new(metrics).mem_used(partition),
             output_rows: MetricBuilder::new(metrics).output_rows(partition),
         }
     }
@@ -90,6 +94,11 @@ impl BaselineMetrics {
         &self.spilled_bytes
     }
 
+    /// return the metric for current memory usage
+    pub fn mem_used(&self) -> &Gauge {
+        &self.mem_used
+    }
+
     /// Record a spill of `spilled_bytes` size.
     pub fn record_spill(&self, spilled_bytes: usize) {
         self.spill_count.add(1);
diff --git a/datafusion/src/physical_plan/metrics/builder.rs 
b/datafusion/src/physical_plan/metrics/builder.rs
index 13ffede..30e9764 100644
--- a/datafusion/src/physical_plan/metrics/builder.rs
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -20,7 +20,7 @@
 use std::{borrow::Cow, sync::Arc};
 
 use super::{
-    Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, 
Timestamp,
+    Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time, 
Timestamp,
 };
 
 /// Structure for constructing metrics, counters, timers, etc.
@@ -123,6 +123,14 @@ impl<'a> MetricBuilder<'a> {
         count
     }
 
+    /// Consume self and create a new gauge for reporting current memory usage
+    pub fn mem_used(self, partition: usize) -> Gauge {
+        let gauge = Gauge::new();
+        self.with_partition(partition)
+            .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
+        gauge
+    }
+
     /// Consumes self and creates a new [`Count`] for recording some
     /// arbitrary metric of an operator.
     pub fn counter(
@@ -133,6 +141,16 @@ impl<'a> MetricBuilder<'a> {
         self.with_partition(partition).global_counter(counter_name)
     }
 
+    /// Consumes self and creates a new [`Gauge`] for reporting some
+    /// arbitrary metric of an operator.
+    pub fn gauge(
+        self,
+        gauge_name: impl Into<Cow<'static, str>>,
+        partition: usize,
+    ) -> Gauge {
+        self.with_partition(partition).global_gauge(gauge_name)
+    }
+
     /// Consumes self and creates a new [`Count`] for recording a
     /// metric of an overall operator (not per partition)
     pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) -> 
Count {
@@ -144,6 +162,17 @@ impl<'a> MetricBuilder<'a> {
         count
     }
 
+    /// Consumes self and creates a new [`Gauge`] for reporting a
+    /// metric of an overall operator (not per partition)
+    pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) -> 
Gauge {
+        let gauge = Gauge::new();
+        self.build(MetricValue::Gauge {
+            name: gauge_name.into(),
+            gauge: gauge.clone(),
+        });
+        gauge
+    }
+
     /// Consume self and create a new Timer for recording the elapsed
     /// CPU time spent by an operator
     pub fn elapsed_compute(self, partition: usize) -> Time {
diff --git a/datafusion/src/physical_plan/metrics/mod.rs 
b/datafusion/src/physical_plan/metrics/mod.rs
index 9174fa3..d489599 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -34,7 +34,7 @@ use hashbrown::HashMap;
 pub use aggregated::AggregatedMetricsSet;
 pub use baseline::{BaselineMetrics, RecordOutput};
 pub use builder::MetricBuilder;
-pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
+pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
 
 /// Something that tracks a value of interest (metric) of a DataFusion
 /// [`ExecutionPlan`] execution.
diff --git a/datafusion/src/physical_plan/metrics/value.rs 
b/datafusion/src/physical_plan/metrics/value.rs
index 8c1097f..6ac282a 100644
--- a/datafusion/src/physical_plan/metrics/value.rs
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -77,6 +77,62 @@ impl Count {
     }
 }
 
+/// A gauge is the simplest metrics type. It just returns a value.
+/// For example, you can easily expose current memory consumption with a gauge.
+///
+/// Note `clone`ing gauge update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Gauge {
+    /// value of the metric gauge
+    value: std::sync::Arc<AtomicUsize>,
+}
+
+impl PartialEq for Gauge {
+    fn eq(&self, other: &Self) -> bool {
+        self.value().eq(&other.value())
+    }
+}
+
+impl Display for Gauge {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "{}", self.value())
+    }
+}
+
+impl Default for Gauge {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Gauge {
+    /// create a new gauge
+    pub fn new() -> Self {
+        Self {
+            value: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+
+    /// Add `n` to the metric's value
+    pub fn add(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.fetch_add(n, Ordering::Relaxed);
+    }
+
+    /// Set the metric's value to `n` and return the previous value
+    pub fn set(&self, n: usize) -> usize {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.value.swap(n, Ordering::Relaxed)
+    }
+
+    /// Get the current value
+    pub fn value(&self) -> usize {
+        self.value.load(Ordering::Relaxed)
+    }
+}
+
 /// Measure a potentially non contiguous duration of time
 #[derive(Debug, Clone)]
 pub struct Time {
@@ -287,6 +343,8 @@ pub enum MetricValue {
     SpillCount(Count),
     /// Total size of spilled bytes produced: "spilled_bytes" metric
     SpilledBytes(Count),
+    /// Current memory used
+    CurrentMemoryUsage(Gauge),
     /// Operator defined count.
     Count {
         /// The provided name of this metric
@@ -294,6 +352,13 @@ pub enum MetricValue {
         /// The value of the metric
         count: Count,
     },
+    /// Operator defined gauge.
+    Gauge {
+        /// The provided name of this metric
+        name: Cow<'static, str>,
+        /// The value of the metric
+        gauge: Gauge,
+    },
     /// Operator defined time
     Time {
         /// The provided name of this metric
@@ -314,8 +379,10 @@ impl MetricValue {
             Self::OutputRows(_) => "output_rows",
             Self::SpillCount(_) => "spill_count",
             Self::SpilledBytes(_) => "spilled_bytes",
+            Self::CurrentMemoryUsage(_) => "mem_used",
             Self::ElapsedCompute(_) => "elapsed_compute",
             Self::Count { name, .. } => name.borrow(),
+            Self::Gauge { name, .. } => name.borrow(),
             Self::Time { name, .. } => name.borrow(),
             Self::StartTimestamp(_) => "start_timestamp",
             Self::EndTimestamp(_) => "end_timestamp",
@@ -328,8 +395,10 @@ impl MetricValue {
             Self::OutputRows(count) => count.value(),
             Self::SpillCount(count) => count.value(),
             Self::SpilledBytes(bytes) => bytes.value(),
+            Self::CurrentMemoryUsage(used) => used.value(),
             Self::ElapsedCompute(time) => time.value(),
             Self::Count { count, .. } => count.value(),
+            Self::Gauge { gauge, .. } => gauge.value(),
             Self::Time { time, .. } => time.value(),
             Self::StartTimestamp(timestamp) => timestamp
                 .value()
@@ -349,11 +418,16 @@ impl MetricValue {
             Self::OutputRows(_) => Self::OutputRows(Count::new()),
             Self::SpillCount(_) => Self::SpillCount(Count::new()),
             Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
+            Self::CurrentMemoryUsage(_) => 
Self::CurrentMemoryUsage(Gauge::new()),
             Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
             Self::Count { name, .. } => Self::Count {
                 name: name.clone(),
                 count: Count::new(),
             },
+            Self::Gauge { name, .. } => Self::Gauge {
+                name: name.clone(),
+                gauge: Gauge::new(),
+            },
             Self::Time { name, .. } => Self::Time {
                 name: name.clone(),
                 time: Time::new(),
@@ -383,6 +457,13 @@ impl MetricValue {
                     count: other_count, ..
                 },
             ) => count.add(other_count.value()),
+            (Self::CurrentMemoryUsage(gauge), 
Self::CurrentMemoryUsage(other_gauge))
+            | (
+                Self::Gauge { gauge, .. },
+                Self::Gauge {
+                    gauge: other_gauge, ..
+                },
+            ) => gauge.add(other_gauge.value()),
             (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
             | (
                 Self::Time { time, .. },
@@ -415,10 +496,12 @@ impl MetricValue {
             Self::ElapsedCompute(_) => 1, // show second
             Self::SpillCount(_) => 2,
             Self::SpilledBytes(_) => 3,
-            Self::Count { .. } => 4,
-            Self::Time { .. } => 5,
-            Self::StartTimestamp(_) => 6, // show timestamps last
-            Self::EndTimestamp(_) => 7,
+            Self::CurrentMemoryUsage(_) => 4,
+            Self::Count { .. } => 5,
+            Self::Gauge { .. } => 6,
+            Self::Time { .. } => 7,
+            Self::StartTimestamp(_) => 8, // show timestamps last
+            Self::EndTimestamp(_) => 9,
         }
     }
 
@@ -438,6 +521,9 @@ impl std::fmt::Display for MetricValue {
             | Self::Count { count, .. } => {
                 write!(f, "{}", count)
             }
+            Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
+                write!(f, "{}", gauge)
+            }
             Self::ElapsedCompute(time) | Self::Time { time, .. } => {
                 // distinguish between no time recorded and very small
                 // amount of time recorded
diff --git a/datafusion/src/physical_plan/sorts/sort.rs 
b/datafusion/src/physical_plan/sorts/sort.rs
index 0f5c3bd..a2df645 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -51,10 +51,9 @@ use std::fmt::{Debug, Formatter};
 use std::fs::File;
 use std::io::BufReader;
 use std::path::{Path, PathBuf};
-use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use tempfile::NamedTempFile;
-use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::sync::mpsc::{Receiver, Sender};
 use tokio::task;
 
 /// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
@@ -76,7 +75,6 @@ struct ExternalSorter {
     runtime: Arc<RuntimeEnv>,
     metrics: AggregatedMetricsSet,
     inner_metrics: BaselineMetrics,
-    used: AtomicUsize,
 }
 
 impl ExternalSorter {
@@ -97,7 +95,6 @@ impl ExternalSorter {
             runtime,
             metrics,
             inner_metrics,
-            used: AtomicUsize::new(0),
         }
     }
 
@@ -105,7 +102,7 @@ impl ExternalSorter {
         if input.num_rows() > 0 {
             let size = batch_byte_size(&input);
             self.try_grow(size).await?;
-            self.used.fetch_add(size, Ordering::SeqCst);
+            self.inner_metrics.mem_used().add(size);
             let mut in_mem_batches = self.in_mem_batches.lock().await;
             in_mem_batches.push(input);
         }
@@ -132,7 +129,8 @@ impl ExternalSorter {
                     &self.expr,
                     baseline_metrics,
                 )?;
-                streams.push(SortedStream::new(in_mem_stream, self.used()));
+                let prev_used = self.inner_metrics.mem_used().set(0);
+                streams.push(SortedStream::new(in_mem_stream, prev_used));
             }
 
             let mut spills = self.spills.lock().await;
@@ -152,19 +150,22 @@ impl ExternalSorter {
             )))
         } else if in_mem_batches.len() > 0 {
             let baseline_metrics = self.metrics.new_final_baseline(partition);
-            in_mem_partial_sort(
+            let result = in_mem_partial_sort(
                 &mut *in_mem_batches,
                 self.schema.clone(),
                 &self.expr,
                 baseline_metrics,
-            )
+            );
+            self.inner_metrics.mem_used().set(0);
+            // TODO: the result size is not tracked
+            result
         } else {
             Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
         }
     }
 
     fn used(&self) -> usize {
-        self.used.load(Ordering::SeqCst)
+        self.inner_metrics.mem_used().value()
     }
 
     fn spilled_bytes(&self) -> usize {
@@ -231,22 +232,17 @@ impl MemoryConsumer for ExternalSorter {
             baseline_metrics,
         );
 
-        let total_size = spill_partial_sorted_stream(
-            &mut stream?,
-            spillfile.path(),
-            self.schema.clone(),
-        )
-        .await?;
-
+        spill_partial_sorted_stream(&mut stream?, spillfile.path(), 
self.schema.clone())
+            .await?;
         let mut spills = self.spills.lock().await;
-        let used = self.used.swap(0, Ordering::SeqCst);
-        self.inner_metrics.record_spill(total_size);
+        let used = self.inner_metrics.mem_used().set(0);
+        self.inner_metrics.record_spill(used);
         spills.push(spillfile);
         Ok(used)
     }
 
     fn mem_used(&self) -> usize {
-        self.used.load(Ordering::SeqCst)
+        self.inner_metrics.mem_used().value()
     }
 }
 
@@ -288,7 +284,7 @@ async fn spill_partial_sorted_stream(
     in_mem_stream: &mut SendableRecordBatchStream,
     path: &Path,
     schema: SchemaRef,
-) -> Result<usize> {
+) -> Result<()> {
     let (sender, receiver) = tokio::sync::mpsc::channel(2);
     let path: PathBuf = path.into();
     let handle = task::spawn_blocking(move || write_sorted(receiver, path, 
schema));
@@ -310,8 +306,8 @@ fn read_spill_as_stream(
     schema: SchemaRef,
 ) -> Result<SendableRecordBatchStream> {
     let (sender, receiver): (
-        TKSender<ArrowResult<RecordBatch>>,
-        TKReceiver<ArrowResult<RecordBatch>>,
+        Sender<ArrowResult<RecordBatch>>,
+        Receiver<ArrowResult<RecordBatch>>,
     ) = tokio::sync::mpsc::channel(2);
     let join_handle = task::spawn_blocking(move || {
         if let Err(e) = read_spill(sender, path.path()) {
@@ -326,10 +322,10 @@ fn read_spill_as_stream(
 }
 
 fn write_sorted(
-    mut receiver: TKReceiver<ArrowResult<RecordBatch>>,
+    mut receiver: Receiver<ArrowResult<RecordBatch>>,
     path: PathBuf,
     schema: SchemaRef,
-) -> Result<usize> {
+) -> Result<()> {
     let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
     while let Some(batch) = receiver.blocking_recv() {
         writer.write(&batch?)?;
@@ -339,10 +335,10 @@ fn write_sorted(
         "Spilled {} batches of total {} rows to disk, memory released {}",
         writer.num_batches, writer.num_rows, writer.num_bytes
     );
-    Ok(writer.num_bytes as usize)
+    Ok(())
 }
 
-fn read_spill(sender: TKSender<ArrowResult<RecordBatch>>, path: &Path) -> 
Result<()> {
+fn read_spill(sender: Sender<ArrowResult<RecordBatch>>, path: &Path) -> 
Result<()> {
     let file = BufReader::new(File::open(&path)?);
     let reader = FileReader::try_new(file)?;
     for batch in reader {

Reply via email to