This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch metrics_serialize
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit b0cb20a27b8ebd32eff1ed351a064f8638e10f6e
Author: xudong963 <wxd963996...@gmail.com>
AuthorDate: Mon Aug 11 17:24:48 2025 +0800

    Support serialize for MetricValue
---
 Cargo.lock                                     | 50 ++++++++++++++++++++++++++
 datafusion/physical-plan/Cargo.toml            |  3 +-
 datafusion/physical-plan/src/metrics/custom.rs |  1 +
 datafusion/physical-plan/src/metrics/value.rs  | 22 ++++++++++--
 4 files changed, 73 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index a49c7b898a..de7f6b2e38 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2521,6 +2521,7 @@ dependencies = [
  "serde",
  "tempfile",
  "tokio",
+ "typetag",
 ]
 
 [[package]]
@@ -2907,6 +2908,16 @@ version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
 
+[[package]]
+name = "erased-serde"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7"
+dependencies = [
+ "serde",
+ "typeid",
+]
+
 [[package]]
 name = "errno"
 version = "0.3.13"
@@ -3759,6 +3770,15 @@ version = "3.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02"
 
+[[package]]
+name = "inventory"
+version = "0.3.20"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ab08d7cd2c5897f2c949e5383ea7c7db03fb19130ffcfbf7eda795137ae3cb83"
+dependencies = [
+ "rustversion",
+]
+
 [[package]]
 name = "io-uring"
 version = "0.7.9"
@@ -6840,12 +6860,42 @@ dependencies = [
  "syn 2.0.104",
 ]
 
+[[package]]
+name = "typeid"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c"
+
 [[package]]
 name = "typenum"
 version = "1.18.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
 
+[[package]]
+name = "typetag"
+version = "0.2.20"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "73f22b40dd7bfe8c14230cf9702081366421890435b2d625fa92b4acc4c3de6f"
+dependencies = [
+ "erased-serde",
+ "inventory",
+ "once_cell",
+ "serde",
+ "typetag-impl",
+]
+
+[[package]]
+name = "typetag-impl"
+version = "0.2.20"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "35f5380909ffc31b4de4f4bdf96b877175a016aa2ca98cee39fcfd8c4d53d952"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.104",
+]
+
 [[package]]
 name = "typify"
 version = "0.4.2"
diff --git a/datafusion/physical-plan/Cargo.toml 
b/datafusion/physical-plan/Cargo.toml
index ee9ef61204..85feab92f3 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -64,8 +64,9 @@ itertools = { workspace = true, features = ["use_std"] }
 log = { workspace = true }
 parking_lot = { workspace = true }
 pin-project-lite = "^0.2.7"
-tokio = { workspace = true }
 serde = { version = "1.0", features = ["derive"] }
+tokio = { workspace = true }
+typetag = "0.2"
 
 [dev-dependencies]
 criterion = { workspace = true, features = ["async_futures"] }
diff --git a/datafusion/physical-plan/src/metrics/custom.rs 
b/datafusion/physical-plan/src/metrics/custom.rs
index 546af6f333..df3a58f307 100644
--- a/datafusion/physical-plan/src/metrics/custom.rs
+++ b/datafusion/physical-plan/src/metrics/custom.rs
@@ -86,6 +86,7 @@ use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc};
 /// ```
 ///
 /// [`MetricValue::Custom`]: super::MetricValue::Custom
+#[typetag::serde(tag = "type")]
 pub trait CustomMetricValue: Display + Debug + Send + Sync {
     /// Returns a new, zero-initialized version of this metric value.
     ///
diff --git a/datafusion/physical-plan/src/metrics/value.rs 
b/datafusion/physical-plan/src/metrics/value.rs
index 8252e81d8c..e1c67b4752 100644
--- a/datafusion/physical-plan/src/metrics/value.rs
+++ b/datafusion/physical-plan/src/metrics/value.rs
@@ -236,12 +236,28 @@ impl Time {
 
 /// Stores a single timestamp, stored as the number of nanoseconds
 /// elapsed from Jan 1, 1970 UTC
-#[derive(Debug, Clone, Serialize)]
+#[derive(Debug, Clone)]
 pub struct Timestamp {
     /// Time thing started
     timestamp: Arc<Mutex<Option<DateTime<Utc>>>>,
 }
 
+#[derive(Serialize, Debug)]
+struct SerializableTimestamp {
+    timestamp: Option<DateTime<Utc>>,
+}
+
+impl Serialize for Timestamp {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        let data = self.timestamp.lock();
+        let serializable = SerializableTimestamp { timestamp: *data };
+        serializable.serialize(serializer)
+    }
+}
+
 impl Default for Timestamp {
     fn default() -> Self {
         Self::new()
@@ -702,10 +718,11 @@ mod tests {
 
     use chrono::TimeZone;
     use datafusion_execution::memory_pool::units::MB;
+    use serde::Deserialize;
 
     use super::*;
 
-    #[derive(Debug, Default)]
+    #[derive(Debug, Default, Serialize, Deserialize)]
     pub struct CustomCounter {
         count: AtomicUsize,
     }
@@ -716,6 +733,7 @@ mod tests {
         }
     }
 
+    #[typetag::serde]
     impl CustomMetricValue for CustomCounter {
         fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
             Arc::new(CustomCounter::default())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to