This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d31c157  Add BaselineMetrics, Timestamp metrics, add for 
CoalscePartitionExec (#909)
d31c157 is described below

commit d31c1579fdc2ad060bc46c4fcfef14cc7676da6b
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 26 08:23:39 2021 -0400

    Add BaselineMetrics, Timestamp metrics, add for CoalscePartitionExec (#909)
---
 .../src/physical_plan/coalesce_partitions.rs       |  22 +-
 datafusion/src/physical_plan/display.rs            |   7 +-
 datafusion/src/physical_plan/metrics/baseline.rs   | 183 ++++++++++++++
 datafusion/src/physical_plan/metrics/builder.rs    |  30 ++-
 datafusion/src/physical_plan/metrics/mod.rs        | 161 ++++++++++---
 datafusion/src/physical_plan/metrics/value.rs      | 267 +++++++++++++++++++--
 datafusion/src/physical_plan/sort.rs               |   7 +-
 datafusion/tests/sql.rs                            |   3 +-
 8 files changed, 623 insertions(+), 57 deletions(-)

diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs 
b/datafusion/src/physical_plan/coalesce_partitions.rs
index 4c04065..8781a3d 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -20,6 +20,7 @@
 
 use std::any::Any;
 use std::sync::Arc;
+use std::task::Poll;
 
 use futures::channel::mpsc;
 use futures::Stream;
@@ -29,6 +30,7 @@ use async_trait::async_trait;
 use arrow::record_batch::RecordBatch;
 use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
 
+use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use super::RecordBatchStream;
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
@@ -43,12 +45,17 @@ use pin_project_lite::pin_project;
 pub struct CoalescePartitionsExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl CoalescePartitionsExec {
     /// Create a new CoalescePartitionsExec
     pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
-        CoalescePartitionsExec { input }
+        CoalescePartitionsExec {
+            input,
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
     }
 
     /// Input execution plan
@@ -90,6 +97,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
+
         // CoalescePartitionsExec produces a single partition
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
@@ -123,6 +132,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
                 Ok(Box::pin(MergeStream {
                     input: receiver,
                     schema: self.schema(),
+                    baseline_metrics,
                 }))
             }
         }
@@ -139,6 +149,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
             }
         }
     }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
 }
 
 pin_project! {
@@ -146,6 +160,7 @@ pin_project! {
         schema: SchemaRef,
         #[pin]
         input: mpsc::Receiver<ArrowResult<RecordBatch>>,
+        baseline_metrics: BaselineMetrics
     }
 }
 
@@ -155,9 +170,10 @@ impl Stream for MergeStream {
     fn poll_next(
         self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
+    ) -> Poll<Option<Self::Item>> {
         let this = self.project();
-        this.input.poll_next(cx)
+        let poll = this.input.poll_next(cx);
+        this.baseline_metrics.record_poll(poll)
     }
 }
 
