This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d31c157 Add BaselineMetrics, Timestamp metrics, add for
CoalscePartitionExec (#909)
d31c157 is described below
commit d31c1579fdc2ad060bc46c4fcfef14cc7676da6b
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 26 08:23:39 2021 -0400
Add BaselineMetrics, Timestamp metrics, add for CoalscePartitionExec (#909)
---
.../src/physical_plan/coalesce_partitions.rs | 22 +-
datafusion/src/physical_plan/display.rs | 7 +-
datafusion/src/physical_plan/metrics/baseline.rs | 183 ++++++++++++++
datafusion/src/physical_plan/metrics/builder.rs | 30 ++-
datafusion/src/physical_plan/metrics/mod.rs | 161 ++++++++++---
datafusion/src/physical_plan/metrics/value.rs | 267 +++++++++++++++++++--
datafusion/src/physical_plan/sort.rs | 7 +-
datafusion/tests/sql.rs | 3 +-
8 files changed, 623 insertions(+), 57 deletions(-)
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs
b/datafusion/src/physical_plan/coalesce_partitions.rs
index 4c04065..8781a3d 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -20,6 +20,7 @@
use std::any::Any;
use std::sync::Arc;
+use std::task::Poll;
use futures::channel::mpsc;
use futures::Stream;
@@ -29,6 +30,7 @@ use async_trait::async_trait;
use arrow::record_batch::RecordBatch;
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::RecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
@@ -43,12 +45,17 @@ use pin_project_lite::pin_project;
pub struct CoalescePartitionsExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
+ /// Execution metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl CoalescePartitionsExec {
/// Create a new CoalescePartitionsExec
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
- CoalescePartitionsExec { input }
+ CoalescePartitionsExec {
+ input,
+ metrics: ExecutionPlanMetricsSet::new(),
+ }
}
/// Input execution plan
@@ -90,6 +97,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
+ let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
// CoalescePartitionsExec produces a single partition
if 0 != partition {
return Err(DataFusionError::Internal(format!(
@@ -123,6 +132,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
Ok(Box::pin(MergeStream {
input: receiver,
schema: self.schema(),
+ baseline_metrics,
}))
}
}
@@ -139,6 +149,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
}
}
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
}
pin_project! {
@@ -146,6 +160,7 @@ pin_project! {
schema: SchemaRef,
#[pin]
input: mpsc::Receiver<ArrowResult<RecordBatch>>,
+ baseline_metrics: BaselineMetrics
}
}
@@ -155,9 +170,10 @@ impl Stream for MergeStream {
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
+ ) -> Poll<Option<Self::Item>> {
let this = self.project();
- this.input.poll_next(cx)
+ let poll = this.input.poll_next(cx);
+ this.baseline_metrics.record_poll(poll)
}
}
diff --git a/datafusion/src/physical_plan/display.rs
b/datafusion/src/physical_plan/display.rs
index 5ff99e5..19a859a 100644
--- a/datafusion/src/physical_plan/display.rs
+++ b/datafusion/src/physical_plan/display.rs
@@ -139,7 +139,12 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a,
'b> {
ShowMetrics::None => {}
ShowMetrics::Aggregated => {
if let Some(metrics) = plan.metrics() {
- write!(self.f, ", metrics=[{}]",
metrics.aggregate_by_partition())?;
+ let metrics = metrics
+ .aggregate_by_partition()
+ .sorted_for_display()
+ .timestamps_removed();
+
+ write!(self.f, ", metrics=[{}]", metrics)?;
} else {
write!(self.f, ", metrics=[]")?;
}
diff --git a/datafusion/src/physical_plan/metrics/baseline.rs
b/datafusion/src/physical_plan/metrics/baseline.rs
new file mode 100644
index 0000000..b007d07
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/baseline.rs
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics common for almost all operators
+
+use std::task::Poll;
+
+use arrow::{error::ArrowError, record_batch::RecordBatch};
+
+use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
+
+/// Helper for creating and tracking common "baseline" metrics for
+/// each operator
+///
+/// Example:
+/// ```
+/// use datafusion::physical_plan::metrics::{BaselineMetrics,
ExecutionPlanMetricsSet};
+/// let metrics = ExecutionPlanMetricsSet::new();
+///
+/// let partition = 2;
+/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
+///
+/// // during execution, in CPU intensive operation:
+/// let timer = baseline_metrics.elapsed_compute().timer();
+/// // .. do CPU intensive work
+/// timer.done();
+///
+/// // when operator is finished:
+/// baseline_metrics.done();
+/// ```
+#[derive(Debug)]
+pub struct BaselineMetrics {
+ /// end_time is set when `ExecutionMetrics::done()` is called
+ end_time: Timestamp,
+
+ /// amount of time the operator was actively trying to use the CPU
+ elapsed_compute: Time,
+
+ /// output rows: the total output rows
+ output_rows: Count,
+}
+
+impl BaselineMetrics {
+ /// Create a new BaselineMetric structure, and set `start_time` to now
+ pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+ let start_time =
MetricBuilder::new(metrics).start_timestamp(partition);
+ start_time.record();
+
+ Self {
+ end_time: MetricBuilder::new(metrics).end_timestamp(partition),
+ elapsed_compute:
MetricBuilder::new(metrics).elapsed_compute(partition),
+ output_rows: MetricBuilder::new(metrics).output_rows(partition),
+ }
+ }
+
+ /// return the metric for cpu time spend in this operator
+ pub fn elapsed_compute(&self) -> &Time {
+ &self.elapsed_compute
+ }
+
+ /// return the metric for the total number of output rows produced
+ pub fn output_rows(&self) -> &Count {
+ &self.output_rows
+ }
+
+ /// Records the fact that this operator's execution is complete
+ /// (recording the `end_time` metric).
+ ///
+ /// Note care should be taken to call `done()` maually if
+ /// `BaselineMetrics` is not `drop`ped immediately upon operator
+ /// completion, as async streams may not be dropped immediately
+ /// depending on the consumer.
+ pub fn done(&self) {
+ self.end_time.record()
+ }
+
+ /// Record that some number of rows have been produced as output
+ ///
+ /// See the [`RecordOutput`] for conveniently recording record
+ /// batch output for other thing
+ pub fn record_output(&self, num_rows: usize) {
+ self.output_rows.add(num_rows);
+ }
+
+ /// Process a poll result of a stream producing output for an
+ /// operator, recording the output rows and stream done time and
+ /// returning the same poll result
+ pub fn record_poll(
+ &self,
+ poll: Poll<Option<Result<RecordBatch, ArrowError>>>,
+ ) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
+ if let Poll::Ready(maybe_batch) = &poll {
+ match maybe_batch {
+ Some(Ok(batch)) => {
+ batch.record_output(self);
+ }
+ Some(Err(_)) => self.done(),
+ None => self.done(),
+ }
+ }
+ poll
+ }
+}
+
+impl Drop for BaselineMetrics {
+ fn drop(&mut self) {
+ // if not previously recorded, record
+ if self.end_time.value().is_none() {
+ self.end_time.record()
+ }
+ }
+}
+
+/// Trait for things that produce output rows as a result of execution.
+pub trait RecordOutput {
+ /// Record that some number of output rows have been produced
+ ///
+ /// Meant to be composable so that instead of returning `batch`
+ /// the operator can return `batch.record_output(baseline_metrics)`
+ fn record_output(self, bm: &BaselineMetrics) -> Self;
+}
+
+impl RecordOutput for usize {
+ fn record_output(self, bm: &BaselineMetrics) -> Self {
+ bm.record_output(self);
+ self
+ }
+}
+
+impl RecordOutput for RecordBatch {
+ fn record_output(self, bm: &BaselineMetrics) -> Self {
+ bm.record_output(self.num_rows());
+ self
+ }
+}
+
+impl RecordOutput for &RecordBatch {
+ fn record_output(self, bm: &BaselineMetrics) -> Self {
+ bm.record_output(self.num_rows());
+ self
+ }
+}
+
+impl RecordOutput for Option<&RecordBatch> {
+ fn record_output(self, bm: &BaselineMetrics) -> Self {
+ if let Some(record_batch) = &self {
+ record_batch.record_output(bm);
+ }
+ self
+ }
+}
+
+impl RecordOutput for Option<RecordBatch> {
+ fn record_output(self, bm: &BaselineMetrics) -> Self {
+ if let Some(record_batch) = &self {
+ record_batch.record_output(bm);
+ }
+ self
+ }
+}
+
+impl RecordOutput for arrow::error::Result<RecordBatch> {
+ fn record_output(self, bm: &BaselineMetrics) -> Self {
+ if let Ok(record_batch) = &self {
+ record_batch.record_output(bm);
+ }
+ self
+ }
+}
diff --git a/datafusion/src/physical_plan/metrics/builder.rs
b/datafusion/src/physical_plan/metrics/builder.rs
index 34392c7..510366b 100644
--- a/datafusion/src/physical_plan/metrics/builder.rs
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -19,7 +19,9 @@
use std::{borrow::Cow, sync::Arc};
-use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time};
+use super::{
+ Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time,
Timestamp,
+};
/// Structure for constructing metrics, counters, timers, etc.
///
@@ -124,12 +126,12 @@ impl<'a> MetricBuilder<'a> {
count
}
- /// Consume self and create a new Timer for recording the overall cpu time
- /// spent by an operator
- pub fn cpu_time(self, partition: usize) -> Time {
+ /// Consume self and create a new Timer for recording the elapsed
+ /// CPU time spent by an operator
+ pub fn elapsed_compute(self, partition: usize) -> Time {
let time = Time::new();
self.with_partition(partition)
- .build(MetricValue::CPUTime(time.clone()));
+ .build(MetricValue::ElapsedCompute(time.clone()));
time
}
@@ -147,4 +149,22 @@ impl<'a> MetricBuilder<'a> {
});
time
}
+
+ /// Consumes self and creates a new Timestamp for recording the
+ /// starting time of execution for a partition
+ pub fn start_timestamp(self, partition: usize) -> Timestamp {
+ let timestamp = Timestamp::new();
+ self.with_partition(partition)
+ .build(MetricValue::StartTimestamp(timestamp.clone()));
+ timestamp
+ }
+
+ /// Consumes self and creates a new Timestamp for recording the
+ /// ending time of execution for a partition
+ pub fn end_timestamp(self, partition: usize) -> Timestamp {
+ let timestamp = Timestamp::new();
+ self.with_partition(partition)
+ .build(MetricValue::EndTimestamp(timestamp.clone()));
+ timestamp
+ }
}
diff --git a/datafusion/src/physical_plan/metrics/mod.rs
b/datafusion/src/physical_plan/metrics/mod.rs
index 7dd92a4..f85a0f1 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -17,6 +17,7 @@
//! Metrics for recording information about execution
+mod baseline;
mod builder;
mod value;
@@ -29,8 +30,9 @@ use std::{
use hashbrown::HashMap;
// public exports
+pub use baseline::{BaselineMetrics, RecordOutput};
pub use builder::MetricBuilder;
-pub use value::{Count, MetricValue, ScopedTimerGuard, Time};
+pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
/// Something that tracks a value of interest (metric) of a DataFusion
/// [`ExecutionPlan`] execution.
@@ -189,10 +191,10 @@ impl MetricsSet {
.map(|v| v.as_usize())
}
- /// convenience: return the amount of CPU time spent, aggregated
- /// across partitions or None if no metric is present
- pub fn cpu_time(&self) -> Option<usize> {
- self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_)))
+ /// convenience: return the amount of elapsed CPU time spent,
+ /// aggregated across partitions or None if no metric is present
+ pub fn elapsed_compute(&self) -> Option<usize> {
+ self.sum(|metric| matches!(metric.value(),
MetricValue::ElapsedCompute(_)))
.map(|v| v.as_usize())
}
@@ -216,7 +218,7 @@ impl MetricsSet {
Some(metric) => metric.value().new_empty(),
};
- iter.for_each(|metric| accum.add(metric.value()));
+ iter.for_each(|metric| accum.aggregate(metric.value()));
Some(accum)
}
@@ -233,7 +235,7 @@ impl MetricsSet {
let key = (metric.value.name(), metric.labels.clone());
map.entry(key)
.and_modify(|accum: &mut Metric| {
- accum.value_mut().add(metric.value());
+ accum.value_mut().aggregate(metric.value());
})
.or_insert_with(|| {
// accumulate with no partition
@@ -243,7 +245,7 @@ impl MetricsSet {
partition,
metric.labels().to_vec(),
);
- accum.value_mut().add(metric.value());
+ accum.value_mut().aggregate(metric.value());
accum
});
}
@@ -257,6 +259,25 @@ impl MetricsSet {
metrics: new_metrics,
}
}
+
+ /// Sort the order of metrics so the "most useful" show up first
+ pub fn sorted_for_display(mut self) -> Self {
+ self.metrics
+ .sort_unstable_by_key(|metric| metric.value().display_sort_key());
+ self
+ }
+
+ /// remove all timestamp metrics (for more compact display
+ pub fn timestamps_removed(self) -> Self {
+ let Self { metrics } = self;
+
+ let metrics = metrics
+ .into_iter()
+ .filter(|m| !m.value.is_timestamp())
+ .collect::<Vec<_>>();
+
+ Self { metrics }
+ }
}
impl Display for MetricsSet {
@@ -351,6 +372,8 @@ impl Display for Label {
mod tests {
use std::time::Duration;
+ use chrono::{TimeZone, Utc};
+
use super::*;
#[test]
@@ -414,17 +437,17 @@ mod tests {
}
#[test]
- fn test_cpu_time() {
+ fn test_elapsed_compute() {
let metrics = ExecutionPlanMetricsSet::new();
- assert!(metrics.clone_inner().cpu_time().is_none());
+ assert!(metrics.clone_inner().elapsed_compute().is_none());
let partition = 1;
- let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition);
- cpu_time.add_duration(Duration::from_nanos(1234));
+ let elapsed_compute =
MetricBuilder::new(&metrics).elapsed_compute(partition);
+ elapsed_compute.add_duration(Duration::from_nanos(1234));
- let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition + 1);
- cpu_time.add_duration(Duration::from_nanos(6));
- assert_eq!(metrics.clone_inner().cpu_time().unwrap(), 1240);
+ let elapsed_compute =
MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
+ elapsed_compute.add_duration(Duration::from_nanos(6));
+ assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
}
#[test]
@@ -473,16 +496,16 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
// Note cpu_time1 has labels so it is not aggregated with 2 and 3
- let cpu_time1 = MetricBuilder::new(&metrics)
+ let elapsed_compute1 = MetricBuilder::new(&metrics)
.with_new_label("foo", "bar")
- .cpu_time(1);
- cpu_time1.add_duration(Duration::from_nanos(12));
+ .elapsed_compute(1);
+ elapsed_compute1.add_duration(Duration::from_nanos(12));
- let cpu_time2 = MetricBuilder::new(&metrics).cpu_time(2);
- cpu_time2.add_duration(Duration::from_nanos(34));
+ let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
+ elapsed_compute2.add_duration(Duration::from_nanos(34));
- let cpu_time3 = MetricBuilder::new(&metrics).cpu_time(4);
- cpu_time3.add_duration(Duration::from_nanos(56));
+ let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
+ elapsed_compute3.add_duration(Duration::from_nanos(56));
let output_rows = MetricBuilder::new(&metrics).output_rows(1); //
output rows
output_rows.add(56);
@@ -490,16 +513,16 @@ mod tests {
let aggregated = metrics.clone_inner().aggregate_by_partition();
// cpu time should be aggregated:
- let cpu_times = aggregated
+ let elapsed_computes = aggregated
.iter()
.filter(|metric| {
- matches!(metric.value(), MetricValue::CPUTime(_))
+ matches!(metric.value(), MetricValue::ElapsedCompute(_))
&& metric.labels().is_empty()
})
.collect::<Vec<_>>();
- assert_eq!(cpu_times.len(), 1);
- assert_eq!(cpu_times[0].value().as_usize(), 34 + 56);
- assert!(cpu_times[0].partition().is_none());
+ assert_eq!(elapsed_computes.len(), 1);
+ assert_eq!(elapsed_computes[0].value().as_usize(), 34 + 56);
+ assert!(elapsed_computes[0].partition().is_none());
// output rows should
let output_rows = aggregated
@@ -525,4 +548,88 @@ mod tests {
// can't aggregate time and count -- expect a panic
metrics.clone_inner().aggregate_by_partition();
}
+
+ #[test]
+ fn test_aggregate_partition_timestamps() {
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ // 1431648000000000 == 1970-01-17 13:40:48 UTC
+ let t1 = Utc.timestamp_nanos(1431648000000000);
+ // 1531648000000000 == 1970-01-18 17:27:28 UTC
+ let t2 = Utc.timestamp_nanos(1531648000000000);
+ // 1631648000000000 == 1970-01-19 21:14:08 UTC
+ let t3 = Utc.timestamp_nanos(1631648000000000);
+ // 1731648000000000 == 1970-01-21 01:00:48 UTC
+ let t4 = Utc.timestamp_nanos(1731648000000000);
+
+ let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
+ start_timestamp0.set(t1);
+ let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
+ end_timestamp0.set(t2);
+ let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
+ start_timestamp1.set(t3);
+ let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
+ end_timestamp1.set(t4);
+
+ // aggregate
+ let aggregated = metrics.clone_inner().aggregate_by_partition();
+
+ let mut ts = aggregated
+ .iter()
+ .filter(|metric| {
+ matches!(metric.value(), MetricValue::StartTimestamp(_))
+ && metric.labels().is_empty()
+ })
+ .collect::<Vec<_>>();
+ assert_eq!(ts.len(), 1);
+ match ts.remove(0).value() {
+ MetricValue::StartTimestamp(ts) => {
+ // expect earliest of t1, t2
+ assert_eq!(ts.value(), Some(t1));
+ }
+ _ => {
+ panic!("Not a timestamp");
+ }
+ };
+
+ let mut ts = aggregated
+ .iter()
+ .filter(|metric| {
+ matches!(metric.value(), MetricValue::EndTimestamp(_))
+ && metric.labels().is_empty()
+ })
+ .collect::<Vec<_>>();
+ assert_eq!(ts.len(), 1);
+ match ts.remove(0).value() {
+ MetricValue::EndTimestamp(ts) => {
+ // expect latest of t3, t4
+ assert_eq!(ts.value(), Some(t4));
+ }
+ _ => {
+ panic!("Not a timestamp");
+ }
+ };
+ }
+
+ #[test]
+ fn test_sorted_for_display() {
+ let metrics = ExecutionPlanMetricsSet::new();
+ MetricBuilder::new(&metrics).end_timestamp(0);
+ MetricBuilder::new(&metrics).start_timestamp(0);
+ MetricBuilder::new(&metrics).elapsed_compute(0);
+ MetricBuilder::new(&metrics).counter("the_counter", 0);
+ MetricBuilder::new(&metrics).subset_time("the_time", 0);
+ MetricBuilder::new(&metrics).output_rows(0);
+ let metrics = metrics.clone_inner();
+
+ fn metric_names(metrics: &MetricsSet) -> String {
+ let n = metrics.iter().map(|m|
m.value().name()).collect::<Vec<_>>();
+ n.join(", ")
+ }
+
+ assert_eq!("end_timestamp, start_timestamp, elapsed_compute,
the_counter, the_time, output_rows", metric_names(&metrics));
+
+ let metrics = metrics.sorted_for_display();
+ assert_eq!("output_rows, elapsed_compute, the_counter, the_time,
start_timestamp, end_timestamp", metric_names(&metrics));
+ }
}
diff --git a/datafusion/src/physical_plan/metrics/value.rs
b/datafusion/src/physical_plan/metrics/value.rs
index 61a7c2b..6f63583 100644
--- a/datafusion/src/physical_plan/metrics/value.rs
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -19,13 +19,16 @@
use std::{
borrow::{Borrow, Cow},
+ fmt::Display,
sync::{
atomic::{AtomicUsize, Ordering},
- Arc,
+ Arc, Mutex,
},
time::{Duration, Instant},
};
+use chrono::{DateTime, Utc};
+
/// A counter to record things such as number of input or output rows
///
/// Note `clone`ing counters update the same underlying metrics
@@ -41,6 +44,12 @@ impl PartialEq for Count {
}
}
+impl Display for Count {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.value())
+ }
+}
+
impl Count {
/// create a new counter
pub fn new() -> Self {
@@ -75,6 +84,13 @@ impl PartialEq for Time {
}
}
+impl Display for Time {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let duration = std::time::Duration::from_nanos(self.value() as u64);
+ write!(f, "{:?}", duration)
+ }
+}
+
impl Time {
/// Create a new [`Time`] wrapper suitable for recording elapsed
/// times for operations.
@@ -116,6 +132,82 @@ impl Time {
}
}
+/// Stores a single timestamp, stored as the number of nanoseconds
+/// elapsed from Jan 1, 1970 UTC
+#[derive(Debug, Clone)]
+pub struct Timestamp {
+ /// Time thing started
+ timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
+}
+
+impl Timestamp {
+ /// Create a new timestamp and sets its value to 0
+ pub fn new() -> Self {
+ Self {
+ timestamp: Arc::new(Mutex::new(None)),
+ }
+ }
+
+ /// Sets the timestamps value to the current time
+ pub fn record(&self) {
+ self.set(Utc::now())
+ }
+
+ /// Sets the timestamps value to a specified time
+ pub fn set(&self, now: DateTime<Utc>) {
+ *self.timestamp.lock().unwrap() = Some(now);
+ }
+
+ /// return the timestamps value at the last time `record()` was
+ /// called.
+ ///
+ /// Returns `None` if `record()` has not been called
+ pub fn value(&self) -> Option<DateTime<Utc>> {
+ *self.timestamp.lock().unwrap()
+ }
+
+ /// sets the value of this timestamp to the minimum of this and other
+ pub fn update_to_min(&self, other: &Timestamp) {
+ let min = match (self.value(), other.value()) {
+ (None, None) => None,
+ (Some(v), None) => Some(v),
+ (None, Some(v)) => Some(v),
+ (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
+ };
+
+ *self.timestamp.lock().unwrap() = min;
+ }
+
+ /// sets the value of this timestamp to the maximum of this and other
+ pub fn update_to_max(&self, other: &Timestamp) {
+ let max = match (self.value(), other.value()) {
+ (None, None) => None,
+ (Some(v), None) => Some(v),
+ (None, Some(v)) => Some(v),
+ (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
+ };
+
+ *self.timestamp.lock().unwrap() = max;
+ }
+}
+
+impl PartialEq for Timestamp {
+ fn eq(&self, other: &Self) -> bool {
+ self.value().eq(&other.value())
+ }
+}
+
+impl Display for Timestamp {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self.value() {
+ None => write!(f, "NONE"),
+ Some(v) => {
+ write!(f, "{}", v)
+ }
+ }
+ }
+}
+
/// RAAI structure that adds all time between its construction and
/// destruction to the CPU time or the first call to `stop` whichever
/// comes first
@@ -144,7 +236,7 @@ impl<'a> Drop for ScopedTimerGuard<'a> {
}
}
-/// Possible values for a metric.
+/// Possible values for a [`Metric`].
///
/// Among other differences, the metric types have different ways to
/// logically interpret their underlying values and some metrics are
@@ -153,8 +245,26 @@ impl<'a> Drop for ScopedTimerGuard<'a> {
pub enum MetricValue {
/// Number of output rows produced: "output_rows" metric
OutputRows(Count),
- /// CPU time: the "cpu_time" metric
- CPUTime(Time),
+ /// Elapsed Compute Time: the wall clock time spent in "cpu
+ /// intensive" work.
+ ///
+ /// This measurement represents, roughly:
+ /// ```
+ /// use std::time::Instant;
+ /// let start = Instant::now();
+ /// // ...CPU intensive work here...
+ /// let elapsed_compute = (Instant::now() - start).as_nanos();
+ /// ```
+ ///
+ /// Note 1: Does *not* include time other operators spend
+ /// computing input.
+ ///
+ /// Note 2: *Does* includes time when the thread could have made
+ /// progress but the OS did not schedule it (e.g. due to CPU
+ /// contention), thus making this value different than the
+ /// classical defintion of "cpu_time", which is the time reported
+ /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
+ ElapsedCompute(Time),
/// Operator defined count.
Count {
/// The provided name of this metric
@@ -169,8 +279,10 @@ pub enum MetricValue {
/// The value of the metric
time: Time,
},
- // TODO timestamp, etc
- // https://github.com/apache/arrow-datafusion/issues/866
+ /// The time at which execution started
+ StartTimestamp(Timestamp),
+ /// The time at which execution ended
+ EndTimestamp(Timestamp),
}
impl MetricValue {
@@ -178,9 +290,11 @@ impl MetricValue {
pub fn name(&self) -> &str {
match self {
Self::OutputRows(_) => "output_rows",
- Self::CPUTime(_) => "cpu_time",
+ Self::ElapsedCompute(_) => "elapsed_compute",
Self::Count { name, .. } => name.borrow(),
Self::Time { name, .. } => name.borrow(),
+ Self::StartTimestamp(_) => "start_timestamp",
+ Self::EndTimestamp(_) => "end_timestamp",
}
}
@@ -188,9 +302,17 @@ impl MetricValue {
pub fn as_usize(&self) -> usize {
match self {
Self::OutputRows(count) => count.value(),
- Self::CPUTime(time) => time.value(),
+ Self::ElapsedCompute(time) => time.value(),
Self::Count { count, .. } => count.value(),
Self::Time { time, .. } => time.value(),
+ Self::StartTimestamp(timestamp) => timestamp
+ .value()
+ .map(|ts| ts.timestamp_nanos() as usize)
+ .unwrap_or(0),
+ Self::EndTimestamp(timestamp) => timestamp
+ .value()
+ .map(|ts| ts.timestamp_nanos() as usize)
+ .unwrap_or(0),
}
}
@@ -199,7 +321,7 @@ impl MetricValue {
pub fn new_empty(&self) -> Self {
match self {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
- Self::CPUTime(_) => Self::CPUTime(Time::new()),
+ Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
Self::Count { name, .. } => Self::Count {
name: name.clone(),
count: Count::new(),
@@ -208,18 +330,21 @@ impl MetricValue {
name: name.clone(),
time: Time::new(),
},
+ Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
+ Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
}
}
- /// Add the value of other to `self`. panic's if the type is mismatched or
- /// aggregating does not make sense for this value
+ /// Aggregates the value of other to `self`. panic's if the types
+ /// are mismatched or aggregating does not make sense for this
+ /// value
///
/// Note this is purposely marked `mut` (even though atomics are
/// used) so Rust's type system can be used to ensure the
/// appropriate API access. `MetricValues` should be modified
/// using the original [`Count`] or [`Time`] they were created
/// from.
- pub fn add(&mut self, other: &Self) {
+ pub fn aggregate(&mut self, other: &Self) {
match (self, other) {
(Self::OutputRows(count), Self::OutputRows(other_count))
| (
@@ -228,13 +353,21 @@ impl MetricValue {
count: other_count, ..
},
) => count.add(other_count.value()),
- (Self::CPUTime(time), Self::CPUTime(other_time))
+ (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
| (
Self::Time { time, .. },
Self::Time {
time: other_time, ..
},
) => time.add(other_time),
+ // timestamps are aggregated by min/max
+ (Self::StartTimestamp(timestamp),
Self::StartTimestamp(other_timestamp)) => {
+ timestamp.update_to_min(other_timestamp);
+ }
+ // timestamps are aggregated by min/max
+ (Self::EndTimestamp(timestamp),
Self::EndTimestamp(other_timestamp)) => {
+ timestamp.update_to_max(other_timestamp);
+ }
m @ (_, _) => {
panic!(
"Mismatched metric types. Can not aggregate {:?} with
value {:?}",
@@ -243,6 +376,24 @@ impl MetricValue {
}
}
}
+
+ /// Returns a number by which to sort metrics by display. Lower
+ /// numbers are "more useful" (and displayed first)
+ pub fn display_sort_key(&self) -> u8 {
+ match self {
+ Self::OutputRows(_) => 0, // show first
+ Self::ElapsedCompute(_) => 1, // show second
+ Self::Count { .. } => 2,
+ Self::Time { .. } => 3,
+ Self::StartTimestamp(_) => 4, // show timestamps last
+ Self::EndTimestamp(_) => 5,
+ }
+ }
+
+ /// returns true if this metric has a timestamp value
+ pub fn is_timestamp(&self) -> bool {
+ matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
+ }
}
impl std::fmt::Display for MetricValue {
@@ -250,12 +401,94 @@ impl std::fmt::Display for MetricValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::OutputRows(count) | Self::Count { count, .. } => {
- write!(f, "{}", count.value())
+ write!(f, "{}", count)
+ }
+ Self::ElapsedCompute(time) | Self::Time { time, .. } => {
+ // distinguish between no time recorded and very small
+ // amount of time recorded
+ if time.value() > 0 {
+ write!(f, "{}", time)
+ } else {
+ write!(f, "NOT RECORDED")
+ }
}
- Self::CPUTime(time) | Self::Time { time, .. } => {
- let duration = std::time::Duration::from_nanos(time.value() as
u64);
- write!(f, "{:?}", duration)
+ Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) =>
{
+ write!(f, "{}", timestamp)
}
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use chrono::TimeZone;
+
+ use super::*;
+
+ #[test]
+ fn test_display_output_rows() {
+ let count = Count::new();
+ let values = vec![
+ MetricValue::OutputRows(count.clone()),
+ MetricValue::Count {
+ name: "my_counter".into(),
+ count: count.clone(),
+ },
+ ];
+
+ for value in &values {
+ assert_eq!("0", value.to_string(), "value {:?}", value);
+ }
+
+ count.add(42);
+ for value in &values {
+ assert_eq!("42", value.to_string(), "value {:?}", value);
+ }
+ }
+
+ #[test]
+ fn test_display_time() {
+ let time = Time::new();
+ let values = vec![
+ MetricValue::ElapsedCompute(time.clone()),
+ MetricValue::Time {
+ name: "my_time".into(),
+ time: time.clone(),
+ },
+ ];
+
+ // if time is not set, it should not be reported as zero
+ for value in &values {
+ assert_eq!("NOT RECORDED", value.to_string(), "value {:?}", value);
+ }
+
+ time.add_duration(Duration::from_nanos(1042));
+ for value in &values {
+ assert_eq!("1.042µs", value.to_string(), "value {:?}", value);
+ }
+ }
+
+ #[test]
+ fn test_display_timestamp() {
+ let timestamp = Timestamp::new();
+ let values = vec![
+ MetricValue::StartTimestamp(timestamp.clone()),
+ MetricValue::EndTimestamp(timestamp.clone()),
+ ];
+
+ // if time is not set, it should not be reported as zero
+ for value in &values {
+ assert_eq!("NONE", value.to_string(), "value {:?}", value);
+ }
+
+ timestamp.set(Utc.timestamp_nanos(1431648000000000));
+ for value in &values {
+ assert_eq!(
+ "1970-01-17 13:40:48 UTC",
+ value.to_string(),
+ "value {:?}",
+ value
+ );
+ }
+ }
+}
diff --git a/datafusion/src/physical_plan/sort.rs
b/datafusion/src/physical_plan/sort.rs
index f1346b5..df77a16 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -155,13 +155,14 @@ impl ExecutionPlan for SortExec {
let output_rows =
MetricBuilder::new(&self.metrics).output_rows(partition);
- let cpu_time = MetricBuilder::new(&self.metrics).cpu_time(partition);
+ let elapsed_compute =
+ MetricBuilder::new(&self.metrics).elapsed_compute(partition);
Ok(Box::pin(SortStream::new(
input,
self.expr.clone(),
output_rows,
- cpu_time,
+ elapsed_compute,
)))
}
@@ -436,7 +437,7 @@ mod tests {
let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
let metrics = sort_exec.metrics().unwrap();
- assert!(metrics.cpu_time().unwrap() > 0);
+ assert!(metrics.elapsed_compute().unwrap() > 0);
assert_eq!(metrics.output_rows().unwrap(), 8);
assert_eq!(result.len(), 1);
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index a2e84e1..8aae3d9 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2198,7 +2198,8 @@ async fn csv_explain_analyze() {
// Only test basic plumbing and try to avoid having to change too
// many things
- let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES),
metrics=[";
+ let needle =
+ "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=NOT
RECORDED";
assert_contains!(&formatted, needle);
let verbose_needle = "Output Rows";