This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e1e881405 feat(parquet)!: coerce_types flag for date64 (#6313)
e1e881405 is described below

commit e1e881405ce9b180858d5ba278803b4795b63e74
Author: dsgibbons <[email protected]>
AuthorDate: Tue Nov 26 20:59:13 2024 +1030

    feat(parquet)!: coerce_types flag for date64 (#6313)
    
    * feat(parquet): coerce_types flag for date64
    
    * fix: use ARROW:schema instead of LogicalType to embed Date64 type
    
    * chore: lint
    
    * chore: lint
    
    * chore: lint
    
    * chore: add physical_type to StatisticsConverter to account for 
coerce_types
    
    * chore: blank line changes
    
    * chore: revert minor test changes
    
    * chore: update to latest parquet-testing
    
    * chore: add physical_type fix for get_data_page_statistics macro
    
    * docs: add docs for coerce_types
    
    * chore: cargo fmt --all
    
    * docs: coerce_types lossless round trip
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    ---------
    
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/arrow/array_reader/primitive_array.rs |  60 ++++++++++-
 parquet/src/arrow/arrow_reader/mod.rs             | 115 +++++++++++++++++++++-
 parquet/src/arrow/arrow_reader/statistics.rs      |  56 +++++++----
 parquet/src/arrow/arrow_writer/mod.rs             |  14 ++-
 parquet/src/arrow/schema/mod.rs                   |  50 ++++++----
 parquet/src/file/properties.rs                    |  26 +++++
 6 files changed, 272 insertions(+), 49 deletions(-)

diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 010e9c2ee..a952e00e1 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -208,10 +208,10 @@ where
         // As there is not always a 1:1 mapping between Arrow and Parquet, 
there
         // are datatypes which we must convert explicitly.
         // These are:
-        // - date64: we should cast int32 to date32, then date32 to date64.
-        // - decimal: cast in32 to decimal, int64 to decimal
+        // - date64: cast int32 to date32, then date32 to date64.
+        // - decimal: cast int32 to decimal, int64 to decimal
         let array = match target_type {
-            ArrowType::Date64 => {
+            ArrowType::Date64 if *(array.data_type()) == ArrowType::Int32 => {
                 // this is cheap as it internally reinterprets the data
                 let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
                 arrow_cast::cast(&a, target_type)?
@@ -305,9 +305,9 @@ mod tests {
     use crate::util::test_common::rand_gen::make_pages;
     use crate::util::InMemoryPageIterator;
     use arrow::datatypes::ArrowPrimitiveType;
-    use arrow_array::{Array, PrimitiveArray};
+    use arrow_array::{Array, Date32Array, PrimitiveArray};
 
-    use arrow::datatypes::DataType::Decimal128;
+    use arrow::datatypes::DataType::{Date32, Decimal128};
     use rand::distributions::uniform::SampleUniform;
     use std::collections::VecDeque;
 
@@ -783,4 +783,54 @@ mod tests {
             assert_ne!(array, &data_decimal_array)
         }
     }
+
+    #[test]
+    fn test_primitive_array_reader_date32_type() {
+        // parquet `INT32` to date
+        let message_type = "
+            message test_schema {
+                REQUIRED INT32 date1 (DATE);
+        }
+        ";
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+        let column_desc = schema.column(0);
+
+        // create the array reader
+        {
+            let mut data = Vec::new();
+            let mut page_lists = Vec::new();
+            make_column_chunks::<Int32Type>(
+                column_desc.clone(),
+                Encoding::PLAIN,
+                100,
+                -99999999,
+                99999999,
+                &mut Vec::new(),
+                &mut Vec::new(),
+                &mut data,
+                &mut page_lists,
+                true,
+                2,
+            );
+            let page_iterator = InMemoryPageIterator::new(page_lists);
+
+            let mut array_reader =
+                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
+                    .unwrap();
+
+            // read data from the reader
+            // the data type is date
+            let array = array_reader.next_batch(50).unwrap();
+            assert_eq!(array.data_type(), &Date32);
+            let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
+            let data_date_array = data[0..50]
+                .iter()
+                .copied()
+                .map(Some)
+                .collect::<Date32Array>();
+            assert_eq!(array, &data_date_array);
+        }
+    }
 }
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index f351c25bd..a3d011346 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -932,8 +932,8 @@ mod tests {
     use arrow_array::builder::*;
     use arrow_array::cast::AsArray;
     use arrow_array::types::{
-        Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, 
Float64Type,
-        Time32MillisecondType, Time64MicrosecondType,
+        Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, 
Float16Type,
+        Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
     };
     use arrow_array::*;
     use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
@@ -1272,6 +1272,117 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_date32_roundtrip() -> Result<()> {
+        use arrow_array::Date32Array;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "date32",
+            ArrowDataType::Date32,
+            false,
+        )]));
+
+        let mut buf = Vec::with_capacity(1024);
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
+
+        let original = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(Date32Array::from(vec![
+                -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 
100_000, 1_000_000,
+            ]))],
+        )?;
+
+        writer.write(&original)?;
+        writer.close()?;
+
+        let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 
1024)?;
+        let ret = reader.next().unwrap()?;
+        assert_eq!(ret, original);
+
+        // Ensure can be downcast to the correct type
+        ret.column(0).as_primitive::<Date32Type>();
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_date64_roundtrip() -> Result<()> {
+        use arrow_array::Date64Array;
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("small-date64", ArrowDataType::Date64, false),
+            Field::new("big-date64", ArrowDataType::Date64, false),
+            Field::new("invalid-date64", ArrowDataType::Date64, false),
+        ]));
+
+        let mut default_buf = Vec::with_capacity(1024);
+        let mut coerce_buf = Vec::with_capacity(1024);
+
+        let coerce_props = 
WriterProperties::builder().set_coerce_types(true).build();
+
+        let mut default_writer = ArrowWriter::try_new(&mut default_buf, 
schema.clone(), None)?;
+        let mut coerce_writer =
+            ArrowWriter::try_new(&mut coerce_buf, schema.clone(), 
Some(coerce_props))?;
+
+        static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
+
+        let original = RecordBatch::try_new(
+            schema,
+            vec![
+                // small-date64
+                Arc::new(Date64Array::from(vec![
+                    -1_000_000 * NUM_MILLISECONDS_IN_DAY,
+                    -1_000 * NUM_MILLISECONDS_IN_DAY,
+                    0,
+                    1_000 * NUM_MILLISECONDS_IN_DAY,
+                    1_000_000 * NUM_MILLISECONDS_IN_DAY,
+                ])),
+                // big-date64
+                Arc::new(Date64Array::from(vec![
+                    -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+                    -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+                    0,
+                    1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+                    10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
+                ])),
+                // invalid-date64
+                Arc::new(Date64Array::from(vec![
+                    -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
+                    -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
+                    1,
+                    1_000 * NUM_MILLISECONDS_IN_DAY + 1,
+                    1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
+                ])),
+            ],
+        )?;
+
+        default_writer.write(&original)?;
+        coerce_writer.write(&original)?;
+
+        default_writer.close()?;
+        coerce_writer.close()?;
+
+        let mut default_reader = 
ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
+        let mut coerce_reader = 
ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
+
+        let default_ret = default_reader.next().unwrap()?;
+        let coerce_ret = coerce_reader.next().unwrap()?;
+
+        // Roundtrip should be successful when default writer used
+        assert_eq!(default_ret, original);
+
+        // Only small-date64 should roundtrip successfully when coerce_types 
writer is used
+        assert_eq!(coerce_ret.column(0), original.column(0));
+        assert_ne!(coerce_ret.column(1), original.column(1));
+        assert_ne!(coerce_ret.column(2), original.column(2));
+
+        // Ensure both can be downcast to the correct type
+        default_ret.column(0).as_primitive::<Date64Type>();
+        coerce_ret.column(0).as_primitive::<Date64Type>();
+
+        Ok(())
+    }
     struct RandFixedLenGen {}
 
     impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