diff --git a/datafusion/src/physical_plan/display.rs 
b/datafusion/src/physical_plan/display.rs
index 5ff99e5..19a859a 100644
--- a/datafusion/src/physical_plan/display.rs
+++ b/datafusion/src/physical_plan/display.rs
@@ -139,7 +139,12 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 
'b> {
             ShowMetrics::None => {}
             ShowMetrics::Aggregated => {
                 if let Some(metrics) = plan.metrics() {
-                    write!(self.f, ", metrics=[{}]", 
metrics.aggregate_by_partition())?;
+                    let metrics = metrics
+                        .aggregate_by_partition()
+                        .sorted_for_display()
+                        .timestamps_removed();
+
+                    write!(self.f, ", metrics=[{}]", metrics)?;
                 } else {
                     write!(self.f, ", metrics=[]")?;
                 }
diff --git a/datafusion/src/physical_plan/metrics/baseline.rs 
b/datafusion/src/physical_plan/metrics/baseline.rs
new file mode 100644
index 0000000..b007d07
--- /dev/null
+++ b/datafusion/src/physical_plan/metrics/baseline.rs
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics common for almost all operators
+
+use std::task::Poll;
+
+use arrow::{error::ArrowError, record_batch::RecordBatch};
+
+use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
+
+/// Helper for creating and tracking common "baseline" metrics for
+/// each operator
+///
+/// Example:
+/// ```
+/// use datafusion::physical_plan::metrics::{BaselineMetrics, 
ExecutionPlanMetricsSet};
+/// let metrics = ExecutionPlanMetricsSet::new();
+///
+/// let partition = 2;
+/// let baseline_metrics = BaselineMetrics::new(&metrics, partition);
+///
+/// // during execution, in CPU intensive operation:
+/// let timer = baseline_metrics.elapsed_compute().timer();
+/// // .. do CPU intensive work
+/// timer.done();
+///
+/// // when operator is finished:
+/// baseline_metrics.done();
+/// ```
+#[derive(Debug)]
+pub struct BaselineMetrics {
+    /// end_time is set when `ExecutionMetrics::done()` is called
+    end_time: Timestamp,
+
+    /// amount of time the operator was actively trying to use the CPU
+    elapsed_compute: Time,
+
+    /// output rows: the total output rows
+    output_rows: Count,
+}
+
+impl BaselineMetrics {
+    /// Create a new BaselineMetric structure, and set  `start_time` to now
+    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        let start_time = 
MetricBuilder::new(metrics).start_timestamp(partition);
+        start_time.record();
+
+        Self {
+            end_time: MetricBuilder::new(metrics).end_timestamp(partition),
+            elapsed_compute: 
MetricBuilder::new(metrics).elapsed_compute(partition),
+            output_rows: MetricBuilder::new(metrics).output_rows(partition),
+        }
+    }
+
+    /// return the metric for cpu time spend in this operator
+    pub fn elapsed_compute(&self) -> &Time {
+        &self.elapsed_compute
+    }
+
+    /// return the metric for the total number of output rows produced
+    pub fn output_rows(&self) -> &Count {
+        &self.output_rows
+    }
+
+    /// Records the fact that this operator's execution is complete
+    /// (recording the `end_time` metric).
+    ///
+    /// Note care should be taken to call `done()` maually if
+    /// `BaselineMetrics` is not `drop`ped immediately upon operator
+    /// completion, as async streams may not be dropped immediately
+    /// depending on the consumer.
+    pub fn done(&self) {
+        self.end_time.record()
+    }
+
+    /// Record that some number of rows have been produced as output
+    ///
+    /// See the [`RecordOutput`] for conveniently recording record
+    /// batch output for other thing
+    pub fn record_output(&self, num_rows: usize) {
+        self.output_rows.add(num_rows);
+    }
+
+    /// Process a poll result of a stream producing output for an
+    /// operator, recording the output rows and stream done time and
+    /// returning the same poll result
+    pub fn record_poll(
+        &self,
+        poll: Poll<Option<Result<RecordBatch, ArrowError>>>,
+    ) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
+        if let Poll::Ready(maybe_batch) = &poll {
+            match maybe_batch {
+                Some(Ok(batch)) => {
+                    batch.record_output(self);
+                }
+                Some(Err(_)) => self.done(),
+                None => self.done(),
+            }
+        }
+        poll
+    }
+}
+
+impl Drop for BaselineMetrics {
+    fn drop(&mut self) {
+        // if not previously recorded, record
+        if self.end_time.value().is_none() {
+            self.end_time.record()
+        }
+    }
+}
+
+/// Trait for things that produce output rows as a result of execution.
+pub trait RecordOutput {
+    /// Record that some number of output rows have been produced
+    ///
+    /// Meant to be composable so that instead of returning `batch`
+    /// the operator can return `batch.record_output(baseline_metrics)`
+    fn record_output(self, bm: &BaselineMetrics) -> Self;
+}
+
+impl RecordOutput for usize {
+    fn record_output(self, bm: &BaselineMetrics) -> Self {
+        bm.record_output(self);
+        self
+    }
+}
+
+impl RecordOutput for RecordBatch {
+    fn record_output(self, bm: &BaselineMetrics) -> Self {
+        bm.record_output(self.num_rows());
+        self
+    }
+}
+
+impl RecordOutput for &RecordBatch {
+    fn record_output(self, bm: &BaselineMetrics) -> Self {
+        bm.record_output(self.num_rows());
+        self
+    }
+}
+
+impl RecordOutput for Option<&RecordBatch> {
+    fn record_output(self, bm: &BaselineMetrics) -> Self {
+        if let Some(record_batch) = &self {
+            record_batch.record_output(bm);
+        }
+        self
+    }
+}
+
+impl RecordOutput for Option<RecordBatch> {
+    fn record_output(self, bm: &BaselineMetrics) -> Self {
+        if let Some(record_batch) = &self {
+            record_batch.record_output(bm);
+        }
+        self
+    }
+}
+
+impl RecordOutput for arrow::error::Result<RecordBatch> {
+    fn record_output(self, bm: &BaselineMetrics) -> Self {
+        if let Ok(record_batch) = &self {
+            record_batch.record_output(bm);
+        }
+        self
+    }
+}
diff --git a/datafusion/src/physical_plan/metrics/builder.rs 
b/datafusion/src/physical_plan/metrics/builder.rs
index 34392c7..510366b 100644
--- a/datafusion/src/physical_plan/metrics/builder.rs
+++ b/datafusion/src/physical_plan/metrics/builder.rs
@@ -19,7 +19,9 @@
 
 use std::{borrow::Cow, sync::Arc};
 
