This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 722ccb9fd4 feat: Support serde for JsonSource PhysicalPlan (#15311)
722ccb9fd4 is described below

commit 722ccb9fd4df0afc1447dbf1622f694b5a0fc367
Author: westhide <westhide....@gmail.com>
AuthorDate: Thu Mar 20 23:53:47 2025 +0800

    feat: Support serde for JsonSource PhysicalPlan (#15311)
---
 datafusion/proto/proto/datafusion.proto            |   6 +-
 datafusion/proto/src/generated/pbjson.rs           | 106 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |   9 +-
 datafusion/proto/src/physical_plan/mod.rs          |  31 +++++-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   9 ++
 5 files changed, 158 insertions(+), 3 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 81e1a5c418..7c34afe7ff 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -273,7 +273,6 @@ message DmlNode{
     INSERT_APPEND = 3;
     INSERT_OVERWRITE = 4;
     INSERT_REPLACE = 5;
-    
   }
   Type dml_type = 1;
   LogicalPlanNode input = 2;
@@ -726,6 +725,7 @@ message PhysicalPlanNode {
     CsvSinkExecNode csv_sink = 28;
     ParquetSinkExecNode parquet_sink = 29;
     UnnestExecNode unnest = 30;
+    JsonScanExecNode json_scan = 31;
   }
 }
 
@@ -1024,6 +1024,10 @@ message CsvScanExecNode {
   bool newlines_in_values = 7;
 }
 
+message JsonScanExecNode {
+  FileScanExecConf base_conf = 1;
+}
+
 message AvroScanExecNode {
   FileScanExecConf base_conf = 1;
 }
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 664fb706f8..75e8ef55b7 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -8735,6 +8735,98 @@ impl<'de> serde::Deserialize<'de> for JoinOn {
         deserializer.deserialize_struct("datafusion.JoinOn", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for JsonScanExecNode {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.base_conf.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.JsonScanExecNode", len)?;
+        if let Some(v) = self.base_conf.as_ref() {
+            struct_ser.serialize_field("baseConf", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for JsonScanExecNode {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "base_conf",
+            "baseConf",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            BaseConf,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "baseConf" | "base_conf" => 
Ok(GeneratedField::BaseConf),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = JsonScanExecNode;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.JsonScanExecNode")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<JsonScanExecNode, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut base_conf__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::BaseConf => {
+                            if base_conf__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("baseConf"));
+                            }
+                            base_conf__ = map_.next_value()?;
+                        }
+                    }
+                }
+                Ok(JsonScanExecNode {
+                    base_conf: base_conf__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.JsonScanExecNode", FIELDS, 
GeneratedVisitor)
+    }
+}
 impl serde::Serialize for JsonSink {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
@@ -15660,6 +15752,9 @@ impl serde::Serialize for PhysicalPlanNode {
                 physical_plan_node::PhysicalPlanType::Unnest(v) => {
                     struct_ser.serialize_field("unnest", v)?;
                 }
+                physical_plan_node::PhysicalPlanType::JsonScan(v) => {
+                    struct_ser.serialize_field("jsonScan", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -15716,6 +15811,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
             "parquet_sink",
             "parquetSink",
             "unnest",
+            "json_scan",
+            "jsonScan",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -15749,6 +15846,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
             CsvSink,
             ParquetSink,
             Unnest,
+            JsonScan,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -15799,6 +15897,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode {
                             "csvSink" | "csv_sink" => 
Ok(GeneratedField::CsvSink),
                             "parquetSink" | "parquet_sink" => 
Ok(GeneratedField::ParquetSink),
                             "unnest" => Ok(GeneratedField::Unnest),
+                            "jsonScan" | "json_scan" => 
Ok(GeneratedField::JsonScan),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -16022,6 +16121,13 @@ impl<'de> serde::Deserialize<'de> for PhysicalPlanNode 
{
                                 return 
Err(serde::de::Error::duplicate_field("unnest"));
                             }
                             physical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::Unnest)
+;
+                        }
+                        GeneratedField::JsonScan => {
+                            if physical_plan_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("jsonScan"));
+                            }
+                            physical_plan_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(physical_plan_node::PhysicalPlanType::JsonScan)
 ;
                         }
                     }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 8ab175cdf0..81c821c0d2 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1048,7 +1048,7 @@ pub mod table_reference {
 pub struct PhysicalPlanNode {
     #[prost(
         oneof = "physical_plan_node::PhysicalPlanType",
-        tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30"
+        tags = "1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31"
     )]
     pub physical_plan_type: 
::core::option::Option<physical_plan_node::PhysicalPlanType>,
 }
