This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e0b2d0554 fix(datafusion-proto): support serializing/deserilizing 
ArrowFormat tables (#16875)
5e0b2d0554 is described below

commit 5e0b2d0554af3a38bb93ccd305a410c5df7905f2
Author: Colin Marc <h...@colinmarc.com>
AuthorDate: Sat Jul 26 08:56:33 2025 +0200

    fix(datafusion-proto): support serializing/deserilizing ArrowFormat tables 
(#16875)
    
    Fixes #16874
---
 .../proto-common/proto/datafusion_common.proto     |   2 +
 datafusion/proto-common/src/generated/pbjson.rs    |  71 +++++++++++++++++++++
 datafusion/proto-common/src/generated/prost.rs     |   2 +
 datafusion/proto/proto/datafusion.proto            |   1 +
 .../proto/src/generated/datafusion_proto_common.rs |   2 +
 datafusion/proto/src/generated/pbjson.rs           |  13 ++++
 datafusion/proto/src/generated/prost.rs            |   7 +-
 datafusion/proto/src/lib.rs                        |   5 +-
 datafusion/proto/src/logical_plan/mod.rs           |  17 +++--
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  14 ++++
 datafusion/proto/tests/testdata/test.arrow         | Bin 0 -> 1842 bytes
 11 files changed, 127 insertions(+), 7 deletions(-)

diff --git a/datafusion/proto-common/proto/datafusion_common.proto 
b/datafusion/proto-common/proto/datafusion_common.proto
index 81fc9cceb7..8cb2726058 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -55,6 +55,8 @@ message NdJsonFormat {
   JsonOptions options = 1;
 }
 
+message ArrowFormat {}
+
 
 message PrimaryKeyConstraint{
   repeated uint64 indices = 1;
diff --git a/datafusion/proto-common/src/generated/pbjson.rs 
b/datafusion/proto-common/src/generated/pbjson.rs
index c3b6686df0..f35fd15946 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1,3 +1,74 @@
+impl serde::Serialize for ArrowFormat {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let len = 0;
+        let struct_ser = 
serializer.serialize_struct("datafusion_common.ArrowFormat", len)?;
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for ArrowFormat {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                            Err(serde::de::Error::unknown_field(value, FIELDS))
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = ArrowFormat;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion_common.ArrowFormat")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<ArrowFormat, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                while map_.next_key::<GeneratedField>()?.is_some() {
+                    let _ = map_.next_value::<serde::de::IgnoredAny>()?;
+                }
+                Ok(ArrowFormat {
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion_common.ArrowFormat", 
FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for ArrowOptions {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto-common/src/generated/prost.rs 
b/datafusion/proto-common/src/generated/prost.rs
index 411d72af4c..ac4a9ea4be 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -45,6 +45,8 @@ pub struct NdJsonFormat {
     #[prost(message, optional, tag = "1")]
     pub options: ::core::option::Option<JsonOptions>,
 }
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
+pub struct ArrowFormat {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PrimaryKeyConstraint {
     #[prost(uint64, repeated, tag = "1")]
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 85a565c0b2..666a8c7d1f 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -98,6 +98,7 @@ message ListingTableScanNode {
     datafusion_common.ParquetFormat parquet = 11;
     datafusion_common.AvroFormat avro = 12;
     datafusion_common.NdJsonFormat json = 15;
+    datafusion_common.ArrowFormat arrow = 16;
   }
   repeated SortExprNodeCollection file_sort_order = 13;
 }
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs 
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 411d72af4c..ac4a9ea4be 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -45,6 +45,8 @@ pub struct NdJsonFormat {
     #[prost(message, optional, tag = "1")]
     pub options: ::core::option::Option<JsonOptions>,
 }
+#[derive(Clone, Copy, PartialEq, ::prost::Message)]
+pub struct ArrowFormat {}
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct PrimaryKeyConstraint {
     #[prost(uint64, repeated, tag = "1")]
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 83f66ec22c..c2e6d8ef59 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -9907,6 +9907,9 @@ impl serde::Serialize for ListingTableScanNode {
                 listing_table_scan_node::FileFormatType::Json(v) => {
                     struct_ser.serialize_field("json", v)?;
                 }
+                listing_table_scan_node::FileFormatType::Arrow(v) => {
+                    struct_ser.serialize_field("arrow", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -9939,6 +9942,7 @@ impl<'de> serde::Deserialize<'de> for 
ListingTableScanNode {
             "parquet",
             "avro",
             "json",
+            "arrow",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -9957,6 +9961,7 @@ impl<'de> serde::Deserialize<'de> for 
ListingTableScanNode {
             Parquet,
             Avro,
             Json,
+            Arrow,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -9992,6 +9997,7 @@ impl<'de> serde::Deserialize<'de> for 
ListingTableScanNode {
                             "parquet" => Ok(GeneratedField::Parquet),
                             "avro" => Ok(GeneratedField::Avro),
                             "json" => Ok(GeneratedField::Json),
+                            "arrow" => Ok(GeneratedField::Arrow),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -10112,6 +10118,13 @@ impl<'de> serde::Deserialize<'de> for 
ListingTableScanNode {
                                 return 
Err(serde::de::Error::duplicate_field("json"));
                             }
                             file_format_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(listing_table_scan_node::FileFormatType::Json)
+;
+                        }
+                        GeneratedField::Arrow => {
+                            if file_format_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("arrow"));
+                            }
+                            file_format_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(listing_table_scan_node::FileFormatType::Arrow)
 ;
                         }
                     }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index cbf6b3b2d4..35491366dc 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -123,7 +123,10 @@ pub struct ListingTableScanNode {
     pub target_partitions: u32,
     #[prost(message, repeated, tag = "13")]
     pub file_sort_order: ::prost::alloc::vec::Vec<SortExprNodeCollection>,
-    #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 
12, 15")]
+    #[prost(
+        oneof = "listing_table_scan_node::FileFormatType",
+        tags = "10, 11, 12, 15, 16"
+    )]
     pub file_format_type: ::core::option::Option<
         listing_table_scan_node::FileFormatType,
     >,
@@ -140,6 +143,8 @@ pub mod listing_table_scan_node {
         Avro(super::super::datafusion_common::AvroFormat),
         #[prost(message, tag = "15")]
         Json(super::super::datafusion_common::NdJsonFormat),
+        #[prost(message, tag = "16")]
+        Arrow(super::super::datafusion_common::ArrowFormat),
     }
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 2df162f21e..b4d72aa1b6 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -130,8 +130,9 @@ pub mod protobuf {
     pub use crate::generated::datafusion::*;
     pub use datafusion_proto_common::common::proto_error;
     pub use datafusion_proto_common::protobuf_common::{
-        ArrowOptions, ArrowType, AvroFormat, AvroOptions, CsvFormat, DfSchema,
-        EmptyMessage, Field, JoinSide, NdJsonFormat, ParquetFormat, 
ScalarValue, Schema,
+        ArrowFormat, ArrowOptions, ArrowType, AvroFormat, AvroOptions, 
CsvFormat,
+        DfSchema, EmptyMessage, Field, JoinSide, NdJsonFormat, ParquetFormat,
+        ScalarValue, Schema,
     };
     pub use datafusion_proto_common::{FromProtoError, ToProtoError};
 }
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index 9915d3617f..576a51707c 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -35,6 +35,7 @@ use crate::{
 use crate::protobuf::{proto_error, ToProtoError};
 use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
 use datafusion::datasource::cte_worktable::CteWorkTable;
+use datafusion::datasource::file_format::arrow::ArrowFormat;
 #[cfg(feature = "avro")]
 use datafusion::datasource::file_format::avro::AvroFormat;
 #[cfg(feature = "parquet")]
@@ -439,13 +440,16 @@ impl AsLogicalPlan for LogicalPlanNode {
                         }
                         #[cfg_attr(not(feature = "avro"), 
allow(unused_variables))]
                         FileFormatType::Avro(..) => {
-                            #[cfg(feature = "avro")] 
+                            #[cfg(feature = "avro")]
                             {
                                 Arc::new(AvroFormat)
                             }
                             #[cfg(not(feature = "avro"))]
                             panic!("Unable to process avro file since `avro` 
feature is not enabled");
                         }
+                        FileFormatType::Arrow(..) => {
+                            Arc::new(ArrowFormat)
+                        }
                     };
 
                 let table_paths = &scan
@@ -1057,13 +1061,18 @@ impl AsLogicalPlan for LogicalPlanNode {
                                 Some(FileFormatType::Avro(protobuf::AvroFormat 
{}))
                         }
 
+                        if any.is::<ArrowFormat>() {
+                            maybe_some_type =
+                                
Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
+                        }
+
                         if let Some(file_format_type) = maybe_some_type {
                             file_format_type
                         } else {
                             return Err(proto_error(format!(
-                            "Error converting file format, {:?} is invalid as 
a datafusion format.",
-                            listing_table.options().format
-                        )));
+                                "Error deserializing unknown file format: 
{:?}",
+                                listing_table.options().format
+                            )));
                         }
                     };
 
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 6c51d553fe..170c2675f7 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -28,6 +28,7 @@ use datafusion::datasource::file_format::json::{JsonFormat, 
JsonFormatFactory};
 use datafusion::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
 };
+use datafusion::execution::options::ArrowReadOptions;
 use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
 use datafusion::optimizer::Optimizer;
 use datafusion_common::parsers::CompressionTypeVariant;
@@ -2656,3 +2657,16 @@ async fn roundtrip_custom_listing_tables_schema() -> 
Result<()> {
     assert_eq!(plan, new_plan);
     Ok(())
 }
+
+#[tokio::test]
+async fn roundtrip_arrow_scan() -> Result<()> {
+    let ctx = SessionContext::new();
+    let plan = ctx
+        .read_arrow("tests/testdata/test.arrow", ArrowReadOptions::default())
+        .await?
+        .into_optimized_plan()?;
+    let bytes = logical_plan_to_bytes(&plan)?;
+    let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
+    assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+    Ok(())
+}
diff --git a/datafusion/proto/tests/testdata/test.arrow 
b/datafusion/proto/tests/testdata/test.arrow
new file mode 100644
index 0000000000..5314d9eea1
Binary files /dev/null and b/datafusion/proto/tests/testdata/test.arrow differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to