This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ca5dc8c066 Change `FileScanConfig.table_partition_cols` from `(String,
DataType)` to `Field`s (#7890)
ca5dc8c066 is described below
commit ca5dc8c066bb6da34a1f8522b6127138358a3159
Author: Nga Tran <[email protected]>
AuthorDate: Sun Oct 22 13:00:49 2023 -0400
Change `FileScanConfig.table_partition_cols` from `(String, DataType)` to
`Field`s (#7890)
* feat: make data type of FileScanConfig.table_partition_cols a vector of
Fields
* fix: avro test
* chore: Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* chore: address review comments
* chore: remove uncessary to_owned
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/listing/table.rs | 10 +---
.../core/src/datasource/physical_plan/avro.rs | 2 +-
.../core/src/datasource/physical_plan/csv.rs | 2 +-
.../datasource/physical_plan/file_scan_config.rs | 54 ++++++++++++++++++----
.../src/datasource/physical_plan/file_stream.rs | 2 +-
.../core/src/datasource/physical_plan/parquet.rs | 9 ++--
datafusion/proto/src/physical_plan/from_proto.rs | 10 +---
datafusion/proto/src/physical_plan/to_proto.rs | 2 +-
8 files changed, 57 insertions(+), 34 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 485ab0a902..bd878932d8 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -746,15 +746,7 @@ impl TableProvider for ListingTable {
.options
.table_partition_cols
.iter()
- .map(|col| {
- Ok((
- col.0.to_owned(),
- self.table_schema
- .field_with_name(&col.0)?
- .data_type()
- .clone(),
- ))
- })
+ .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.collect::<Result<Vec<_>>>()?;
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index f08bc9b8df..237772eb83 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -420,7 +420,7 @@ mod tests {
statistics: Statistics::new_unknown(&file_schema),
file_schema,
limit: None,
- table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
+ table_partition_cols: vec![Field::new("date", DataType::Utf8,
false)],
output_ordering: vec![],
infinite_source: false,
});
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index f3b2fa9de7..e60a249b0b 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -871,7 +871,7 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
// Add partition columns
- config.table_partition_cols = vec![("date".to_owned(),
DataType::Utf8)];
+ config.table_partition_cols = vec![Field::new("date", DataType::Utf8,
false)];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index c1a19b745b..d8a9697b2b 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -101,7 +101,7 @@ pub struct FileScanConfig {
/// all records after filtering are returned.
pub limit: Option<usize>,
/// The partitioning columns
- pub table_partition_cols: Vec<(String, DataType)>,
+ pub table_partition_cols: Vec<Field>,
/// All equivalent lexicographical orderings that describe the schema.
pub output_ordering: Vec<LexOrdering>,
/// Indicates whether this plan may produce an infinite stream of records.
@@ -135,8 +135,7 @@ impl FileScanConfig {
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
} else {
let partition_idx = idx - self.file_schema.fields().len();
- let (name, dtype) = &self.table_partition_cols[partition_idx];
- table_fields.push(Field::new(name, dtype.to_owned(), false));
+
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
// TODO provide accurate stat for partition column (#1186)
table_cols_stats.push(ColumnStatistics::new_unknown())
}
@@ -501,10 +500,10 @@ mod tests {
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
- vec![(
+ to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
- )],
+ )]),
);
let (proj_schema, proj_statistics, _) = conf.project();
@@ -527,6 +526,35 @@ mod tests {
assert_eq!(col_indices, None);
}
+ #[test]
+ fn physical_plan_config_no_projection_tab_cols_as_field() {
+ let file_schema = aggr_test_schema();
+
+ // make a table_partition_col as a field
+ let table_partition_col =
+ Field::new("date", wrap_partition_type_in_dict(DataType::Utf8),
true)
+ .with_metadata(HashMap::from_iter(vec![(
+ "key_whatever".to_owned(),
+ "value_whatever".to_owned(),
+ )]));
+
+ let conf = config_for_projection(
+ Arc::clone(&file_schema),
+ None,
+ Statistics::new_unknown(&file_schema),
+ vec![table_partition_col.clone()],
+ );
+
+ // verify the proj_schema inlcudes the last column and exactly the
same the field it is defined
+ let (proj_schema, _proj_statistics, _) = conf.project();
+ assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
+ assert_eq!(
+ *proj_schema.field(file_schema.fields().len()),
+ table_partition_col,
+ "partition columns are the last columns and ust have all values
defined in created field"
+ );
+ }
+
#[test]
fn physical_plan_config_with_projection() {
let file_schema = aggr_test_schema();
@@ -545,10 +573,10 @@ mod tests {
.collect(),
total_byte_size: Precision::Absent,
},
- vec![(
+ to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
- )],
+ )]),
);
let (proj_schema, proj_statistics, _) = conf.project();
@@ -602,7 +630,7 @@ mod tests {
file_batch.schema().fields().len() + 2,
]),
Statistics::new_unknown(&file_batch.schema()),
- partition_cols.clone(),
+ to_partition_cols(partition_cols.clone()),
);
let (proj_schema, ..) = conf.project();
// created a projector for that projected schema
@@ -747,7 +775,7 @@ mod tests {
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
- table_partition_cols: Vec<(String, DataType)>,
+ table_partition_cols: Vec<Field>,
) -> FileScanConfig {
FileScanConfig {
file_schema,
@@ -762,6 +790,14 @@ mod tests {
}
}
+ /// Convert partition columns from Vec<String DataType> to Vec<Field>
+ fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) ->
Vec<Field> {
+ table_partition_cols
+ .iter()
+ .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
+ .collect::<Vec<_>>()
+ }
+
/// returns record batch with 3 columns of i32 in memory
pub fn build_table_i32(
a: (&str, &Vec<i32>),
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 22ff3f42eb..a715f6e8e3 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -259,7 +259,7 @@ impl<F: FileOpener> FileStream<F> {
&config
.table_partition_cols
.iter()
- .map(|x| x.0.clone())
+ .map(|x| x.name().clone())
.collect::<Vec<_>>(),
);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index e59686453f..6cab27b084 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -1624,14 +1624,15 @@ mod tests {
projection: Some(vec![0, 1, 2, 12, 13]),
limit: None,
table_partition_cols: vec![
- ("year".to_owned(), DataType::Utf8),
- ("month".to_owned(), DataType::UInt8),
- (
- "day".to_owned(),
+ Field::new("year", DataType::Utf8, false),
+ Field::new("month", DataType::UInt8, false),
+ Field::new(
+ "day",
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
+ false,
),
],
output_ordering: vec![],
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index ff02b80521..cdc772f71d 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -22,7 +22,6 @@ use std::ops::Deref;
use std::sync::Arc;
use arrow::compute::SortOptions;
-use arrow::datatypes::DataType;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -489,13 +488,8 @@ pub fn parse_protobuf_file_scan_config(
let table_partition_cols = proto
.table_partition_cols
.iter()
- .map(|col| {
- Ok((
- col.to_owned(),
- schema.field_with_name(col)?.data_type().clone(),
- ))
- })
- .collect::<Result<Vec<(String, DataType)>>>()?;
+ .map(|col| Ok(schema.field_with_name(col)?.clone()))
+ .collect::<Result<Vec<_>>>()?;
let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index db97c39325..466b99b684 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -729,7 +729,7 @@ impl TryFrom<&FileScanConfig> for
protobuf::FileScanExecConf {
table_partition_cols: conf
.table_partition_cols
.iter()
- .map(|x| x.0.clone())
+ .map(|x| x.name().clone())
.collect::<Vec<_>>(),
object_store_url: conf.object_store_url.to_string(),
output_ordering: output_orderings