This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e1e881405 feat(parquet)!: coerce_types flag for date64 (#6313)
e1e881405 is described below
commit e1e881405ce9b180858d5ba278803b4795b63e74
Author: dsgibbons <[email protected]>
AuthorDate: Tue Nov 26 20:59:13 2024 +1030
feat(parquet)!: coerce_types flag for date64 (#6313)
* feat(parquet): coerce_types flag for date64
* fix: use ARROW:schema instead of LogicalType to embed Date64 type
* chore: lint
* chore: lint
* chore: lint
* chore: add physical_type to StatisticsConverter to account for
coerce_types
* chore: blank line changes
* chore: revert minor test changes
* chore: update to latest parquet-testing
* chore: add physical_type fix for get_data_page_statistics macro
* docs: add docs for coerce_types
* chore: cargo fmt --all
* docs: coerce_types lossless round trip
Co-authored-by: Ed Seidl <[email protected]>
---------
Co-authored-by: Ed Seidl <[email protected]>
---
parquet/src/arrow/array_reader/primitive_array.rs | 60 ++++++++++-
parquet/src/arrow/arrow_reader/mod.rs | 115 +++++++++++++++++++++-
parquet/src/arrow/arrow_reader/statistics.rs | 56 +++++++----
parquet/src/arrow/arrow_writer/mod.rs | 14 ++-
parquet/src/arrow/schema/mod.rs | 50 ++++++----
parquet/src/file/properties.rs | 26 +++++
6 files changed, 272 insertions(+), 49 deletions(-)
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index 010e9c2ee..a952e00e1 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -208,10 +208,10 @@ where
// As there is not always a 1:1 mapping between Arrow and Parquet,
there
// are datatypes which we must convert explicitly.
// These are:
- // - date64: we should cast int32 to date32, then date32 to date64.
- // - decimal: cast in32 to decimal, int64 to decimal
+ // - date64: cast int32 to date32, then date32 to date64.
+ // - decimal: cast int32 to decimal, int64 to decimal
let array = match target_type {
- ArrowType::Date64 => {
+ ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
@@ -305,9 +305,9 @@ mod tests {
use crate::util::test_common::rand_gen::make_pages;
use crate::util::InMemoryPageIterator;
use arrow::datatypes::ArrowPrimitiveType;
- use arrow_array::{Array, PrimitiveArray};
+ use arrow_array::{Array, Date32Array, PrimitiveArray};
- use arrow::datatypes::DataType::Decimal128;
+ use arrow::datatypes::DataType::{Date32, Decimal128};
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;
@@ -783,4 +783,54 @@ mod tests {
assert_ne!(array, &data_decimal_array)
}
}
+
+ #[test]
+ fn test_primitive_array_reader_date32_type() {
+ // parquet `INT32` to date
+ let message_type = "
+ message test_schema {
+ REQUIRED INT32 date1 (DATE);
+ }
+ ";
+ let schema = parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap();
+ let column_desc = schema.column(0);
+
+ // create the array reader
+ {
+ let mut data = Vec::new();
+ let mut page_lists = Vec::new();
+ make_column_chunks::<Int32Type>(
+ column_desc.clone(),
+ Encoding::PLAIN,
+ 100,
+ -99999999,
+ 99999999,
+ &mut Vec::new(),
+ &mut Vec::new(),
+ &mut data,
+ &mut page_lists,
+ true,
+ 2,
+ );
+ let page_iterator = InMemoryPageIterator::new(page_lists);
+
+ let mut array_reader =
+
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
+ .unwrap();
+
+ // read data from the reader
+ // the data type is date
+ let array = array_reader.next_batch(50).unwrap();
+ assert_eq!(array.data_type(), &Date32);
+ let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+ let data_date_array = data[0..50]
+ .iter()
+ .copied()
+ .map(Some)
+ .collect::<Date32Array>();
+ assert_eq!(array, &data_date_array);
+ }
+ }
}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index f351c25bd..a3d011346 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -932,8 +932,8 @@ mod tests {
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
- Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type,
Float64Type,
- Time32MillisecondType, Time64MicrosecondType,
+ Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType,
Float16Type,
+ Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
@@ -1272,6 +1272,117 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_date32_roundtrip() -> Result<()> {
+ use arrow_array::Date32Array;
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "date32",
+ ArrowDataType::Date32,
+ false,
+ )]));
+
+ let mut buf = Vec::with_capacity(1024);
+
+ let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
+
+ let original = RecordBatch::try_new(
+ schema,
+ vec![Arc::new(Date32Array::from(vec![
+ -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000,
100_000, 1_000_000,
+ ]))],
+ )?;
+
+ writer.write(&original)?;
+ writer.close()?;
+
+ let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf),
1024)?;
+ let ret = reader.next().unwrap()?;
+ assert_eq!(ret, original);
+
+ // Ensure can be downcast to the correct type
+ ret.column(0).as_primitive::<Date32Type>();
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_date64_roundtrip() -> Result<()> {
+ use arrow_array::Date64Array;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("small-date64", ArrowDataType::Date64, false),
+ Field::new("big-date64", ArrowDataType::Date64, false),
+ Field::new("invalid-date64", ArrowDataType::Date64, false),
+ ]));
+
+ let mut default_buf = Vec::with_capacity(1024);
+ let mut coerce_buf = Vec::with_capacity(1024);
+
+ let coerce_props =
WriterProperties::builder().set_coerce_types(true).build();
+
+ let mut default_writer = ArrowWriter::try_new(&mut default_buf,
schema.clone(), None)?;
+ let mut coerce_writer =
+ ArrowWriter::try_new(&mut coerce_buf, schema.clone(),
Some(coerce_props))?;
+
+ static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
+
+ let original = RecordBatch::try_new(
+ schema,
+ vec![
+ // small-date64
+ Arc::new(Date64Array::from(vec![
+ -1_000_000 * NUM_MILLISECONDS_IN_DAY,
+ -1_000 * NUM_MILLISECONDS_IN_DAY,
+ 0,
+ 1_000 * NUM_MILLISECONDS_IN_DAY,
+ 1_000_000 * NUM_MILLISECONDS_IN_DAY,
+ ])),
+ // big-date64
+ Arc::new(Date64Array::from(vec![
+ -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+ -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+ 0,
+ 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+ 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+ ])),
+ // invalid-date64
+ Arc::new(Date64Array::from(vec![
+ -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
+ -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
+ 1,
+ 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
+ 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
+ ])),
+ ],
+ )?;
+
+ default_writer.write(&original)?;
+ coerce_writer.write(&original)?;
+
+ default_writer.close()?;
+ coerce_writer.close()?;
+
+ let mut default_reader =
ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
+ let mut coerce_reader =
ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
+
+ let default_ret = default_reader.next().unwrap()?;
+ let coerce_ret = coerce_reader.next().unwrap()?;
+
+ // Roundtrip should be successful when default writer used
+ assert_eq!(default_ret, original);
+
+ // Only small-date64 should roundtrip successfully when coerce_types
writer is used
+ assert_eq!(coerce_ret.column(0), original.column(0));
+ assert_ne!(coerce_ret.column(1), original.column(1));
+ assert_ne!(coerce_ret.column(2), original.column(2));
+
+ // Ensure both can be downcast to the correct type
+ default_ret.column(0).as_primitive::<Date64Type>();
+ coerce_ret.column(0).as_primitive::<Date64Type>();
+
+ Ok(())
+ }
struct RandFixedLenGen {}
impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
diff --git a/parquet/src/arrow/arrow_reader/statistics.rs
b/parquet/src/arrow/arrow_reader/statistics.rs
index 8a7511be2..09f8ec7cc 100644
--- a/parquet/src/arrow/arrow_reader/statistics.rs
+++ b/parquet/src/arrow/arrow_reader/statistics.rs
@@ -21,6 +21,7 @@
/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::parquet_column;
+use crate::basic::Type as PhysicalType;
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex,
RowGroupMetaData};
@@ -318,7 +319,7 @@ make_decimal_stats_iterator!(
/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics
from.
macro_rules! get_statistics {
- ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
+ ($stat_type_prefix: ident, $data_type: ident, $iterator: ident,
$physical_type: ident) => {
paste! {
match $data_type {
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
@@ -370,10 +371,11 @@ macro_rules! get_statistics {
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
[<$stat_type_prefix
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
- DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
+ DataType::Date64 if $physical_type == Some(PhysicalType::INT32) =>
Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
- .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
- ))),
+ .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 *
1000))))),
+ DataType::Date64 if $physical_type == Some(PhysicalType::INT64) =>
Ok(Arc::new(Date64Array::from_iter(
+ [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix
Int64StatsIterator>]::new($iterator).map(|x| x.copied());
Ok(match unit {
@@ -487,7 +489,7 @@ macro_rules! get_statistics {
Ok(Arc::new(arr))
},
DataType::Dictionary(_, value_type) => {
- [<$stat_type_prefix:lower _ statistics>](value_type, $iterator)
+ [<$stat_type_prefix:lower _ statistics>](value_type,
$iterator, $physical_type)
},
DataType::Utf8View => {
let iterator = [<$stat_type_prefix
ByteArrayStatsIterator>]::new($iterator);
@@ -524,6 +526,7 @@ macro_rules! get_statistics {
DataType::Map(_,_) |
DataType::Duration(_) |
DataType::Interval(_) |
+ DataType::Date64 | // required to cover $physical_type match guard
DataType::Null |
DataType::List(_) |
DataType::ListView(_) |
@@ -790,7 +793,7 @@ get_decimal_page_stats_iterator!(
);
macro_rules! get_data_page_statistics {
- ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
+ ($stat_type_prefix: ident, $data_type: ident, $iterator: ident,
$physical_type: ident) => {
paste! {
match $data_type {
DataType::Boolean => {
@@ -929,7 +932,7 @@ macro_rules! get_data_page_statistics {
Ok(Arc::new(builder.finish()))
},
DataType::Dictionary(_, value_type) => {
- [<$stat_type_prefix:lower _ page_statistics>](value_type,
$iterator)
+ [<$stat_type_prefix:lower _ page_statistics>](value_type,
$iterator, $physical_type)
},
DataType::Timestamp(unit, timezone) => {
let iter = [<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten();
@@ -941,7 +944,7 @@ macro_rules! get_data_page_statistics {
})
},
DataType::Date32 =>
Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
- DataType::Date64 => Ok(
+ DataType::Date64 if $physical_type ==
Some(PhysicalType::INT32)=> Ok(
Arc::new(
Date64Array::from_iter([<$stat_type_prefix
Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
@@ -954,6 +957,7 @@ macro_rules! get_data_page_statistics {
)
)
),
+ DataType::Date64 if $physical_type ==
Some(PhysicalType::INT64) =>
Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Decimal128(precision, scale) => Ok(Arc::new(
Decimal128Array::from_iter([<$stat_type_prefix
Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
*scale)?)),
DataType::Decimal256(precision, scale) => Ok(Arc::new(
@@ -1040,6 +1044,7 @@ macro_rules! get_data_page_statistics {
}
Ok(Arc::new(builder.finish()))
},
+ DataType::Date64 | // required to cover $physical_type match
guard
DataType::Null |
DataType::Duration(_) |
DataType::Interval(_) |
@@ -1067,8 +1072,9 @@ macro_rules! get_data_page_statistics {
fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
+ physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
- get_statistics!(Min, data_type, iterator)
+ get_statistics!(Min, data_type, iterator, physical_type)
}
/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to
an [`ArrayRef`]
@@ -1077,26 +1083,35 @@ fn min_statistics<'a, I: Iterator<Item = Option<&'a
ParquetStatistics>>>(
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
+ physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
- get_statistics!(Max, data_type, iterator)
+ get_statistics!(Max, data_type, iterator, physical_type)
}
/// Extracts the min statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
-pub(crate) fn min_page_statistics<'a, I>(data_type: &DataType, iterator: I) ->
Result<ArrayRef>
+pub(crate) fn min_page_statistics<'a, I>(
+ data_type: &DataType,
+ iterator: I,
+ physical_type: Option<PhysicalType>,
+) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
- get_data_page_statistics!(Min, data_type, iterator)
+ get_data_page_statistics!(Min, data_type, iterator, physical_type)
}
/// Extracts the max statistics from an iterator
/// of parquet page [`Index`]'es to an [`ArrayRef`]
-pub(crate) fn max_page_statistics<'a, I>(data_type: &DataType, iterator: I) ->
Result<ArrayRef>
+pub(crate) fn max_page_statistics<'a, I>(
+ data_type: &DataType,
+ iterator: I,
+ physical_type: Option<PhysicalType>,
+) -> Result<ArrayRef>
where
I: Iterator<Item = (usize, &'a Index)>,
{
- get_data_page_statistics!(Max, data_type, iterator)
+ get_data_page_statistics!(Max, data_type, iterator, physical_type)
}
/// Extracts the null count statistics from an iterator
@@ -1177,6 +1192,8 @@ pub struct StatisticsConverter<'a> {
arrow_field: &'a Field,
/// treat missing null_counts as 0 nulls
missing_null_counts_as_zero: bool,
+ /// The physical type of the matched column in the Parquet schema
+ physical_type: Option<PhysicalType>,
}
impl<'a> StatisticsConverter<'a> {
@@ -1304,6 +1321,7 @@ impl<'a> StatisticsConverter<'a> {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
+ physical_type: parquet_index.map(|idx|
parquet_schema.column(idx).physical_type()),
})
}
@@ -1346,7 +1364,7 @@ impl<'a> StatisticsConverter<'a> {
/// // get the minimum value for the column "foo" in the parquet file
/// let min_values: ArrayRef = converter
/// .row_group_mins(metadata.row_groups().iter())
- /// .unwrap();
+ /// .unwrap();
/// // if "foo" is a Float64 value, the returned array will contain
Float64 values
/// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0),
Some(2.0)])) as _);
/// ```
@@ -1363,7 +1381,7 @@ impl<'a> StatisticsConverter<'a> {
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
- min_statistics(data_type, iter)
+ min_statistics(data_type, iter, self.physical_type)
}
/// Extract the maximum values from row group statistics in
[`RowGroupMetaData`]
@@ -1382,7 +1400,7 @@ impl<'a> StatisticsConverter<'a> {
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
- max_statistics(data_type, iter)
+ max_statistics(data_type, iter, self.physical_type)
}
/// Extract the null counts from row group statistics in
[`RowGroupMetaData`]
@@ -1490,7 +1508,7 @@ impl<'a> StatisticsConverter<'a> {
(*num_data_pages, column_page_index_per_row_group_per_column)
});
- min_page_statistics(data_type, iter)
+ min_page_statistics(data_type, iter, self.physical_type)
}
/// Extract the maximum values from Data Page statistics.
@@ -1521,7 +1539,7 @@ impl<'a> StatisticsConverter<'a> {
(*num_data_pages, column_page_index_per_row_group_per_column)
});
- max_page_statistics(data_type, iter)
+ max_page_statistics(data_type, iter, self.physical_type)
}
/// Returns a [`UInt64Array`] with null counts for each data page.
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index c9f911448..222d86131 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -180,11 +180,11 @@ impl<W: Write + Send> ArrowWriter<W> {
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
+ let mut props = options.properties;
let schema = match options.schema_root {
- Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
- None => arrow_to_parquet_schema(&arrow_schema)?,
+ Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s,
props.coerce_types())?,
+ None => arrow_to_parquet_schema(&arrow_schema,
props.coerce_types())?,
};
- let mut props = options.properties;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
@@ -549,8 +549,8 @@ impl ArrowColumnChunk {
/// ]));
///
/// // Compute the parquet schema
-/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
/// let props = Arc::new(WriterProperties::default());
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(),
props.coerce_types()).unwrap();
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props,
&schema).unwrap();
@@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels:
&ArrayLevels) -> Result<usi
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
match column.data_type() {
+ ArrowDataType::Date64 => {
+ let array = arrow_cast::cast(column,
&ArrowDataType::Int64)?;
+
+ let array = array.as_primitive::<Int64Type>();
+ write_primitive(typed, array.values(), levels)
+ }
ArrowDataType::Int64 => {
let array = column.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index bf1fb6332..ec34840d8 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -229,16 +229,20 @@ pub(crate) fn
add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
///
/// The name of the root schema element defaults to `"arrow_schema"`, this can
be
/// overridden with [`arrow_to_parquet_schema_with_root`]
-pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
- arrow_to_parquet_schema_with_root(schema, "arrow_schema")
+pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) ->
Result<SchemaDescriptor> {
+ arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
}
/// Convert arrow schema to parquet schema specifying the name of the root
schema element
-pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) ->
Result<SchemaDescriptor> {
+pub fn arrow_to_parquet_schema_with_root(
+ schema: &Schema,
+ root: &str,
+ coerce_types: bool,
+) -> Result<SchemaDescriptor> {
let fields = schema
.fields()
.iter()
- .map(|field| arrow_to_parquet_type(field).map(Arc::new))
+ .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
let group = Type::group_type_builder(root).with_fields(fields).build()?;
Ok(SchemaDescriptor::new(Arc::new(group)))
@@ -298,7 +302,7 @@ pub fn decimal_length_from_precision(precision: u8) ->
usize {
}
/// Convert an arrow field to a parquet `Type`
-fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
+fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
let name = field.name().as_str();
let repetition = if field.is_nullable() {
Repetition::OPTIONAL
@@ -415,12 +419,20 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_repetition(repetition)
.with_id(id)
.build(),
- // date64 is cast to date32 (#1666)
- DataType::Date64 => Type::primitive_type_builder(name,
PhysicalType::INT32)
- .with_logical_type(Some(LogicalType::Date))
- .with_repetition(repetition)
- .with_id(id)
- .build(),
+ DataType::Date64 => {
+ if coerce_types {
+ Type::primitive_type_builder(name, PhysicalType::INT32)
+ .with_logical_type(Some(LogicalType::Date))
+ .with_repetition(repetition)
+ .with_id(id)
+ .build()
+ } else {
+ Type::primitive_type_builder(name, PhysicalType::INT64)
+ .with_repetition(repetition)
+ .with_id(id)
+ .build()
+ }
+ }
DataType::Time32(TimeUnit::Second) => {
// Cannot represent seconds in LogicalType
Type::primitive_type_builder(name, PhysicalType::INT32)
@@ -518,7 +530,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
Type::group_type_builder(name)
.with_fields(vec![Arc::new(
Type::group_type_builder("list")
- .with_fields(vec![Arc::new(arrow_to_parquet_type(f)?)])
+ .with_fields(vec![Arc::new(arrow_to_parquet_type(f,
coerce_types)?)])
.with_repetition(Repetition::REPEATED)
.build()?,
)])
@@ -537,7 +549,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
// recursively convert children to types/nodes
let fields = fields
.iter()
- .map(|f| arrow_to_parquet_type(f).map(Arc::new))
+ .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
.collect::<Result<_>>()?;
Type::group_type_builder(name)
.with_fields(fields)
@@ -551,8 +563,8 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_fields(vec![Arc::new(
Type::group_type_builder(field.name())
.with_fields(vec![
-
Arc::new(arrow_to_parquet_type(&struct_fields[0])?),
-
Arc::new(arrow_to_parquet_type(&struct_fields[1])?),
+
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
+
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
])
.with_repetition(Repetition::REPEATED)
.build()?,
@@ -571,7 +583,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
DataType::Dictionary(_, ref value) => {
// Dictionary encoding not handled at the schema level
let dict_field =
field.clone().with_data_type(value.as_ref().clone());
- arrow_to_parquet_type(&dict_field)
+ arrow_to_parquet_type(&dict_field, coerce_types)
}
DataType::RunEndEncoded(_, _) => Err(arrow_err!(
"Converting RunEndEncodedType to parquet not supported",
@@ -1557,7 +1569,7 @@ mod tests {
Field::new("decimal256", DataType::Decimal256(39, 2), false),
];
let arrow_schema = Schema::new(arrow_fields);
- let converted_arrow_schema =
arrow_to_parquet_schema(&arrow_schema).unwrap();
+ let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema,
false).unwrap();
assert_eq!(
parquet_schema.columns().len(),
@@ -1594,7 +1606,7 @@ mod tests {
false,
)];
let arrow_schema = Schema::new(arrow_fields);
- let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
+ let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema,
true);
assert!(converted_arrow_schema.is_err());
converted_arrow_schema.unwrap();
@@ -1866,7 +1878,7 @@ mod tests {
// don't pass metadata so field ids are read from Parquet and not from
serialized Arrow schema
let arrow_schema =
crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
- let parq_schema_descr =
crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+ let parq_schema_descr =
crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
let parq_fields = parq_schema_descr.root_schema().get_fields();
assert_eq!(parq_fields.len(), 2);
assert_eq!(parq_fields[0].get_basic_info().id(), 1);
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 49980f525..cb07c1f49 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -57,6 +57,8 @@ pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
/// Default values for [`WriterProperties::statistics_truncate_length`]
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = None;
+/// Default values for [`WriterProperties::coerce_types`]
+pub const DEFAULT_COERCE_TYPES: bool = false;
/// Parquet writer version.
///
@@ -163,6 +165,7 @@ pub struct WriterProperties {
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
+ coerce_types: bool,
}
impl Default for WriterProperties {
@@ -265,6 +268,19 @@ impl WriterProperties {
self.statistics_truncate_length
}
+ /// Returns `coerce_types` boolean
+ ///
+ /// Some Arrow types do not have a corresponding Parquet logical type.
+ /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
+ /// Writers have the option to coerce these into native Parquet types. Type
+ /// coercion allows for meaningful representations that do not require
+ /// downstream readers to consider the embedded Arrow schema. However, type
+ /// coercion also prevents the data from being losslessly round-tripped.
This method
+ /// returns `true` if type coercion enabled.
+ pub fn coerce_types(&self) -> bool {
+ self.coerce_types
+ }
+
/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
@@ -361,6 +377,7 @@ pub struct WriterPropertiesBuilder {
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
+ coerce_types: bool,
}
impl WriterPropertiesBuilder {
@@ -381,6 +398,7 @@ impl WriterPropertiesBuilder {
sorting_columns: None,
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+ coerce_types: DEFAULT_COERCE_TYPES,
}
}
@@ -401,6 +419,7 @@ impl WriterPropertiesBuilder {
sorting_columns: self.sorting_columns,
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
+ coerce_types: self.coerce_types,
}
}
@@ -731,6 +750,13 @@ impl WriterPropertiesBuilder {
self.statistics_truncate_length = max_length;
self
}
+
+ /// Sets flag to enable/disable type coercion.
+ /// Takes precedence over globally defined settings.
+ pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
+ self.coerce_types = coerce_types;
+ self
+ }
}
/// Controls the level of statistics to be computed by the writer and stored in