This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ca5dc8c066 Change `FileScanConfig.table_partition_cols` from `(String, 
DataType)` to `Field`s (#7890)
ca5dc8c066 is described below

commit ca5dc8c066bb6da34a1f8522b6127138358a3159
Author: Nga Tran <[email protected]>
AuthorDate: Sun Oct 22 13:00:49 2023 -0400

    Change `FileScanConfig.table_partition_cols` from `(String, DataType)` to 
`Field`s (#7890)
    
    * feat: make data type of FileScanConfig.table_partition_cols a vector of 
Fields
    
    * fix: avro test
    
    * chore: Apply suggestions from code review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * chore: address review comments
    
    * chore: remove uncessary to_owned
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/listing/table.rs    | 10 +---
 .../core/src/datasource/physical_plan/avro.rs      |  2 +-
 .../core/src/datasource/physical_plan/csv.rs       |  2 +-
 .../datasource/physical_plan/file_scan_config.rs   | 54 ++++++++++++++++++----
 .../src/datasource/physical_plan/file_stream.rs    |  2 +-
 .../core/src/datasource/physical_plan/parquet.rs   |  9 ++--
 datafusion/proto/src/physical_plan/from_proto.rs   | 10 +---
 datafusion/proto/src/physical_plan/to_proto.rs     |  2 +-
 8 files changed, 57 insertions(+), 34 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 485ab0a902..bd878932d8 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -746,15 +746,7 @@ impl TableProvider for ListingTable {
             .options
             .table_partition_cols
             .iter()
-            .map(|col| {
-                Ok((
-                    col.0.to_owned(),
-                    self.table_schema
-                        .field_with_name(&col.0)?
-                        .data_type()
-                        .clone(),
-                ))
-            })
+            .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
             .collect::<Result<Vec<_>>>()?;
 
         let filters = if let Some(expr) = conjunction(filters.to_vec()) {
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index f08bc9b8df..237772eb83 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -420,7 +420,7 @@ mod tests {
             statistics: Statistics::new_unknown(&file_schema),
             file_schema,
             limit: None,
-            table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
+            table_partition_cols: vec![Field::new("date", DataType::Utf8, 
false)],
             output_ordering: vec![],
             infinite_source: false,
         });
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index f3b2fa9de7..e60a249b0b 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -871,7 +871,7 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
 
         // Add partition columns
-        config.table_partition_cols = vec![("date".to_owned(), 
DataType::Utf8)];
+        config.table_partition_cols = vec![Field::new("date", DataType::Utf8, 
false)];
         config.file_groups[0][0].partition_values =
             vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
 
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index c1a19b745b..d8a9697b2b 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -101,7 +101,7 @@ pub struct FileScanConfig {
     /// all records after filtering are returned.
     pub limit: Option<usize>,
     /// The partitioning columns
-    pub table_partition_cols: Vec<(String, DataType)>,
+    pub table_partition_cols: Vec<Field>,
     /// All equivalent lexicographical orderings that describe the schema.
     pub output_ordering: Vec<LexOrdering>,
     /// Indicates whether this plan may produce an infinite stream of records.
@@ -135,8 +135,7 @@ impl FileScanConfig {
                 
table_cols_stats.push(self.statistics.column_statistics[idx].clone())
             } else {
                 let partition_idx = idx - self.file_schema.fields().len();
-                let (name, dtype) = &self.table_partition_cols[partition_idx];
-                table_fields.push(Field::new(name, dtype.to_owned(), false));
+                
table_fields.push(self.table_partition_cols[partition_idx].to_owned());
                 // TODO provide accurate stat for partition column (#1186)
                 table_cols_stats.push(ColumnStatistics::new_unknown())
             }
@@ -501,10 +500,10 @@ mod tests {
             Arc::clone(&file_schema),
             None,
             Statistics::new_unknown(&file_schema),
-            vec![(
+            to_partition_cols(vec![(
                 "date".to_owned(),
                 wrap_partition_type_in_dict(DataType::Utf8),
-            )],
+            )]),
         );
 
         let (proj_schema, proj_statistics, _) = conf.project();
@@ -527,6 +526,35 @@ mod tests {
         assert_eq!(col_indices, None);
     }
 
+    #[test]
+    fn physical_plan_config_no_projection_tab_cols_as_field() {
+        let file_schema = aggr_test_schema();
+
+        // make a table_partition_col as a field
+        let table_partition_col =
+            Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), 
true)
+                .with_metadata(HashMap::from_iter(vec![(
+                    "key_whatever".to_owned(),
+                    "value_whatever".to_owned(),
+                )]));
+
+        let conf = config_for_projection(
+            Arc::clone(&file_schema),
+            None,
+            Statistics::new_unknown(&file_schema),
+            vec![table_partition_col.clone()],
+        );
+
+        // verify the proj_schema inlcudes the last column and exactly the 
same the field it is defined
+        let (proj_schema, _proj_statistics, _) = conf.project();
+        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
+        assert_eq!(
+            *proj_schema.field(file_schema.fields().len()),
+            table_partition_col,
+            "partition columns are the last columns and ust have all values 
defined in created field"
+        );
+    }
+
     #[test]
     fn physical_plan_config_with_projection() {
         let file_schema = aggr_test_schema();
@@ -545,10 +573,10 @@ mod tests {
                     .collect(),
                 total_byte_size: Precision::Absent,
             },
-            vec![(
+            to_partition_cols(vec![(
                 "date".to_owned(),
                 wrap_partition_type_in_dict(DataType::Utf8),
-            )],
+            )]),
         );
 
         let (proj_schema, proj_statistics, _) = conf.project();
@@ -602,7 +630,7 @@ mod tests {
                 file_batch.schema().fields().len() + 2,
             ]),
             Statistics::new_unknown(&file_batch.schema()),
-            partition_cols.clone(),
+            to_partition_cols(partition_cols.clone()),
         );
         let (proj_schema, ..) = conf.project();
         // created a projector for that projected schema
