This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b06ffceaa Add support for level histograms added in PARQUET-2261 to 
`ParquetMetaData` (#6105)
b06ffceaa is described below

commit b06ffceaab2b59edc098d86f75b2a5125a8352ee
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Jul 26 10:37:30 2024 -0700

    Add support for level histograms added in PARQUET-2261 to `ParquetMetaData` 
(#6105)
    
    * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
    
    * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    * fix example tests
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    ---------
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    * Remove `impl<T: AsRef<[u8]>> From<T> for Buffer`  that easily 
accidentally copies data (#6043)
    
    * deprecate auto copy, ask explicit reference
    
    * update comments
    
    * make cargo doc happy
    
    * Make display of interval types more pretty (#6006)
    
    * improve dispaly for interval.
    
    * update test in pretty, and fix display problem.
    
    * tmp
    
    * fix tests in arrow-cast.
    
    * fix tests in pretty.
    
    * fix style.
    
    * Update snafu (#5930)
    
    * Update Parquet thrift generated structures (#6045)
    
    * update to latest thrift (as of 11 Jul 2024) from parquet-format
    
    * pass None for optional size statistics
    
    * escape HTML tags
    
    * don't need to escape brackets in arrays
    
    * Revert "Revert "Write Bloom filters between row groups instead of the end 
 (#…" (#5933)
    
    This reverts commit 22e0b4432c9838f2536284015271d3de9a165135.
    
    * Revert "Update snafu (#5930)" (#6069)
    
    This reverts commit 756b1fb26d1702f36f446faf9bb40a4869c3e840.
    
    * Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
    
    * Update pyo3 requirement from 0.21.1 to 0.22.1
    
    Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit 
the latest version.
    - [Release notes](https://github.com/pyo3/pyo3/releases)
    - [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md)
    - [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.1)
    
    ---
    updated-dependencies:
    - dependency-name: pyo3
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <[email protected]>
    
    * refactor: remove deprecated `FromPyArrow::from_pyarrow`
    
    "GIL Refs" are being phased out.
    
    * chore: update `pyo3` in integration tests
    
    ---------
    
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    
    * remove repeated codes to make the codes more concise. (#6080)
    
    * Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
    
    * update to latest thrift (as of 11 Jul 2024) from parquet-format
    
    * pass None for optional size statistics
    
    * escape HTML tags
    
    * don't need to escape brackets in arrays
    
    * add support for unencoded_byte_array_data_bytes
    
    * add comments
    
    * change sig of ColumnMetrics::update_variable_length_bytes()
    
    * rename ParquetOffsetIndex to OffsetSizeIndex
    
    * rename some functions
    
    * suggestion from review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * add Default trait to ColumnMetrics as suggested in review
    
    * rename OffsetSizeIndex to OffsetIndexMetaData
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * deprecate read_page_locations
    
    * add level histograms to metadata
    
    * add to_thrift() to OffsetIndexMetaData
    
    * Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
    
    Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit 
the latest version.
    - [Release notes](https://github.com/pyo3/pyo3/releases)
    - [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md)
    - [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.2)
    
    ---
    updated-dependencies:
    - dependency-name: pyo3
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    
    * Deprecate read_page_locations() and simplify offset index in 
`ParquetMetaData` (#6095)
    
    * deprecate read_page_locations
    
    * add to_thrift() to OffsetIndexMetaData
    
    * move valid test into ColumnIndexBuilder::append_histograms
    
    * move update_histogram() inside ColumnMetrics
    
    * Update parquet/src/column/writer/mod.rs
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    * Implement LevelHistograms as a struct
    
    * formatting
    
    * fix error in docs
    
    ---------
    
    Signed-off-by: Bugen Zhao <[email protected]>
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: Bugen Zhao <[email protected]>
    Co-authored-by: Xiangpeng Hao <[email protected]>
    Co-authored-by: kamille <[email protected]>
    Co-authored-by: Jesse <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
    Co-authored-by: Marco Neumann <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
---
 parquet/src/column/writer/mod.rs     | 137 ++++++++++++++++++++--
 parquet/src/file/metadata/memory.rs  |   2 +
 parquet/src/file/metadata/mod.rs     | 217 +++++++++++++++++++++++++++++++++--
 parquet/src/file/page_index/index.rs |  87 +++++++++++---
 parquet/src/file/writer.rs           | 144 ++++++++++++++++++++++-
 5 files changed, 550 insertions(+), 37 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 2c0c957d8..54d8fd3cc 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
 use crate::data_type::*;
 use crate::encodings::levels::LevelEncoder;
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
+use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, 
OffsetIndexBuilder};
 use crate::file::properties::EnabledStatistics;
 use crate::file::statistics::{Statistics, ValueStatistics};
 use crate::file::{
@@ -189,6 +189,54 @@ struct PageMetrics {
     num_buffered_values: u32,
     num_buffered_rows: u32,
     num_page_nulls: u64,
+    repetition_level_histogram: Option<LevelHistogram>,
+    definition_level_histogram: Option<LevelHistogram>,
+}
+
+impl PageMetrics {
+    fn new() -> Self {
+        Default::default()
+    }
+
+    /// Initialize the repetition level histogram
+    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
+        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
+        self
+    }
+
+    /// Initialize the definition level histogram
+    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
+        self.definition_level_histogram = LevelHistogram::try_new(max_level);
+        self
+    }
+
+    /// Resets the state of this `PageMetrics` to the initial state.
+    /// If histograms have been initialized their contents will be reset to 
zero.
+    fn new_page(&mut self) {
+        self.num_buffered_values = 0;
+        self.num_buffered_rows = 0;
+        self.num_page_nulls = 0;
+        self.repetition_level_histogram
+            .as_mut()
+            .map(LevelHistogram::reset);
+        self.definition_level_histogram
+            .as_mut()
+            .map(LevelHistogram::reset);
+    }
+
+    /// Updates histogram values using provided repetition levels
+    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
+        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
+            rep_hist.update_from_levels(levels);
+        }
+    }
+
+    /// Updates histogram values using provided definition levels
+    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
+        if let Some(ref mut def_hist) = self.definition_level_histogram {
+            def_hist.update_from_levels(levels);
+        }
+    }
 }
 
 // Metrics per column writer
@@ -206,6 +254,8 @@ struct ColumnMetrics<T: Default> {
     num_column_nulls: u64,
     column_distinct_count: Option<u64>,
     variable_length_bytes: Option<i64>,
+    repetition_level_histogram: Option<LevelHistogram>,
+    definition_level_histogram: Option<LevelHistogram>,
 }
 
 impl<T: Default> ColumnMetrics<T> {
@@ -213,6 +263,41 @@ impl<T: Default> ColumnMetrics<T> {
         Default::default()
     }
 
+    /// Initialize the repetition level histogram
+    fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
+        self.repetition_level_histogram = LevelHistogram::try_new(max_level);
+        self
+    }
+
+    /// Initialize the definition level histogram
+    fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
+        self.definition_level_histogram = LevelHistogram::try_new(max_level);
+        self
+    }
+
+    /// Sum `page_histogram` into `chunk_histogram`
+    fn update_histogram(
+        chunk_histogram: &mut Option<LevelHistogram>,
+        page_histogram: &Option<LevelHistogram>,
+    ) {
+        if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, 
chunk_histogram) {
+            chunk_hist.add(page_hist);
+        }
+    }
+
+    /// Sum the provided PageMetrics histograms into the chunk histograms. 
Does nothing if
+    /// page histograms are not initialized.
+    fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
+        ColumnMetrics::<T>::update_histogram(
+            &mut self.definition_level_histogram,
+            &page_metrics.definition_level_histogram,
+        );
+        ColumnMetrics::<T>::update_histogram(
+            &mut self.repetition_level_histogram,
+            &page_metrics.repetition_level_histogram,
+        );
+    }
+
     /// Sum the provided page variable_length_bytes into the chunk 
variable_length_bytes
     fn update_variable_length_bytes(&mut self, variable_length_bytes: 
Option<i64>) {
         if let Some(var_bytes) = variable_length_bytes {
@@ -275,6 +360,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         // Used for level information
         encodings.insert(Encoding::RLE);
 
+        let mut page_metrics = PageMetrics::new();
+        let mut column_metrics = ColumnMetrics::<E::T>::new();
+
+        // Initialize level histograms if collecting page or chunk statistics
+        if statistics_enabled != EnabledStatistics::None {
+            page_metrics = page_metrics
+                .with_repetition_level_histogram(descr.max_rep_level())
+                .with_definition_level_histogram(descr.max_def_level());
+            column_metrics = column_metrics
+                .with_repetition_level_histogram(descr.max_rep_level())
+                .with_definition_level_histogram(descr.max_def_level())
+        }
+
         // Disable column_index_builder if not collecting page statistics.
         let mut column_index_builder = ColumnIndexBuilder::new();
         if statistics_enabled != EnabledStatistics::Page {
@@ -292,12 +390,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
             def_levels_sink: vec![],
             rep_levels_sink: vec![],
             data_pages: VecDeque::new(),
-            page_metrics: PageMetrics {
-                num_buffered_values: 0,
-                num_buffered_rows: 0,
-                num_page_nulls: 0,
-            },
-            column_metrics: ColumnMetrics::<E::T>::new(),
+            page_metrics,
+            column_metrics,
             column_index_builder,
             offset_index_builder: OffsetIndexBuilder::new(),
             encodings,
@@ -547,6 +641,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 }
             }
 
+            // Update histogram
+            self.page_metrics.update_definition_level_histogram(levels);
+
             self.def_levels_sink.extend_from_slice(levels);
             values_to_write
         } else {
@@ -575,6 +672,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 self.page_metrics.num_buffered_rows += (level == 0) as u32
             }
 
+            // Update histogram
+            self.page_metrics.update_repetition_level_histogram(levels);
+
             self.rep_levels_sink.extend_from_slice(levels);
         } else {
             // Each value is exactly one row.
@@ -718,7 +818,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
                 }
             }
         }
-        // update the offset index
+
+        // Append page histograms to the `ColumnIndex` histograms
+        self.column_index_builder.append_histograms(
+            &self.page_metrics.repetition_level_histogram,
+            &self.page_metrics.definition_level_histogram,
+        );
+
+        // Update the offset index
         self.offset_index_builder
             .append_row_count(self.page_metrics.num_buffered_rows as i64);
 
@@ -804,7 +911,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             values_data.variable_length_bytes,
         );
 
