This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fbafea4366 feat: Support defining custom MetricValues in PhysicalPlans 
(#16195)
fbafea4366 is described below

commit fbafea4366a959f0f696641a6c877dc04c240381
Author: Sami Tabet <24794538+sfl...@users.noreply.github.com>
AuthorDate: Fri Jun 6 22:18:25 2025 +0200

    feat: Support defining custom MetricValues in PhysicalPlans (#16195)
    
    See this issue: https://github.com/apache/datafusion/issues/16044
    
    The MetricValue enum currently exposes only single-value statistics:
    counts, gauges, timers, timestamps, and a few hard-coded variants
    such as SpillCount or OutputRows.
    
    However there's often the need for custom metrics when using custom
    PhysicalPlans. At Datadog for instance we had the need for tracking the
    distribution of latencies of the sub-queries issued by a given phyiscal
    plan to be able to pin-point outliers.
    
    Similarly tracking the topN slowest sub-query is something that has been
    quite useful to help us debug slow queries.
    
    This PR allows each user to define their own MetricValue types as long
    as they are aggregatable. A very basic example is included in the PR
    using a custom counter.
---
 datafusion/physical-plan/src/metrics/custom.rs | 113 +++++++++++++++
 datafusion/physical-plan/src/metrics/mod.rs    |   3 +
 datafusion/physical-plan/src/metrics/value.rs  | 187 ++++++++++++++++++++++++-
 3 files changed, 296 insertions(+), 7 deletions(-)

diff --git a/datafusion/physical-plan/src/metrics/custom.rs 
b/datafusion/physical-plan/src/metrics/custom.rs
new file mode 100644
index 0000000000..546af6f333
--- /dev/null
+++ b/datafusion/physical-plan/src/metrics/custom.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Custom metric value type.
+
+use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc};
+
+/// A trait for implementing custom metric values.
+///
+/// This trait enables defining application- or operator-specific metric types
+/// that can be aggregated and displayed alongside standard metrics. These
+/// custom metrics integrate with [`MetricValue::Custom`] and support
+/// aggregation logic, introspection, and optional numeric representation.
+///
+/// # Requirements
+/// Implementations of `CustomMetricValue` must satisfy the following:
+///
+/// 1. [`Self::aggregate`]: Defines how two metric values are combined
+/// 2. [`Self::new_empty`]: Returns a new, zero-value instance for accumulation
+/// 3. [`Self::as_any`]: Enables dynamic downcasting for type-specific 
operations
+/// 4. [`Self::as_usize`]: Optionally maps the value to a `usize` (for 
sorting, display, etc.)
+/// 5. [`Self::is_eq`]: Implements comparison between two values, this isn't 
reusing the std
+///    PartialEq trait because this trait is used dynamically in the context of
+///    [`MetricValue::Custom`]
+///
+/// # Examples
+/// ```
+/// # use std::sync::Arc;
+/// # use std::fmt::{Debug, Display};
+/// # use std::any::Any;
+/// # use std::sync::atomic::{AtomicUsize, Ordering};
+///
+/// # use datafusion_physical_plan::metrics::CustomMetricValue;
+///
+/// #[derive(Debug, Default)]
+/// struct MyCounter {
+///     count: AtomicUsize,
+/// }
+///
+/// impl Display for MyCounter {
+///     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+///         write!(f, "count: {}", self.count.load(Ordering::Relaxed))
+///     }
+/// }
+///
+/// impl CustomMetricValue for MyCounter {
+///     fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
+///         Arc::new(Self::default())
+///     }
+///
+///     fn aggregate(&self, other: Arc<dyn CustomMetricValue>) {
+///         let other = other.as_any().downcast_ref::<Self>().unwrap();
+///         self.count.fetch_add(other.count.load(Ordering::Relaxed), 
Ordering::Relaxed);
+///     }
+///
+///     fn as_any(&self) -> &dyn Any {
+///         self
+///     }
+///
+///     fn as_usize(&self) -> usize {
+///         self.count.load(Ordering::Relaxed)
+///     }
+///
+///     fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
+///         let Some(other) = other.as_any().downcast_ref::<Self>() else {
+///             return false;
+///         };
+///
+///         self.count.load(Ordering::Relaxed) == 
other.count.load(Ordering::Relaxed)
+///     }
+/// }
+/// ```
+///
+/// [`MetricValue::Custom`]: super::MetricValue::Custom
+pub trait CustomMetricValue: Display + Debug + Send + Sync {
+    /// Returns a new, zero-initialized version of this metric value.
+    ///
+    /// This value is used during metric aggregation to accumulate results.
+    fn new_empty(&self) -> Arc<dyn CustomMetricValue>;
+
+    /// Merges another metric value into this one.
+    ///
+    /// The type of `other` could be of a different custom type as long as 
it's aggregatable into self.
+    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>);
+
+    /// Returns this value as a [`Any`] to support dynamic downcasting.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Optionally returns a numeric representation of the value, if 
meaningful.
+    /// Otherwise will default to zero.
+    ///
+    /// This is used for sorting and summarizing metrics.
+    fn as_usize(&self) -> usize {
+        0
+    }
+
+    /// Compares this value with another custom value.
+    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool;
+}
diff --git a/datafusion/physical-plan/src/metrics/mod.rs 
b/datafusion/physical-plan/src/metrics/mod.rs
index 2ac7ac1299..87783eada8 100644
--- a/datafusion/physical-plan/src/metrics/mod.rs
+++ b/datafusion/physical-plan/src/metrics/mod.rs
@@ -19,6 +19,7 @@
 
 mod baseline;
 mod builder;
