alamb commented on a change in pull request #901: URL: https://github.com/apache/arrow-datafusion/pull/901#discussion_r691527618
########## File path: datafusion/src/physical_plan/metrics.rs ########## @@ -0,0 +1,461 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metrics for recording information about execution + +pub mod wrappers; + +use std::{ + fmt::{Debug, Display}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, +}; + +use hashbrown::HashMap; + +use self::wrappers::{Count, Time}; + +/// Structure for constructing metrics, counters, timers, etc +pub struct MetricBuilder<'a> { + /// Location that the metric created by this builder will be added do + metrics: &'a SharedMetricsSet, + + /// optional partition number + partition: Option<usize>, + + /// arbitrary name=value pairs identifiying this metric + labels: Vec<Label>, +} + +impl<'a> MetricBuilder<'a> { + /// Create a new `MetricBuilder` that will register the result of `build()` with the `metrics` + pub fn new(metrics: &'a SharedMetricsSet) -> Self { + Self { + metrics, + partition: None, + labels: vec![], + } + } + + /// Add a label to the metric being constructed + pub fn with_label(mut self, label: Label) -> Self { + self.labels.push(label); + self + } + + /// Add a label to the metric being constructed + pub fn with_new_label( + self, + name: impl Into<Arc<str>>, + value: impl Into<Arc<str>>, + ) -> Self { + self.with_label(Label::new(name.into(), value.into())) + } + + /// Set the partition of the metric being constructed + pub fn with_partition(mut self, partition: usize) -> Self { + self.partition = Some(partition); + self + } + + /// Consume self and create a metric of the specified kind + /// registered with the MetricsSet + pub fn build(self, kind: MetricKind) -> Arc<SQLMetric> { + let Self { + labels, + partition, + metrics, + } = self; + let metric = Arc::new(SQLMetric::new_with_labels(kind, partition, labels)); + metrics.register(metric.clone()); + metric + } + + /// Consume self and create a new counter for recording output rows + pub fn output_rows(self, partition: usize) -> Count { + let metric = self.with_partition(partition).build(MetricKind::OutputRows); + Count::new(metric) + } + + /// Consumes self and creates a new Countr for recording + /// some metric of an operators + pub fn counter(self, counter_name: impl Into<Arc<str>>, partition: usize) -> Count { + let metric = self + .with_partition(partition) + .build(MetricKind::Custom(counter_name.into())); + Count::new(metric) + } + + /// Consumes self and creates a new Counter for recording + /// some metric of an overall operator (not per partition + pub fn global_counter(self, counter_name: impl Into<Arc<str>>) -> Count { + let metric = self.build(MetricKind::Custom(counter_name.into())); + Count::new(metric) + } + + /// Consume self and create a new Timer for recording the overall cpu time + /// spent by an operator + pub fn cpu_time(self, partition: usize) -> Time { + let metric = self.with_partition(partition).build(MetricKind::CPUTime); + Time::new(metric) + } + + /// Consumes self and creates a new Timer for recording some + /// subset of of an operators execution time + pub fn subset_time(self, subset_name: impl Into<Arc<str>>, partition: usize) -> Time { + let metric = self + .with_partition(partition) + .build(MetricKind::Custom(subset_name.into())); + Time::new(metric) + } +} + +/// Something that tracks the metrics of an execution using an atomic +/// usize +#[derive(Debug)] +pub struct SQLMetric { + /// value of the metric + value: AtomicUsize, + + /// arbitrary name=value pairs identifiying this metric + labels: Vec<Label>, + + /// To which partition of an operators output did this metric + /// apply? If None means all partitions. + partition: Option<usize>, + + /// The kind of metric (how to logically interpret the value) + kind: MetricKind, +} + +impl Display for SQLMetric { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.kind)?; + + let mut iter = self + .partition + .iter() + .map(|partition| { + Label::new( + Arc::from("partition"), + Arc::from(partition.to_string().as_str()), + ) + }) + .chain(self.labels().iter().cloned()) + .peekable(); + + // print out the labels specially + if iter.peek().is_some() { + write!(f, "{{")?; + + let mut is_first = true; + for i in iter { + if !is_first { + write!(f, ", ")?; + } else { + is_first = false; + } + + write!(f, "{}", i)?; + } + + write!(f, "}}")?; + } + + // and now the value + let format_as_duration = match &self.kind { + MetricKind::OutputRows => false, + MetricKind::CPUTime => true, + MetricKind::Custom(name) => name.contains("Time") || name.contains("time"), + }; + + if format_as_duration { + let duration = std::time::Duration::from_nanos(self.value() as u64); + write!(f, "={:?}", duration) + } else { + write!(f, "={}", self.value()) + } + } +} + +impl SQLMetric { + /// Create a new SQLMetric + pub fn new(kind: MetricKind, partition: Option<usize>) -> Self { + Self { + value: 0.into(), + labels: vec![], + partition, + kind, + } + } + + /// Add a new label to this metric + pub fn new_with_labels( + kind: MetricKind, + partition: Option<usize>, + labels: Vec<Label>, + ) -> Self { + Self { + value: 0.into(), + labels, + partition, + kind, + } + } + + /// Add a new label to this metric + pub fn with(mut self, label: Label) -> Self { + self.labels.push(label); + self + } + + /// Add the standard name for output rows + + /// Get the current value + pub fn value(&self) -> usize { + self.value.load(Ordering::Relaxed) + } + + /// get the kind of this metric + pub fn kind(&self) -> &MetricKind { + &self.kind + } + + /// Add `n` to the metric's value + pub fn add(&self, n: usize) { + // relaxed ordering for operations on `value` poses no issues + // we're purely using atomic ops with no associated memory ops + self.value.fetch_add(n, Ordering::Relaxed); + } + + /// Set the metric's value to `n` + pub fn set(&self, n: usize) { + self.value.store(n, Ordering::Relaxed); + } + + /// What labels are present for this metric? + fn labels(&self) -> &[Label] { + &self.labels + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// How should the value of the metric be interpreted? +pub enum MetricKind { + /// Number of output rows produced + OutputRows, + /// CPU time + CPUTime, + // TODO timestamp, etc + // https://github.com/apache/arrow-datafusion/issues/866 + /// Arbitarary user defined type + Custom(Arc<str>), Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org