This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch metrics_serialize
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit ffb1178b12431c23a0a66f3d4f33aad4a2bcee70
Author: xudong963 <wxd963996...@gmail.com>
AuthorDate: Tue Aug 12 10:45:59 2025 +0800

    save, need to address the serde for custom
---
 Cargo.lock                                    |   1 +
 Cargo.toml                                    |   2 +-
 datafusion/physical-plan/Cargo.toml           |   1 +
 datafusion/physical-plan/src/metrics/mod.rs   |   2 +-
 datafusion/physical-plan/src/metrics/value.rs | 210 +++++++++++++++++++++++++-
 5 files changed, 208 insertions(+), 8 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index de7f6b2e38..e1f1c37e7b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2519,6 +2519,7 @@ dependencies = [
  "rstest",
  "rstest_reuse",
  "serde",
+ "serde_json",
  "tempfile",
  "tokio",
  "typetag",
diff --git a/Cargo.toml b/Cargo.toml
index 1335361708..32d58b89d5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -105,7 +105,7 @@ arrow-schema = { version = "56.0.0", default-features = 
false }
 async-trait = "0.1.88"
 bigdecimal = "0.4.8"
 bytes = "1.10"
-chrono = { version = "0.4.41", default-features = false }
+chrono = { version = "0.4.41", default-features = false, features = ["serde"] }
 criterion = "0.5.1"
 ctor = "0.4.3"
 dashmap = "6.0.1"
diff --git a/datafusion/physical-plan/Cargo.toml 
b/datafusion/physical-plan/Cargo.toml
index 85feab92f3..8bf5d9af4b 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -65,6 +65,7 @@ log = { workspace = true }
 parking_lot = { workspace = true }
 pin-project-lite = "^0.2.7"
 serde = { version = "1.0", features = ["derive"] }
+serde_json = { workspace = true }
 tokio = { workspace = true }
 typetag = "0.2"
 
diff --git a/datafusion/physical-plan/src/metrics/mod.rs 
b/datafusion/physical-plan/src/metrics/mod.rs
index 74e85c882f..80e4236568 100644
--- a/datafusion/physical-plan/src/metrics/mod.rs
+++ b/datafusion/physical-plan/src/metrics/mod.rs
@@ -171,7 +171,7 @@ impl Metric {
 /// A snapshot of the metrics for a particular ([`ExecutionPlan`]).
 ///
 /// [`ExecutionPlan`]: super::ExecutionPlan
-#[derive(Default, Debug, Clone, Serialize)]
+#[derive(Default, Debug, Clone)]
 pub struct MetricsSet {
     metrics: Vec<Arc<Metric>>,
 }
diff --git a/datafusion/physical-plan/src/metrics/value.rs 
b/datafusion/physical-plan/src/metrics/value.rs
index e1c67b4752..99a4177149 100644
--- a/datafusion/physical-plan/src/metrics/value.rs
+++ b/datafusion/physical-plan/src/metrics/value.rs
@@ -22,7 +22,7 @@ use chrono::{DateTime, Utc};
 use datafusion_common::instant::Instant;
 use datafusion_execution::memory_pool::human_readable_size;
 use parking_lot::Mutex;
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
 use std::{
     borrow::{Borrow, Cow},
     fmt::{Debug, Display},
@@ -36,7 +36,7 @@ use std::{
 /// A counter to record things such as number of input or output rows
 ///
 /// Note `clone`ing counters update the same underlying metrics
-#[derive(Debug, Clone, Serialize)]
+#[derive(Debug, Clone)]
 pub struct Count {
     /// value of the metric counter
     value: Arc<AtomicUsize>,
@@ -54,6 +54,27 @@ impl Display for Count {
     }
 }
 
+impl Serialize for Count {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        serializer.serialize_u64(self.value.load(Ordering::Relaxed) as u64)
+    }
+}
+
+impl<'de> Deserialize<'de> for Count {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let value = u64::deserialize(deserializer)? as usize;
+        Ok(Count {
+            value: Arc::new(AtomicUsize::new(value)),
+        })
+    }
+}
+
 impl Default for Count {
     fn default() -> Self {
         Self::new()
@@ -85,7 +106,7 @@ impl Count {
 /// For example, you can easily expose current memory consumption with a gauge.
 ///
 /// Note `clone`ing gauge update the same underlying metrics
-#[derive(Debug, Clone, Serialize)]
+#[derive(Debug, Clone)]
 pub struct Gauge {
     /// value of the metric gauge
     value: Arc<AtomicUsize>,
@@ -103,6 +124,27 @@ impl Display for Gauge {
     }
 }
 
+impl Serialize for Gauge {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        serializer.serialize_u64(self.value.load(Ordering::Relaxed) as u64)
+    }
+}
+
+impl<'de> Deserialize<'de> for Gauge {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let value = u64::deserialize(deserializer)? as usize;
+        Ok(Gauge {
+            value: Arc::new(AtomicUsize::new(value)),
+        })
+    }
+}
+
 impl Default for Gauge {
     fn default() -> Self {
         Self::new()
@@ -150,7 +192,7 @@ impl Gauge {
 }
 
 /// Measure a potentially non contiguous duration of time
-#[derive(Debug, Clone, Serialize)]
+#[derive(Debug, Clone)]
 pub struct Time {
     /// elapsed time, in nanoseconds
     nanos: Arc<AtomicUsize>,
@@ -175,6 +217,27 @@ impl Display for Time {
     }
 }
 
+impl Serialize for Time {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        serializer.serialize_u64(self.nanos.load(Ordering::Relaxed) as u64)
+    }
+}
+
+impl<'de> Deserialize<'de> for Time {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let nanos = u64::deserialize(deserializer)? as usize;
+        Ok(Time {
+            nanos: Arc::new(AtomicUsize::new(nanos)),
+        })
+    }
+}
+
 impl Time {
     /// Create a new [`Time`] wrapper suitable for recording elapsed
     /// times for operations.
@@ -242,7 +305,7 @@ pub struct Timestamp {
     timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
 }
 
-#[derive(Serialize, Debug)]
+#[derive(Serialize, Debug, Deserialize)]
 struct SerializableTimestamp {
     timestamp: Option<DateTime<Utc>>,
 }
@@ -258,6 +321,19 @@ impl Serialize for Timestamp {
     }
 }
 
+impl<'de> Deserialize<'de> for Timestamp {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let SerializableTimestamp { timestamp } =
+            SerializableTimestamp::deserialize(deserializer)?;
+        Ok(Timestamp {
+            timestamp: Arc::new(Mutex::new(timestamp)),
+        })
+    }
+}
+
 impl Default for Timestamp {
     fn default() -> Self {
         Self::new()
@@ -384,7 +460,7 @@ impl Drop for ScopedTimerGuard<'_> {
 /// Among other differences, the metric types have different ways to
 /// logically interpret their underlying values and some metrics are
 /// so common they are given special treatment.
-#[derive(Debug, Clone, Serialize)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
 pub enum MetricValue {
     /// Number of output rows produced: "output_rows" metric
     OutputRows(Count),
@@ -715,6 +791,7 @@ impl Display for MetricValue {
 #[cfg(test)]
 mod tests {
     use std::any::Any;
+    use std::time::Duration;
 
     use chrono::TimeZone;
     use datafusion_execution::memory_pool::units::MB;
@@ -978,4 +1055,125 @@ mod tests {
             "Expected ~10ms total, got {new_recorded} ns",
         );
     }
+
+    #[test]
+    fn test_count_serde() {
+        // Test serialization and deserialization of Count
+        let count = Count::new();
+        count.add(42);
+
+        // Serialize to JSON
+        let serialized = serde_json::to_string(&count).unwrap();
+        assert_eq!(serialized, "42");
+
+        // Deserialize from JSON
+        let deserialized: Count = serde_json::from_str(&serialized).unwrap();
+        assert_eq!(deserialized.value(), 42);
+
+        // Test zero value
+        let zero_count = Count::new();
+        let serialized_zero = serde_json::to_string(&zero_count).unwrap();
+        assert_eq!(serialized_zero, "0");
+        let deserialized_zero: Count = 
serde_json::from_str(&serialized_zero).unwrap();
+        assert_eq!(deserialized_zero.value(), 0);
+    }
+
+    #[test]
+    fn test_gauge_serde() {
+        // Test serialization and deserialization of Gauge
+        let gauge = Gauge::new();
+        gauge.add(100);
+        gauge.sub(30);
+
+        // Serialize to JSON
+        let serialized = serde_json::to_string(&gauge).unwrap();
+        assert_eq!(serialized, "70");
+
+        // Deserialize from JSON
+        let deserialized: Gauge = serde_json::from_str(&serialized).unwrap();
+        assert_eq!(deserialized.value(), 70);
+
+        // Test zero value
+        let zero_gauge = Gauge::new();
+        let serialized_zero = serde_json::to_string(&zero_gauge).unwrap();
+        assert_eq!(serialized_zero, "0");
+        let deserialized_zero: Gauge = 
serde_json::from_str(&serialized_zero).unwrap();
+        assert_eq!(deserialized_zero.value(), 0);
+
+        // Test large value
+        let large_gauge = Gauge::new();
+        large_gauge.set(1_000_000);
+        let serialized_large = serde_json::to_string(&large_gauge).unwrap();
+        assert_eq!(serialized_large, "1000000");
+        let deserialized_large: Gauge = 
serde_json::from_str(&serialized_large).unwrap();
+        assert_eq!(deserialized_large.value(), 1_000_000);
+    }
+
+    #[test]
+    fn test_time_serde() {
+        // Test serialization and deserialization of Time
+        let time = Time::new();
+        time.add_duration(Duration::from_secs(1)); // 1 second in nanoseconds
+
+        // Serialize to JSON
+        let serialized = serde_json::to_string(&time).unwrap();
+        assert_eq!(serialized, "1000000000");
+
+        // Deserialize from JSON
+        let deserialized: Time = serde_json::from_str(&serialized).unwrap();
+        assert_eq!(deserialized.value(), 1_000_000_000);
+
+        // Test zero time
+        let zero_time = Time::new();
+        let serialized_zero = serde_json::to_string(&zero_time).unwrap();
+        assert_eq!(serialized_zero, "0");
+        let deserialized_zero: Time = 
serde_json::from_str(&serialized_zero).unwrap();
+        assert_eq!(deserialized_zero.value(), 0);
+
+        // Test with duration
+        let duration_time = Time::new();
+        duration_time.add_duration(Duration::from_millis(500));
+        let serialized_duration = 
serde_json::to_string(&duration_time).unwrap();
+        let deserialized_duration: Time =
+            serde_json::from_str(&serialized_duration).unwrap();
+        // Should be approximately 500ms in nanoseconds (500_000_000)
+        assert!(deserialized_duration.value() >= 500_000_000);
+    }
+
+    #[test]
+    fn test_timestamp_serde() {
+        // Test serialization and deserialization of Timestamp with a specific 
time
+        let timestamp = Timestamp::new();
+        let test_time = Utc.with_ymd_and_hms(2023, 12, 25, 10, 30, 
45).unwrap();
+        timestamp.set(test_time);
+
+        // Serialize to JSON
+        let serialized = serde_json::to_string(&timestamp).unwrap();
+
+        // Deserialize from JSON
+        let deserialized: Timestamp = 
serde_json::from_str(&serialized).unwrap();
+        assert_eq!(deserialized.value(), Some(test_time));
+
+        // Test uninitialized timestamp (None)
+        let empty_timestamp = Timestamp::new();
+        let serialized_empty = 
serde_json::to_string(&empty_timestamp).unwrap();
+        assert_eq!(serialized_empty, r#"{"timestamp":null}"#);
+
+        let deserialized_empty: Timestamp =
+            serde_json::from_str(&serialized_empty).unwrap();
+        assert_eq!(deserialized_empty.value(), None);
+
+        // Test current time
+        let now_timestamp = Timestamp::new();
+        now_timestamp.record(); // Sets to current time
+        let serialized_now = serde_json::to_string(&now_timestamp).unwrap();
+        let deserialized_now: Timestamp = 
serde_json::from_str(&serialized_now).unwrap();
+
+        // Should have a value and it should be recent
+        assert!(deserialized_now.value().is_some());
+        let deserialized_time = deserialized_now.value().unwrap();
+        let now = Utc::now();
+        // Should be within 1 second of now
+        assert!((now - deserialized_time).num_seconds().abs() < 1);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to