+mod custom;
 mod value;
 
 use parking_lot::Mutex;
@@ -33,6 +34,7 @@ use datafusion_common::HashMap;
 // public exports
 pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics};
 pub use builder::MetricBuilder;
+pub use custom::CustomMetricValue;
 pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
 
 /// Something that tracks a value of interest (metric) of a DataFusion
@@ -263,6 +265,7 @@ impl MetricsSet {
             MetricValue::Gauge { name, .. } => name == metric_name,
             MetricValue::StartTimestamp(_) => false,
             MetricValue::EndTimestamp(_) => false,
+            MetricValue::Custom { .. } => false,
         })
     }
 
diff --git a/datafusion/physical-plan/src/metrics/value.rs 
b/datafusion/physical-plan/src/metrics/value.rs
index 249cd5edb1..1cc4a4fbcb 100644
--- a/datafusion/physical-plan/src/metrics/value.rs
+++ b/datafusion/physical-plan/src/metrics/value.rs
@@ -17,9 +17,14 @@
 
 //! Value representation of metrics
 
+use super::CustomMetricValue;
+use chrono::{DateTime, Utc};
+use datafusion_common::instant::Instant;
+use datafusion_execution::memory_pool::human_readable_size;
+use parking_lot::Mutex;
 use std::{
     borrow::{Borrow, Cow},
-    fmt::Display,
+    fmt::{Debug, Display},
     sync::{
         atomic::{AtomicUsize, Ordering},
         Arc,
@@ -27,11 +32,6 @@ use std::{
     time::Duration,
 };
 
-use chrono::{DateTime, Utc};
-use datafusion_common::instant::Instant;
-use datafusion_execution::memory_pool::human_readable_size;
-use parking_lot::Mutex;
-
 /// A counter to record things such as number of input or output rows
 ///
 /// Note `clone`ing counters update the same underlying metrics
@@ -344,7 +344,7 @@ impl Drop for ScopedTimerGuard<'_> {
 /// Among other differences, the metric types have different ways to
 /// logically interpret their underlying values and some metrics are
 /// so common they are given special treatment.
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
 pub enum MetricValue {
     /// Number of output rows produced: "output_rows" metric
     OutputRows(Count),
@@ -401,6 +401,78 @@ pub enum MetricValue {
     StartTimestamp(Timestamp),
     /// The time at which execution ended
     EndTimestamp(Timestamp),
+    Custom {
+        /// The provided name of this metric
+        name: Cow<'static, str>,
+        /// A custom implementation of the metric value.
+        value: Arc<dyn CustomMetricValue>,
+    },
+}
+
+// Manually implement PartialEq for `MetricValue` because it contains 
CustomMetricValue in its
+// definition which is a dyn trait. This wouldn't allow us to just derive 
PartialEq.
+impl PartialEq for MetricValue {
+    fn eq(&self, other: &Self) -> bool {
+        match (self, other) {
+            (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) 
=> {
+                count == other
+            }
+            (MetricValue::ElapsedCompute(time), 
MetricValue::ElapsedCompute(other)) => {
+                time == other
+            }
+            (MetricValue::SpillCount(count), MetricValue::SpillCount(other)) 
=> {
+                count == other
+            }
+            (MetricValue::SpilledBytes(count), 
MetricValue::SpilledBytes(other)) => {
+                count == other
+            }
+            (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) 
=> {
+                count == other
+            }
+            (
+                MetricValue::CurrentMemoryUsage(gauge),
+                MetricValue::CurrentMemoryUsage(other),
+            ) => gauge == other,
+            (
+                MetricValue::Count { name, count },
+                MetricValue::Count {
+                    name: other_name,
+                    count: other_count,
+                },
+            ) => name == other_name && count == other_count,
+            (
+                MetricValue::Gauge { name, gauge },
+                MetricValue::Gauge {
+                    name: other_name,
+                    gauge: other_gauge,
+                },
+            ) => name == other_name && gauge == other_gauge,
+            (
+                MetricValue::Time { name, time },
+                MetricValue::Time {
+                    name: other_name,
+                    time: other_time,
+                },
+            ) => name == other_name && time == other_time,
+
+            (
+                MetricValue::StartTimestamp(timestamp),
+                MetricValue::StartTimestamp(other),
+            ) => timestamp == other,
+            (MetricValue::EndTimestamp(timestamp), 
MetricValue::EndTimestamp(other)) => {
+                timestamp == other
+            }
+            (
+                MetricValue::Custom { name, value },
+                MetricValue::Custom {
+                    name: other_name,
+                    value: other_value,
+                },
+            ) => name == other_name && value.is_eq(other_value),
+            // Default case when the two sides do not have the same type.
+            _ => false,
+        }
+    }
 }
 
 impl MetricValue {
@@ -418,6 +490,7 @@ impl MetricValue {
             Self::Time { name, .. } => name.borrow(),
             Self::StartTimestamp(_) => "start_timestamp",
             Self::EndTimestamp(_) => "end_timestamp",
+            Self::Custom { name, .. } => name.borrow(),
         }
     }
 
@@ -443,6 +516,7 @@ impl MetricValue {
                 .and_then(|ts| ts.timestamp_nanos_opt())
                 .map(|nanos| nanos as usize)
                 .unwrap_or(0),
+            Self::Custom { value, .. } => value.as_usize(),
         }
     }
 
@@ -470,6 +544,10 @@ impl MetricValue {
             },
             Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
             Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
+            Self::Custom { name, value } => Self::Custom {
+                name: name.clone(),
+                value: value.new_empty(),
+            },
         }
     }
 