-        // Update variable_length_bytes in column_metrics
+        // Update histograms and variable_length_bytes in column_metrics
+        self.column_metrics
+            .update_from_page_metrics(&self.page_metrics);
         self.column_metrics
             .update_variable_length_bytes(values_data.variable_length_bytes);
 
@@ -911,7 +1020,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         // Reset state.
         self.rep_levels_sink.clear();
         self.def_levels_sink.clear();
-        self.page_metrics = PageMetrics::default();
+        self.page_metrics.new_page();
 
         Ok(())
     }
@@ -1019,7 +1128,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
             builder = builder
                 .set_statistics(statistics)
-                
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
+                
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
+                .set_repetition_level_histogram(
+                    self.column_metrics.repetition_level_histogram.take(),
+                )
+                .set_definition_level_histogram(
+                    self.column_metrics.definition_level_histogram.take(),
+                );
         }
 
         let metadata = builder.build()?;
diff --git a/parquet/src/file/metadata/memory.rs 
b/parquet/src/file/metadata/memory.rs
index 0b6d1f0d1..bb822b4cc 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -99,6 +99,8 @@ impl HeapSize for ColumnChunkMetaData {
             + self.statistics.heap_size()
             + self.encoding_stats.heap_size()
             + self.unencoded_byte_array_data_bytes.heap_size()
+            + self.repetition_level_histogram.heap_size()
+            + self.definition_level_histogram.heap_size()
     }
 }
 
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 52206e66a..cd3555de8 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -557,6 +557,114 @@ pub struct ColumnChunkMetaData {
     column_index_offset: Option<i64>,
     column_index_length: Option<i32>,
     unencoded_byte_array_data_bytes: Option<i64>,
+    repetition_level_histogram: Option<LevelHistogram>,
+    definition_level_histogram: Option<LevelHistogram>,
+}
+
+/// Histograms for repetition and definition levels.
+///
+/// Each histogram is a vector of length `max_level + 1`. The value at index 
`i` is the number of
+/// values at level `i`.
+///
+/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the
+/// number of rows with level 1, and so on.
+///
+#[derive(Debug, Clone, PartialEq)]
+pub struct LevelHistogram {
+    inner: Vec<i64>,
+}
+
+impl LevelHistogram {
+    /// Creates a new level histogram data.
+    ///
+    /// Length will be `max_level + 1`.
+    ///
+    /// Returns `None` when `max_level == 0` (because histograms are not 
necessary in this case)
+    pub fn try_new(max_level: i16) -> Option<Self> {
+        if max_level > 0 {
+            Some(Self {
+                inner: vec![0; max_level as usize + 1],
+            })
+        } else {
+            None
+        }
+    }
+    /// Returns a reference to the the histogram's values.
+    pub fn values(&self) -> &[i64] {
+        &self.inner
+    }
+
+    /// Return the inner vector, consuming self
+    pub fn into_inner(self) -> Vec<i64> {
+        self.inner
+    }
+
+    /// Returns the histogram value at the given index.
+    ///
+    /// The value of `i` is the number of values with level `i`. For example,
+    /// `get(1)` returns the number of values with level 1.
+    ///
+    /// Returns `None` if the index is out of bounds.
+    pub fn get(&self, index: usize) -> Option<i64> {
+        self.inner.get(index).copied()
+    }
+
+    /// Adds the values from the other histogram to this histogram
+    ///
+    /// # Panics
+    /// If the histograms have different lengths
+    pub fn add(&mut self, other: &Self) {
+        assert_eq!(self.len(), other.len());
+        for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
+            *dst += src;
+        }
+    }
+
+    /// return the length of the histogram
+    pub fn len(&self) -> usize {
+        self.inner.len()
+    }
+
+    /// returns if the histogram is empty
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
+    }
+
+    /// Sets the values of all histogram levels to 0.
+    pub fn reset(&mut self) {
+        for value in self.inner.iter_mut() {
+            *value = 0;
+        }
+    }
+
+    /// Updates histogram values using provided repetition levels
+    ///
+    /// # Panics
+    /// if any of the levels is greater than the length of the histogram (
+    /// the argument supplied to [`Self::try_new`])
+    pub fn update_from_levels(&mut self, levels: &[i16]) {
+        for &level in levels {
+            self.inner[level as usize] += 1;
+        }
+    }
+}
+
+impl From<Vec<i64>> for LevelHistogram {
+    fn from(inner: Vec<i64>) -> Self {
+        Self { inner }
+    }
+}
+
+impl From<LevelHistogram> for Vec<i64> {
+    fn from(value: LevelHistogram) -> Self {
+        value.into_inner()
+    }
+}
+
+impl HeapSize for LevelHistogram {
+    fn heap_size(&self) -> usize {
+        self.inner.heap_size()
+    }
 }
 
 /// Represents common operations for a column chunk.
