This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b5dbd79e Support arbitrary user defined partition column in 
`ListingTable` (rather than assuming they are always Dictionary encoded) (#5545)
8b5dbd79e is described below

commit 8b5dbd79ee9cb19e006fdc8c893e2a4cbc76914e
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Mar 15 13:24:46 2023 +0100

    Support arbitrary user defined partition column in `ListingTable` (rather 
than assuming they are always Dictionary encoded) (#5545)
    
    * refactor: user may choose to dict-encode partition values
    
    Let the user decide if they may want to encode partition values for
    file-based data sources. Dictionary encoding makes sense for string
    values but is probably pointless or even counterproductive for integer
    types.
    
    * refactor: improve transition
    
    * fix: remove leftover dbg
    
    * refactor: `partition_{type,value}_wrap` -> 
`wrap_partition_{type,value}_in_dict`
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/listing/mod.rs      |  11 +-
 datafusion/core/src/datasource/listing/table.rs    |  17 +-
 .../core/src/datasource/listing_table_factory.rs   |  10 +-
 .../core/src/physical_plan/file_format/avro.rs     |   6 +-
 .../core/src/physical_plan/file_format/csv.rs      |   4 +-
 .../core/src/physical_plan/file_format/mod.rs      | 290 ++++++++++++++++++---
 .../core/src/physical_plan/file_format/parquet.rs  |  65 +++--
 datafusion/core/tests/path_partition.rs            |   8 +-
 .../core/tests/sqllogictests/test_files/ddl.slt    |   2 +-
 9 files changed, 329 insertions(+), 84 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index d5374d1ed..1e6b40a34 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -55,7 +55,16 @@ pub struct FileRange {
 pub struct PartitionedFile {
     /// Path for the file (e.g. URL, filesystem path, etc)
     pub object_meta: ObjectMeta,
-    /// Values of partition columns to be appended to each row
+    /// Values of partition columns to be appended to each row.
+    ///
+    /// These MUST have the same count, order, and type than the 
[`table_partition_cols`].
+    ///
+    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have 
used [`wrap_partition_type_in_dict`] to wrap the column type.
+    ///
+    ///
+    /// [`wrap_partition_type_in_dict`]: 
crate::physical_plan::file_format::wrap_partition_type_in_dict
+    /// [`wrap_partition_value_in_dict`]: 
crate::physical_plan::file_format::wrap_partition_value_in_dict
+    /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index f6d9c959e..8858413bf 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -44,7 +44,6 @@ use crate::datasource::{
 };
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::physical_plan;
-use crate::physical_plan::file_format::partition_type_wrap;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
@@ -300,6 +299,8 @@ impl ListingOptions {
 
     /// Set table partition column names on [`ListingOptions`] and returns 
self.
     ///
+    /// You may use [`wrap_partition_type_in_dict`] to request a 
dictionary-encoded type.
+    ///
     /// ```
     /// # use std::sync::Arc;
     /// # use arrow::datatypes::DataType;
@@ -315,6 +316,9 @@ impl ListingOptions {
     /// assert_eq!(listing_options.table_partition_cols, 
vec![("col_a".to_string(), DataType::Utf8),
     ///     ("col_b".to_string(), DataType::Utf8)]);
     /// ```
+    ///
+    ///
+    /// [`wrap_partition_type_in_dict`]: 
crate::physical_plan::file_format::wrap_partition_type_in_dict
     pub fn with_table_partition_cols(
         mut self,
         table_partition_cols: Vec<(String, DataType)>,
@@ -538,11 +542,7 @@ impl ListingTable {
         // Add the partition columns to the file schema
         let mut table_fields = file_schema.fields().clone();
         for (part_col_name, part_col_type) in &options.table_partition_cols {
-            table_fields.push(Field::new(
-                part_col_name,
-                partition_type_wrap(part_col_type.clone()),
-                false,
-            ));
+            table_fields.push(Field::new(part_col_name, part_col_type.clone(), 
false));
         }
         let infinite_source = options.infinite_source;
 
@@ -1012,10 +1012,7 @@ mod tests {
 
         let opt = ListingOptions::new(Arc::new(AvroFormat {}))
             .with_file_extension(FileType::AVRO.get_ext())
-            .with_table_partition_cols(vec![(
-                String::from("p1"),
-                partition_type_wrap(DataType::Utf8),
-            )])
+            .with_table_partition_cols(vec![(String::from("p1"), 
DataType::Utf8)])
             .with_target_partitions(4);
 
         let table_path = ListingTableUrl::parse("test:///table/").unwrap();
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index fe4393cb2..8b7dd46a8 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -86,7 +86,15 @@ impl TableProviderFactory for ListingTableFactory {
                 None,
                 cmd.table_partition_cols
                     .iter()
-                    .map(|x| (x.clone(), DataType::Utf8))
+                    .map(|x| {
+                        (
+                            x.clone(),
+                            DataType::Dictionary(
+                                Box::new(DataType::UInt16),
+                                Box::new(DataType::Utf8),
+                            ),
+                        )
+                    })
                     .collect::<Vec<_>>(),
             )
         } else {
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index 9ac1d23ba..e4fc570fe 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -213,7 +213,6 @@ mod tests {
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::physical_plan::file_format::chunked_store::ChunkedStore;
-    use crate::physical_plan::file_format::partition_type_wrap;
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
     use crate::test::object_store::local_unpartitioned_file;
@@ -409,10 +408,7 @@ mod tests {
             file_schema,
             statistics: Statistics::default(),
             limit: None,
-            table_partition_cols: vec![(
-                "date".to_owned(),
-                partition_type_wrap(DataType::Utf8),
-            )],
+            table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
             output_ordering: None,
             infinite_source: false,
         });
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 346c99158..3fc7df4f1 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -326,7 +326,6 @@ mod tests {
     use super::*;
     use crate::datasource::file_format::file_type::FileType;
     use crate::physical_plan::file_format::chunked_store::ChunkedStore;
-    use crate::physical_plan::file_format::partition_type_wrap;
     use crate::prelude::*;
     use crate::test::{partitioned_csv_config, partitioned_file_groups};
     use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
@@ -580,8 +579,7 @@ mod tests {
         let mut config = partitioned_csv_config(file_schema, file_groups)?;
 
         // Add partition columns
-        config.table_partition_cols =
-            vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))];
+        config.table_partition_cols = vec![("date".to_owned(), 
DataType::Utf8)];
         config.file_groups[0][0].partition_values =
             vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
 
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs 
b/datafusion/core/src/physical_plan/file_format/mod.rs
index d7616a3c2..eb70d18ef 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -30,9 +30,9 @@ pub use self::csv::CsvExec;
 pub(crate) use self::parquet::plan_to_parquet;
 pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory};
 use arrow::{
-    array::{ArrayData, ArrayRef, DictionaryArray},
+    array::{ArrayData, ArrayRef, BufferBuilder, DictionaryArray},
     buffer::Buffer,
-    datatypes::{DataType, Field, Schema, SchemaRef, UInt16Type},
+    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, 
UInt16Type},
     record_batch::RecordBatch,
 };
 pub use avro::AvroExec;
@@ -53,25 +53,42 @@ use crate::{
     error::{DataFusionError, Result},
     scalar::ScalarValue,
 };
-use arrow::array::{new_null_array, UInt16BufferBuilder};
+use arrow::array::new_null_array;
 use arrow::record_batch::RecordBatchOptions;
-use log::{debug, info};
+use log::{debug, info, warn};
 use object_store::path::Path;
 use object_store::ObjectMeta;
 use std::{
+    borrow::Cow,
     collections::HashMap,
     fmt::{Display, Formatter, Result as FmtResult},
+    marker::PhantomData,
     sync::Arc,
     vec,
 };
 
 use super::{ColumnStatistics, Statistics};
 
-/// Convert logical type of partition column to physical type: 
`Dictionary(UInt16, val_type)`
-pub fn partition_type_wrap(val_type: DataType) -> DataType {
+/// Convert logical type of partition column to physical type: 
`Dictionary(UInt16, val_type)`.
+///
+/// You CAN use this to specify types for partition columns. However you MAY 
also choose not to dictionary-encode the
+/// data or to use a different dictionary type.
+///
+/// Use [`wrap_partition_value_in_dict`] to wrap the values.
+pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
     DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
 }
 
+/// Convert scalar value of partition columns to physical type: 
`Dictionary(UInt16, val_type)` .
+///
+/// You CAN use this to specify types for partition columns. However you MAY 
also choose not to dictionary-encode the
+/// data or to use a different dictionary type.
+///
+/// Use [`wrap_partition_type_in_dict`] to wrap the types.
+pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
+    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
+}
+
 /// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
 pub fn get_scan_files(
     plan: Arc<dyn ExecutionPlan>,
@@ -394,7 +411,7 @@ struct PartitionColumnProjector {
     /// An Arrow buffer initialized to zeros that represents the key array of 
all partition
     /// columns (partition columns are materialized by dictionary arrays with 
only one
     /// value in the dictionary, thus all the keys are equal to zero).
-    key_buffer_cache: Option<Buffer>,
+    key_buffer_cache: ZeroBufferGenerators,
     /// Mapping between the indexes in the list of partition columns and the 
target
     /// schema. Sorted by index in the target schema so that we can iterate on 
it to
     /// insert the partition columns in the target record batch.
@@ -420,7 +437,7 @@ impl PartitionColumnProjector {
 
         Self {
             projected_partition_indexes,
-            key_buffer_cache: None,
+            key_buffer_cache: Default::default(),
             projected_schema,
         }
     }
@@ -446,11 +463,27 @@ impl PartitionColumnProjector {
         }
         let mut cols = file_batch.columns().to_vec();
         for &(pidx, sidx) in &self.projected_partition_indexes {
+            let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
+
+            // check if user forgot to dict-encode the partition value
+            let field = self.projected_schema.field(sidx);
+            let expected_data_type = field.data_type();
+            let actual_data_type = partition_value.get_datatype();
+            if let DataType::Dictionary(key_type, _) = expected_data_type {
+                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
+                    warn!("Partition value for column {} was not 
dictionary-encoded, applied auto-fix.", field.name());
+                    partition_value = Cow::Owned(ScalarValue::Dictionary(
+                        key_type.clone(),
+                        Box::new(partition_value.as_ref().clone()),
+                    ));
+                }
+            }
+
             cols.insert(
                 sidx,
-                create_dict_array(
+                create_output_array(
                     &mut self.key_buffer_cache,
-                    &partition_values[pidx],
+                    partition_value.as_ref(),
                     file_batch.num_rows(),
                 ),
             )
@@ -459,26 +492,60 @@ impl PartitionColumnProjector {
     }
 }
 
-fn create_dict_array(
-    key_buffer_cache: &mut Option<Buffer>,
-    val: &ScalarValue,
-    len: usize,
-) -> ArrayRef {
-    // build value dictionary
-    let dict_vals = val.to_array();
-
-    // build keys array
-    let sliced_key_buffer = match key_buffer_cache {
-        Some(buf) if buf.len() >= len * 2 => buf.slice(buf.len() - len * 2),
-        _ => {
-            let mut key_buffer_builder = UInt16BufferBuilder::new(len * 2);
-            key_buffer_builder.advance(len * 2); // keys are all 0
-            key_buffer_cache.insert(key_buffer_builder.finish()).clone()
+#[derive(Debug, Default)]
+struct ZeroBufferGenerators {
+    gen_i8: ZeroBufferGenerator<i8>,
+    gen_i16: ZeroBufferGenerator<i16>,
+    gen_i32: ZeroBufferGenerator<i32>,
+    gen_i64: ZeroBufferGenerator<i64>,
+    gen_u8: ZeroBufferGenerator<u8>,
+    gen_u16: ZeroBufferGenerator<u16>,
+    gen_u32: ZeroBufferGenerator<u32>,
+    gen_u64: ZeroBufferGenerator<u64>,
+}
+
+/// Generate a arrow [`Buffer`] that contains zero values.
+#[derive(Debug, Default)]
+struct ZeroBufferGenerator<T>
+where
+    T: ArrowNativeType,
+{
+    cache: Option<Buffer>,
+    _t: PhantomData<T>,
+}
+
+impl<T> ZeroBufferGenerator<T>
+where
+    T: ArrowNativeType,
+{
+    const SIZE: usize = std::mem::size_of::<T>();
+
+    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
+        match &mut self.cache {
+            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
+                buf.slice_with_length(0, n_vals * Self::SIZE)
+            }
+            _ => {
+                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
+                key_buffer_builder.advance(n_vals); // keys are all 0
+                self.cache.insert(key_buffer_builder.finish()).clone()
+            }
         }
-    };
+    }
+}
 
-    // create data type
-    let data_type = partition_type_wrap(val.get_datatype());
+fn create_dict_array<T>(
+    buffer_gen: &mut ZeroBufferGenerator<T>,
+    dict_val: &ScalarValue,
+    len: usize,
+    data_type: DataType,
+) -> ArrayRef
+where
+    T: ArrowNativeType,
+{
+    let dict_vals = dict_val.to_array();
+
+    let sliced_key_buffer = buffer_gen.get_buffer(len);
 
     // assemble pieces together
     let mut builder = ArrayData::builder(data_type)
@@ -490,6 +557,84 @@ fn create_dict_array(
     ))
 }
 
+fn create_output_array(
+    key_buffer_cache: &mut ZeroBufferGenerators,
+    val: &ScalarValue,
+    len: usize,
+) -> ArrayRef {
+    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
+        match key_type.as_ref() {
+            DataType::Int8 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i8,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::Int16 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i16,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::Int32 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i32,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::Int64 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i64,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt8 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u8,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt16 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u16,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt32 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u32,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt64 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u64,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            _ => {}
+        }
+    }
+
+    val.to_array_of_size(len)
+}
+
 /// A single file or part of a file that should be read, along with its 
schema, statistics
 pub struct FileMeta {
     /// Path for the file (e.g. URL, filesystem path, etc)
@@ -607,7 +752,10 @@ mod tests {
             Arc::clone(&file_schema),
             None,
             Statistics::default(),
-            vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
+            vec![(
+                "date".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            )],
         );
 
         let (proj_schema, proj_statistics) = conf.project();
@@ -653,7 +801,10 @@ mod tests {
                 ),
                 ..Default::default()
             },
-            vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
+            vec![(
+                "date".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            )],
         );
 
         let (proj_schema, proj_statistics) = conf.project();
@@ -684,9 +835,18 @@ mod tests {
             ("c", &vec![10, 11, 12]),
         );
         let partition_cols = vec![
-            ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
-            ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
-            ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
+            (
+                "year".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            ),
+            (
+                "month".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            ),
+            (
+                "day".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            ),
         ];
         // create a projected schema
         let conf = config_for_projection(
@@ -718,9 +878,15 @@ mod tests {
                 // file_batch is ok here because we kept all the file cols in 
the projection
                 file_batch,
                 &[
-                    ScalarValue::Utf8(Some("2021".to_owned())),
-                    ScalarValue::Utf8(Some("10".to_owned())),
-                    ScalarValue::Utf8(Some("26".to_owned())),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "2021".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "10".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "26".to_owned(),
+                    ))),
                 ],
             )
             .expect("Projection of partition columns into record batch 
failed");
@@ -746,9 +912,15 @@ mod tests {
                 // file_batch is ok here because we kept all the file cols in 
the projection
                 file_batch,
                 &[
-                    ScalarValue::Utf8(Some("2021".to_owned())),
-                    ScalarValue::Utf8(Some("10".to_owned())),
-                    ScalarValue::Utf8(Some("27".to_owned())),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "2021".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "10".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "27".to_owned(),
+                    ))),
                 ],
             )
             .expect("Projection of partition columns into record batch 
failed");
@@ -776,9 +948,15 @@ mod tests {
                 // file_batch is ok here because we kept all the file cols in 
the projection
                 file_batch,
                 &[
-                    ScalarValue::Utf8(Some("2021".to_owned())),
-                    ScalarValue::Utf8(Some("10".to_owned())),
-                    ScalarValue::Utf8(Some("28".to_owned())),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "2021".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "10".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "28".to_owned(),
+                    ))),
                 ],
             )
             .expect("Projection of partition columns into record batch 
failed");
@@ -792,6 +970,34 @@ mod tests {
             "+---+---+---+------+-----+",
         ];
         crate::assert_batches_eq!(expected, &[projected_batch]);
+
+        // forgot to dictionary-wrap the scalar value
+        let file_batch = build_table_i32(
+            ("a", &vec![0, 1, 2]),
+            ("b", &vec![-2, -1, 0]),
+            ("c", &vec![10, 11, 12]),
+        );
+        let projected_batch = proj
+            .project(
+                // file_batch is ok here because we kept all the file cols in 
the projection
+                file_batch,
+                &[
+                    ScalarValue::Utf8(Some("2021".to_owned())),
+                    ScalarValue::Utf8(Some("10".to_owned())),
+                    ScalarValue::Utf8(Some("26".to_owned())),
+                ],
+            )
+            .expect("Projection of partition columns into record batch 
failed");
+        let expected = vec![
+            "+---+----+----+------+-----+",
+            "| a | b  | c  | year | day |",
+            "+---+----+----+------+-----+",
+            "| 0 | -2 | 10 | 2021 | 26  |",
+            "| 1 | -1 | 11 | 2021 | 26  |",
+            "| 2 | 0  | 12 | 2021 | 26  |",
+            "+---+----+----+------+-----+",
+        ];
+        crate::assert_batches_eq!(expected, &[projected_batch]);
     }
 
     #[test]
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 3f3b0bb74..5ad642970 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -814,7 +814,6 @@ mod tests {
     use crate::execution::context::SessionState;
     use crate::execution::options::CsvReadOptions;
     use crate::physical_plan::displayable;
-    use crate::physical_plan::file_format::partition_type_wrap;
     use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
     use crate::{
@@ -1660,26 +1659,50 @@ mod tests {
             object_meta: meta,
             partition_values: vec![
                 ScalarValue::Utf8(Some("2021".to_owned())),
-                ScalarValue::Utf8(Some("10".to_owned())),
-                ScalarValue::Utf8(Some("26".to_owned())),
+                ScalarValue::UInt8(Some(10)),
+                ScalarValue::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(ScalarValue::Utf8(Some("26".to_owned()))),
+                ),
             ],
             range: None,
             extensions: None,
         };
 
+        let expected_schema = Schema::new(vec![
+            Field::new("id", DataType::Int32, true),
+            Field::new("bool_col", DataType::Boolean, true),
+            Field::new("tinyint_col", DataType::Int32, true),
+            Field::new("month", DataType::UInt8, false),
+            Field::new(
+                "day",
+                DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                ),
+                false,
+            ),
+        ]);
+
         let parquet_exec = ParquetExec::new(
             FileScanConfig {
                 object_store_url,
                 file_groups: vec![vec![partitioned_file]],
                 file_schema: schema,
                 statistics: Statistics::default(),
-                // file has 10 cols so index 12 should be month
-                projection: Some(vec![0, 1, 2, 12]),
+                // file has 10 cols so index 12 should be month and 13 should 
be day
+                projection: Some(vec![0, 1, 2, 12, 13]),
                 limit: None,
                 table_partition_cols: vec![
-                    ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
-                    ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
-                    ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
+                    ("year".to_owned(), DataType::Utf8),
+                    ("month".to_owned(), DataType::UInt8),
+                    (
+                        "day".to_owned(),
+                        DataType::Dictionary(
+                            Box::new(DataType::UInt16),
+                            Box::new(DataType::Utf8),
+                        ),
+                    ),
                 ],
                 output_ordering: None,
                 infinite_source: false,
@@ -1688,22 +1711,24 @@ mod tests {
             None,
         );
         assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
+        assert_eq!(parquet_exec.schema().as_ref(), &expected_schema);
 
         let mut results = parquet_exec.execute(0, task_ctx)?;
         let batch = results.next().await.unwrap()?;
+        assert_eq!(batch.schema().as_ref(), &expected_schema);
         let expected = vec![
-            "+----+----------+-------------+-------+",
-            "| id | bool_col | tinyint_col | month |",
-            "+----+----------+-------------+-------+",
-            "| 4  | true     | 0           | 10    |",
-            "| 5  | false    | 1           | 10    |",
-            "| 6  | true     | 0           | 10    |",
-            "| 7  | false    | 1           | 10    |",
-            "| 2  | true     | 0           | 10    |",
-            "| 3  | false    | 1           | 10    |",
-            "| 0  | true     | 0           | 10    |",
-            "| 1  | false    | 1           | 10    |",
-            "+----+----------+-------------+-------+",
+            "+----+----------+-------------+-------+-----+",
+            "| id | bool_col | tinyint_col | month | day |",
+            "+----+----------+-------------+-------+-----+",
+            "| 4  | true     | 0           | 10    | 26  |",
+            "| 5  | false    | 1           | 10    | 26  |",
+            "| 6  | true     | 0           | 10    | 26  |",
+            "| 7  | false    | 1           | 10    | 26  |",
+            "| 2  | true     | 0           | 10    | 26  |",
+            "| 3  | false    | 1           | 10    | 26  |",
+            "| 0  | true     | 0           | 10    | 26  |",
+            "| 1  | false    | 1           | 10    | 26  |",
+            "+----+----------+-------------+-------+-----+",
         ];
         crate::assert_batches_eq!(expected, &[batch]);
 
diff --git a/datafusion/core/tests/path_partition.rs 
b/datafusion/core/tests/path_partition.rs
index 670b508c3..6c7f1431a 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -59,7 +59,13 @@ async fn parquet_distinct_partition_col() -> Result<()> {
         ],
         &[
             ("year", DataType::Int32),
-            ("month", DataType::Utf8),
+            (
+                "month",
+                DataType::Dictionary(
+                    Box::new(DataType::UInt16),
+                    Box::new(DataType::Utf8),
+                ),
+            ),
             ("day", DataType::Utf8),
         ],
         "mirror:///",
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt 
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 59bfc91b5..a4c90df07 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -366,7 +366,7 @@ STORED AS CSV
 PARTITIONED BY (c_date)
 LOCATION 'tests/data/partitioned_table';
 
-query TP?
+query TPD
 SELECT * from csv_with_timestamps where c_date='2018-11-13'
 ----
 Jorge 2018-12-13T12:12:10.011 2018-11-13

Reply via email to