This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch 53.0.0-dev
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/53.0.0-dev by this push:
     new e40b311518 Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` 
(#6068)
e40b311518 is described below

commit e40b3115183c4e31132377839cc03a76244e10a7
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Jul 19 13:29:52 2024 -0700

    Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
    
    * update to latest thrift (as of 11 Jul 2024) from parquet-format
    
    * pass None for optional size statistics
    
    * escape HTML tags
    
    * don't need to escape brackets in arrays
    
    * add support for unencoded_byte_array_data_bytes
    
    * add comments
    
    * change sig of ColumnMetrics::update_variable_length_bytes()
    
    * rename ParquetOffsetIndex to OffsetSizeIndex
    
    * rename some functions
    
    * suggestion from review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * add Default trait to ColumnMetrics as suggested in review
    
    * rename OffsetSizeIndex to OffsetIndexMetaData
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/arrow/arrow_writer/byte_array.rs | 19 ++++++-
 parquet/src/arrow/async_reader/metadata.rs   |  8 ++-
 parquet/src/arrow/async_reader/mod.rs        |  1 +
 parquet/src/column/writer/encoder.rs         |  8 +++
 parquet/src/column/writer/mod.rs             | 54 ++++++++++++------
 parquet/src/data_type.rs                     | 11 ++++
 parquet/src/file/metadata/memory.rs          |  1 +
 parquet/src/file/metadata/mod.rs             | 77 +++++++++++++++++++++++--
 parquet/src/file/page_index/index_reader.rs  | 53 +++++++++++++++--
 parquet/src/file/page_index/mod.rs           |  1 +
 parquet/src/file/page_index/offset_index.rs  | 50 ++++++++++++++++
 parquet/src/file/serialized_reader.rs        | 15 ++++-
 parquet/src/file/writer.rs                   | 85 +++++++++++++++++++++++++++-
 13 files changed, 349 insertions(+), 34 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs 
b/parquet/src/arrow/arrow_writer/byte_array.rs
index fc37ebfb45..2d23ad8510 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -96,6 +96,7 @@ macro_rules! downcast_op {
 struct FallbackEncoder {
     encoder: FallbackEncoderImpl,
     num_values: usize,
+    variable_length_bytes: i64,
 }
 
 /// The fallback encoder in use
@@ -152,6 +153,7 @@ impl FallbackEncoder {
         Ok(Self {
             encoder,
             num_values: 0,
+            variable_length_bytes: 0,
         })
     }
 
@@ -168,7 +170,8 @@ impl FallbackEncoder {
                     let value = values.value(*idx);
                     let value = value.as_ref();
                     buffer.extend_from_slice((value.len() as u32).as_bytes());
-                    buffer.extend_from_slice(value)
+                    buffer.extend_from_slice(value);
+                    self.variable_length_bytes += value.len() as i64;
                 }
             }
             FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
@@ -177,6 +180,7 @@ impl FallbackEncoder {
                     let value = value.as_ref();
                     lengths.put(&[value.len() as i32]).unwrap();
                     buffer.extend_from_slice(value);
+                    self.variable_length_bytes += value.len() as i64;
                 }
             }
             FallbackEncoderImpl::Delta {
@@ -205,6 +209,7 @@ impl FallbackEncoder {
                     buffer.extend_from_slice(&value[prefix_length..]);
                     prefix_lengths.put(&[prefix_length as i32]).unwrap();
                     suffix_lengths.put(&[suffix_length as i32]).unwrap();
+                    self.variable_length_bytes += value.len() as i64;
                 }
             }
         }
@@ -269,12 +274,17 @@ impl FallbackEncoder {
             }
         };
 
+        // Capture value of variable_length_bytes and reset for next page
+        let variable_length_bytes = Some(self.variable_length_bytes);
+        self.variable_length_bytes = 0;
+
         Ok(DataPageValues {
             buf: buf.into(),
             num_values: std::mem::take(&mut self.num_values),
             encoding,
             min_value,
             max_value,
+            variable_length_bytes,
         })
     }
 }
@@ -321,6 +331,7 @@ impl Storage for ByteArrayStorage {
 struct DictEncoder {
     interner: Interner<ByteArrayStorage>,
     indices: Vec<u64>,
+    variable_length_bytes: i64,
 }
 
 impl DictEncoder {
@@ -336,6 +347,7 @@ impl DictEncoder {
             let value = values.value(*idx);
             let interned = self.interner.intern(value.as_ref());
             self.indices.push(interned);
+            self.variable_length_bytes += value.as_ref().len() as i64;
         }
     }
 
@@ -384,12 +396,17 @@ impl DictEncoder {
 
         self.indices.clear();
 
+        // Capture value of variable_length_bytes and reset for next page
+        let variable_length_bytes = Some(self.variable_length_bytes);
+        self.variable_length_bytes = 0;
+
         DataPageValues {
             buf: encoder.consume().into(),
             num_values,
             encoding: Encoding::RLE_DICTIONARY,
             min_value,
             max_value,
+            variable_length_bytes,
         }
     }
 }
diff --git a/parquet/src/arrow/async_reader/metadata.rs 
b/parquet/src/arrow/async_reader/metadata.rs
index 9224ea3f68..4a3489a208 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -20,7 +20,9 @@ use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
 use crate::file::metadata::ParquetMetaData;
 use crate::file::page_index::index::Index;
-use crate::file::page_index::index_reader::{acc_range, decode_column_index, 
decode_offset_index};
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_page_locations,
+};
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use futures::FutureExt;
@@ -177,7 +179,9 @@ impl<F: MetadataFetch> MetadataLoader<F> {
                     x.columns()
                         .iter()
                         .map(|c| match c.offset_index_range() {
-                            Some(r) => decode_offset_index(&data[r.start - 
offset..r.end - offset]),
+                            Some(r) => {
+                                decode_page_locations(&data[r.start - 
offset..r.end - offset])
+                            }
                             None => Err(general_err!("missing offset index")),
                         })
                         .collect::<Result<Vec<_>>>()
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index e4205b7ef2..5a790fa6af 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1538,6 +1538,7 @@ mod tests {
             vec![row_group_meta],
             None,
             Some(vec![offset_index.clone()]),
+            None,
         );
 
         let metadata = Arc::new(metadata);
diff --git a/parquet/src/column/writer/encoder.rs 
b/parquet/src/column/writer/encoder.rs
index b6c8212608..9d01c09040 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -63,6 +63,7 @@ pub struct DataPageValues<T> {
     pub encoding: Encoding,
     pub min_value: Option<T>,
     pub max_value: Option<T>,
+    pub variable_length_bytes: Option<i64>,
 }
 
 /// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
@@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
     min_value: Option<T::T>,
     max_value: Option<T::T>,
     bloom_filter: Option<Sbbf>,
+    variable_length_bytes: Option<i64>,
 }
 
 impl<T: DataType> ColumnValueEncoderImpl<T> {
@@ -150,6 +152,10 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
                 update_min(&self.descr, &min, &mut self.min_value);
                 update_max(&self.descr, &max, &mut self.max_value);
             }
+
+            if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
+                *self.variable_length_bytes.get_or_insert(0) += var_bytes;
+            }
         }
 
         // encode the values into bloom filter if enabled
@@ -203,6 +209,7 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
             bloom_filter,
             min_value: None,
             max_value: None,
+            variable_length_bytes: None,
         })
     }
 
@@ -296,6 +303,7 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
             num_values: std::mem::take(&mut self.num_values),
             min_value: self.min_value.take(),
             max_value: self.max_value.take(),
+            variable_length_bytes: self.variable_length_bytes.take(),
         })
     }
 }
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 8594e59714..0e227a157d 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -192,7 +192,8 @@ struct PageMetrics {
 }
 
 // Metrics per column writer
-struct ColumnMetrics<T> {
+#[derive(Default)]
+struct ColumnMetrics<T: Default> {
     total_bytes_written: u64,
     total_rows_written: u64,
     total_uncompressed_size: u64,
@@ -204,6 +205,20 @@ struct ColumnMetrics<T> {
     max_column_value: Option<T>,
     num_column_nulls: u64,
     column_distinct_count: Option<u64>,
+    variable_length_bytes: Option<i64>,
+}
+
+impl<T: Default> ColumnMetrics<T> {
+    fn new() -> Self {
+        Default::default()
+    }
+
+    /// Sum the provided page variable_length_bytes into the chunk 
variable_length_bytes
+    fn update_variable_length_bytes(&mut self, variable_length_bytes: 
Option<i64>) {
+        if let Some(var_bytes) = variable_length_bytes {
+            *self.variable_length_bytes.get_or_insert(0) += var_bytes;
+        }
+    }
 }
 
 /// Typed column writer for a primitive column.
@@ -276,19 +291,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
                 num_buffered_rows: 0,
                 num_page_nulls: 0,
             },
-            column_metrics: ColumnMetrics {
-                total_bytes_written: 0,
-                total_rows_written: 0,
-                total_uncompressed_size: 0,
-                total_compressed_size: 0,
-                total_num_values: 0,
-                dictionary_page_offset: None,
-                data_page_offset: None,
-                min_column_value: None,
-                max_column_value: None,
-                num_column_nulls: 0,
-                column_distinct_count: None,
-            },
+            column_metrics: ColumnMetrics::<E::T>::new(),
             column_index_builder: ColumnIndexBuilder::new(),
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -634,7 +637,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
     }
 
     /// Update the column index and offset index when adding the data page
-    fn update_column_offset_index(&mut self, page_statistics: 
Option<&ValueStatistics<E::T>>) {
+    fn update_column_offset_index(
+        &mut self,
+        page_statistics: Option<&ValueStatistics<E::T>>,
+        page_variable_length_bytes: Option<i64>,
+    ) {
         // update the column index
         let null_page =
             (self.page_metrics.num_buffered_rows as u64) == 
self.page_metrics.num_page_nulls;
@@ -708,6 +715,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         // update the offset index
         self.offset_index_builder
             .append_row_count(self.page_metrics.num_buffered_rows as i64);
+
+        self.offset_index_builder
+            
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
     }
 
     /// Determine if we should allow truncating min/max values for this 
column's statistics
@@ -783,7 +793,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         };
 
         // update column and offset index
-        self.update_column_offset_index(page_statistics.as_ref());
+        self.update_column_offset_index(
+            page_statistics.as_ref(),
+            values_data.variable_length_bytes,
+        );
+
+        // Update variable_length_bytes in column_metrics
+        self.column_metrics
+            .update_variable_length_bytes(values_data.variable_length_bytes);
+
         let page_statistics = page_statistics.map(Statistics::from);
 
         let compressed_page = match self.props.writer_version() {
@@ -993,7 +1011,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
                 stats => stats,
             };
 
-            builder = builder.set_statistics(statistics);
+            builder = builder
+                .set_statistics(statistics)
+                
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
         }
 
         let metadata = builder.build()?;
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index b85a75cfd4..01e92115c4 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -644,6 +644,13 @@ pub(crate) mod private {
             (std::mem::size_of::<Self>(), 1)
         }
 
+        /// Return the number of variable length bytes in a given slice of data
+        ///
+        /// Returns the sum of lengths for BYTE_ARRAY data, and None for all 
other data types
+        fn variable_length_bytes(_: &[Self]) -> Option<i64> {
+            None
+        }
+
         /// Return the value as i64 if possible
         ///
         /// This is essentially the same as `std::convert::TryInto<i64>` but 
can't be
@@ -956,6 +963,10 @@ pub(crate) mod private {
             Ok(num_values)
         }
 
+        fn variable_length_bytes(values: &[Self]) -> Option<i64> {
+            Some(values.iter().map(|x| x.len() as i64).sum())
+        }
+
         fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> 
Result<usize> {
             let data = decoder
                 .data
diff --git a/parquet/src/file/metadata/memory.rs 
b/parquet/src/file/metadata/memory.rs
index 57b2f7eec0..a1b40a8de9 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -97,6 +97,7 @@ impl HeapSize for ColumnChunkMetaData {
             + self.compression.heap_size()
             + self.statistics.heap_size()
             + self.encoding_stats.heap_size()
+            + self.unencoded_byte_array_data_bytes.heap_size()
     }
 }
 
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 39b6de41fa..d86b1ce572 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -36,7 +36,7 @@ use std::sync::Arc;
 
 use crate::format::{
     BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, 
PageLocation, RowGroup,
-    SortingColumn,
+    SizeStatistics, SortingColumn,
 };
 
 use crate::basic::{ColumnOrder, Compression, Encoding, Type};
@@ -96,6 +96,8 @@ pub struct ParquetMetaData {
     column_index: Option<ParquetColumnIndex>,
     /// Offset index for all each page in each column chunk
     offset_index: Option<ParquetOffsetIndex>,
+    /// `unencoded_byte_array_data_bytes` from the offset index
+    unencoded_byte_array_data_bytes: Option<Vec<Vec<Option<Vec<i64>>>>>,
 }
 
 impl ParquetMetaData {
@@ -107,6 +109,7 @@ impl ParquetMetaData {
             row_groups,
             column_index: None,
             offset_index: None,
+            unencoded_byte_array_data_bytes: None,
         }
     }
 
@@ -117,12 +120,14 @@ impl ParquetMetaData {
         row_groups: Vec<RowGroupMetaData>,
         column_index: Option<ParquetColumnIndex>,
         offset_index: Option<ParquetOffsetIndex>,
+        unencoded_byte_array_data_bytes: Option<Vec<Vec<Option<Vec<i64>>>>>,
     ) -> Self {
         ParquetMetaData {
             file_metadata,
             row_groups,
             column_index,
             offset_index,
+            unencoded_byte_array_data_bytes,
         }
     }
 
@@ -179,6 +184,19 @@ impl ParquetMetaData {
         self.offset_index.as_ref()
     }
 
+    /// Returns `unencoded_byte_array_data_bytes` from the offset indexes in 
this file, if loaded
+    ///
+    /// This value represents the output size of the total bytes in this file, 
which can be useful for
+    /// allocating an appropriately sized output buffer.
+    ///
+    /// Returns `None` if the parquet file does not have a `OffsetIndex` or
+    /// [ArrowReaderOptions::with_page_index] was set to false.
+    ///
+    /// [ArrowReaderOptions::with_page_index]: 
https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index
+    pub fn unencoded_byte_array_data_bytes(&self) -> 
Option<&Vec<Vec<Option<Vec<i64>>>>> {
+        self.unencoded_byte_array_data_bytes.as_ref()
+    }
+
     /// Estimate of the bytes allocated to store `ParquetMetadata`
     ///
     /// # Notes:
@@ -199,6 +217,7 @@ impl ParquetMetaData {
             + self.row_groups.heap_size()
             + self.column_index.heap_size()
             + self.offset_index.heap_size()
+            + self.unencoded_byte_array_data_bytes.heap_size()
     }
 
     /// Override the column index
@@ -543,6 +562,7 @@ pub struct ColumnChunkMetaData {
     offset_index_length: Option<i32>,
     column_index_offset: Option<i64>,
     column_index_length: Option<i32>,
+    unencoded_byte_array_data_bytes: Option<i64>,
 }
 
 /// Represents common operations for a column chunk.
@@ -695,6 +715,14 @@ impl ColumnChunkMetaData {
         Some(offset..(offset + length))
     }
 
+    /// Returns the number of bytes of variable length data after decoding.
+    ///
+    /// Only set for BYTE_ARRAY columns. This field may not be set by older
+    /// writers.
+    pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
+        self.unencoded_byte_array_data_bytes
+    }
+
     /// Method to convert from Thrift.
     pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> 
Result<Self> {
         if cc.meta_data.is_none() {
@@ -732,6 +760,12 @@ impl ColumnChunkMetaData {
         let offset_index_length = cc.offset_index_length;
         let column_index_offset = cc.column_index_offset;
         let column_index_length = cc.column_index_length;
+        let unencoded_byte_array_data_bytes = if let Some(size_stats) = 
col_metadata.size_statistics
+        {
+            size_stats.unencoded_byte_array_data_bytes
+        } else {
+            None
+        };
 
         let result = ColumnChunkMetaData {
             column_descr,
@@ -753,6 +787,7 @@ impl ColumnChunkMetaData {
             offset_index_length,
             column_index_offset,
             column_index_length,
+            unencoded_byte_array_data_bytes,
         };
         Ok(result)
     }
@@ -776,6 +811,16 @@ impl ColumnChunkMetaData {
 
     /// Method to convert to Thrift `ColumnMetaData`
     pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
+        let size_statistics = if 
self.unencoded_byte_array_data_bytes.is_some() {
+            Some(SizeStatistics {
+                unencoded_byte_array_data_bytes: 
self.unencoded_byte_array_data_bytes,
+                repetition_level_histogram: None,
+                definition_level_histogram: None,
+            })
+        } else {
+            None
+        };
+
         ColumnMetaData {
             type_: self.column_type().into(),
             encodings: self.encodings().iter().map(|&v| v.into()).collect(),
@@ -795,7 +840,7 @@ impl ColumnChunkMetaData {
                 .map(|vec| 
vec.iter().map(page_encoding_stats::to_thrift).collect()),
             bloom_filter_offset: self.bloom_filter_offset,
             bloom_filter_length: self.bloom_filter_length,
-            size_statistics: None,
+            size_statistics,
         }
     }
 
@@ -831,6 +876,7 @@ impl ColumnChunkMetaDataBuilder {
             offset_index_length: None,
             column_index_offset: None,
             column_index_length: None,
+            unencoded_byte_array_data_bytes: None,
         })
     }
 
@@ -942,6 +988,12 @@ impl ColumnChunkMetaDataBuilder {
         self
     }
 
+    /// Sets optional length of variable length data in bytes.
+    pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>) 
-> Self {
+        self.0.unencoded_byte_array_data_bytes = value;
+        self
+    }
+
     /// Builds column chunk metadata.
     pub fn build(self) -> Result<ColumnChunkMetaData> {
         Ok(self.0)
@@ -1021,6 +1073,7 @@ pub struct OffsetIndexBuilder {
     offset_array: Vec<i64>,
     compressed_page_size_array: Vec<i32>,
     first_row_index_array: Vec<i64>,
+    unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
     current_first_row_index: i64,
 }
 
@@ -1036,6 +1089,7 @@ impl OffsetIndexBuilder {
             offset_array: Vec::new(),
             compressed_page_size_array: Vec::new(),
             first_row_index_array: Vec::new(),
+            unencoded_byte_array_data_bytes_array: None,
             current_first_row_index: 0,
         }
     }
@@ -1051,6 +1105,17 @@ impl OffsetIndexBuilder {
         self.compressed_page_size_array.push(compressed_page_size);
     }
 
+    pub fn append_unencoded_byte_array_data_bytes(
+        &mut self,
+        unencoded_byte_array_data_bytes: Option<i64>,
+    ) {
+        if let Some(val) = unencoded_byte_array_data_bytes {
+            self.unencoded_byte_array_data_bytes_array
+                .get_or_insert(Vec::new())
+                .push(val);
+        }
+    }
+
     /// Build and get the thrift metadata of offset index
     pub fn build_to_thrift(self) -> OffsetIndex {
         let locations = self
@@ -1060,7 +1125,7 @@ impl OffsetIndexBuilder {
             .zip(self.first_row_index_array.iter())
             .map(|((offset, size), row_index)| PageLocation::new(*offset, 
*size, *row_index))
             .collect::<Vec<_>>();
-        OffsetIndex::new(locations, None)
+        OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array)
     }
 }
 
@@ -1211,6 +1276,7 @@ mod tests {
             .set_offset_index_length(Some(25))
             .set_column_index_offset(Some(8000))
             .set_column_index_length(Some(25))
+            .set_unencoded_byte_array_data_bytes(Some(2000))
             .build()
             .unwrap();
 
@@ -1298,7 +1364,7 @@ mod tests {
             column_orders,
         );
         let parquet_meta = ParquetMetaData::new(file_metadata.clone(), 
row_group_meta.clone());
-        let base_expected_size = 1320;
+        let base_expected_size = 1376;
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
         let mut column_index = ColumnIndexBuilder::new();
@@ -1315,9 +1381,10 @@ mod tests {
                 vec![PageLocation::new(1, 2, 3)],
                 vec![PageLocation::new(1, 2, 3)],
             ]]),
+            Some(vec![vec![Some(vec![10, 20, 30])]]),
         );
 
-        let bigger_expected_size = 2304;
+        let bigger_expected_size = 2464;
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
         assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
diff --git a/parquet/src/file/page_index/index_reader.rs 
b/parquet/src/file/page_index/index_reader.rs
index 2ddf826fb0..7959cb95c0 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -22,6 +22,7 @@ use crate::data_type::Int96;
 use crate::errors::ParquetError;
 use crate::file::metadata::ColumnChunkMetaData;
 use crate::file::page_index::index::{Index, NativeIndex};
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::reader::ChunkReader;
 use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
 use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
@@ -45,9 +46,9 @@ pub(crate) fn acc_range(a: Option<Range<usize>>, b: 
Option<Range<usize>>) -> Opt
 /// Returns an empty vector if this row group does not contain a
 /// [`ColumnIndex`].
 ///
-/// See [Column Index Documentation] for more details.
+/// See [Page Index Documentation] for more details.
 ///
-/// [Column Index Documentation]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+/// [Page Index Documentation]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
 pub fn read_columns_indexes<R: ChunkReader>(
     reader: &R,
     chunks: &[ColumnChunkMetaData],
@@ -81,9 +82,9 @@ pub fn read_columns_indexes<R: ChunkReader>(
 /// Return an empty vector if this row group does not contain an
 /// [`OffsetIndex]`.
 ///
-/// See [Column Index Documentation] for more details.
+/// See [Page Index Documentation] for more details.
 ///
-/// [Column Index Documentation]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+/// [Page Index Documentation]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
 pub fn read_pages_locations<R: ChunkReader>(
     reader: &R,
     chunks: &[ColumnChunkMetaData],
@@ -100,6 +101,42 @@ pub fn read_pages_locations<R: ChunkReader>(
     let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
     let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - 
fetch.start)];
 
+    chunks
+        .iter()
+        .map(|c| match c.offset_index_range() {
+            Some(r) => decode_page_locations(get(r)),
+            None => Err(general_err!("missing offset index")),
+        })
+        .collect()
+}
+
+/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by
+/// decoding [`OffsetIndex`] .
+///
+/// Returns a vector of `offset_index[column_number]`.
+///
+/// Returns an empty vector if this row group does not contain an
+/// [`OffsetIndex`].
+///
+/// See [Page Index Documentation] for more details.
+///
+/// [Page Index Documentation]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+pub fn read_offset_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<OffsetIndexMetaData>, ParquetError> {
+    let fetch = chunks
+        .iter()
+        .fold(None, |range, c| acc_range(range, c.offset_index_range()));
+
+    let fetch = match fetch {
+        Some(r) => r,
+        None => return Ok(vec![]),
+    };
+
+    let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
+    let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - 
fetch.start)];
+
     chunks
         .iter()
         .map(|c| match c.offset_index_range() {
@@ -109,7 +146,13 @@ pub fn read_pages_locations<R: ChunkReader>(
         .collect()
 }
 
-pub(crate) fn decode_offset_index(data: &[u8]) -> Result<Vec<PageLocation>, 
ParquetError> {
+pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, 
ParquetError> {
+    let mut prot = TCompactSliceInputProtocol::new(data);
+    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+    OffsetIndexMetaData::try_new(offset)
+}
+
+pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>, 
ParquetError> {
     let mut prot = TCompactSliceInputProtocol::new(data);
     let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
     Ok(offset.page_locations)
diff --git a/parquet/src/file/page_index/mod.rs 
b/parquet/src/file/page_index/mod.rs
index 9372645d76..a8077896db 100644
--- a/parquet/src/file/page_index/mod.rs
+++ b/parquet/src/file/page_index/mod.rs
@@ -21,3 +21,4 @@
 
 pub mod index;
 pub mod index_reader;
+pub mod offset_index;
diff --git a/parquet/src/file/page_index/offset_index.rs 
b/parquet/src/file/page_index/offset_index.rs
new file mode 100644
index 0000000000..9138620e3f
--- /dev/null
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`OffsetIndexMetaData`] structure holding decoded [`OffsetIndex`] 
information
+
+use crate::errors::ParquetError;
+use crate::format::{OffsetIndex, PageLocation};
+
+/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes 
for each page
+/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY 
columns.
+#[derive(Debug, Clone, PartialEq)]
+pub struct OffsetIndexMetaData {
+    pub page_locations: Vec<PageLocation>,
+    pub unencoded_byte_array_data_bytes: Option<Vec<i64>>,
+}
+
+impl OffsetIndexMetaData {
+    /// Creates a new [`OffsetIndexMetaData`] from an [`OffsetIndex`].
+    pub(crate) fn try_new(index: OffsetIndex) -> Result<Self, ParquetError> {
+        Ok(Self {
+            page_locations: index.page_locations,
+            unencoded_byte_array_data_bytes: 
index.unencoded_byte_array_data_bytes,
+        })
+    }
+
+    /// Vector of [`PageLocation`] objects, one per page in the chunk.
+    pub fn page_locations(&self) -> &Vec<PageLocation> {
+        &self.page_locations
+    }
+
+    /// Optional vector of unencoded page sizes, one per page in the chunk. 
Only defined
+    /// for BYTE_ARRAY columns.
+    pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec<i64>> {
+        self.unencoded_byte_array_data_bytes.as_ref()
+    }
+}
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index ac7d2d2874..65b6ebf2ec 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -211,12 +211,22 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         if options.enable_page_index {
             let mut columns_indexes = vec![];
             let mut offset_indexes = vec![];
+            let mut unenc_byte_sizes = vec![];
 
             for rg in &mut filtered_row_groups {
                 let column_index = 
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
-                let offset_index = 
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
+                let offset_index = 
index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
+
+                // split offset_index into two vectors to not break API
+                let mut page_locations = vec![];
+                let mut unenc_bytes = vec![];
+                offset_index.into_iter().for_each(|index| {
+                    page_locations.push(index.page_locations);
+                    unenc_bytes.push(index.unencoded_byte_array_data_bytes);
+                });
                 columns_indexes.push(column_index);
-                offset_indexes.push(offset_index);
+                offset_indexes.push(page_locations);
+                unenc_byte_sizes.push(unenc_bytes);
             }
 
             Ok(Self {
@@ -226,6 +236,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
                     filtered_row_groups,
                     Some(columns_indexes),
                     Some(offset_indexes),
+                    Some(unenc_byte_sizes),
                 )),
                 props: Arc::new(options.props),
             })
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index eb633f31c4..b109a2da8e 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -659,7 +659,8 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
             .set_total_uncompressed_size(metadata.uncompressed_size())
             .set_num_values(metadata.num_values())
             .set_data_page_offset(map_offset(src_data_offset))
-            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset));
+            .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
+            
.set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
 
         if let Some(statistics) = metadata.statistics() {
             builder = builder.set_statistics(statistics.clone())
@@ -827,7 +828,7 @@ mod tests {
     use crate::column::page::{Page, PageReader};
     use crate::column::reader::get_typed_column_reader;
     use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
-    use crate::data_type::{BoolType, Int32Type};
+    use crate::data_type::{BoolType, ByteArrayType, Int32Type};
     use crate::file::page_index::index::Index;
     use crate::file::properties::EnabledStatistics;
     use crate::file::serialized_reader::ReadOptionsBuilder;
@@ -840,6 +841,7 @@ mod tests {
     use crate::record::{Row, RowAccessor};
     use crate::schema::parser::parse_message_type;
     use crate::schema::types::{ColumnDescriptor, ColumnPath};
+    use crate::util::test_common::rand_gen::RandGen;
 
     #[test]
     fn test_row_group_writer_error_not_all_columns_written() {
@@ -1851,4 +1853,83 @@ mod tests {
         let b_idx = &column_index[0][1];
         assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
     }
+
+    #[test]
+    fn test_byte_array_size_statistics() {
+        let message_type = "
+            message test_schema {
+                OPTIONAL BYTE_ARRAY a (UTF8);
+            }
+        ";
+        let schema = Arc::new(parse_message_type(message_type).unwrap());
+        let data = ByteArrayType::gen_vec(32, 7);
+        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
+        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
+        let file: File = tempfile::tempfile().unwrap();
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Page)
+                .build(),
+        );
+
+        let mut writer = SerializedFileWriter::new(&file, schema, 
props).unwrap();
+        let mut row_group_writer = writer.next_row_group().unwrap();
+
+        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+        col_writer
+            .typed::<ByteArrayType>()
+            .write_batch(&data, Some(&def_levels), None)
+            .unwrap();
+        col_writer.close().unwrap();
+        row_group_writer.close().unwrap();
+        let file_metadata = writer.close().unwrap();
+
+        assert_eq!(file_metadata.row_groups.len(), 1);
+        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+        let meta_data = file_metadata.row_groups[0].columns[0]
+            .meta_data
+            .as_ref()
+            .unwrap();
+        assert!(meta_data.size_statistics.is_some());
+        let size_stats = meta_data.size_statistics.as_ref().unwrap();
+
+        assert!(size_stats.repetition_level_histogram.is_none());
+        assert!(size_stats.definition_level_histogram.is_none());
+        assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
+        assert_eq!(
+            unenc_size,
+            size_stats.unencoded_byte_array_data_bytes.unwrap()
+        );
+
+        // check that the read metadata is also correct
+        let options = ReadOptionsBuilder::new().with_page_index().build();
+        let reader = SerializedFileReader::new_with_options(file, 
options).unwrap();
+
+        let rfile_metadata = reader.metadata().file_metadata();
+        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+        assert_eq!(reader.num_row_groups(), 1);
+        let rowgroup = reader.get_row_group(0).unwrap();
+        assert_eq!(rowgroup.num_columns(), 1);
+        let column = rowgroup.metadata().column(0);
+        assert!(column.unencoded_byte_array_data_bytes().is_some());
+        assert_eq!(
+            unenc_size,
+            column.unencoded_byte_array_data_bytes().unwrap()
+        );
+
+        assert!(reader
+            .metadata()
+            .unencoded_byte_array_data_bytes()
+            .is_some());
+        let unenc_sizes = 
reader.metadata().unencoded_byte_array_data_bytes().unwrap();
+        assert_eq!(unenc_sizes.len(), 1);
+        assert_eq!(unenc_sizes[0].len(), 1);
+        assert!(unenc_sizes[0][0].is_some());
+        let page_sizes = unenc_sizes[0][0].as_ref().unwrap();
+        assert_eq!(page_sizes.len(), 1);
+        assert_eq!(page_sizes[0], unenc_size);
+    }
 }


Reply via email to