@@ -717,6 +825,24 @@ impl ColumnChunkMetaData {
         self.unencoded_byte_array_data_bytes
     }
 
+    /// Returns the repetition level histogram.
+    ///
+    /// The returned value `vec[i]` is how many values are at repetition level 
`i`. For example,
+    /// `vec[0]` indicates how many rows the page contains.
+    /// This field may not be set by older writers.
+    pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
+        self.repetition_level_histogram.as_ref()
+    }
+
+    /// Returns the definition level histogram.
+    ///
+    /// The returned value `vec[i]` is how many values are at definition level 
`i`. For example,
+    /// `vec[max_definition_level]` indicates how many non-null values are 
present in the page.
+    /// This field may not be set by older writers.
+    pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
+        self.definition_level_histogram.as_ref()
+    }
+
     /// Method to convert from Thrift.
     pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> 
Result<Self> {
         if cc.meta_data.is_none() {
@@ -754,13 +880,23 @@ impl ColumnChunkMetaData {
         let offset_index_length = cc.offset_index_length;
         let column_index_offset = cc.column_index_offset;
         let column_index_length = cc.column_index_length;
-        let unencoded_byte_array_data_bytes = if let Some(size_stats) = 
col_metadata.size_statistics
-        {
-            size_stats.unencoded_byte_array_data_bytes
+        let (
+            unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
+        ) = if let Some(size_stats) = col_metadata.size_statistics {
+            (
+                size_stats.unencoded_byte_array_data_bytes,
+                size_stats.repetition_level_histogram,
+                size_stats.definition_level_histogram,
+            )
         } else {
-            None
+            (None, None, None)
         };
 
+        let repetition_level_histogram = 
repetition_level_histogram.map(LevelHistogram::from);
+        let definition_level_histogram = 
definition_level_histogram.map(LevelHistogram::from);
+
         let result = ColumnChunkMetaData {
             column_descr,
             encodings,
@@ -782,6 +918,8 @@ impl ColumnChunkMetaData {
             column_index_offset,
             column_index_length,
             unencoded_byte_array_data_bytes,
+            repetition_level_histogram,
+            definition_level_histogram,
         };
         Ok(result)
     }
@@ -805,11 +943,24 @@ impl ColumnChunkMetaData {
 
     /// Method to convert to Thrift `ColumnMetaData`
     pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
-        let size_statistics = if 
self.unencoded_byte_array_data_bytes.is_some() {
+        let size_statistics = if self.unencoded_byte_array_data_bytes.is_some()
+            || self.repetition_level_histogram.is_some()
+            || self.definition_level_histogram.is_some()
+        {
+            let repetition_level_histogram = self
+                .repetition_level_histogram
+                .as_ref()
+                .map(|hist| hist.clone().into_inner());
+
+            let definition_level_histogram = self
+                .definition_level_histogram
+                .as_ref()
+                .map(|hist| hist.clone().into_inner());
+
             Some(SizeStatistics {
                 unencoded_byte_array_data_bytes: 
self.unencoded_byte_array_data_bytes,
-                repetition_level_histogram: None,
-                definition_level_histogram: None,
+                repetition_level_histogram,
+                definition_level_histogram,
             })
         } else {
             None
@@ -871,6 +1022,8 @@ impl ColumnChunkMetaDataBuilder {
             column_index_offset: None,
             column_index_length: None,
             unencoded_byte_array_data_bytes: None,
+            repetition_level_histogram: None,
+            definition_level_histogram: None,
         })
     }
 
@@ -988,6 +1141,18 @@ impl ColumnChunkMetaDataBuilder {
         self
     }
 
+    /// Sets optional repetition level histogram
+    pub fn set_repetition_level_histogram(mut self, value: 
Option<LevelHistogram>) -> Self {
+        self.0.repetition_level_histogram = value;
+        self
+    }
+
+    /// Sets optional repetition level histogram
+    pub fn set_definition_level_histogram(mut self, value: 
Option<LevelHistogram>) -> Self {
+        self.0.definition_level_histogram = value;
+        self
+    }
+
     /// Builds column chunk metadata.
     pub fn build(self) -> Result<ColumnChunkMetaData> {
         Ok(self.0)
@@ -1003,6 +1168,10 @@ pub struct ColumnIndexBuilder {
     max_values: Vec<Vec<u8>>,
     null_counts: Vec<i64>,
     boundary_order: BoundaryOrder,
+    /// contains the concatenation of the histograms of all pages
+    repetition_level_histograms: Option<Vec<i64>>,
+    /// contains the concatenation of the histograms of all pages
+    definition_level_histograms: Option<Vec<i64>>,
     /// Is the information in the builder valid?
     ///
     /// Set to `false` if any entry in the page doesn't have statistics for
@@ -1027,6 +1196,8 @@ impl ColumnIndexBuilder {
             max_values: Vec::new(),
             null_counts: Vec::new(),
             boundary_order: BoundaryOrder::UNORDERED,
+            repetition_level_histograms: None,
+            definition_level_histograms: None,
             valid: true,
         }
     }
@@ -1045,6 +1216,28 @@ impl ColumnIndexBuilder {
         self.null_counts.push(null_count);
     }
 
+    /// Append the given page-level histograms to the [`ColumnIndex`] 
histograms.
+    /// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state.
+    pub fn append_histograms(
+        &mut self,
+        repetition_level_histogram: &Option<LevelHistogram>,
+        definition_level_histogram: &Option<LevelHistogram>,
+    ) {
+        if !self.valid {
+            return;
+        }
+        if let Some(ref rep_lvl_hist) = repetition_level_histogram {
+            let hist = 
self.repetition_level_histograms.get_or_insert(Vec::new());
+            hist.reserve(rep_lvl_hist.len());
+            hist.extend(rep_lvl_hist.values());
+        }
+        if let Some(ref def_lvl_hist) = definition_level_histogram {
+            let hist = 
self.definition_level_histograms.get_or_insert(Vec::new());
+            hist.reserve(def_lvl_hist.len());
+            hist.extend(def_lvl_hist.values());
+        }
+    }
+
     pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
         self.boundary_order = boundary_order;
     }
@@ -1069,8 +1262,8 @@ impl ColumnIndexBuilder {
             self.max_values,
             self.boundary_order,
             self.null_counts,
-            None,
-            None,
+            self.repetition_level_histograms,
+            self.definition_level_histograms,
         )
     }
 }
@@ -1286,6 +1479,8 @@ mod tests {
             .set_column_index_offset(Some(8000))
             .set_column_index_length(Some(25))
             .set_unencoded_byte_array_data_bytes(Some(2000))
+            
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
+            .set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 
200])))
             .build()
             .unwrap();
 