@@ -516,6 +594,14 @@ impl MetricValue {
             (Self::EndTimestamp(timestamp), 
Self::EndTimestamp(other_timestamp)) => {
                 timestamp.update_to_max(other_timestamp);
             }
+            (
+                Self::Custom { value, .. },
+                Self::Custom {
+                    value: other_value, ..
+                },
+            ) => {
+                value.aggregate(Arc::clone(other_value));
+            }
             m @ (_, _) => {
                 panic!(
                     "Mismatched metric types. Can not aggregate {:?} with 
value {:?}",
@@ -540,6 +626,7 @@ impl MetricValue {
             Self::Time { .. } => 8,
             Self::StartTimestamp(_) => 9, // show timestamps last
             Self::EndTimestamp(_) => 10,
+            Self::Custom { .. } => 11,
         }
     }
 
@@ -578,17 +665,103 @@ impl Display for MetricValue {
             Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => 
{
                 write!(f, "{timestamp}")
             }
+            Self::Custom { name, value } => {
+                write!(f, "name:{name} {value}")
+            }
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
+    use std::any::Any;
+
     use chrono::TimeZone;
     use datafusion_execution::memory_pool::units::MB;
 
     use super::*;
 
+    #[derive(Debug, Default)]
+    pub struct CustomCounter {
+        count: AtomicUsize,
+    }
+
+    impl Display for CustomCounter {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "count: {}", self.count.load(Ordering::Relaxed))
+        }
+    }
+
+    impl CustomMetricValue for CustomCounter {
+        fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
+            Arc::new(CustomCounter::default())
+        }
+
+        fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
+            let other = other.as_any().downcast_ref::<Self>().unwrap();
+            self.count
+                .fetch_add(other.count.load(Ordering::Relaxed), 
Ordering::Relaxed);
+        }
+
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
+            let Some(other) = other.as_any().downcast_ref::<Self>() else {
+                return false;
+            };
+
+            self.count.load(Ordering::Relaxed) == 
other.count.load(Ordering::Relaxed)
+        }
+    }
+
+    fn new_custom_counter(name: &'static str, value: usize) -> MetricValue {
+        let custom_counter = CustomCounter::default();
+        custom_counter.count.fetch_add(value, Ordering::Relaxed);
+        let custom_val = MetricValue::Custom {
+            name: Cow::Borrowed(name),
+            value: Arc::new(custom_counter),
+        };
+
+        custom_val
+    }
+
+    #[test]
+    fn test_custom_metric_with_mismatching_names() {
+        let mut custom_val = new_custom_counter("Hi", 1);
+        let other_custom_val = new_custom_counter("Hello", 1);
+
+        // Not equal since the name differs.
+        assert!(other_custom_val != custom_val);
+
+        // Should work even though the name differs
+        custom_val.aggregate(&other_custom_val);
+
+        let expected_val = new_custom_counter("Hi", 2);
+        assert!(expected_val == custom_val);
+    }
+
+    #[test]
+    fn test_custom_metric() {
+        let mut custom_val = new_custom_counter("hi", 11);
+        let other_custom_val = new_custom_counter("hi", 20);
+
+        custom_val.aggregate(&other_custom_val);
+
+        assert!(custom_val != other_custom_val);
+
+        if let MetricValue::Custom { value, .. } = custom_val {
+            let counter = value
+                .as_any()
+                .downcast_ref::<CustomCounter>()
+                .expect("Expected CustomCounter");
+            assert_eq!(counter.count.load(Ordering::Relaxed), 31);
+        } else {
+            panic!("Unexpected value");
+        }
+    }
+
     #[test]
     fn test_display_output_rows() {
         let count = Count::new();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to