-use super::{Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time};
+use super::{
+    Count, ExecutionPlanMetricsSet, Label, Metric, MetricValue, Time, 
Timestamp,
+};
 
 /// Structure for constructing metrics, counters, timers, etc.
 ///
@@ -124,12 +126,12 @@ impl<'a> MetricBuilder<'a> {
         count
     }
 
-    /// Consume self and create a new Timer for recording the overall cpu time
-    /// spent by an operator
-    pub fn cpu_time(self, partition: usize) -> Time {
+    /// Consume self and create a new Timer for recording the elapsed
+    /// CPU time spent by an operator
+    pub fn elapsed_compute(self, partition: usize) -> Time {
         let time = Time::new();
         self.with_partition(partition)
-            .build(MetricValue::CPUTime(time.clone()));
+            .build(MetricValue::ElapsedCompute(time.clone()));
         time
     }
 
@@ -147,4 +149,22 @@ impl<'a> MetricBuilder<'a> {
         });
         time
     }
+
+    /// Consumes self and creates a new Timestamp for recording the
+    /// starting time of execution for a partition
+    pub fn start_timestamp(self, partition: usize) -> Timestamp {
+        let timestamp = Timestamp::new();
+        self.with_partition(partition)
+            .build(MetricValue::StartTimestamp(timestamp.clone()));
+        timestamp
+    }
+
+    /// Consumes self and creates a new Timestamp for recording the
+    /// ending time of execution for a partition
+    pub fn end_timestamp(self, partition: usize) -> Timestamp {
+        let timestamp = Timestamp::new();
+        self.with_partition(partition)
+            .build(MetricValue::EndTimestamp(timestamp.clone()));
+        timestamp
+    }
 }
diff --git a/datafusion/src/physical_plan/metrics/mod.rs 
b/datafusion/src/physical_plan/metrics/mod.rs
index 7dd92a4..f85a0f1 100644
--- a/datafusion/src/physical_plan/metrics/mod.rs
+++ b/datafusion/src/physical_plan/metrics/mod.rs
@@ -17,6 +17,7 @@
 
 //! Metrics for recording information about execution
 
+mod baseline;
 mod builder;
 mod value;
 
@@ -29,8 +30,9 @@ use std::{
 use hashbrown::HashMap;
 
 // public exports
+pub use baseline::{BaselineMetrics, RecordOutput};
 pub use builder::MetricBuilder;
-pub use value::{Count, MetricValue, ScopedTimerGuard, Time};
+pub use value::{Count, MetricValue, ScopedTimerGuard, Time, Timestamp};
 
 /// Something that tracks a value of interest (metric) of a DataFusion
 /// [`ExecutionPlan`] execution.
@@ -189,10 +191,10 @@ impl MetricsSet {
             .map(|v| v.as_usize())
     }
 
-    /// convenience: return the amount of CPU time spent, aggregated
-    /// across partitions or None if no metric is present
-    pub fn cpu_time(&self) -> Option<usize> {
-        self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_)))
+    /// convenience: return the amount of elapsed CPU time spent,
+    /// aggregated across partitions or None if no metric is present
+    pub fn elapsed_compute(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), 
MetricValue::ElapsedCompute(_)))
             .map(|v| v.as_usize())
     }
 
@@ -216,7 +218,7 @@ impl MetricsSet {
             Some(metric) => metric.value().new_empty(),
         };
 
-        iter.for_each(|metric| accum.add(metric.value()));
+        iter.for_each(|metric| accum.aggregate(metric.value()));
 
         Some(accum)
     }
@@ -233,7 +235,7 @@ impl MetricsSet {
             let key = (metric.value.name(), metric.labels.clone());
             map.entry(key)
                 .and_modify(|accum: &mut Metric| {
-                    accum.value_mut().add(metric.value());
+                    accum.value_mut().aggregate(metric.value());
                 })
                 .or_insert_with(|| {
                     // accumulate with no partition
@@ -243,7 +245,7 @@ impl MetricsSet {
                         partition,
                         metric.labels().to_vec(),
                     );
-                    accum.value_mut().add(metric.value());
+                    accum.value_mut().aggregate(metric.value());
                     accum
                 });
         }
