This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 5e0b2d0554 fix(datafusion-proto): support serializing/deserilizing ArrowFormat tables (#16875) 5e0b2d0554 is described below commit 5e0b2d0554af3a38bb93ccd305a410c5df7905f2 Author: Colin Marc <h...@colinmarc.com> AuthorDate: Sat Jul 26 08:56:33 2025 +0200 fix(datafusion-proto): support serializing/deserilizing ArrowFormat tables (#16875) Fixes #16874 --- .../proto-common/proto/datafusion_common.proto | 2 + datafusion/proto-common/src/generated/pbjson.rs | 71 +++++++++++++++++++++ datafusion/proto-common/src/generated/prost.rs | 2 + datafusion/proto/proto/datafusion.proto | 1 + .../proto/src/generated/datafusion_proto_common.rs | 2 + datafusion/proto/src/generated/pbjson.rs | 13 ++++ datafusion/proto/src/generated/prost.rs | 7 +- datafusion/proto/src/lib.rs | 5 +- datafusion/proto/src/logical_plan/mod.rs | 17 +++-- .../proto/tests/cases/roundtrip_logical_plan.rs | 14 ++++ datafusion/proto/tests/testdata/test.arrow | Bin 0 -> 1842 bytes 11 files changed, 127 insertions(+), 7 deletions(-) diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 81fc9cceb7..8cb2726058 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -55,6 +55,8 @@ message NdJsonFormat { JsonOptions options = 1; } +message ArrowFormat {} + message PrimaryKeyConstraint{ repeated uint64 indices = 1; diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index c3b6686df0..f35fd15946 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1,3 +1,74 @@ +impl serde::Serialize for ArrowFormat { + #[allow(deprecated)] + fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let len = 0; + let struct_ser = serializer.serialize_struct("datafusion_common.ArrowFormat", len)?; + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ArrowFormat { + #[allow(deprecated)] + fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error> + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E> + where + E: serde::de::Error, + { + Err(serde::de::Error::unknown_field(value, FIELDS)) + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ArrowFormat; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.ArrowFormat") + } + + fn visit_map<V>(self, mut map_: V) -> std::result::Result<ArrowFormat, V::Error> + where + V: serde::de::MapAccess<'de>, + { + while map_.next_key::<GeneratedField>()?.is_some() { + let _ = map_.next_value::<serde::de::IgnoredAny>()?; + } + Ok(ArrowFormat { + }) + } + } + deserializer.deserialize_struct("datafusion_common.ArrowFormat", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for ArrowOptions { #[allow(deprecated)] fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 411d72af4c..ac4a9ea4be 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -45,6 +45,8 @@ pub struct NdJsonFormat { #[prost(message, optional, tag = "1")] pub options: ::core::option::Option<JsonOptions>, } +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ArrowFormat {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PrimaryKeyConstraint { #[prost(uint64, repeated, tag = "1")] diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 85a565c0b2..666a8c7d1f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -98,6 +98,7 @@ message ListingTableScanNode { datafusion_common.ParquetFormat parquet = 11; datafusion_common.AvroFormat avro = 12; datafusion_common.NdJsonFormat json = 15; + datafusion_common.ArrowFormat arrow = 16; } repeated SortExprNodeCollection file_sort_order = 13; } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 411d72af4c..ac4a9ea4be 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -45,6 +45,8 @@ pub struct NdJsonFormat { #[prost(message, optional, tag = "1")] pub options: ::core::option::Option<JsonOptions>, } +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ArrowFormat {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PrimaryKeyConstraint { #[prost(uint64, repeated, tag = "1")] diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 83f66ec22c..c2e6d8ef59 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -9907,6 +9907,9 @@ impl serde::Serialize for ListingTableScanNode { listing_table_scan_node::FileFormatType::Json(v) => { struct_ser.serialize_field("json", v)?; } + listing_table_scan_node::FileFormatType::Arrow(v) => { + struct_ser.serialize_field("arrow", v)?; + } } } struct_ser.end() @@ -9939,6 +9942,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { "parquet", "avro", "json", + "arrow", ]; #[allow(clippy::enum_variant_names)] @@ -9957,6 +9961,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { Parquet, Avro, Json, + Arrow, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error> @@ -9992,6 +9997,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { "parquet" => Ok(GeneratedField::Parquet), "avro" => Ok(GeneratedField::Avro), "json" => Ok(GeneratedField::Json), + "arrow" => Ok(GeneratedField::Arrow), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -10112,6 +10118,13 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { return Err(serde::de::Error::duplicate_field("json")); } file_format_type__ = map_.next_value::<::std::option::Option<_>>()?.map(listing_table_scan_node::FileFormatType::Json) +; + } + GeneratedField::Arrow => { + if file_format_type__.is_some() { + return Err(serde::de::Error::duplicate_field("arrow")); + } + file_format_type__ = map_.next_value::<::std::option::Option<_>>()?.map(listing_table_scan_node::FileFormatType::Arrow) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index cbf6b3b2d4..35491366dc 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -123,7 +123,10 @@ pub struct ListingTableScanNode { pub target_partitions: u32, #[prost(message, repeated, tag = "13")] pub file_sort_order: ::prost::alloc::vec::Vec<SortExprNodeCollection>, - #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12, 15")] + #[prost( + oneof = "listing_table_scan_node::FileFormatType", + tags = "10, 11, 12, 15, 16" + )] pub file_format_type: ::core::option::Option< listing_table_scan_node::FileFormatType, >, @@ -140,6 +143,8 @@ pub mod listing_table_scan_node { Avro(super::super::datafusion_common::AvroFormat), #[prost(message, tag = "15")] Json(super::super::datafusion_common::NdJsonFormat), + #[prost(message, tag = "16")] + Arrow(super::super::datafusion_common::ArrowFormat), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 2df162f21e..b4d72aa1b6 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -130,8 +130,9 @@ pub mod protobuf { pub use crate::generated::datafusion::*; pub use datafusion_proto_common::common::proto_error; pub use datafusion_proto_common::protobuf_common::{ - ArrowOptions, ArrowType, AvroFormat, AvroOptions, CsvFormat, DfSchema, - EmptyMessage, Field, JoinSide, NdJsonFormat, ParquetFormat, ScalarValue, Schema, + ArrowFormat, ArrowOptions, ArrowType, AvroFormat, AvroOptions, CsvFormat, + DfSchema, EmptyMessage, Field, JoinSide, NdJsonFormat, ParquetFormat, + ScalarValue, Schema, }; pub use datafusion_proto_common::{FromProtoError, ToProtoError}; } diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 9915d3617f..576a51707c 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -35,6 +35,7 @@ use crate::{ use crate::protobuf::{proto_error, ToProtoError}; use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef}; use datafusion::datasource::cte_worktable::CteWorkTable; +use datafusion::datasource::file_format::arrow::ArrowFormat; #[cfg(feature = "avro")] use datafusion::datasource::file_format::avro::AvroFormat; #[cfg(feature = "parquet")] @@ -439,13 +440,16 @@ impl AsLogicalPlan for LogicalPlanNode { } #[cfg_attr(not(feature = "avro"), allow(unused_variables))] FileFormatType::Avro(..) => { - #[cfg(feature = "avro")] + #[cfg(feature = "avro")] { Arc::new(AvroFormat) } #[cfg(not(feature = "avro"))] panic!("Unable to process avro file since `avro` feature is not enabled"); } + FileFormatType::Arrow(..) => { + Arc::new(ArrowFormat) + } }; let table_paths = &scan @@ -1057,13 +1061,18 @@ impl AsLogicalPlan for LogicalPlanNode { Some(FileFormatType::Avro(protobuf::AvroFormat {})) } + if any.is::<ArrowFormat>() { + maybe_some_type = + Some(FileFormatType::Arrow(protobuf::ArrowFormat {})) + } + if let Some(file_format_type) = maybe_some_type { file_format_type } else { return Err(proto_error(format!( - "Error converting file format, {:?} is invalid as a datafusion format.", - listing_table.options().format - ))); + "Error deserializing unknown file format: {:?}", + listing_table.options().format + ))); } }; diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 6c51d553fe..170c2675f7 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -28,6 +28,7 @@ use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory}; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::execution::options::ArrowReadOptions; use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion; use datafusion::optimizer::Optimizer; use datafusion_common::parsers::CompressionTypeVariant; @@ -2656,3 +2657,16 @@ async fn roundtrip_custom_listing_tables_schema() -> Result<()> { assert_eq!(plan, new_plan); Ok(()) } + +#[tokio::test] +async fn roundtrip_arrow_scan() -> Result<()> { + let ctx = SessionContext::new(); + let plan = ctx + .read_arrow("tests/testdata/test.arrow", ArrowReadOptions::default()) + .await? + .into_optimized_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; + assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + Ok(()) +} diff --git a/datafusion/proto/tests/testdata/test.arrow b/datafusion/proto/tests/testdata/test.arrow new file mode 100644 index 0000000000..5314d9eea1 Binary files /dev/null and b/datafusion/proto/tests/testdata/test.arrow differ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org