This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch metrics_serialize in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit ffb1178b12431c23a0a66f3d4f33aad4a2bcee70 Author: xudong963 <wxd963996...@gmail.com> AuthorDate: Tue Aug 12 10:45:59 2025 +0800 save, need to address the serde for custom --- Cargo.lock | 1 + Cargo.toml | 2 +- datafusion/physical-plan/Cargo.toml | 1 + datafusion/physical-plan/src/metrics/mod.rs | 2 +- datafusion/physical-plan/src/metrics/value.rs | 210 +++++++++++++++++++++++++- 5 files changed, 208 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de7f6b2e38..e1f1c37e7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2519,6 +2519,7 @@ dependencies = [ "rstest", "rstest_reuse", "serde", + "serde_json", "tempfile", "tokio", "typetag", diff --git a/Cargo.toml b/Cargo.toml index 1335361708..32d58b89d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,7 +105,7 @@ arrow-schema = { version = "56.0.0", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.8" bytes = "1.10" -chrono = { version = "0.4.41", default-features = false } +chrono = { version = "0.4.41", default-features = false, features = ["serde"] } criterion = "0.5.1" ctor = "0.4.3" dashmap = "6.0.1" diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 85feab92f3..8bf5d9af4b 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -65,6 +65,7 @@ log = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" serde = { version = "1.0", features = ["derive"] } +serde_json = { workspace = true } tokio = { workspace = true } typetag = "0.2" diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 74e85c882f..80e4236568 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -171,7 +171,7 @@ impl Metric { /// A snapshot of the metrics for a particular ([`ExecutionPlan`]). /// /// [`ExecutionPlan`]: super::ExecutionPlan -#[derive(Default, Debug, Clone, Serialize)] +#[derive(Default, Debug, Clone)] pub struct MetricsSet { metrics: Vec<Arc<Metric>>, } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index e1c67b4752..99a4177149 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -22,7 +22,7 @@ use chrono::{DateTime, Utc}; use datafusion_common::instant::Instant; use datafusion_execution::memory_pool::human_readable_size; use parking_lot::Mutex; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::{ borrow::{Borrow, Cow}, fmt::{Debug, Display}, @@ -36,7 +36,7 @@ use std::{ /// A counter to record things such as number of input or output rows /// /// Note `clone`ing counters update the same underlying metrics -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone)] pub struct Count { /// value of the metric counter value: Arc<AtomicUsize>, @@ -54,6 +54,27 @@ impl Display for Count { } } +impl Serialize for Count { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + serializer.serialize_u64(self.value.load(Ordering::Relaxed) as u64) + } +} + +impl<'de> Deserialize<'de> for Count { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + let value = u64::deserialize(deserializer)? as usize; + Ok(Count { + value: Arc::new(AtomicUsize::new(value)), + }) + } +} + impl Default for Count { fn default() -> Self { Self::new() @@ -85,7 +106,7 @@ impl Count { /// For example, you can easily expose current memory consumption with a gauge. /// /// Note `clone`ing gauge update the same underlying metrics -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone)] pub struct Gauge { /// value of the metric gauge value: Arc<AtomicUsize>, @@ -103,6 +124,27 @@ impl Display for Gauge { } } +impl Serialize for Gauge { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + serializer.serialize_u64(self.value.load(Ordering::Relaxed) as u64) + } +} + +impl<'de> Deserialize<'de> for Gauge { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + let value = u64::deserialize(deserializer)? as usize; + Ok(Gauge { + value: Arc::new(AtomicUsize::new(value)), + }) + } +} + impl Default for Gauge { fn default() -> Self { Self::new() @@ -150,7 +192,7 @@ impl Gauge { } /// Measure a potentially non contiguous duration of time -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone)] pub struct Time { /// elapsed time, in nanoseconds nanos: Arc<AtomicUsize>, @@ -175,6 +217,27 @@ impl Display for Time { } } +impl Serialize for Time { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + serializer.serialize_u64(self.nanos.load(Ordering::Relaxed) as u64) + } +} + +impl<'de> Deserialize<'de> for Time { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + let nanos = u64::deserialize(deserializer)? as usize; + Ok(Time { + nanos: Arc::new(AtomicUsize::new(nanos)), + }) + } +} + impl Time { /// Create a new [`Time`] wrapper suitable for recording elapsed /// times for operations. @@ -242,7 +305,7 @@ pub struct Timestamp { timestamp: Arc<Mutex<Option<DateTime<Utc>>>>, } -#[derive(Serialize, Debug)] +#[derive(Serialize, Debug, Deserialize)] struct SerializableTimestamp { timestamp: Option<DateTime<Utc>>, } @@ -258,6 +321,19 @@ impl Serialize for Timestamp { } } +impl<'de> Deserialize<'de> for Timestamp { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + let SerializableTimestamp { timestamp } = + SerializableTimestamp::deserialize(deserializer)?; + Ok(Timestamp { + timestamp: Arc::new(Mutex::new(timestamp)), + }) + } +} + impl Default for Timestamp { fn default() -> Self { Self::new() @@ -384,7 +460,7 @@ impl Drop for ScopedTimerGuard<'_> { /// Among other differences, the metric types have different ways to /// logically interpret their underlying values and some metrics are /// so common they are given special treatment. -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum MetricValue { /// Number of output rows produced: "output_rows" metric OutputRows(Count), @@ -715,6 +791,7 @@ impl Display for MetricValue { #[cfg(test)] mod tests { use std::any::Any; + use std::time::Duration; use chrono::TimeZone; use datafusion_execution::memory_pool::units::MB; @@ -978,4 +1055,125 @@ mod tests { "Expected ~10ms total, got {new_recorded} ns", ); } + + #[test] + fn test_count_serde() { + // Test serialization and deserialization of Count + let count = Count::new(); + count.add(42); + + // Serialize to JSON + let serialized = serde_json::to_string(&count).unwrap(); + assert_eq!(serialized, "42"); + + // Deserialize from JSON + let deserialized: Count = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.value(), 42); + + // Test zero value + let zero_count = Count::new(); + let serialized_zero = serde_json::to_string(&zero_count).unwrap(); + assert_eq!(serialized_zero, "0"); + let deserialized_zero: Count = serde_json::from_str(&serialized_zero).unwrap(); + assert_eq!(deserialized_zero.value(), 0); + } + + #[test] + fn test_gauge_serde() { + // Test serialization and deserialization of Gauge + let gauge = Gauge::new(); + gauge.add(100); + gauge.sub(30); + + // Serialize to JSON + let serialized = serde_json::to_string(&gauge).unwrap(); + assert_eq!(serialized, "70"); + + // Deserialize from JSON + let deserialized: Gauge = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.value(), 70); + + // Test zero value + let zero_gauge = Gauge::new(); + let serialized_zero = serde_json::to_string(&zero_gauge).unwrap(); + assert_eq!(serialized_zero, "0"); + let deserialized_zero: Gauge = serde_json::from_str(&serialized_zero).unwrap(); + assert_eq!(deserialized_zero.value(), 0); + + // Test large value + let large_gauge = Gauge::new(); + large_gauge.set(1_000_000); + let serialized_large = serde_json::to_string(&large_gauge).unwrap(); + assert_eq!(serialized_large, "1000000"); + let deserialized_large: Gauge = serde_json::from_str(&serialized_large).unwrap(); + assert_eq!(deserialized_large.value(), 1_000_000); + } + + #[test] + fn test_time_serde() { + // Test serialization and deserialization of Time + let time = Time::new(); + time.add_duration(Duration::from_secs(1)); // 1 second in nanoseconds + + // Serialize to JSON + let serialized = serde_json::to_string(&time).unwrap(); + assert_eq!(serialized, "1000000000"); + + // Deserialize from JSON + let deserialized: Time = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.value(), 1_000_000_000); + + // Test zero time + let zero_time = Time::new(); + let serialized_zero = serde_json::to_string(&zero_time).unwrap(); + assert_eq!(serialized_zero, "0"); + let deserialized_zero: Time = serde_json::from_str(&serialized_zero).unwrap(); + assert_eq!(deserialized_zero.value(), 0); + + // Test with duration + let duration_time = Time::new(); + duration_time.add_duration(Duration::from_millis(500)); + let serialized_duration = serde_json::to_string(&duration_time).unwrap(); + let deserialized_duration: Time = + serde_json::from_str(&serialized_duration).unwrap(); + // Should be approximately 500ms in nanoseconds (500_000_000) + assert!(deserialized_duration.value() >= 500_000_000); + } + + #[test] + fn test_timestamp_serde() { + // Test serialization and deserialization of Timestamp with a specific time + let timestamp = Timestamp::new(); + let test_time = Utc.with_ymd_and_hms(2023, 12, 25, 10, 30, 45).unwrap(); + timestamp.set(test_time); + + // Serialize to JSON + let serialized = serde_json::to_string(×tamp).unwrap(); + + // Deserialize from JSON + let deserialized: Timestamp = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.value(), Some(test_time)); + + // Test uninitialized timestamp (None) + let empty_timestamp = Timestamp::new(); + let serialized_empty = serde_json::to_string(&empty_timestamp).unwrap(); + assert_eq!(serialized_empty, r#"{"timestamp":null}"#); + + let deserialized_empty: Timestamp = + serde_json::from_str(&serialized_empty).unwrap(); + assert_eq!(deserialized_empty.value(), None); + + // Test current time + let now_timestamp = Timestamp::new(); + now_timestamp.record(); // Sets to current time + let serialized_now = serde_json::to_string(&now_timestamp).unwrap(); + let deserialized_now: Timestamp = serde_json::from_str(&serialized_now).unwrap(); + + // Should have a value and it should be recent + assert!(deserialized_now.value().is_some()); + let deserialized_time = deserialized_now.value().unwrap(); + let now = Utc::now(); + // Should be within 1 second of now + assert!((now - deserialized_time).num_seconds().abs() < 1); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org