diff --git a/parquet/src/arrow/arrow_reader/statistics.rs 
b/parquet/src/arrow/arrow_reader/statistics.rs
index 8a7511be2..09f8ec7cc 100644
--- a/parquet/src/arrow/arrow_reader/statistics.rs
+++ b/parquet/src/arrow/arrow_reader/statistics.rs
@@ -21,6 +21,7 @@
 /// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
 use crate::arrow::buffer::bit_util::sign_extend_be;
 use crate::arrow::parquet_column;
+use crate::basic::Type as PhysicalType;
 use crate::data_type::{ByteArray, FixedLenByteArray};
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, 
RowGroupMetaData};
@@ -318,7 +319,7 @@ make_decimal_stats_iterator!(
 /// data_type: The data type of the statistics (e.g. `DataType::Int32`)
 /// iterator: The iterator of [`ParquetStatistics`] to extract the statistics 
from.
 macro_rules! get_statistics {
-    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
+    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, 
$physical_type: ident) => {
         paste! {
         match $data_type {
             DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
@@ -370,10 +371,11 @@ macro_rules! get_statistics {
             DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
                 [<$stat_type_prefix 
Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
             ))),
-            DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
+            DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => 
Ok(Arc::new(Date64Array::from_iter(
                 [<$stat_type_prefix Int32StatsIterator>]::new($iterator)
-                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
-            ))),
+                    .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 
1000))))),
+            DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => 
Ok(Arc::new(Date64Array::from_iter(
+                [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
             DataType::Timestamp(unit, timezone) =>{
                 let iter = [<$stat_type_prefix 
Int64StatsIterator>]::new($iterator).map(|x| x.copied());
                 Ok(match unit {
@@ -487,7 +489,7 @@ macro_rules! get_statistics {
                 Ok(Arc::new(arr))
             },
             DataType::Dictionary(_, value_type) => {
-                [<$stat_type_prefix:lower _ statistics>](value_type, $iterator)
+                [<$stat_type_prefix:lower _ statistics>](value_type, 
$iterator, $physical_type)
             },
             DataType::Utf8View => {
                 let iterator = [<$stat_type_prefix 
ByteArrayStatsIterator>]::new($iterator);
@@ -524,6 +526,7 @@ macro_rules! get_statistics {
             DataType::Map(_,_) |
             DataType::Duration(_) |
             DataType::Interval(_) |
+            DataType::Date64 |  // required to cover $physical_type match guard
             DataType::Null |
             DataType::List(_) |
             DataType::ListView(_) |
@@ -790,7 +793,7 @@ get_decimal_page_stats_iterator!(
 );
 
 macro_rules! get_data_page_statistics {
-    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
+    ($stat_type_prefix: ident, $data_type: ident, $iterator: ident, 
$physical_type: ident) => {
         paste! {
             match $data_type {
                 DataType::Boolean => {
@@ -929,7 +932,7 @@ macro_rules! get_data_page_statistics {
                     Ok(Arc::new(builder.finish()))
                 },
                 DataType::Dictionary(_, value_type) => {
-                    [<$stat_type_prefix:lower _ page_statistics>](value_type, 
$iterator)
+                    [<$stat_type_prefix:lower _ page_statistics>](value_type, 
$iterator, $physical_type)
                 },
                 DataType::Timestamp(unit, timezone) => {
                     let iter = [<$stat_type_prefix 
Int64DataPageStatsIterator>]::new($iterator).flatten();
@@ -941,7 +944,7 @@ macro_rules! get_data_page_statistics {
                     })
                 },
                 DataType::Date32 => 
Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix 
Int32DataPageStatsIterator>]::new($iterator).flatten()))),
-                DataType::Date64 => Ok(
+                DataType::Date64 if $physical_type == 
Some(PhysicalType::INT32)=> Ok(
                     Arc::new(
                         Date64Array::from_iter([<$stat_type_prefix 
Int32DataPageStatsIterator>]::new($iterator)
                             .map(|x| {
@@ -954,6 +957,7 @@ macro_rules! get_data_page_statistics {
                         )
                     )
                 ),
+                DataType::Date64 if $physical_type == 
Some(PhysicalType::INT64) => 
Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix 
Int64DataPageStatsIterator>]::new($iterator).flatten()))),
                 DataType::Decimal128(precision, scale) => Ok(Arc::new(
                     Decimal128Array::from_iter([<$stat_type_prefix 
Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision,
 *scale)?)),
                 DataType::Decimal256(precision, scale) => Ok(Arc::new(
@@ -1040,6 +1044,7 @@ macro_rules! get_data_page_statistics {
                     }
                     Ok(Arc::new(builder.finish()))
                 },
+                DataType::Date64 |  // required to cover $physical_type match 
guard
                 DataType::Null |
                 DataType::Duration(_) |
                 DataType::Interval(_) |
@@ -1067,8 +1072,9 @@ macro_rules! get_data_page_statistics {
 fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
     data_type: &DataType,
     iterator: I,
+    physical_type: Option<PhysicalType>,
 ) -> Result<ArrayRef> {
-    get_statistics!(Min, data_type, iterator)
+    get_statistics!(Min, data_type, iterator, physical_type)
 }
 
 /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to 
an [`ArrayRef`]
@@ -1077,26 +1083,35 @@ fn min_statistics<'a, I: Iterator<Item = Option<&'a 
ParquetStatistics>>>(
 fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
     data_type: &DataType,
     iterator: I,
+    physical_type: Option<PhysicalType>,
 ) -> Result<ArrayRef> {
-    get_statistics!(Max, data_type, iterator)
+    get_statistics!(Max, data_type, iterator, physical_type)
 }
 
 /// Extracts the min statistics from an iterator
 /// of parquet page [`Index`]'es to an [`ArrayRef`]
-pub(crate) fn min_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> 
Result<ArrayRef>
+pub(crate) fn min_page_statistics<'a, I>(
+    data_type: &DataType,
+    iterator: I,
+    physical_type: Option<PhysicalType>,
+) -> Result<ArrayRef>
 where
     I: Iterator<Item = (usize, &'a Index)>,
 {
-    get_data_page_statistics!(Min, data_type, iterator)
+    get_data_page_statistics!(Min, data_type, iterator, physical_type)
 }
 
 /// Extracts the max statistics from an iterator
 /// of parquet page [`Index`]'es to an [`ArrayRef`]
-pub(crate) fn max_page_statistics<'a, I>(data_type: &DataType, iterator: I) -> 
Result<ArrayRef>
+pub(crate) fn max_page_statistics<'a, I>(
+    data_type: &DataType,
+    iterator: I,
+    physical_type: Option<PhysicalType>,
+) -> Result<ArrayRef>
 where
     I: Iterator<Item = (usize, &'a Index)>,
 {
-    get_data_page_statistics!(Max, data_type, iterator)
+    get_data_page_statistics!(Max, data_type, iterator, physical_type)
 }
 
 /// Extracts the null count statistics from an iterator
@@ -1177,6 +1192,8 @@ pub struct StatisticsConverter<'a> {
     arrow_field: &'a Field,
     /// treat missing null_counts as 0 nulls
     missing_null_counts_as_zero: bool,
+    /// The physical type of the matched column in the Parquet schema
+    physical_type: Option<PhysicalType>,
 }
 
 impl<'a> StatisticsConverter<'a> {
@@ -1304,6 +1321,7 @@ impl<'a> StatisticsConverter<'a> {
             parquet_column_index: parquet_index,
             arrow_field,
             missing_null_counts_as_zero: true,
+            physical_type: parquet_index.map(|idx| 
parquet_schema.column(idx).physical_type()),
         })
     }
 
@@ -1346,7 +1364,7 @@ impl<'a> StatisticsConverter<'a> {
     /// // get the minimum value for the column "foo" in the parquet file
     /// let min_values: ArrayRef = converter
     ///   .row_group_mins(metadata.row_groups().iter())
-    ///  .unwrap();
+    ///   .unwrap();
     /// // if "foo" is a Float64 value, the returned array will contain 
Float64 values
     /// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), 
Some(2.0)])) as _);
     /// ```
@@ -1363,7 +1381,7 @@ impl<'a> StatisticsConverter<'a> {
         let iter = metadatas
             .into_iter()
             .map(|x| x.column(parquet_index).statistics());
-        min_statistics(data_type, iter)
+        min_statistics(data_type, iter, self.physical_type)
     }
 
     /// Extract the maximum values from row group statistics in 
[`RowGroupMetaData`]
@@ -1382,7 +1400,7 @@ impl<'a> StatisticsConverter<'a> {
         let iter = metadatas
             .into_iter()
             .map(|x| x.column(parquet_index).statistics());
-        max_statistics(data_type, iter)
+        max_statistics(data_type, iter, self.physical_type)
     }
 
     /// Extract the null counts from row group statistics in 
[`RowGroupMetaData`]
@@ -1490,7 +1508,7 @@ impl<'a> StatisticsConverter<'a> {
             (*num_data_pages, column_page_index_per_row_group_per_column)
         });
 
-        min_page_statistics(data_type, iter)
+        min_page_statistics(data_type, iter, self.physical_type)
     }
 
     /// Extract the maximum values from Data Page statistics.
@@ -1521,7 +1539,7 @@ impl<'a> StatisticsConverter<'a> {
             (*num_data_pages, column_page_index_per_row_group_per_column)
         });
 
-        max_page_statistics(data_type, iter)
+        max_page_statistics(data_type, iter, self.physical_type)
     }
 
     /// Returns a [`UInt64Array`] with null counts for each data page.
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index c9f911448..222d86131 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -180,11 +180,11 @@ impl<W: Write + Send> ArrowWriter<W> {
         arrow_schema: SchemaRef,
         options: ArrowWriterOptions,
     ) -> Result<Self> {
+        let mut props = options.properties;
         let schema = match options.schema_root {
-            Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
-            None => arrow_to_parquet_schema(&arrow_schema)?,
+            Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, 
props.coerce_types())?,
+            None => arrow_to_parquet_schema(&arrow_schema, 
props.coerce_types())?,
         };
-        let mut props = options.properties;
         if !options.skip_arrow_metadata {
             // add serialized arrow schema
             add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
@@ -549,8 +549,8 @@ impl ArrowColumnChunk {
 /// ]));
 ///
 /// // Compute the parquet schema
-/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
 /// let props = Arc::new(WriterProperties::default());
+/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), 
props.coerce_types()).unwrap();
 ///
 /// // Create writers for each of the leaf columns
 /// let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
@@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: 
&ArrayLevels) -> Result<usi
         }
         ColumnWriter::Int64ColumnWriter(ref mut typed) => {
             match column.data_type() {
+                ArrowDataType::Date64 => {
+                    let array = arrow_cast::cast(column, 
&ArrowDataType::Int64)?;
+
+                    let array = array.as_primitive::<Int64Type>();
+                    write_primitive(typed, array.values(), levels)
+                }
                 ArrowDataType::Int64 => {
                     let array = column.as_primitive::<Int64Type>();
                     write_primitive(typed, array.values(), levels)
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index bf1fb6332..ec34840d8 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -229,16 +229,20 @@ pub(crate) fn 
add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut
 ///
 /// The name of the root schema element defaults to `"arrow_schema"`, this can 
be
 /// overridden with [`arrow_to_parquet_schema_with_root`]
-pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
-    arrow_to_parquet_schema_with_root(schema, "arrow_schema")
+pub fn arrow_to_parquet_schema(schema: &Schema, coerce_types: bool) -> 
Result<SchemaDescriptor> {
+    arrow_to_parquet_schema_with_root(schema, "arrow_schema", coerce_types)
 }
 
 /// Convert arrow schema to parquet schema specifying the name of the root 
schema element
-pub fn arrow_to_parquet_schema_with_root(schema: &Schema, root: &str) -> 
Result<SchemaDescriptor> {
+pub fn arrow_to_parquet_schema_with_root(
+    schema: &Schema,
+    root: &str,
+    coerce_types: bool,
+) -> Result<SchemaDescriptor> {
     let fields = schema
         .fields()
         .iter()
-        .map(|field| arrow_to_parquet_type(field).map(Arc::new))
+        .map(|field| arrow_to_parquet_type(field, coerce_types).map(Arc::new))
         .collect::<Result<_>>()?;
     let group = Type::group_type_builder(root).with_fields(fields).build()?;
     Ok(SchemaDescriptor::new(Arc::new(group)))
@@ -298,7 +302,7 @@ pub fn decimal_length_from_precision(precision: u8) -> 
usize {
 }
 
 /// Convert an arrow field to a parquet `Type`
-fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
+fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
     let name = field.name().as_str();
     let repetition = if field.is_nullable() {
         Repetition::OPTIONAL
@@ -415,12 +419,20 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             .with_repetition(repetition)
             .with_id(id)
             .build(),
-        // date64 is cast to date32 (#1666)
-        DataType::Date64 => Type::primitive_type_builder(name, 
PhysicalType::INT32)
-            .with_logical_type(Some(LogicalType::Date))
-            .with_repetition(repetition)
-            .with_id(id)
-            .build(),
+        DataType::Date64 => {
+            if coerce_types {
+                Type::primitive_type_builder(name, PhysicalType::INT32)
+                    .with_logical_type(Some(LogicalType::Date))
+                    .with_repetition(repetition)
+                    .with_id(id)
+                    .build()
+            } else {
+                Type::primitive_type_builder(name, PhysicalType::INT64)
+                    .with_repetition(repetition)
+                    .with_id(id)
+                    .build()
+            }
+        }
         DataType::Time32(TimeUnit::Second) => {
             // Cannot represent seconds in LogicalType
             Type::primitive_type_builder(name, PhysicalType::INT32)
@@ -518,7 +530,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             Type::group_type_builder(name)
                 .with_fields(vec![Arc::new(
                     Type::group_type_builder("list")
-                        .with_fields(vec![Arc::new(arrow_to_parquet_type(f)?)])
+                        .with_fields(vec![Arc::new(arrow_to_parquet_type(f, 
coerce_types)?)])
                         .with_repetition(Repetition::REPEATED)
                         .build()?,
                 )])
@@ -537,7 +549,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             // recursively convert children to types/nodes
             let fields = fields
                 .iter()
-                .map(|f| arrow_to_parquet_type(f).map(Arc::new))
+                .map(|f| arrow_to_parquet_type(f, coerce_types).map(Arc::new))
                 .collect::<Result<_>>()?;
             Type::group_type_builder(name)
                 .with_fields(fields)
@@ -551,8 +563,8 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                     .with_fields(vec![Arc::new(
                         Type::group_type_builder(field.name())
                             .with_fields(vec![
-                                
Arc::new(arrow_to_parquet_type(&struct_fields[0])?),
-                                
Arc::new(arrow_to_parquet_type(&struct_fields[1])?),
+                                
Arc::new(arrow_to_parquet_type(&struct_fields[0], coerce_types)?),
+                                
Arc::new(arrow_to_parquet_type(&struct_fields[1], coerce_types)?),
                             ])
                             .with_repetition(Repetition::REPEATED)
                             .build()?,
@@ -571,7 +583,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
         DataType::Dictionary(_, ref value) => {
             // Dictionary encoding not handled at the schema level
             let dict_field = 
field.clone().with_data_type(value.as_ref().clone());
-            arrow_to_parquet_type(&dict_field)
+            arrow_to_parquet_type(&dict_field, coerce_types)
         }
         DataType::RunEndEncoded(_, _) => Err(arrow_err!(
             "Converting RunEndEncodedType to parquet not supported",
@@ -1557,7 +1569,7 @@ mod tests {
             Field::new("decimal256", DataType::Decimal256(39, 2), false),
         ];
         let arrow_schema = Schema::new(arrow_fields);
-        let converted_arrow_schema = 
arrow_to_parquet_schema(&arrow_schema).unwrap();
+        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, 
false).unwrap();
 
         assert_eq!(
             parquet_schema.columns().len(),
@@ -1594,7 +1606,7 @@ mod tests {
             false,
         )];
         let arrow_schema = Schema::new(arrow_fields);
-        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
+        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema, 
true);
 
         assert!(converted_arrow_schema.is_err());
         converted_arrow_schema.unwrap();
@@ -1866,7 +1878,7 @@ mod tests {
         // don't pass metadata so field ids are read from Parquet and not from 
serialized Arrow schema
         let arrow_schema = 
crate::arrow::parquet_to_arrow_schema(&schema_descriptor, None)?;
 
-        let parq_schema_descr = 
crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let parq_schema_descr = 
crate::arrow::arrow_to_parquet_schema(&arrow_schema, true)?;
         let parq_fields = parq_schema_descr.root_schema().get_fields();
         assert_eq!(parq_fields.len(), 2);
         assert_eq!(parq_fields[0].get_basic_info().id(), 1);
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 49980f525..cb07c1f49 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -57,6 +57,8 @@ pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
 pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
 /// Default values for [`WriterProperties::statistics_truncate_length`]
 pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = None;
+/// Default values for [`WriterProperties::coerce_types`]
+pub const DEFAULT_COERCE_TYPES: bool = false;
 
 /// Parquet writer version.
 ///
@@ -163,6 +165,7 @@ pub struct WriterProperties {
     sorting_columns: Option<Vec<SortingColumn>>,
     column_index_truncate_length: Option<usize>,
     statistics_truncate_length: Option<usize>,
+    coerce_types: bool,
 }
 
 impl Default for WriterProperties {
@@ -265,6 +268,19 @@ impl WriterProperties {
         self.statistics_truncate_length
     }
 
+    /// Returns `coerce_types` boolean
+    ///
+    /// Some Arrow types do not have a corresponding Parquet logical type.
+    /// Affected Arrow data types include `Date64`, `Timestamp` and `Interval`.
+    /// Writers have the option to coerce these into native Parquet types. Type
+    /// coercion allows for meaningful representations that do not require
+    /// downstream readers to consider the embedded Arrow schema. However, type
+    /// coercion also prevents the data from being losslessly round-tripped. 
This method
+    /// returns `true` if type coercion enabled.
+    pub fn coerce_types(&self) -> bool {
+        self.coerce_types
+    }
+
     /// Returns encoding for a data page, when dictionary encoding is enabled.
     /// This is not configurable.
     #[inline]
@@ -361,6 +377,7 @@ pub struct WriterPropertiesBuilder {
     sorting_columns: Option<Vec<SortingColumn>>,
     column_index_truncate_length: Option<usize>,
     statistics_truncate_length: Option<usize>,
+    coerce_types: bool,
 }
 
 impl WriterPropertiesBuilder {
@@ -381,6 +398,7 @@ impl WriterPropertiesBuilder {
             sorting_columns: None,
             column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
             statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+            coerce_types: DEFAULT_COERCE_TYPES,
         }
     }
 
@@ -401,6 +419,7 @@ impl WriterPropertiesBuilder {
             sorting_columns: self.sorting_columns,
             column_index_truncate_length: self.column_index_truncate_length,
             statistics_truncate_length: self.statistics_truncate_length,
+            coerce_types: self.coerce_types,
         }
     }
 
@@ -731,6 +750,13 @@ impl WriterPropertiesBuilder {
         self.statistics_truncate_length = max_length;
         self
     }
+
+    /// Sets flag to enable/disable type coercion.
+    /// Takes precedence over globally defined settings.
+    pub fn set_coerce_types(mut self, coerce_types: bool) -> Self {
+        self.coerce_types = coerce_types;
+        self
+    }
 }
 
 /// Controls the level of statistics to be computed by the writer and stored in

Reply via email to