This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 722ccb9fd4 feat: Support serde for JsonSource PhysicalPlan (#15311) 722ccb9fd4 is described below commit 722ccb9fd4df0afc1447dbf1622f694b5a0fc367 Author: westhide <westhide....@gmail.com> AuthorDate: Thu Mar 20 23:53:47 2025 +0800 feat: Support serde for JsonSource PhysicalPlan (#15311) --- datafusion/proto/proto/datafusion.proto | 6 +- datafusion/proto/src/generated/pbjson.rs | 106 +++++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 9 +- datafusion/proto/src/physical_plan/mod.rs | 31 +++++- .../proto/tests/cases/roundtrip_physical_plan.rs | 9 ++ 5 files changed, 158 insertions(+), 3 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 81e1a5c418..7c34afe7ff 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -273,7 +273,6 @@ message DmlNode{ INSERT_APPEND = 3; INSERT_OVERWRITE = 4; INSERT_REPLACE = 5; - } Type dml_type = 1; LogicalPlanNode input = 2; @@ -726,6 +725,7 @@ message PhysicalPlanNode { CsvSinkExecNode csv_sink = 28; ParquetSinkExecNode parquet_sink = 29; UnnestExecNode unnest = 30; + JsonScanExecNode json_scan = 31; } } @@ -1024,6 +1024,10 @@ message CsvScanExecNode { bool newlines_in_values = 7; } +message JsonScanExecNode { + FileScanExecConf base_conf = 1; +} + message AvroScanExecNode { FileScanExecConf base_conf = 1; } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 664fb706f8..75e8ef55b7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -8735,6 +8735,98 @@ impl<'de> serde::Deserialize<'de> for JoinOn { deserializer.deserialize_struct("datafusion.JoinOn", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for JsonScanExecNode { + #[allow(deprecated)] + fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.base_conf.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.JsonScanExecNode", len)?; + if let Some(v) = self.base_conf.as_ref() { + struct_ser.serialize_field("baseConf", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for JsonScanExecNode { + #[allow(deprecated)] + fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "base_conf", + "baseConf", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + BaseConf, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error> + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E> + where + E: serde::de::Error, + { + match value { + "baseConf" | "base_conf" => Ok(GeneratedField::BaseConf), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = JsonScanExecNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.JsonScanExecNode") + } + + fn visit_map<V>(self, mut map_: V) -> std::result::Result<JsonScanExecNode, V::Error> + where + V: serde::de::MapAccess<'de>, + { + let mut base_conf__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::BaseConf => { + if base_conf__.is_some() { + return Err(serde::de::Error::duplicate_field("baseConf")); + } + base_conf__ = map_.next_value()?; + } + } + } + Ok(JsonScanExecNode { + base_conf: base_conf__, + }) + } + } + deserializer.deserialize_struct("datafusion.JsonScanExecNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for JsonSink { #[allow(deprecated)] fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> @@ -15660,6 +15752,9 @@ impl serde::Serialize for PhysicalPlanNode { physical_plan_node::PhysicalPlanType::Unnest(v) => { struct_ser.serialize_field("unnest", v)?; } + physical_plan_node::PhysicalPlanType::JsonScan(v) => { + struct_ser.serialize_field("jsonScan", v)?; + } } } struct_ser.end() @@ -15716,6 +15811,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "parquet_sink", "parquetSink", "unnest", + "json_scan", + "jsonScan", ]; #[allow(clippy::enum_variant_names)] @@ -15749,6 +15846,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { CsvSink, ParquetSink, Unnest, + JsonScan, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error> @@ -15799,6 +15897,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { "csvSink" | "csv_sink" => Ok(GeneratedField::CsvSink), "parquetSink" | "parquet_sink" => Ok(GeneratedField::ParquetSink), "unnest" => Ok(GeneratedField::Unnest), + "jsonScan" | "json_scan" => Ok(GeneratedField::JsonScan), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16022,6 +16121,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode { return Err(serde::de::Error::duplicate_field("unnest")); } physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Unnest) +; + } + GeneratedField::JsonScan => { + if physical_plan_type__.is_some() { + return Err(serde::de::Error::duplicate_field("jsonScan")); + } + physical_plan_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::JsonScan) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 8ab175cdf0..81c821c0d2 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1048,7 +1048,7 @@ pub mod table_reference { pub struct PhysicalPlanNode { #[prost( oneof = "physical_plan_node::PhysicalPlanType", - tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30" + tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31" )] pub physical_plan_type: ::core::option::Option<physical_plan_node::PhysicalPlanType>, } @@ -1116,6 +1116,8 @@ pub mod physical_plan_node { ParquetSink(::prost::alloc::boxed::Box<super::ParquetSinkExecNode>), #[prost(message, tag = "30")] Unnest(::prost::alloc::boxed::Box<super::UnnestExecNode>), + #[prost(message, tag = "31")] + JsonScan(super::JsonScanExecNode), } } #[derive(Clone, PartialEq, ::prost::Message)] @@ -1558,6 +1560,11 @@ pub mod csv_scan_exec_node { } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct JsonScanExecNode { + #[prost(message, optional, tag = "1")] + pub base_conf: ::core::option::Option<FileScanExecConf>, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct AvroScanExecNode { #[prost(message, optional, tag = "1")] pub base_conf: ::core::option::Option<FileScanExecConf>, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 60972ac54b..6562a9be45 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -33,7 +33,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::physical_plan::AvroSource; #[cfg(feature = "parquet")] use datafusion::datasource::physical_plan::ParquetSource; -use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig}; +use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, JsonSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; @@ -247,6 +247,15 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { .with_file_compression_type(FileCompressionType::UNCOMPRESSED); Ok(conf.build()) } + PhysicalPlanType::JsonScan(scan) => { + let scan_conf = parse_protobuf_file_scan_config( + scan.base_conf.as_ref().unwrap(), + registry, + extension_codec, + Arc::new(JsonSource::new()), + )?; + Ok(scan_conf.build()) + } #[cfg_attr(not(feature = "parquet"), allow(unused_variables))] PhysicalPlanType::ParquetScan(scan) => { #[cfg(feature = "parquet")] @@ -1684,6 +1693,26 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } } + if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() { + let data_source = data_source_exec.data_source(); + if let Some(scan_conf) = data_source.as_any().downcast_ref::<FileScanConfig>() + { + let source = scan_conf.file_source(); + if let Some(_json_source) = source.as_any().downcast_ref::<JsonSource>() { + return Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::JsonScan( + protobuf::JsonScanExecNode { + base_conf: Some(serialize_file_scan_config( + scan_conf, + extension_codec, + )?), + }, + )), + }); + } + } + } + #[cfg(feature = "parquet")] if let Some(exec) = plan.downcast_ref::<DataSourceExec>() { let data_source_exec = exec.data_source(); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 3361bec664..aeae39c4d0 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1277,6 +1277,15 @@ fn roundtrip_analyze() -> Result<()> { ))) } +#[tokio::test] +async fn roundtrip_json_source() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_json("t1", "../core/tests/data/1.json", Default::default()) + .await?; + let plan = ctx.table("t1").await?.create_physical_plan().await?; + roundtrip_test(plan) +} + #[test] fn roundtrip_json_sink() -> Result<()> { let field_a = Field::new("plan_type", DataType::Utf8, false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org