This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d297540 Add a new metric type: `Gauge` + `CurrentMemoryUsage` to
metrics (#1682)
d297540 is described below
commit d2975408c98c3483bc41d3cdd6bb6894ec3c992f
Author: Yijie Shen <[email protected]>
AuthorDate: Thu Jan 27 02:55:41 2022 +0800
Add a new metric type: `Gauge` + `CurrentMemoryUsage` to metrics (#1682)
* Add gauge
* fix doc
* fix fmt
---
datafusion/src/physical_plan/metrics/aggregated.rs | 14 +++-
datafusion/src/physical_plan/metrics/baseline.rs | 11 ++-
datafusion/src/physical_plan/metrics/builder.rs | 31 ++++++-
datafusion/src/physical_plan/metrics/mod.rs | 2 +-
datafusion/src/physical_plan/metrics/value.rs | 94 +++++++++++++++++++++-
datafusion/src/physical_plan/sorts/sort.rs | 48 +++++------
6 files changed, 163 insertions(+), 37 deletions(-)
diff --git a/datafusion/src/physical_plan/metrics/aggregated.rs
b/datafusion/src/physical_plan/metrics/aggregated.rs
index cfcdcb7..c55cc16 100644
--- a/datafusion/src/physical_plan/metrics/aggregated.rs
+++ b/datafusion/src/physical_plan/metrics/aggregated.rs
@@ -35,9 +35,15 @@ pub struct AggregatedMetricsSet {
final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
}
+impl Default for AggregatedMetricsSet {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl AggregatedMetricsSet {
/// Create a new aggregated set
- pub(crate) fn new() -> Self {
+ pub fn new() -> Self {
Self {
intermediate: Arc::new(std::sync::Mutex::new(vec![])),
final_: Arc::new(std::sync::Mutex::new(vec![])),
@@ -45,7 +51,7 @@ impl AggregatedMetricsSet {
}
/// create a new intermediate baseline
- pub(crate) fn new_intermediate_baseline(&self, partition: usize) ->
BaselineMetrics {
+ pub fn new_intermediate_baseline(&self, partition: usize) ->
BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.intermediate.lock().unwrap().push(ms);
@@ -53,7 +59,7 @@ impl AggregatedMetricsSet {
}
/// create a new final baseline
- pub(crate) fn new_final_baseline(&self, partition: usize) ->
BaselineMetrics {
+ pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
let ms = ExecutionPlanMetricsSet::new();
let result = BaselineMetrics::new(&ms, partition);
self.final_.lock().unwrap().push(ms);
@@ -137,7 +143,7 @@ impl AggregatedMetricsSet {
}
/// Aggregate all metrics into a one
- pub(crate) fn aggregate_all(&self) -> MetricsSet {
+ pub fn aggregate_all(&self) -> MetricsSet {
let metrics = ExecutionPlanMetricsSet::new();
let baseline = BaselineMetrics::new(&metrics, 0);
self.merge_compute_time(baseline.elapsed_compute());
diff --git a/datafusion/src/physical_plan/metrics/baseline.rs
b/datafusion/src/physical_plan/metrics/baseline.rs
index 4c3ab6f..50c49ec 100644
--- a/datafusion/src/physical_plan/metrics/baseline.rs
+++ b/datafusion/src/physical_plan/metrics/baseline.rs
@@ -21,7 +21,7 @@ use std::task::Poll;
use arrow::{error::ArrowError, record_batch::RecordBatch};
-use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
+use super::{Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, Time,
Timestamp};
/// Helper for creating and tracking common "baseline" metrics for
/// each operator
@@ -56,6 +56,9 @@ pub struct BaselineMetrics {
/// total spilled bytes during the execution of the operator
spilled_bytes: Count,
+ /// current memory usage for the operator
+ mem_used: Gauge,
+
/// output rows: the total output rows
output_rows: Count,
}
@@ -71,6 +74,7 @@ impl BaselineMetrics {
elapsed_compute:
MetricBuilder::new(metrics).elapsed_compute(partition),
spill_count: MetricBuilder::new(metrics).spill_count(partition),
spilled_bytes:
MetricBuilder::new(metrics).spilled_bytes(partition),
+ mem_used: MetricBuilder::new(metrics).mem_used(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
}
}
@@ -90,6 +94,11 @@ impl BaselineMetrics {
&self.spilled_bytes
}
+ /// return the metric for current memory usage
+ pub fn mem_used(&self) -> &Gauge {
+ &self.mem_used
+ }
+
/// Record a spill of `spilled_bytes` size.
pub fn record_spill(&self, spilled_bytes: usize) {
self.spill_count.add(1);
diff --git a/datafusion/src/physical_plan/metrics/builder.rs
b/datafusion/src/physical_plan/metrics/builder.rs
index 13ffede..30e9764 100644
--- a/datafusion/src/physical_plan/metrics/builder.rs
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -20,7 +20,7 @@
use std::{borrow::Cow, sync::Arc};
use super::{
- Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time,
Timestamp,
+ Count, ExecutionPlanMetricsSet, Gauge, Label, Metric, MetricValue, Time,
Timestamp,
};
/// Structure for constructing metrics, counters, timers, etc.
@@ -123,6 +123,14 @@ impl<'a> MetricBuilder<'a> {
count
}
+ /// Consume self and create a new gauge for reporting current memory usage
+ pub fn mem_used(self, partition: usize) -> Gauge {
+ let gauge = Gauge::new();
+ self.with_partition(partition)
+ .build(MetricValue::CurrentMemoryUsage(gauge.clone()));
+ gauge
+ }
+
/// Consumes self and creates a new [`Count`] for recording some
/// arbitrary metric of an operator.
pub fn counter(
@@ -133,6 +141,16 @@ impl<'a> MetricBuilder<'a> {
self.with_partition(partition).global_counter(counter_name)
}
+ /// Consumes self and creates a new [`Gauge`] for reporting some
+ /// arbitrary metric of an operator.
+ pub fn gauge(
+ self,
+ gauge_name: impl Into<Cow<'static, str>>,
+ partition: usize,
+ ) -> Gauge {
+ self.with_partition(partition).global_gauge(gauge_name)
+ }
+
/// Consumes self and creates a new [`Count`] for recording a
/// metric of an overall operator (not per partition)
pub fn global_counter(self, counter_name: impl Into<Cow<'static, str>>) ->
Count {
@@ -144,6 +162,17 @@ impl<'a> MetricBuilder<'a> {
count
}
+ /// Consumes self and creates a new [`Gauge`] for reporting a
+ /// metric of an overall operator (not per partition)
+ pub fn global_gauge(self, gauge_name: impl Into<Cow<'static, str>>) ->
Gauge {
+ let gauge = Gauge::new();
+ self.build(MetricValue::Gauge {
+ name: gauge_name.into(),
+ gauge: gauge.clone(),
+ });
+ gauge
+ }
+
/// Consume self and create a new Timer for recording the elapsed
/// CPU time spent by an operator
pub fn elapsed_compute(self, partition: usize) -> Time {
diff --git a/datafusion/src/physical_plan/metrics/mod.rs
b/datafusion/src/physical_plan/metrics/mod.rs
index 9174fa3..d489599 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -34,7 +34,7 @@ use hashbrown::HashMap;
pub use aggregated::AggregatedMetricsSet;
pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
-pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
+pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
/// Something that tracks a value of interest (metric) of a DataFusion
/// [`ExecutionPlan`] execution.
diff --git a/datafusion/src/physical_plan/metrics/value.rs
b/datafusion/src/physical_plan/metrics/value.rs
index 8c1097f..6ac282a 100644
--- a/datafusion/src/physical_plan/metrics/value.rs
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -77,6 +77,62 @@ impl Count {
}
}
+/// A gauge is the simplest metrics type. It just returns a value.
+/// For example, you can easily expose current memory consumption with a gauge.
+///
+/// Note `clone`ing gauge update the same underlying metrics
+#[derive(Debug, Clone)]
+pub struct Gauge {
+ /// value of the metric gauge
+ value: std::sync::Arc<AtomicUsize>,
+}
+
+impl PartialEq for Gauge {
+ fn eq(&self, other: &Self) -> bool {
+ self.value().eq(&other.value())
+ }
+}
+
+impl Display for Gauge {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.value())
+ }
+}
+
+impl Default for Gauge {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl Gauge {
+ /// create a new gauge
+ pub fn new() -> Self {
+ Self {
+ value: Arc::new(AtomicUsize::new(0)),
+ }
+ }
+
+ /// Add `n` to the metric's value
+ pub fn add(&self, n: usize) {
+ // relaxed ordering for operations on `value` poses no issues
+ // we're purely using atomic ops with no associated memory ops
+ self.value.fetch_add(n, Ordering::Relaxed);
+ }
+
+ /// Set the metric's value to `n` and return the previous value
+ pub fn set(&self, n: usize) -> usize {
+ // relaxed ordering for operations on `value` poses no issues
+ // we're purely using atomic ops with no associated memory ops
+ self.value.swap(n, Ordering::Relaxed)
+ }
+
+ /// Get the current value
+ pub fn value(&self) -> usize {
+ self.value.load(Ordering::Relaxed)
+ }
+}
+
/// Measure a potentially non contiguous duration of time
#[derive(Debug, Clone)]
pub struct Time {
@@ -287,6 +343,8 @@ pub enum MetricValue {
SpillCount(Count),
/// Total size of spilled bytes produced: "spilled_bytes" metric
SpilledBytes(Count),
+ /// Current memory used
+ CurrentMemoryUsage(Gauge),
/// Operator defined count.
Count {
/// The provided name of this metric
@@ -294,6 +352,13 @@ pub enum MetricValue {
/// The value of the metric
count: Count,
},
+ /// Operator defined gauge.
+ Gauge {
+ /// The provided name of this metric
+ name: Cow<'static, str>,
+ /// The value of the metric
+ gauge: Gauge,
+ },
/// Operator defined time
Time {
/// The provided name of this metric
@@ -314,8 +379,10 @@ impl MetricValue {
Self::OutputRows(_) => "output_rows",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
+ Self::CurrentMemoryUsage(_) => "mem_used",
Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
+ Self::Gauge { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
@@ -328,8 +395,10 @@ impl MetricValue {
Self::OutputRows(count) => count.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
+ Self::CurrentMemoryUsage(used) => used.value(),
Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
+ Self::Gauge { gauge, .. } => gauge.value(),
Self::Time { time, .. } => time.value(),
Self::StartTimestamp(timestamp) => timestamp
.value()
@@ -349,11 +418,16 @@ impl MetricValue {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
+ Self::CurrentMemoryUsage(_) =>
Self::CurrentMemoryUsage(Gauge::new()),
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
count: Count::new(),
},
+ Self::Gauge { name, .. } => Self::Gauge {
+ name: name.clone(),
+ gauge: Gauge::new(),
+ },
Self::Time { name, .. } => Self::Time {
name: name.clone(),
time: Time::new(),
@@ -383,6 +457,13 @@ impl MetricValue {
count: other_count, ..
},
) => count.add(other_count.value()),
+ (Self::CurrentMemoryUsage(gauge),
Self::CurrentMemoryUsage(other_gauge))
+ | (
+ Self::Gauge { gauge, .. },
+ Self::Gauge {
+ gauge: other_gauge, ..
+ },
+ ) => gauge.add(other_gauge.value()),
(Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
| (
Self::Time { time, .. },
@@ -415,10 +496,12 @@ impl MetricValue {
Self::ElapsedCompute(_) => 1, // show second
Self::SpillCount(_) => 2,
Self::SpilledBytes(_) => 3,
- Self::Count { .. } => 4,
- Self::Time { .. } => 5,
- Self::StartTimestamp(_) => 6, // show timestamps last
- Self::EndTimestamp(_) => 7,
+ Self::CurrentMemoryUsage(_) => 4,
+ Self::Count { .. } => 5,
+ Self::Gauge { .. } => 6,
+ Self::Time { .. } => 7,
+ Self::StartTimestamp(_) => 8, // show timestamps last
+ Self::EndTimestamp(_) => 9,
}
}
@@ -438,6 +521,9 @@ impl std::fmt::Display for MetricValue {
| Self::Count { count, .. } => {
write!(f, "{}", count)
}
+ Self::CurrentMemoryUsage(gauge) | Self::Gauge { gauge, .. } => {
+ write!(f, "{}", gauge)
+ }
Self::ElapsedCompute(time) | Self::Time { time, .. } => {
// distinguish between no time recorded and very small
// amount of time recorded
diff --git a/datafusion/src/physical_plan/sorts/sort.rs
b/datafusion/src/physical_plan/sorts/sort.rs
index 0f5c3bd..a2df645 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -51,10 +51,9 @@ use std::fmt::{Debug, Formatter};
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
-use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::NamedTempFile;
-use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;
/// Sort arbitrary size of data to get a total order (may spill several times
during sorting based on free memory available).
@@ -76,7 +75,6 @@ struct ExternalSorter {
runtime: Arc<RuntimeEnv>,
metrics: AggregatedMetricsSet,
inner_metrics: BaselineMetrics,
- used: AtomicUsize,
}
impl ExternalSorter {
@@ -97,7 +95,6 @@ impl ExternalSorter {
runtime,
metrics,
inner_metrics,
- used: AtomicUsize::new(0),
}
}
@@ -105,7 +102,7 @@ impl ExternalSorter {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
self.try_grow(size).await?;
- self.used.fetch_add(size, Ordering::SeqCst);
+ self.inner_metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
in_mem_batches.push(input);
}
@@ -132,7 +129,8 @@ impl ExternalSorter {
&self.expr,
baseline_metrics,
)?;
- streams.push(SortedStream::new(in_mem_stream, self.used()));
+ let prev_used = self.inner_metrics.mem_used().set(0);
+ streams.push(SortedStream::new(in_mem_stream, prev_used));
}
let mut spills = self.spills.lock().await;
@@ -152,19 +150,22 @@ impl ExternalSorter {
)))
} else if in_mem_batches.len() > 0 {
let baseline_metrics = self.metrics.new_final_baseline(partition);
- in_mem_partial_sort(
+ let result = in_mem_partial_sort(
&mut *in_mem_batches,
self.schema.clone(),
&self.expr,
baseline_metrics,
- )
+ );
+ self.inner_metrics.mem_used().set(0);
+ // TODO: the result size is not tracked
+ result
} else {
Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
}
}
fn used(&self) -> usize {
- self.used.load(Ordering::SeqCst)
+ self.inner_metrics.mem_used().value()
}
fn spilled_bytes(&self) -> usize {
@@ -231,22 +232,17 @@ impl MemoryConsumer for ExternalSorter {
baseline_metrics,
);
- let total_size = spill_partial_sorted_stream(
- &mut stream?,
- spillfile.path(),
- self.schema.clone(),
- )
- .await?;
-
+ spill_partial_sorted_stream(&mut stream?, spillfile.path(),
self.schema.clone())
+ .await?;
let mut spills = self.spills.lock().await;
- let used = self.used.swap(0, Ordering::SeqCst);
- self.inner_metrics.record_spill(total_size);
+ let used = self.inner_metrics.mem_used().set(0);
+ self.inner_metrics.record_spill(used);
spills.push(spillfile);
Ok(used)
}
fn mem_used(&self) -> usize {
- self.used.load(Ordering::SeqCst)
+ self.inner_metrics.mem_used().value()
}
}
@@ -288,7 +284,7 @@ async fn spill_partial_sorted_stream(
in_mem_stream: &mut SendableRecordBatchStream,
path: &Path,
schema: SchemaRef,
-) -> Result<usize> {
+) -> Result<()> {
let (sender, receiver) = tokio::sync::mpsc::channel(2);
let path: PathBuf = path.into();
let handle = task::spawn_blocking(move || write_sorted(receiver, path,
schema));
@@ -310,8 +306,8 @@ fn read_spill_as_stream(
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let (sender, receiver): (
- TKSender<ArrowResult<RecordBatch>>,
- TKReceiver<ArrowResult<RecordBatch>>,
+ Sender<ArrowResult<RecordBatch>>,
+ Receiver<ArrowResult<RecordBatch>>,
) = tokio::sync::mpsc::channel(2);
let join_handle = task::spawn_blocking(move || {
if let Err(e) = read_spill(sender, path.path()) {
@@ -326,10 +322,10 @@ fn read_spill_as_stream(
}
fn write_sorted(
- mut receiver: TKReceiver<ArrowResult<RecordBatch>>,
+ mut receiver: Receiver<ArrowResult<RecordBatch>>,
path: PathBuf,
schema: SchemaRef,
-) -> Result<usize> {
+) -> Result<()> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
while let Some(batch) = receiver.blocking_recv() {
writer.write(&batch?)?;
@@ -339,10 +335,10 @@ fn write_sorted(
"Spilled {} batches of total {} rows to disk, memory released {}",
writer.num_batches, writer.num_rows, writer.num_bytes
);
- Ok(writer.num_bytes as usize)
+ Ok(())
}
-fn read_spill(sender: TKSender<ArrowResult<RecordBatch>>, path: &Path) ->
Result<()> {
+fn read_spill(sender: Sender<ArrowResult<RecordBatch>>, path: &Path) ->
Result<()> {
let file = BufReader::new(File::open(&path)?);
let reader = FileReader::try_new(file)?;
for batch in reader {