This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch metrics_serialize in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit b0cb20a27b8ebd32eff1ed351a064f8638e10f6e Author: xudong963 <wxd963996...@gmail.com> AuthorDate: Mon Aug 11 17:24:48 2025 +0800 Support serialize for MetricValue --- Cargo.lock | 50 ++++++++++++++++++++++++++ datafusion/physical-plan/Cargo.toml | 3 +- datafusion/physical-plan/src/metrics/custom.rs | 1 + datafusion/physical-plan/src/metrics/value.rs | 22 ++++++++++-- 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a49c7b898a..de7f6b2e38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2521,6 +2521,7 @@ dependencies = [ "serde", "tempfile", "tokio", + "typetag", ] [[package]] @@ -2907,6 +2908,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "erased-serde" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.13" @@ -3759,6 +3770,15 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "inventory" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab08d7cd2c5897f2c949e5383ea7c7db03fb19130ffcfbf7eda795137ae3cb83" +dependencies = [ + "rustversion", +] + [[package]] name = "io-uring" version = "0.7.9" @@ -6840,12 +6860,42 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "typetag" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f22b40dd7bfe8c14230cf9702081366421890435b2d625fa92b4acc4c3de6f" +dependencies = [ + "erased-serde", + "inventory", + "once_cell", + "serde", + "typetag-impl", +] + +[[package]] +name = "typetag-impl" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35f5380909ffc31b4de4f4bdf96b877175a016aa2ca98cee39fcfd8c4d53d952" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "typify" version = "0.4.2" diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index ee9ef61204..85feab92f3 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -64,8 +64,9 @@ itertools = { workspace = true, features = ["use_std"] } log = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" -tokio = { workspace = true } serde = { version = "1.0", features = ["derive"] } +tokio = { workspace = true } +typetag = "0.2" [dev-dependencies] criterion = { workspace = true, features = ["async_futures"] } diff --git a/datafusion/physical-plan/src/metrics/custom.rs b/datafusion/physical-plan/src/metrics/custom.rs index 546af6f333..df3a58f307 100644 --- a/datafusion/physical-plan/src/metrics/custom.rs +++ b/datafusion/physical-plan/src/metrics/custom.rs @@ -86,6 +86,7 @@ use std::{any::Any, fmt::Debug, fmt::Display, sync::Arc}; /// ``` /// /// [`MetricValue::Custom`]: super::MetricValue::Custom +#[typetag::serde(tag = "type")] pub trait CustomMetricValue: Display + Debug + Send + Sync { /// Returns a new, zero-initialized version of this metric value. /// diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 8252e81d8c..e1c67b4752 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -236,12 +236,28 @@ impl Time { /// Stores a single timestamp, stored as the number of nanoseconds /// elapsed from Jan 1, 1970 UTC -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone)] pub struct Timestamp { /// Time thing started timestamp: Arc<Mutex<Option<DateTime<Utc>>>>, } +#[derive(Serialize, Debug)] +struct SerializableTimestamp { + timestamp: Option<DateTime<Utc>>, +} + +impl Serialize for Timestamp { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let data = self.timestamp.lock(); + let serializable = SerializableTimestamp { timestamp: *data }; + serializable.serialize(serializer) + } +} + impl Default for Timestamp { fn default() -> Self { Self::new() @@ -702,10 +718,11 @@ mod tests { use chrono::TimeZone; use datafusion_execution::memory_pool::units::MB; + use serde::Deserialize; use super::*; - #[derive(Debug, Default)] + #[derive(Debug, Default, Serialize, Deserialize)] pub struct CustomCounter { count: AtomicUsize, } @@ -716,6 +733,7 @@ mod tests { } } + #[typetag::serde] impl CustomMetricValue for CustomCounter { fn new_empty(&self) -> Arc<dyn CustomMetricValue> { Arc::new(CustomCounter::default()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org