This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8b5dbd79e Support arbitrary user defined partition column in
`ListingTable` (rather than assuming they are always Dictionary encoded) (#5545)
8b5dbd79e is described below
commit 8b5dbd79ee9cb19e006fdc8c893e2a4cbc76914e
Author: Marco Neumann <[email protected]>
AuthorDate: Wed Mar 15 13:24:46 2023 +0100
Support arbitrary user defined partition column in `ListingTable` (rather
than assuming they are always Dictionary encoded) (#5545)
* refactor: user may choose to dict-encode partition values
Let the user decide if they may want to encode partition values for
file-based data sources. Dictionary encoding makes sense for string
values but is probably pointless or even counterproductive for integer
types.
* refactor: improve transition
* fix: remove leftover dbg
* refactor: `partition_{type,value}_wrap` ->
`wrap_partition_{type,value}_in_dict`
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/listing/mod.rs | 11 +-
datafusion/core/src/datasource/listing/table.rs | 17 +-
.../core/src/datasource/listing_table_factory.rs | 10 +-
.../core/src/physical_plan/file_format/avro.rs | 6 +-
.../core/src/physical_plan/file_format/csv.rs | 4 +-
.../core/src/physical_plan/file_format/mod.rs | 290 ++++++++++++++++++---
.../core/src/physical_plan/file_format/parquet.rs | 65 +++--
datafusion/core/tests/path_partition.rs | 8 +-
.../core/tests/sqllogictests/test_files/ddl.slt | 2 +-
9 files changed, 329 insertions(+), 84 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index d5374d1ed..1e6b40a34 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -55,7 +55,16 @@ pub struct FileRange {
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub object_meta: ObjectMeta,
- /// Values of partition columns to be appended to each row
+ /// Values of partition columns to be appended to each row.
+ ///
+ /// These MUST have the same count, order, and type than the
[`table_partition_cols`].
+ ///
+ /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have
used [`wrap_partition_type_in_dict`] to wrap the column type.
+ ///
+ ///
+ /// [`wrap_partition_type_in_dict`]:
crate::physical_plan::file_format::wrap_partition_type_in_dict
+ /// [`wrap_partition_value_in_dict`]:
crate::physical_plan::file_format::wrap_partition_value_in_dict
+ /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index f6d9c959e..8858413bf 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -44,7 +44,6 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
-use crate::physical_plan::file_format::partition_type_wrap;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
@@ -300,6 +299,8 @@ impl ListingOptions {
/// Set table partition column names on [`ListingOptions`] and returns
self.
///
+ /// You may use [`wrap_partition_type_in_dict`] to request a
dictionary-encoded type.
+ ///
/// ```
/// # use std::sync::Arc;
/// # use arrow::datatypes::DataType;
@@ -315,6 +316,9 @@ impl ListingOptions {
/// assert_eq!(listing_options.table_partition_cols,
vec![("col_a".to_string(), DataType::Utf8),
/// ("col_b".to_string(), DataType::Utf8)]);
/// ```
+ ///
+ ///
+ /// [`wrap_partition_type_in_dict`]:
crate::physical_plan::file_format::wrap_partition_type_in_dict
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
@@ -538,11 +542,7 @@ impl ListingTable {
// Add the partition columns to the file schema
let mut table_fields = file_schema.fields().clone();
for (part_col_name, part_col_type) in &options.table_partition_cols {
- table_fields.push(Field::new(
- part_col_name,
- partition_type_wrap(part_col_type.clone()),
- false,
- ));
+ table_fields.push(Field::new(part_col_name, part_col_type.clone(),
false));
}
let infinite_source = options.infinite_source;
@@ -1012,10 +1012,7 @@ mod tests {
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
.with_file_extension(FileType::AVRO.get_ext())
- .with_table_partition_cols(vec![(
- String::from("p1"),
- partition_type_wrap(DataType::Utf8),
- )])
+ .with_table_partition_cols(vec![(String::from("p1"),
DataType::Utf8)])
.with_target_partitions(4);
let table_path = ListingTableUrl::parse("test:///table/").unwrap();
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index fe4393cb2..8b7dd46a8 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -86,7 +86,15 @@ impl TableProviderFactory for ListingTableFactory {
None,
cmd.table_partition_cols
.iter()
- .map(|x| (x.clone(), DataType::Utf8))
+ .map(|x| {
+ (
+ x.clone(),
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ )
+ })
.collect::<Vec<_>>(),
)
} else {
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index 9ac1d23ba..e4fc570fe 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -213,7 +213,6 @@ mod tests {
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
- use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
@@ -409,10 +408,7 @@ mod tests {
file_schema,
statistics: Statistics::default(),
limit: None,
- table_partition_cols: vec![(
- "date".to_owned(),
- partition_type_wrap(DataType::Utf8),
- )],
+ table_partition_cols: vec![("date".to_owned(), DataType::Utf8)],
output_ordering: None,
infinite_source: false,
});
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 346c99158..3fc7df4f1 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -326,7 +326,6 @@ mod tests {
use super::*;
use crate::datasource::file_format::file_type::FileType;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
- use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
@@ -580,8 +579,7 @@ mod tests {
let mut config = partitioned_csv_config(file_schema, file_groups)?;
// Add partition columns
- config.table_partition_cols =
- vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))];
+ config.table_partition_cols = vec![("date".to_owned(),
DataType::Utf8)];
config.file_groups[0][0].partition_values =
vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))];
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index d7616a3c2..eb70d18ef 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -30,9 +30,9 @@ pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::{ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactory};
use arrow::{
- array::{ArrayData, ArrayRef, DictionaryArray},
+ array::{ArrayData, ArrayRef, BufferBuilder, DictionaryArray},
buffer::Buffer,
- datatypes::{DataType, Field, Schema, SchemaRef, UInt16Type},
+ datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef,
UInt16Type},
record_batch::RecordBatch,
};
pub use avro::AvroExec;
@@ -53,25 +53,42 @@ use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
};
-use arrow::array::{new_null_array, UInt16BufferBuilder};
+use arrow::array::new_null_array;
use arrow::record_batch::RecordBatchOptions;
-use log::{debug, info};
+use log::{debug, info, warn};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::{
+ borrow::Cow,
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
+ marker::PhantomData,
sync::Arc,
vec,
};
use super::{ColumnStatistics, Statistics};
-/// Convert logical type of partition column to physical type:
`Dictionary(UInt16, val_type)`
-pub fn partition_type_wrap(val_type: DataType) -> DataType {
+/// Convert logical type of partition column to physical type:
`Dictionary(UInt16, val_type)`.
+///
+/// You CAN use this to specify types for partition columns. However you MAY
also choose not to dictionary-encode the
+/// data or to use a different dictionary type.
+///
+/// Use [`wrap_partition_value_in_dict`] to wrap the values.
+pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
+/// Convert scalar value of partition columns to physical type:
`Dictionary(UInt16, val_type)` .
+///
+/// You CAN use this to specify types for partition columns. However you MAY
also choose not to dictionary-encode the
+/// data or to use a different dictionary type.
+///
+/// Use [`wrap_partition_type_in_dict`] to wrap the types.
+pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
+ ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
+}
+
/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
pub fn get_scan_files(
plan: Arc<dyn ExecutionPlan>,
@@ -394,7 +411,7 @@ struct PartitionColumnProjector {
/// An Arrow buffer initialized to zeros that represents the key array of
all partition
/// columns (partition columns are materialized by dictionary arrays with
only one
/// value in the dictionary, thus all the keys are equal to zero).
- key_buffer_cache: Option<Buffer>,
+ key_buffer_cache: ZeroBufferGenerators,
/// Mapping between the indexes in the list of partition columns and the
target
/// schema. Sorted by index in the target schema so that we can iterate on
it to
/// insert the partition columns in the target record batch.
@@ -420,7 +437,7 @@ impl PartitionColumnProjector {
Self {
projected_partition_indexes,
- key_buffer_cache: None,
+ key_buffer_cache: Default::default(),
projected_schema,
}
}
@@ -446,11 +463,27 @@ impl PartitionColumnProjector {
}
let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
+ let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
+
+ // check if user forgot to dict-encode the partition value
+ let field = self.projected_schema.field(sidx);
+ let expected_data_type = field.data_type();
+ let actual_data_type = partition_value.get_datatype();
+ if let DataType::Dictionary(key_type, _) = expected_data_type {
+ if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
+ warn!("Partition value for column {} was not
dictionary-encoded, applied auto-fix.", field.name());
+ partition_value = Cow::Owned(ScalarValue::Dictionary(
+ key_type.clone(),
+ Box::new(partition_value.as_ref().clone()),
+ ));
+ }
+ }
+
cols.insert(
sidx,
- create_dict_array(
+ create_output_array(
&mut self.key_buffer_cache,
- &partition_values[pidx],
+ partition_value.as_ref(),
file_batch.num_rows(),
),
)
@@ -459,26 +492,60 @@ impl PartitionColumnProjector {
}
}
-fn create_dict_array(
- key_buffer_cache: &mut Option<Buffer>,
- val: &ScalarValue,
- len: usize,
-) -> ArrayRef {
- // build value dictionary
- let dict_vals = val.to_array();
-
- // build keys array
- let sliced_key_buffer = match key_buffer_cache {
- Some(buf) if buf.len() >= len * 2 => buf.slice(buf.len() - len * 2),
- _ => {
- let mut key_buffer_builder = UInt16BufferBuilder::new(len * 2);
- key_buffer_builder.advance(len * 2); // keys are all 0
- key_buffer_cache.insert(key_buffer_builder.finish()).clone()
+#[derive(Debug, Default)]
+struct ZeroBufferGenerators {
+ gen_i8: ZeroBufferGenerator<i8>,
+ gen_i16: ZeroBufferGenerator<i16>,
+ gen_i32: ZeroBufferGenerator<i32>,
+ gen_i64: ZeroBufferGenerator<i64>,
+ gen_u8: ZeroBufferGenerator<u8>,
+ gen_u16: ZeroBufferGenerator<u16>,
+ gen_u32: ZeroBufferGenerator<u32>,
+ gen_u64: ZeroBufferGenerator<u64>,
+}
+
+/// Generate a arrow [`Buffer`] that contains zero values.
+#[derive(Debug, Default)]
+struct ZeroBufferGenerator<T>
+where
+ T: ArrowNativeType,
+{
+ cache: Option<Buffer>,
+ _t: PhantomData<T>,
+}
+
+impl<T> ZeroBufferGenerator<T>
+where
+ T: ArrowNativeType,
+{
+ const SIZE: usize = std::mem::size_of::<T>();
+
+ fn get_buffer(&mut self, n_vals: usize) -> Buffer {
+ match &mut self.cache {
+ Some(buf) if buf.len() >= n_vals * Self::SIZE => {
+ buf.slice_with_length(0, n_vals * Self::SIZE)
+ }
+ _ => {
+ let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
+ key_buffer_builder.advance(n_vals); // keys are all 0
+ self.cache.insert(key_buffer_builder.finish()).clone()
+ }
}
- };
+ }
+}
- // create data type
- let data_type = partition_type_wrap(val.get_datatype());
+fn create_dict_array<T>(
+ buffer_gen: &mut ZeroBufferGenerator<T>,
+ dict_val: &ScalarValue,
+ len: usize,
+ data_type: DataType,
+) -> ArrayRef
+where
+ T: ArrowNativeType,
+{
+ let dict_vals = dict_val.to_array();
+
+ let sliced_key_buffer = buffer_gen.get_buffer(len);
// assemble pieces together
let mut builder = ArrayData::builder(data_type)
@@ -490,6 +557,84 @@ fn create_dict_array(
))
}
+fn create_output_array(
+ key_buffer_cache: &mut ZeroBufferGenerators,
+ val: &ScalarValue,
+ len: usize,
+) -> ArrayRef {
+ if let ScalarValue::Dictionary(key_type, dict_val) = &val {
+ match key_type.as_ref() {
+ DataType::Int8 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_i8,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::Int16 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_i16,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::Int32 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_i32,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::Int64 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_i64,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::UInt8 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_u8,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::UInt16 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_u16,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::UInt32 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_u32,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ DataType::UInt64 => {
+ return create_dict_array(
+ &mut key_buffer_cache.gen_u64,
+ dict_val,
+ len,
+ val.get_datatype(),
+ );
+ }
+ _ => {}
+ }
+ }
+
+ val.to_array_of_size(len)
+}
+
/// A single file or part of a file that should be read, along with its
schema, statistics
pub struct FileMeta {
/// Path for the file (e.g. URL, filesystem path, etc)
@@ -607,7 +752,10 @@ mod tests {
Arc::clone(&file_schema),
None,
Statistics::default(),
- vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
+ vec![(
+ "date".to_owned(),
+ wrap_partition_type_in_dict(DataType::Utf8),
+ )],
);
let (proj_schema, proj_statistics) = conf.project();
@@ -653,7 +801,10 @@ mod tests {
),
..Default::default()
},
- vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))],
+ vec![(
+ "date".to_owned(),
+ wrap_partition_type_in_dict(DataType::Utf8),
+ )],
);
let (proj_schema, proj_statistics) = conf.project();
@@ -684,9 +835,18 @@ mod tests {
("c", &vec![10, 11, 12]),
);
let partition_cols = vec![
- ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
- ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
- ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
+ (
+ "year".to_owned(),
+ wrap_partition_type_in_dict(DataType::Utf8),
+ ),
+ (
+ "month".to_owned(),
+ wrap_partition_type_in_dict(DataType::Utf8),
+ ),
+ (
+ "day".to_owned(),
+ wrap_partition_type_in_dict(DataType::Utf8),
+ ),
];
// create a projected schema
let conf = config_for_projection(
@@ -718,9 +878,15 @@ mod tests {
// file_batch is ok here because we kept all the file cols in
the projection
file_batch,
&[
- ScalarValue::Utf8(Some("2021".to_owned())),
- ScalarValue::Utf8(Some("10".to_owned())),
- ScalarValue::Utf8(Some("26".to_owned())),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "2021".to_owned(),
+ ))),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "10".to_owned(),
+ ))),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "26".to_owned(),
+ ))),
],
)
.expect("Projection of partition columns into record batch
failed");
@@ -746,9 +912,15 @@ mod tests {
// file_batch is ok here because we kept all the file cols in
the projection
file_batch,
&[
- ScalarValue::Utf8(Some("2021".to_owned())),
- ScalarValue::Utf8(Some("10".to_owned())),
- ScalarValue::Utf8(Some("27".to_owned())),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "2021".to_owned(),
+ ))),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "10".to_owned(),
+ ))),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "27".to_owned(),
+ ))),
],
)
.expect("Projection of partition columns into record batch
failed");
@@ -776,9 +948,15 @@ mod tests {
// file_batch is ok here because we kept all the file cols in
the projection
file_batch,
&[
- ScalarValue::Utf8(Some("2021".to_owned())),
- ScalarValue::Utf8(Some("10".to_owned())),
- ScalarValue::Utf8(Some("28".to_owned())),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "2021".to_owned(),
+ ))),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "10".to_owned(),
+ ))),
+ wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+ "28".to_owned(),
+ ))),
],
)
.expect("Projection of partition columns into record batch
failed");
@@ -792,6 +970,34 @@ mod tests {
"+---+---+---+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
+
+ // forgot to dictionary-wrap the scalar value
+ let file_batch = build_table_i32(
+ ("a", &vec![0, 1, 2]),
+ ("b", &vec![-2, -1, 0]),
+ ("c", &vec![10, 11, 12]),
+ );
+ let projected_batch = proj
+ .project(
+ // file_batch is ok here because we kept all the file cols in
the projection
+ file_batch,
+ &[
+ ScalarValue::Utf8(Some("2021".to_owned())),
+ ScalarValue::Utf8(Some("10".to_owned())),
+ ScalarValue::Utf8(Some("26".to_owned())),
+ ],
+ )
+ .expect("Projection of partition columns into record batch
failed");
+ let expected = vec![
+ "+---+----+----+------+-----+",
+ "| a | b | c | year | day |",
+ "+---+----+----+------+-----+",
+ "| 0 | -2 | 10 | 2021 | 26 |",
+ "| 1 | -1 | 11 | 2021 | 26 |",
+ "| 2 | 0 | 12 | 2021 | 26 |",
+ "+---+----+----+------+-----+",
+ ];
+ crate::assert_batches_eq!(expected, &[projected_batch]);
}
#[test]
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 3f3b0bb74..5ad642970 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -814,7 +814,6 @@ mod tests {
use crate::execution::context::SessionState;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::displayable;
- use crate::physical_plan::file_format::partition_type_wrap;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
@@ -1660,26 +1659,50 @@ mod tests {
object_meta: meta,
partition_values: vec![
ScalarValue::Utf8(Some("2021".to_owned())),
- ScalarValue::Utf8(Some("10".to_owned())),
- ScalarValue::Utf8(Some("26".to_owned())),
+ ScalarValue::UInt8(Some(10)),
+ ScalarValue::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(ScalarValue::Utf8(Some("26".to_owned()))),
+ ),
],
range: None,
extensions: None,
};
+ let expected_schema = Schema::new(vec![
+ Field::new("id", DataType::Int32, true),
+ Field::new("bool_col", DataType::Boolean, true),
+ Field::new("tinyint_col", DataType::Int32, true),
+ Field::new("month", DataType::UInt8, false),
+ Field::new(
+ "day",
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ false,
+ ),
+ ]);
+
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url,
file_groups: vec![vec![partitioned_file]],
file_schema: schema,
statistics: Statistics::default(),
- // file has 10 cols so index 12 should be month
- projection: Some(vec![0, 1, 2, 12]),
+ // file has 10 cols so index 12 should be month and 13 should
be day
+ projection: Some(vec![0, 1, 2, 12, 13]),
limit: None,
table_partition_cols: vec![
- ("year".to_owned(), partition_type_wrap(DataType::Utf8)),
- ("month".to_owned(), partition_type_wrap(DataType::Utf8)),
- ("day".to_owned(), partition_type_wrap(DataType::Utf8)),
+ ("year".to_owned(), DataType::Utf8),
+ ("month".to_owned(), DataType::UInt8),
+ (
+ "day".to_owned(),
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ ),
],
output_ordering: None,
infinite_source: false,
@@ -1688,22 +1711,24 @@ mod tests {
None,
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
+ assert_eq!(parquet_exec.schema().as_ref(), &expected_schema);
let mut results = parquet_exec.execute(0, task_ctx)?;
let batch = results.next().await.unwrap()?;
+ assert_eq!(batch.schema().as_ref(), &expected_schema);
let expected = vec![
- "+----+----------+-------------+-------+",
- "| id | bool_col | tinyint_col | month |",
- "+----+----------+-------------+-------+",
- "| 4 | true | 0 | 10 |",
- "| 5 | false | 1 | 10 |",
- "| 6 | true | 0 | 10 |",
- "| 7 | false | 1 | 10 |",
- "| 2 | true | 0 | 10 |",
- "| 3 | false | 1 | 10 |",
- "| 0 | true | 0 | 10 |",
- "| 1 | false | 1 | 10 |",
- "+----+----------+-------------+-------+",
+ "+----+----------+-------------+-------+-----+",
+ "| id | bool_col | tinyint_col | month | day |",
+ "+----+----------+-------------+-------+-----+",
+ "| 4 | true | 0 | 10 | 26 |",
+ "| 5 | false | 1 | 10 | 26 |",
+ "| 6 | true | 0 | 10 | 26 |",
+ "| 7 | false | 1 | 10 | 26 |",
+ "| 2 | true | 0 | 10 | 26 |",
+ "| 3 | false | 1 | 10 | 26 |",
+ "| 0 | true | 0 | 10 | 26 |",
+ "| 1 | false | 1 | 10 | 26 |",
+ "+----+----------+-------------+-------+-----+",
];
crate::assert_batches_eq!(expected, &[batch]);
diff --git a/datafusion/core/tests/path_partition.rs
b/datafusion/core/tests/path_partition.rs
index 670b508c3..6c7f1431a 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -59,7 +59,13 @@ async fn parquet_distinct_partition_col() -> Result<()> {
],
&[
("year", DataType::Int32),
- ("month", DataType::Utf8),
+ (
+ "month",
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ ),
("day", DataType::Utf8),
],
"mirror:///",
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 59bfc91b5..a4c90df07 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -366,7 +366,7 @@ STORED AS CSV
PARTITIONED BY (c_date)
LOCATION 'tests/data/partitioned_table';
-query TP?
+query TPD
SELECT * from csv_with_timestamps where c_date='2018-11-13'
----
Jorge 2018-12-13T12:12:10.011 2018-11-13