@@ -1116,6 +1116,8 @@ pub mod physical_plan_node {
         ParquetSink(::prost::alloc::boxed::Box<super::ParquetSinkExecNode>),
         #[prost(message, tag = "30")]
         Unnest(::prost::alloc::boxed::Box<super::UnnestExecNode>),
+        #[prost(message, tag = "31")]
+        JsonScan(super::JsonScanExecNode),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
@@ -1558,6 +1560,11 @@ pub mod csv_scan_exec_node {
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct JsonScanExecNode {
+    #[prost(message, optional, tag = "1")]
+    pub base_conf: ::core::option::Option<FileScanExecConf>,
+}
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct AvroScanExecNode {
     #[prost(message, optional, tag = "1")]
     pub base_conf: ::core::option::Option<FileScanExecConf>,
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 60972ac54b..6562a9be45 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -33,7 +33,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
 use datafusion::datasource::physical_plan::AvroSource;
 #[cfg(feature = "parquet")]
 use datafusion::datasource::physical_plan::ParquetSource;
-use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig};
+use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig, 
JsonSource};
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::execution::FunctionRegistry;
@@ -247,6 +247,15 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                 .with_file_compression_type(FileCompressionType::UNCOMPRESSED);
                 Ok(conf.build())
             }
+            PhysicalPlanType::JsonScan(scan) => {
+                let scan_conf = parse_protobuf_file_scan_config(
+                    scan.base_conf.as_ref().unwrap(),
+                    registry,
+                    extension_codec,
+                    Arc::new(JsonSource::new()),
+                )?;
+                Ok(scan_conf.build())
+            }
             #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
             PhysicalPlanType::ParquetScan(scan) => {
                 #[cfg(feature = "parquet")]
@@ -1684,6 +1693,26 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
             }
         }
 
+        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
+            let data_source = data_source_exec.data_source();
+            if let Some(scan_conf) = 
data_source.as_any().downcast_ref::<FileScanConfig>()
+            {
+                let source = scan_conf.file_source();
+                if let Some(_json_source) = 
source.as_any().downcast_ref::<JsonSource>() {
+                    return Ok(protobuf::PhysicalPlanNode {
+                        physical_plan_type: Some(PhysicalPlanType::JsonScan(
+                            protobuf::JsonScanExecNode {
+                                base_conf: Some(serialize_file_scan_config(
+                                    scan_conf,
+                                    extension_codec,
+                                )?),
+                            },
+                        )),
+                    });
+                }
+            }
+        }
+
         #[cfg(feature = "parquet")]
         if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
             let data_source_exec = exec.data_source();
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 3361bec664..aeae39c4d0 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -1277,6 +1277,15 @@ fn roundtrip_analyze() -> Result<()> {
     )))
 }
 
+#[tokio::test]
+async fn roundtrip_json_source() -> Result<()> {
+    let ctx = SessionContext::new();
+    ctx.register_json("t1", "../core/tests/data/1.json", Default::default())
+        .await?;
+    let plan = ctx.table("t1").await?.create_physical_plan().await?;
+    roundtrip_test(plan)
+}
+
 #[test]
 fn roundtrip_json_sink() -> Result<()> {
     let field_a = Field::new("plan_type", DataType::Utf8, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to