@@ -1397,7 +1592,7 @@ mod tests {
         let row_group_meta_with_stats = vec![row_group_meta_with_stats];
 
         let parquet_meta = ParquetMetaData::new(file_metadata.clone(), 
row_group_meta_with_stats);
-        let base_expected_size = 2088;
+        let base_expected_size = 2280;
 
         assert_eq!(parquet_meta.memory_size(), base_expected_size);
 
@@ -1425,7 +1620,7 @@ mod tests {
             ]]),
         );
 
-        let bigger_expected_size = 2400;
+        let bigger_expected_size = 2784;
         // more set fields means more memory usage
         assert!(bigger_expected_size > base_expected_size);
         assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
diff --git a/parquet/src/file/page_index/index.rs 
b/parquet/src/file/page_index/index.rs
index 7eba94904..68412572b 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -36,6 +36,17 @@ pub struct PageIndex<T> {
     pub max: Option<T>,
     /// Null values in the page
     pub null_count: Option<i64>,
+    /// Repetition level histogram for the page
+    ///
+    /// `repetition_level_histogram[i]` is a count of how many values are at 
repetition level `i`.
+    /// For example, `repetition_level_histogram[0]` indicates how many rows 
the page contains.
+    pub repetition_level_histogram: Option<Vec<i64>>,
+    /// Definition level histogram for the page
+    ///
+    /// `definition_level_histogram[i]` is a count of how many values are at 
definition level `i`.
+    /// For example, `definition_level_histogram[max_definition_level]` 
indicates how many
+    /// non-null values are present in the page.
+    pub definition_level_histogram: Option<Vec<i64>>,
 }
 
 impl<T> PageIndex<T> {
@@ -48,6 +59,12 @@ impl<T> PageIndex<T> {
     pub fn null_count(&self) -> Option<i64> {
         self.null_count
     }
+    pub fn repetition_level_histogram(&self) -> Option<&Vec<i64>> {
+        self.repetition_level_histogram.as_ref()
+    }
+    pub fn definition_level_histogram(&self) -> Option<&Vec<i64>> {
+        self.definition_level_histogram.as_ref()
+    }
 }
 
 impl<T> PageIndex<T>
@@ -149,26 +166,57 @@ impl<T: ParquetValueType> NativeIndex<T> {
             .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
             .unwrap_or_else(|| vec![None; len]);
 
+        // histograms are a 1D array encoding a 2D num_pages X num_levels 
matrix.
+        let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
+            if let Some(hist) = opt_hist {
+                // TODO: should we assert (hist.len() % len) == 0?
+                let num_levels = hist.len() / len;
+                let mut res = Vec::with_capacity(len);
+                for i in 0..len {
+                    let page_idx = i * num_levels;
+                    let page_hist = hist[page_idx..page_idx + 
num_levels].to_vec();
+                    res.push(Some(page_hist));
+                }
+                res
+            } else {
+                vec![None; len]
+            }
+        };
+
+        let rep_hists: Vec<Option<Vec<i64>>> =
+            to_page_histograms(index.repetition_level_histograms);
+        let def_hists: Vec<Option<Vec<i64>>> =
+            to_page_histograms(index.definition_level_histograms);
+
         let indexes = index
             .min_values
             .iter()
             .zip(index.max_values.into_iter())
             .zip(index.null_pages.into_iter())
             .zip(null_counts.into_iter())
-            .map(|(((min, max), is_null), null_count)| {
-                let (min, max) = if is_null {
-                    (None, None)
-                } else {
-                    let min = min.as_slice();
-                    let max = max.as_slice();
-                    (Some(from_le_slice::<T>(min)), 
Some(from_le_slice::<T>(max)))
-                };
-                Ok(PageIndex {
-                    min,
-                    max,
-                    null_count,
-                })
-            })
+            .zip(rep_hists.into_iter())
+            .zip(def_hists.into_iter())
+            .map(
+                |(
+                    ((((min, max), is_null), null_count), 
repetition_level_histogram),
+                    definition_level_histogram,
+                )| {
+                    let (min, max) = if is_null {
+                        (None, None)
+                    } else {
+                        let min = min.as_slice();
+                        let max = max.as_slice();
+                        (Some(from_le_slice::<T>(min)), 
Some(from_le_slice::<T>(max)))
+                    };
+                    Ok(PageIndex {
+                        min,
+                        max,
+                        null_count,
+                        repetition_level_histogram,
+                        definition_level_histogram,
+                    })
+                },
+            )
             .collect::<Result<Vec<_>, ParquetError>>()?;
 
         Ok(Self {
@@ -188,6 +236,8 @@ mod tests {
             min: Some(-123),
             max: Some(234),
             null_count: Some(0),
+            repetition_level_histogram: Some(vec![1, 2]),
+            definition_level_histogram: Some(vec![1, 2, 3]),
         };
 
         assert_eq!(page_index.min().unwrap(), &-123);
@@ -195,6 +245,11 @@ mod tests {
         assert_eq!(page_index.min_bytes().unwrap(), (-123).as_bytes());
         assert_eq!(page_index.max_bytes().unwrap(), 234.as_bytes());
         assert_eq!(page_index.null_count().unwrap(), 0);
+        assert_eq!(page_index.repetition_level_histogram(), Some(&vec![1, 2]));
+        assert_eq!(
+            page_index.definition_level_histogram(),
+            Some(&vec![1, 2, 3])
+        );
     }
 
     #[test]
@@ -203,6 +258,8 @@ mod tests {
             min: None,
             max: None,
             null_count: None,
+            repetition_level_histogram: None,
+            definition_level_histogram: None,
         };
 
         assert_eq!(page_index.min(), None);
@@ -210,5 +267,7 @@ mod tests {
         assert_eq!(page_index.min_bytes(), None);
         assert_eq!(page_index.max_bytes(), None);
         assert_eq!(page_index.null_count(), None);
+        assert_eq!(page_index.repetition_level_histogram(), None);
+        assert_eq!(page_index.definition_level_histogram(), None);
     }
 }
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c44a7e669..f2e8f74a3 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -663,6 +663,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
             .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
             
.set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
 
+        if let Some(rep_hist) = metadata.repetition_level_histogram() {
+            builder = 
builder.set_repetition_level_histogram(Some(rep_hist.clone()))
+        }
+        if let Some(def_hist) = metadata.definition_level_histogram() {
+            builder = 
builder.set_definition_level_histogram(Some(def_hist.clone()))
+        }
         if let Some(statistics) = metadata.statistics() {
             builder = builder.set_statistics(statistics.clone())
         }
@@ -1889,6 +1895,12 @@ mod tests {
         assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
         assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
 
+        let check_def_hist = |def_hist: &[i64]| {
+            assert_eq!(def_hist.len(), 2);
+            assert_eq!(def_hist[0], 3);
+            assert_eq!(def_hist[1], 7);
+        };
+
         assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
         let meta_data = file_metadata.row_groups[0].columns[0]
             .meta_data
@@ -1898,12 +1910,13 @@ mod tests {
         let size_stats = meta_data.size_statistics.as_ref().unwrap();
 
         assert!(size_stats.repetition_level_histogram.is_none());
-        assert!(size_stats.definition_level_histogram.is_none());
+        assert!(size_stats.definition_level_histogram.is_some());
         assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
         assert_eq!(
             unenc_size,
             size_stats.unencoded_byte_array_data_bytes.unwrap()
         );
+        
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
 
         // check that the read metadata is also correct
         let options = ReadOptionsBuilder::new().with_page_index().build();
@@ -1915,12 +1928,31 @@ mod tests {
         let rowgroup = reader.get_row_group(0).unwrap();
         assert_eq!(rowgroup.num_columns(), 1);
         let column = rowgroup.metadata().column(0);
+        assert!(column.definition_level_histogram().is_some());
+        assert!(column.repetition_level_histogram().is_none());
         assert!(column.unencoded_byte_array_data_bytes().is_some());
+        check_def_hist(column.definition_level_histogram().unwrap().values());
         assert_eq!(
             unenc_size,
             column.unencoded_byte_array_data_bytes().unwrap()
         );
 
+        // check histogram in column index as well
+        assert!(reader.metadata().column_index().is_some());
+        let column_index = reader.metadata().column_index().unwrap();
+        assert_eq!(column_index.len(), 1);
+        assert_eq!(column_index[0].len(), 1);
+        let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
+            assert_eq!(index.indexes.len(), 1);
+            &index.indexes[0]
+        } else {
+            unreachable!()
+        };
+
+        assert!(col_idx.repetition_level_histogram().is_none());
+        assert!(col_idx.definition_level_histogram().is_some());
+        check_def_hist(col_idx.definition_level_histogram().unwrap());
+
         assert!(reader.metadata().offset_index().is_some());
         let offset_index = reader.metadata().offset_index().unwrap();
         assert_eq!(offset_index.len(), 1);
@@ -1933,4 +1965,114 @@ mod tests {
         assert_eq!(page_sizes.len(), 1);
         assert_eq!(page_sizes[0], unenc_size);
     }
+
+    #[test]
+    fn test_size_statistics_with_repetition_and_nulls() {
+        let message_type = "
+            message test_schema {
+                OPTIONAL group i32_list (LIST) {
+                    REPEATED group list {
+                        OPTIONAL INT32 element;
+                    }
+                }
+            }
+        ";
+        // column is:
+        // row 0: [1, 2]
+        // row 1: NULL
+        // row 2: [4, NULL]
+        // row 3: []
+        // row 4: [7, 8, 9, 10]
+        let schema = Arc::new(parse_message_type(message_type).unwrap());
+        let data = [1, 2, 4, 7, 8, 9, 10];
+        let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
+        let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
+        let file = tempfile::tempfile().unwrap();
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Page)
+                .build(),
+        );
+        let mut writer = SerializedFileWriter::new(&file, schema, 
props).unwrap();
+        let mut row_group_writer = writer.next_row_group().unwrap();
+
+        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+        col_writer
+            .typed::<Int32Type>()
+            .write_batch(&data, Some(&def_levels), Some(&rep_levels))
+            .unwrap();
+        col_writer.close().unwrap();
+        row_group_writer.close().unwrap();
+        let file_metadata = writer.close().unwrap();
+
+        assert_eq!(file_metadata.row_groups.len(), 1);
+        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+        let check_def_hist = |def_hist: &[i64]| {
+            assert_eq!(def_hist.len(), 4);
+            assert_eq!(def_hist[0], 1);
+            assert_eq!(def_hist[1], 1);
+            assert_eq!(def_hist[2], 1);
+            assert_eq!(def_hist[3], 7);
+        };
+
+        let check_rep_hist = |rep_hist: &[i64]| {
+            assert_eq!(rep_hist.len(), 2);
+            assert_eq!(rep_hist[0], 5);
+            assert_eq!(rep_hist[1], 5);
+        };
+
+        // check that histograms are set properly in the write and read 
metadata
+        // also check that unencoded_byte_array_data_bytes is not set
+        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+        let meta_data = file_metadata.row_groups[0].columns[0]
+            .meta_data
+            .as_ref()
+            .unwrap();
+        assert!(meta_data.size_statistics.is_some());
+        let size_stats = meta_data.size_statistics.as_ref().unwrap();
+        assert!(size_stats.repetition_level_histogram.is_some());
+        assert!(size_stats.definition_level_histogram.is_some());
+        assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
+        
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
+        
check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
+
+        // check that the read metadata is also correct
+        let options = ReadOptionsBuilder::new().with_page_index().build();
+        let reader = SerializedFileReader::new_with_options(file, 
options).unwrap();
+
+        let rfile_metadata = reader.metadata().file_metadata();
+        assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+        assert_eq!(reader.num_row_groups(), 1);
+        let rowgroup = reader.get_row_group(0).unwrap();
+        assert_eq!(rowgroup.num_columns(), 1);
+        let column = rowgroup.metadata().column(0);
+        assert!(column.definition_level_histogram().is_some());
+        assert!(column.repetition_level_histogram().is_some());
+        assert!(column.unencoded_byte_array_data_bytes().is_none());
+        check_def_hist(column.definition_level_histogram().unwrap().values());
+        check_rep_hist(column.repetition_level_histogram().unwrap().values());
+
+        // check histogram in column index as well
+        assert!(reader.metadata().column_index().is_some());
+        let column_index = reader.metadata().column_index().unwrap();
+        assert_eq!(column_index.len(), 1);
+        assert_eq!(column_index[0].len(), 1);
+        let col_idx = if let Index::INT32(index) = &column_index[0][0] {
+            assert_eq!(index.indexes.len(), 1);
+            &index.indexes[0]
+        } else {
+            unreachable!()
+        };
+
+        check_def_hist(col_idx.definition_level_histogram().unwrap());
+        check_rep_hist(col_idx.repetition_level_histogram().unwrap());
+
+        assert!(reader.metadata().offset_index().is_some());
+        let offset_index = reader.metadata().offset_index().unwrap();
+        assert_eq!(offset_index.len(), 1);
+        assert_eq!(offset_index[0].len(), 1);
+        assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
+    }
 }


Reply via email to