This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a4d494c54f fix: serialize listing table without partition column
(#15737)
a4d494c54f is described below
commit a4d494c54f2c69e87ec14ed4a123839d8733c0d9
Author: Chen Chongchen <[email protected]>
AuthorDate: Fri Apr 18 03:05:34 2025 +0800
fix: serialize listing table without partition column (#15737)
* fix: serialize listing table without partition column
* remove unwrap
* format
* clippy
---
datafusion/proto/proto/datafusion.proto | 2 +-
datafusion/proto/src/generated/prost.rs | 4 +-
datafusion/proto/src/logical_plan/mod.rs | 76 +++++++++++++++-------
.../proto/tests/cases/roundtrip_logical_plan.rs | 35 +++++++++-
4 files changed, 91 insertions(+), 26 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 908b95ab56..39236da3b9 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -90,7 +90,7 @@ message ListingTableScanNode {
ProjectionColumns projection = 4;
datafusion_common.Schema schema = 5;
repeated LogicalExprNode filters = 6;
- repeated string table_partition_cols = 7;
+ repeated PartitionColumn table_partition_cols = 7;
bool collect_stat = 8;
uint32 target_partitions = 9;
oneof FileFormatType {
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index d2165dad48..41c60b22e3 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -115,8 +115,8 @@ pub struct ListingTableScanNode {
pub schema: ::core::option::Option<super::datafusion_common::Schema>,
#[prost(message, repeated, tag = "6")]
pub filters: ::prost::alloc::vec::Vec<LogicalExprNode>,
- #[prost(string, repeated, tag = "7")]
- pub table_partition_cols:
::prost::alloc::vec::Vec<::prost::alloc::string::String>,
+ #[prost(message, repeated, tag = "7")]
+ pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
#[prost(bool, tag = "8")]
pub collect_stat: bool,
#[prost(uint32, tag = "9")]
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index c65569ef1c..806f604ccc 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -33,7 +33,7 @@ use crate::{
};
use crate::protobuf::{proto_error, ToProtoError};
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
use datafusion::datasource::cte_worktable::CteWorkTable;
#[cfg(feature = "avro")]
use datafusion::datasource::file_format::avro::AvroFormat;
@@ -458,23 +458,25 @@ impl AsLogicalPlan for LogicalPlanNode {
.map(ListingTableUrl::parse)
.collect::<Result<Vec<_>, _>>()?;
+ let partition_columns = scan
+ .table_partition_cols
+ .iter()
+ .map(|col| {
+ let Some(arrow_type) = col.arrow_type.as_ref() else {
+ return Err(proto_error(
+ "Missing Arrow type in partition
columns".to_string(),
+ ));
+ };
+ let arrow_type =
DataType::try_from(arrow_type).map_err(|e| {
+ proto_error(format!("Received an unknown
ArrowType: {}", e))
+ })?;
+ Ok((col.name.clone(), arrow_type))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
let options = ListingOptions::new(file_format)
.with_file_extension(&scan.file_extension)
- .with_table_partition_cols(
- scan.table_partition_cols
- .iter()
- .map(|col| {
- (
- col.clone(),
- schema
- .field_with_name(col)
- .unwrap()
- .data_type()
- .clone(),
- )
- })
- .collect(),
- )
+ .with_table_partition_cols(partition_columns)
.with_collect_stat(scan.collect_stat)
.with_target_partitions(scan.target_partitions as usize)
.with_file_sort_order(all_sort_orders);
@@ -1046,7 +1048,6 @@ impl AsLogicalPlan for LogicalPlanNode {
})
}
};
- let schema: protobuf::Schema = schema.as_ref().try_into()?;
let filters: Vec<protobuf::LogicalExprNode> =
serialize_exprs(filters, extension_codec)?;
@@ -1099,6 +1100,21 @@ impl AsLogicalPlan for LogicalPlanNode {
let options = listing_table.options();
+ let mut builder = SchemaBuilder::from(schema.as_ref());
+ for (idx, field) in
schema.fields().iter().enumerate().rev() {
+ if options
+ .table_partition_cols
+ .iter()
+ .any(|(name, _)| name == field.name())
+ {
+ builder.remove(idx);
+ }
+ }
+
+ let schema = builder.finish();
+
+ let schema: protobuf::Schema = (&schema).try_into()?;
+
let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
for order in &options.file_sort_order {
let expr_vec = SortExprNodeCollection {
@@ -1107,6 +1123,24 @@ impl AsLogicalPlan for LogicalPlanNode {
exprs_vec.push(expr_vec);
}
+ let partition_columns = options
+ .table_partition_cols
+ .iter()
+ .map(|(name, arrow_type)| {
+ let arrow_type =
protobuf::ArrowType::try_from(arrow_type)
+ .map_err(|e| {
+ proto_error(format!(
+ "Received an unknown ArrowType: {}",
+ e
+ ))
+ })?;
+ Ok(protobuf::PartitionColumn {
+ name: name.clone(),
+ arrow_type: Some(arrow_type),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ListingScan(
protobuf::ListingTableScanNode {
@@ -1114,11 +1148,7 @@ impl AsLogicalPlan for LogicalPlanNode {
table_name: Some(table_name.clone().into()),
collect_stat: options.collect_stat,
file_extension: options.file_extension.clone(),
- table_partition_cols: options
- .table_partition_cols
- .iter()
- .map(|x| x.0.clone())
- .collect::<Vec<_>>(),
+ table_partition_cols: partition_columns,
paths: listing_table
.table_paths()
.iter()
@@ -1133,6 +1163,7 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
} else if let Some(view_table) =
source.downcast_ref::<ViewTable>() {
+ let schema: protobuf::Schema = schema.as_ref().try_into()?;
Ok(LogicalPlanNode {
logical_plan_type:
Some(LogicalPlanType::ViewScan(Box::new(
protobuf::ViewTableScanNode {
@@ -1167,6 +1198,7 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
} else {
+ let schema: protobuf::Schema = schema.as_ref().try_into()?;
let mut bytes = vec![];
extension_codec
.try_encode_table_provider(table_name, provider, &mut
bytes)
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9fa1f74ae1..bc57ac7c4d 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -24,7 +24,10 @@ use arrow::datatypes::{
DECIMAL256_MAX_PRECISION,
};
use arrow::util::pretty::pretty_format_batches;
-use datafusion::datasource::file_format::json::JsonFormatFactory;
+use datafusion::datasource::file_format::json::{JsonFormat, JsonFormatFactory};
+use datafusion::datasource::listing::{
+ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
use datafusion::optimizer::Optimizer;
use datafusion_common::parsers::CompressionTypeVariant;
@@ -2559,3 +2562,33 @@ async fn roundtrip_union_query() -> Result<()> {
);
Ok(())
}
+
+#[tokio::test]
+async fn roundtrip_custom_listing_tables_schema() -> Result<()> {
+ let ctx = SessionContext::new();
+ // Make sure during round-trip, constraint information is preserved
+ let file_format = JsonFormat::default();
+ let table_partition_cols = vec![("part".to_owned(), DataType::Int64)];
+ let data = "../core/tests/data/partitioned_table_json";
+ let listing_table_url = ListingTableUrl::parse(data)?;
+ let listing_options = ListingOptions::new(Arc::new(file_format))
+ .with_table_partition_cols(table_partition_cols);
+
+ let config = ListingTableConfig::new(listing_table_url)
+ .with_listing_options(listing_options)
+ .infer_schema(&ctx.state())
+ .await?;
+
+ ctx.register_table("hive_style",
Arc::new(ListingTable::try_new(config)?))?;
+
+ let plan = ctx
+ .sql("SELECT part, value FROM hive_style LIMIT 1")
+ .await?
+ .logical_plan()
+ .clone();
+
+ let bytes = logical_plan_to_bytes(&plan)?;
+ let new_plan = logical_plan_from_bytes(&bytes, &ctx)?;
+ assert_eq!(plan, new_plan);
+ Ok(())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]