alamb commented on a change in pull request #908:
URL: https://github.com/apache/arrow-datafusion/pull/908#discussion_r694167112



##########
File path: datafusion/src/physical_plan/metrics/mod.rs
##########
@@ -0,0 +1,528 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Metrics for recording information about execution
+
+mod builder;
+mod value;
+
+use std::{
+    borrow::Cow,
+    fmt::{Debug, Display},
+    sync::{Arc, Mutex},
+};
+
+use hashbrown::HashMap;
+
+// public exports
+pub use builder::MetricBuilder;
+pub use value::{Count, MetricValue, ScopedTimerGuard, Time};
+
+/// Something that tracks a value of interest (metric) of a DataFusion
+/// [`ExecutionPlan`] execution.
+///
+/// Typically [`Metric`]s are not created directly, but instead
+/// are created using [`MetricBuilder`] or methods on
+/// [`ExecutionPlanMetricsSet`].
+///
+/// ```
+///  use datafusion::physical_plan::metrics::*;
+///
+///  let metrics = ExecutionPlanMetricsSet::new();
+///  assert!(metrics.clone_inner().output_rows().is_none());
+///
+///  // Create a counter to increment using the MetricBuilder
+///  let partition = 1;
+///  let output_rows = MetricBuilder::new(&metrics)
+///      .output_rows(partition);
+///
+///  // Counter can be incremented
+///  output_rows.add(13);
+///
+///  // The value can be retrieved directly:
+///  assert_eq!(output_rows.value(), 13);
+///
+///  // As well as from the metrics set
+///  assert_eq!(metrics.clone_inner().output_rows(), Some(13));
+/// ```
+
+#[derive(Debug)]
+pub struct Metric {
+    /// The value the metric
+    value: MetricValue,
+
+    /// arbitrary name=value pairs identifiying this metric
+    labels: Vec<Label>,
+
+    /// To which partition of an operators output did this metric
+    /// apply? If None means all partitions.
+    partition: Option<usize>,
+}
+
+impl Display for Metric {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.value.name())?;
+
+        let mut iter = self
+            .partition
+            .iter()
+            .map(|partition| Label::new("partition", partition.to_string()))
+            .chain(self.labels().iter().cloned())
+            .peekable();
+
+        // print out the labels specially
+        if iter.peek().is_some() {
+            write!(f, "{{")?;
+
+            let mut is_first = true;
+            for i in iter {
+                if !is_first {
+                    write!(f, ", ")?;
+                } else {
+                    is_first = false;
+                }
+
+                write!(f, "{}", i)?;
+            }
+
+            write!(f, "}}")?;
+        }
+
+        // and now the value
+        write!(f, "={}", self.value)
+    }
+}
+
+impl Metric {
+    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+    /// rather than this function directly.
+    pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
+        Self {
+            value,
+            labels: vec![],
+            partition,
+        }
+    }
+
+    /// Create a new [`Metric`]. Consider using [`MetricBuilder`]
+    /// rather than this function directly.
+    pub fn new_with_labels(
+        value: MetricValue,
+        partition: Option<usize>,
+        labels: Vec<Label>,
+    ) -> Self {
+        Self {
+            value,
+            labels,
+            partition,
+        }
+    }
+
+    /// Add a new label to this metric
+    pub fn with(mut self, label: Label) -> Self {
+        self.labels.push(label);
+        self
+    }
+
+    /// What labels are present for this metric?
+    fn labels(&self) -> &[Label] {
+        &self.labels
+    }
+
+    /// return a reference to the value of this metric
+    pub fn value(&self) -> &MetricValue {
+        &self.value
+    }
+
+    /// return a mutable reference to the value of this metric
+    pub fn value_mut(&mut self) -> &mut MetricValue {
+        &mut self.value
+    }
+
+    /// return a reference to the partition
+    pub fn partition(&self) -> &Option<usize> {
+        &self.partition
+    }
+}
+
+/// A snapshot of the metrics for a particular operator (`dyn
+/// ExecutionPlan`).
+#[derive(Default, Debug, Clone)]
+pub struct MetricsSet {
+    metrics: Vec<Arc<Metric>>,
+}
+
+impl MetricsSet {
+    /// Create a new container of metrics
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Add the specified metric
+    pub fn push(&mut self, metric: Arc<Metric>) {
+        self.metrics.push(metric)
+    }
+
+    /// Returns an interator across all metrics
+    pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
+        self.metrics.iter()
+    }
+
+    /// convenience: return the number of rows produced, aggregated
+    /// across partitions or None if no metric is present
+    pub fn output_rows(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// convenience: return the amount of CPU time spent, aggregated
+    /// across partitions or None if no metric is present
+    pub fn cpu_time(&self) -> Option<usize> {
+        self.sum(|metric| matches!(metric.value(), MetricValue::CPUTime(_)))
+            .map(|v| v.as_usize())
+    }
+
+    /// Sums the values for metrics for which `f(metric)` returns
+    /// true, and returns the value. Returns None if no metrics match
+    /// the predicate.
+    pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
+    where
+        F: FnMut(&Metric) -> bool,
+    {
+        let mut iter = self
+            .metrics
+            .iter()
+            .filter(|metric| f(metric.as_ref()))
+            .peekable();
+
+        let mut accum = match iter.peek() {
+            None => {
+                return None;
+            }
+            Some(metric) => metric.value().new_empty(),
+        };
+
+        iter.for_each(|metric| accum.add(metric.value()));
+
+        Some(accum)
+    }
+
+    /// Returns returns a new derived `MetricsSet` where all metrics
+    /// that had the same name and partition=`Some(..)` have been
+    /// aggregated together. The resulting `MetricsSet` has all
+    /// metrics with `Partition=None`
+    pub fn aggregate_by_partition(&self) -> Self {

Review comment:
       Here is the code that can aggregate metrics by partition (and `sum` 
above is also useful)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to