@@ -257,6 +259,25 @@ impl MetricsSet {
             metrics: new_metrics,
         }
     }
+
+    /// Sort the order of metrics so the "most useful" show up first
+    pub fn sorted_for_display(mut self) -> Self {
+        self.metrics
+            .sort_unstable_by_key(|metric| metric.value().display_sort_key());
+        self
+    }
+
+    /// remove all timestamp metrics (for more compact display
+    pub fn timestamps_removed(self) -> Self {
+        let Self { metrics } = self;
+
+        let metrics = metrics
+            .into_iter()
+            .filter(|m| !m.value.is_timestamp())
+            .collect::<Vec<_>>();
+
+        Self { metrics }
+    }
 }
 
 impl Display for MetricsSet {
@@ -351,6 +372,8 @@ impl Display for Label {
 mod tests {
     use std::time::Duration;
 
+    use chrono::{TimeZone, Utc};
+
     use super::*;
 
     #[test]
@@ -414,17 +437,17 @@ mod tests {
     }
 
     #[test]
-    fn test_cpu_time() {
+    fn test_elapsed_compute() {
         let metrics = ExecutionPlanMetricsSet::new();
-        assert!(metrics.clone_inner().cpu_time().is_none());
+        assert!(metrics.clone_inner().elapsed_compute().is_none());
 
         let partition = 1;
-        let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition);
-        cpu_time.add_duration(Duration::from_nanos(1234));
+        let elapsed_compute = 
MetricBuilder::new(&metrics).elapsed_compute(partition);
+        elapsed_compute.add_duration(Duration::from_nanos(1234));
 
-        let cpu_time = MetricBuilder::new(&metrics).cpu_time(partition + 1);
-        cpu_time.add_duration(Duration::from_nanos(6));
-        assert_eq!(metrics.clone_inner().cpu_time().unwrap(), 1240);
+        let elapsed_compute = 
MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
+        elapsed_compute.add_duration(Duration::from_nanos(6));
+        assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
     }
 
     #[test]
@@ -473,16 +496,16 @@ mod tests {
         let metrics = ExecutionPlanMetricsSet::new();
 
         // Note cpu_time1 has labels so it is not aggregated with 2 and 3
-        let cpu_time1 = MetricBuilder::new(&metrics)
+        let elapsed_compute1 = MetricBuilder::new(&metrics)
             .with_new_label("foo", "bar")
-            .cpu_time(1);
-        cpu_time1.add_duration(Duration::from_nanos(12));
+            .elapsed_compute(1);
+        elapsed_compute1.add_duration(Duration::from_nanos(12));
 
-        let cpu_time2 = MetricBuilder::new(&metrics).cpu_time(2);
-        cpu_time2.add_duration(Duration::from_nanos(34));
+        let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
+        elapsed_compute2.add_duration(Duration::from_nanos(34));
 
-        let cpu_time3 = MetricBuilder::new(&metrics).cpu_time(4);
-        cpu_time3.add_duration(Duration::from_nanos(56));
+        let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
+        elapsed_compute3.add_duration(Duration::from_nanos(56));
 
         let output_rows = MetricBuilder::new(&metrics).output_rows(1); // 
output rows
         output_rows.add(56);
@@ -490,16 +513,16 @@ mod tests {
         let aggregated = metrics.clone_inner().aggregate_by_partition();
 
         // cpu time should be aggregated:
-        let cpu_times = aggregated
+        let elapsed_computes = aggregated
             .iter()
             .filter(|metric| {
-                matches!(metric.value(), MetricValue::CPUTime(_))
+                matches!(metric.value(), MetricValue::ElapsedCompute(_))
                     && metric.labels().is_empty()
             })
             .collect::<Vec<_>>();
-        assert_eq!(cpu_times.len(), 1);
-        assert_eq!(cpu_times[0].value().as_usize(), 34 + 56);
-        assert!(cpu_times[0].partition().is_none());
+        assert_eq!(elapsed_computes.len(), 1);
+        assert_eq!(elapsed_computes[0].value().as_usize(), 34 + 56);
+        assert!(elapsed_computes[0].partition().is_none());
 
         // output rows should
         let output_rows = aggregated
@@ -525,4 +548,88 @@ mod tests {
         // can't aggregate time and count -- expect a panic
         metrics.clone_inner().aggregate_by_partition();
     }
+
+    #[test]
+    fn test_aggregate_partition_timestamps() {
+        let metrics = ExecutionPlanMetricsSet::new();
+
+        // 1431648000000000 == 1970-01-17 13:40:48 UTC
+        let t1 = Utc.timestamp_nanos(1431648000000000);
+        // 1531648000000000 == 1970-01-18 17:27:28 UTC
+        let t2 = Utc.timestamp_nanos(1531648000000000);
+        // 1631648000000000 == 1970-01-19 21:14:08 UTC
+        let t3 = Utc.timestamp_nanos(1631648000000000);
+        // 1731648000000000 == 1970-01-21 01:00:48 UTC
+        let t4 = Utc.timestamp_nanos(1731648000000000);
+
+        let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
+        start_timestamp0.set(t1);
+        let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
+        end_timestamp0.set(t2);
+        let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
+        start_timestamp1.set(t3);
+        let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
+        end_timestamp1.set(t4);
+
+        // aggregate
+        let aggregated = metrics.clone_inner().aggregate_by_partition();
+
+        let mut ts = aggregated
+            .iter()
+            .filter(|metric| {
+                matches!(metric.value(), MetricValue::StartTimestamp(_))
+                    && metric.labels().is_empty()
+            })
+            .collect::<Vec<_>>();
+        assert_eq!(ts.len(), 1);
+        match ts.remove(0).value() {
+            MetricValue::StartTimestamp(ts) => {
+                // expect earliest of t1, t2
+                assert_eq!(ts.value(), Some(t1));
+            }
+            _ => {
+                panic!("Not a timestamp");
+            }
+        };
+
+        let mut ts = aggregated
+            .iter()
+            .filter(|metric| {
+                matches!(metric.value(), MetricValue::EndTimestamp(_))
+                    && metric.labels().is_empty()
+            })
+            .collect::<Vec<_>>();
+        assert_eq!(ts.len(), 1);
+        match ts.remove(0).value() {
+            MetricValue::EndTimestamp(ts) => {
+                // expect latest of t3, t4
+                assert_eq!(ts.value(), Some(t4));
+            }
+            _ => {
+                panic!("Not a timestamp");
+            }
+        };
+    }
+
+    #[test]
+    fn test_sorted_for_display() {
+        let metrics = ExecutionPlanMetricsSet::new();
+        MetricBuilder::new(&metrics).end_timestamp(0);
+        MetricBuilder::new(&metrics).start_timestamp(0);
+        MetricBuilder::new(&metrics).elapsed_compute(0);
+        MetricBuilder::new(&metrics).counter("the_counter", 0);
+        MetricBuilder::new(&metrics).subset_time("the_time", 0);
+        MetricBuilder::new(&metrics).output_rows(0);
+        let metrics = metrics.clone_inner();
+
+        fn metric_names(metrics: &MetricsSet) -> String {
+            let n = metrics.iter().map(|m| 
m.value().name()).collect::<Vec<_>>();
+            n.join(", ")
+        }
+
+        assert_eq!("end_timestamp, start_timestamp, elapsed_compute, 
the_counter, the_time, output_rows", metric_names(&metrics));
+
+        let metrics = metrics.sorted_for_display();
+        assert_eq!("output_rows, elapsed_compute, the_counter, the_time, 
start_timestamp, end_timestamp", metric_names(&metrics));
+    }
 }
diff --git a/datafusion/src/physical_plan/metrics/value.rs 
b/datafusion/src/physical_plan/metrics/value.rs
index 61a7c2b..6f63583 100644
--- a/datafusion/src/physical_plan/metrics/value.rs
+++ b/datafusion/src/physical_plan/metrics/value.rs
@@ -19,13 +19,16 @@
 
 use std::{
     borrow::{Borrow, Cow},
+    fmt::Display,
     sync::{
         atomic::{AtomicUsize, Ordering},
-        Arc,
+        Arc, Mutex,
     },
     time::{Duration, Instant},
 };
 
+use chrono::{DateTime, Utc};
+
 /// A counter to record things such as number of input or output rows
 ///
 /// Note `clone`ing counters update the same underlying metrics
@@ -41,6 +44,12 @@ impl PartialEq for Count {
     }
 }
 
+impl Display for Count {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.value())
+    }
+}
+
 impl Count {
     /// create a new counter
     pub fn new() -> Self {
@@ -75,6 +84,13 @@ impl PartialEq for Time {
     }
 }
 
+impl Display for Time {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let duration = std::time::Duration::from_nanos(self.value() as u64);
+        write!(f, "{:?}", duration)
+    }
+}
+
 impl Time {
     /// Create a new [`Time`] wrapper suitable for recording elapsed
     /// times for operations.
@@ -116,6 +132,82 @@ impl Time {
     }
 }
 
+/// Stores a single timestamp, stored as the number of nanoseconds
+/// elapsed from Jan 1, 1970 UTC
+#[derive(Debug, Clone)]
+pub struct Timestamp {
+    /// Time thing started
+    timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
+}
+
+impl Timestamp {
+    /// Create a new timestamp and sets its value to 0
+    pub fn new() -> Self {
+        Self {
+            timestamp: Arc::new(Mutex::new(None)),
+        }
+    }
+
+    /// Sets the timestamps value to the current time
+    pub fn record(&self) {
+        self.set(Utc::now())
+    }
+
+    /// Sets the timestamps value to a specified time
+    pub fn set(&self, now: DateTime<Utc>) {
+        *self.timestamp.lock().unwrap() = Some(now);
+    }
+
+    /// return the timestamps value at the last time `record()` was
+    /// called.
+    ///
+    /// Returns `None` if `record()` has not been called
+    pub fn value(&self) -> Option<DateTime<Utc>> {
+        *self.timestamp.lock().unwrap()
+    }
+
+    /// sets the value of this timestamp to the minimum of this and other
+    pub fn update_to_min(&self, other: &Timestamp) {
+        let min = match (self.value(), other.value()) {
+            (None, None) => None,
+            (Some(v), None) => Some(v),
+            (None, Some(v)) => Some(v),
+            (Some(v1), Some(v2)) => Some(if v1 < v2 { v1 } else { v2 }),
+        };
+
+        *self.timestamp.lock().unwrap() = min;
+    }
+
+    /// sets the value of this timestamp to the maximum of this and other
+    pub fn update_to_max(&self, other: &Timestamp) {
+        let max = match (self.value(), other.value()) {
+            (None, None) => None,
+            (Some(v), None) => Some(v),
+            (None, Some(v)) => Some(v),
+            (Some(v1), Some(v2)) => Some(if v1 < v2 { v2 } else { v1 }),
+        };
+
+        *self.timestamp.lock().unwrap() = max;
+    }
+}
+
+impl PartialEq for Timestamp {
+    fn eq(&self, other: &Self) -> bool {
+        self.value().eq(&other.value())
+    }
+}
+
+impl Display for Timestamp {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self.value() {
+            None => write!(f, "NONE"),
+            Some(v) => {
+                write!(f, "{}", v)
+            }
+        }
+    }
+}
+
 /// RAAI structure that adds all time between its construction and
 /// destruction to the CPU time or the first call to `stop` whichever
 /// comes first
@@ -144,7 +236,7 @@ impl<'a> Drop for ScopedTimerGuard<'a> {
     }
 }
 
-/// Possible values for a metric.
+/// Possible values for a [`Metric`].
 ///
 /// Among other differences, the metric types have different ways to
 /// logically interpret their underlying values and some metrics are
@@ -153,8 +245,26 @@ impl<'a> Drop for ScopedTimerGuard<'a> {
 pub enum MetricValue {
     /// Number of output rows produced: "output_rows" metric
     OutputRows(Count),
-    /// CPU time: the "cpu_time" metric
-    CPUTime(Time),
+    /// Elapsed Compute Time: the wall clock time spent in "cpu
+    /// intensive" work.
+    ///
+    /// This measurement represents, roughly:
+    /// ```
+    /// use std::time::Instant;
+    /// let start = Instant::now();
+    /// // ...CPU intensive work here...
+    /// let elapsed_compute = (Instant::now() - start).as_nanos();
+    /// ```
+    ///
+    /// Note 1: Does *not* include time other operators spend
+    /// computing input.
+    ///
+    /// Note 2: *Does* includes time when the thread could have made
+    /// progress but the OS did not schedule it (e.g. due to CPU
+    /// contention), thus making this value different than the
+    /// classical defintion of "cpu_time", which is the time reported
+    /// from `clock_gettime(CLOCK_THREAD_CPUTIME_ID, ..)`.
+    ElapsedCompute(Time),
     /// Operator defined count.
     Count {
         /// The provided name of this metric
@@ -169,8 +279,10 @@ pub enum MetricValue {
         /// The value of the metric
         time: Time,
     },
-    // TODO timestamp, etc
-    // https://github.com/apache/arrow-datafusion/issues/866
+    /// The time at which execution started
+    StartTimestamp(Timestamp),
+    /// The time at which execution ended
+    EndTimestamp(Timestamp),
 }
 
 impl MetricValue {
@@ -178,9 +290,11 @@ impl MetricValue {
     pub fn name(&self) -> &str {
         match self {
             Self::OutputRows(_) => "output_rows",
-            Self::CPUTime(_) => "cpu_time",
+            Self::ElapsedCompute(_) => "elapsed_compute",
             Self::Count { name, .. } => name.borrow(),
             Self::Time { name, .. } => name.borrow(),
+            Self::StartTimestamp(_) => "start_timestamp",
+            Self::EndTimestamp(_) => "end_timestamp",
         }
     }
 
@@ -188,9 +302,17 @@ impl MetricValue {
     pub fn as_usize(&self) -> usize {
         match self {
             Self::OutputRows(count) => count.value(),
-            Self::CPUTime(time) => time.value(),
+            Self::ElapsedCompute(time) => time.value(),
             Self::Count { count, .. } => count.value(),
             Self::Time { time, .. } => time.value(),
+            Self::StartTimestamp(timestamp) => timestamp
+                .value()
+                .map(|ts| ts.timestamp_nanos() as usize)
+                .unwrap_or(0),
+            Self::EndTimestamp(timestamp) => timestamp
+                .value()
+                .map(|ts| ts.timestamp_nanos() as usize)
+                .unwrap_or(0),
         }
     }
 
@@ -199,7 +321,7 @@ impl MetricValue {
     pub fn new_empty(&self) -> Self {
         match self {
             Self::OutputRows(_) => Self::OutputRows(Count::new()),
-            Self::CPUTime(_) => Self::CPUTime(Time::new()),
+            Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
             Self::Count { name, .. } => Self::Count {
                 name: name.clone(),
                 count: Count::new(),
@@ -208,18 +330,21 @@ impl MetricValue {
                 name: name.clone(),
                 time: Time::new(),
             },
+            Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
+            Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
         }
     }
 
-    /// Add the value of other to `self`. panic's if the type is mismatched or
-    /// aggregating does not make sense for this value
+    /// Aggregates the value of other to `self`. panic's if the types
+    /// are mismatched or aggregating does not make sense for this
+    /// value
     ///
     /// Note this is purposely marked `mut` (even though atomics are
     /// used) so Rust's type system can be used to ensure the
     /// appropriate API access. `MetricValues` should be modified
     /// using the original [`Count`] or [`Time`] they were created
     /// from.
-    pub fn add(&mut self, other: &Self) {
+    pub fn aggregate(&mut self, other: &Self) {
         match (self, other) {
             (Self::OutputRows(count), Self::OutputRows(other_count))
             | (
@@ -228,13 +353,21 @@ impl MetricValue {
                     count: other_count, ..
                 },
             ) => count.add(other_count.value()),
-            (Self::CPUTime(time), Self::CPUTime(other_time))
+            (Self::ElapsedCompute(time), Self::ElapsedCompute(other_time))
             | (
                 Self::Time { time, .. },
                 Self::Time {
                     time: other_time, ..
                 },
             ) => time.add(other_time),
+            // timestamps are aggregated by min/max
+            (Self::StartTimestamp(timestamp), 
Self::StartTimestamp(other_timestamp)) => {
+                timestamp.update_to_min(other_timestamp);
+            }
+            // timestamps are aggregated by min/max
+            (Self::EndTimestamp(timestamp), 
Self::EndTimestamp(other_timestamp)) => {
+                timestamp.update_to_max(other_timestamp);
+            }
             m @ (_, _) => {
                 panic!(
                     "Mismatched metric types. Can not aggregate {:?} with 
value {:?}",
@@ -243,6 +376,24 @@ impl MetricValue {
             }
         }
     }
+
+    /// Returns a number by which to sort metrics by display. Lower
+    /// numbers are "more useful" (and displayed first)
+    pub fn display_sort_key(&self) -> u8 {
+        match self {
+            Self::OutputRows(_) => 0,     // show first
+            Self::ElapsedCompute(_) => 1, // show second
+            Self::Count { .. } => 2,
+            Self::Time { .. } => 3,
+            Self::StartTimestamp(_) => 4, // show timestamps last
+            Self::EndTimestamp(_) => 5,
+        }
+    }
+
+    /// returns true if this metric has a timestamp value
+    pub fn is_timestamp(&self) -> bool {
+        matches!(self, Self::StartTimestamp(_) | Self::EndTimestamp(_))
+    }
 }
 
 impl std::fmt::Display for MetricValue {
@@ -250,12 +401,94 @@ impl std::fmt::Display for MetricValue {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             Self::OutputRows(count) | Self::Count { count, .. } => {
-                write!(f, "{}", count.value())
+                write!(f, "{}", count)
+            }
+            Self::ElapsedCompute(time) | Self::Time { time, .. } => {
+                // distinguish between no time recorded and very small
+                // amount of time recorded
+                if time.value() > 0 {
+                    write!(f, "{}", time)
+                } else {
+                    write!(f, "NOT RECORDED")
+                }
             }
-            Self::CPUTime(time) | Self::Time { time, .. } => {
-                let duration = std::time::Duration::from_nanos(time.value() as 
u64);
-                write!(f, "{:?}", duration)
+            Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => 
{
+                write!(f, "{}", timestamp)
             }
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use chrono::TimeZone;
+
+    use super::*;
+
+    #[test]
+    fn test_display_output_rows() {
+        let count = Count::new();
+        let values = vec![
+            MetricValue::OutputRows(count.clone()),
+            MetricValue::Count {
+                name: "my_counter".into(),
+                count: count.clone(),
+            },
+        ];
+
+        for value in &values {
+            assert_eq!("0", value.to_string(), "value {:?}", value);
+        }
+
+        count.add(42);
+        for value in &values {
+            assert_eq!("42", value.to_string(), "value {:?}", value);
+        }
+    }
+
+    #[test]
+    fn test_display_time() {
+        let time = Time::new();
+        let values = vec![
+            MetricValue::ElapsedCompute(time.clone()),
+            MetricValue::Time {
+                name: "my_time".into(),
+                time: time.clone(),
+            },
+        ];
+
+        // if time is not set, it should not be reported as zero
+        for value in &values {
+            assert_eq!("NOT RECORDED", value.to_string(), "value {:?}", value);
+        }
+
+        time.add_duration(Duration::from_nanos(1042));
+        for value in &values {
+            assert_eq!("1.042µs", value.to_string(), "value {:?}", value);
+        }
+    }
+
+    #[test]
+    fn test_display_timestamp() {
+        let timestamp = Timestamp::new();
+        let values = vec![
+            MetricValue::StartTimestamp(timestamp.clone()),
+            MetricValue::EndTimestamp(timestamp.clone()),
+        ];
+
+        // if time is not set, it should not be reported as zero
+        for value in &values {
+            assert_eq!("NONE", value.to_string(), "value {:?}", value);
+        }
+
+        timestamp.set(Utc.timestamp_nanos(1431648000000000));
+        for value in &values {
+            assert_eq!(
+                "1970-01-17 13:40:48 UTC",
+                value.to_string(),
+                "value {:?}",
+                value
+            );
+        }
+    }
+}
diff --git a/datafusion/src/physical_plan/sort.rs 
b/datafusion/src/physical_plan/sort.rs
index f1346b5..df77a16 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -155,13 +155,14 @@ impl ExecutionPlan for SortExec {
 
         let output_rows = 
MetricBuilder::new(&self.metrics).output_rows(partition);
 
-        let cpu_time = MetricBuilder::new(&self.metrics).cpu_time(partition);
+        let elapsed_compute =
+            MetricBuilder::new(&self.metrics).elapsed_compute(partition);
 
         Ok(Box::pin(SortStream::new(
             input,
             self.expr.clone(),
             output_rows,
-            cpu_time,
+            elapsed_compute,
         )))
     }
 
@@ -436,7 +437,7 @@ mod tests {
 
         let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
         let metrics = sort_exec.metrics().unwrap();
-        assert!(metrics.cpu_time().unwrap() > 0);
+        assert!(metrics.elapsed_compute().unwrap() > 0);
         assert_eq!(metrics.output_rows().unwrap(), 8);
         assert_eq!(result.len(), 1);
 
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index a2e84e1..8aae3d9 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2198,7 +2198,8 @@ async fn csv_explain_analyze() {
 
     // Only test basic plumbing and try to avoid having to change too
     // many things
-    let needle = "RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), 
metrics=[";
+    let needle =
+        "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute=NOT 
RECORDED";
     assert_contains!(&formatted, needle);
 
     let verbose_needle = "Output Rows";

Reply via email to