@@ -747,7 +775,7 @@ mod tests {
         file_schema: SchemaRef,
         projection: Option<Vec<usize>>,
         statistics: Statistics,
-        table_partition_cols: Vec<(String, DataType)>,
+        table_partition_cols: Vec<Field>,
     ) -> FileScanConfig {
         FileScanConfig {
             file_schema,
@@ -762,6 +790,14 @@ mod tests {
         }
     }
 
+    /// Convert partition columns from Vec<String DataType> to Vec<Field>
+    fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> 
Vec<Field> {
+        table_partition_cols
+            .iter()
+            .map(|(name, dtype)| Field::new(name, dtype.clone(), false))
+            .collect::<Vec<_>>()
+    }
+
     /// returns record batch with 3 columns of i32 in memory
     pub fn build_table_i32(
         a: (&str, &Vec<i32>),
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs 
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 22ff3f42eb..a715f6e8e3 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -259,7 +259,7 @@ impl<F: FileOpener> FileStream<F> {
             &config
                 .table_partition_cols
                 .iter()
-                .map(|x| x.0.clone())
+                .map(|x| x.name().clone())
                 .collect::<Vec<_>>(),
         );
 
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index e59686453f..6cab27b084 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -1624,14 +1624,15 @@ mod tests {
                 projection: Some(vec![0, 1, 2, 12, 13]),
                 limit: None,
                 table_partition_cols: vec![
-                    ("year".to_owned(), DataType::Utf8),
-                    ("month".to_owned(), DataType::UInt8),
-                    (
-                        "day".to_owned(),
+                    Field::new("year", DataType::Utf8, false),
+                    Field::new("month", DataType::UInt8, false),
+                    Field::new(
+                        "day",
                         DataType::Dictionary(
                             Box::new(DataType::UInt16),
                             Box::new(DataType::Utf8),
                         ),
+                        false,
                     ),
                 ],
                 output_ordering: vec![],
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index ff02b80521..cdc772f71d 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -22,7 +22,6 @@ use std::ops::Deref;
 use std::sync::Arc;
 
 use arrow::compute::SortOptions;
-use arrow::datatypes::DataType;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
@@ -489,13 +488,8 @@ pub fn parse_protobuf_file_scan_config(
     let table_partition_cols = proto
         .table_partition_cols
         .iter()
-        .map(|col| {
-            Ok((
-                col.to_owned(),
-                schema.field_with_name(col)?.data_type().clone(),
-            ))
-        })
-        .collect::<Result<Vec<(String, DataType)>>>()?;
+        .map(|col| Ok(schema.field_with_name(col)?.clone()))
+        .collect::<Result<Vec<_>>>()?;
 
     let mut output_ordering = vec![];
     for node_collection in &proto.output_ordering {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index db97c39325..466b99b684 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -729,7 +729,7 @@ impl TryFrom<&FileScanConfig> for 
protobuf::FileScanExecConf {
             table_partition_cols: conf
                 .table_partition_cols
                 .iter()
-                .map(|x| x.0.clone())
+                .map(|x| x.name().clone())
                 .collect::<Vec<_>>(),
             object_store_url: conf.object_store_url.to_string(),
             output_ordering: output_orderings

Reply via email to