This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this
push:
new 3c353e28bf [thrift-remodel] Decoding of page indexes (#8160)
3c353e28bf is described below
commit 3c353e28bf0c4cd62a6aa814f034ca817f1a8cdb
Author: Ed Seidl <[email protected]>
AuthorDate: Wed Aug 20 12:19:28 2025 -0700
[thrift-remodel] Decoding of page indexes (#8160)
# Which issue does this PR close?
**Note: this targets a feature branch, not main**
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
- Part of #5854.
# Rationale for this change
Speed
# What changes are included in this PR?
Still a work in progress, but begins the process of converting page
index parsing to the new thrift decoder.
# Are these changes tested?
This PR actually uses the new decoder when parsing the page indexes
using the existing machinery. As such all tests involving the page
indexes should apply to this code.
# Are there any user-facing changes?
Yes
---
parquet/src/basic.rs | 53 +++++++++++++++++++++
parquet/src/file/page_index/index.rs | 71 +++++++++++++++++++++++++++++
parquet/src/file/page_index/index_reader.rs | 47 ++++++++++++-------
parquet/src/file/page_index/offset_index.rs | 44 ++++++++++--------
parquet/src/parquet_macros.rs | 5 +-
5 files changed, 182 insertions(+), 38 deletions(-)
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 78d294acd5..c325cf5dbf 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -501,15 +501,68 @@ thrift_enum!(
///
/// [WriterVersion]: crate::file::properties::WriterVersion
enum Encoding {
+ /// Default encoding.
+ /// - BOOLEAN - 1 bit per value. 0 is false; 1 is true.
+ /// - INT32 - 4 bytes per value. Stored as little-endian.
+ /// - INT64 - 8 bytes per value. Stored as little-endian.
+ /// - FLOAT - 4 bytes per value. IEEE. Stored as little-endian.
+ /// - DOUBLE - 8 bytes per value. IEEE. Stored as little-endian.
+ /// - BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.
+ /// - FIXED_LEN_BYTE_ARRAY - Just the bytes.
PLAIN = 0;
// GROUP_VAR_INT = 1;
+ /// **Deprecated** dictionary encoding.
+ ///
+ /// The values in the dictionary are encoded using PLAIN encoding.
+ /// Since it is deprecated, RLE_DICTIONARY encoding is used for a data page,
and
+ /// PLAIN encoding is used for dictionary page.
PLAIN_DICTIONARY = 2;
+ /// Group packed run length encoding.
+ ///
+ /// Usable for definition/repetition levels encoding and boolean values.
RLE = 3;
+ /// **Deprecated** Bit-packed encoding.
+ ///
+ /// This can only be used if the data has a known max width.
+ /// Usable for definition/repetition levels encoding.
+ ///
+ /// There are compatibility issues with files using this encoding.
+ /// The parquet standard specifies the bits to be packed starting from the
+ /// most-significant bit, several implementations do not follow this bit
order.
+ /// Several other implementations also have issues reading this encoding
+ /// because of incorrect assumptions about the length of the encoded data.
+ ///
+ /// The RLE/bit-packing hybrid is more cpu and memory efficient and should
be used instead.
+ #[deprecated(
+ since = "51.0.0",
+ note = "Please see documentation for compatibility issues and use the
RLE/bit-packing hybrid encoding instead"
+ )]
BIT_PACKED = 4;
+ /// Delta encoding for integers, either INT32 or INT64.
+ ///
+ /// Works best on sorted data.
DELTA_BINARY_PACKED = 5;
+ /// Encoding for byte arrays to separate the length values and the data.
+ ///
+ /// The lengths are encoded using DELTA_BINARY_PACKED encoding.
DELTA_LENGTH_BYTE_ARRAY = 6;
+ /// Incremental encoding for byte arrays.
+ ///
+ /// Prefix lengths are encoded using DELTA_BINARY_PACKED encoding.
+ /// Suffixes are stored using DELTA_LENGTH_BYTE_ARRAY encoding.
DELTA_BYTE_ARRAY = 7;
+ /// Dictionary encoding.
+ ///
+ /// The ids are encoded using the RLE encoding.
RLE_DICTIONARY = 8;
+ /// Encoding for fixed-width data.
+ ///
+ /// K byte-streams are created where K is the size in bytes of the data type.
+ /// The individual bytes of a value are scattered to the corresponding
stream and
+ /// the streams are concatenated.
+ /// This itself does not reduce the size of the data but can lead to better
compression
+ /// afterwards. Note that the use of this encoding with
FIXED_LEN_BYTE_ARRAY(N) data may
+ /// perform poorly for large values of N.
BYTE_STREAM_SPLIT = 9;
}
);
diff --git a/parquet/src/file/page_index/index.rs
b/parquet/src/file/page_index/index.rs
index 2c9aa00908..ed586bcd33 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -24,6 +24,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96};
use crate::errors::ParquetError;
use crate::file::metadata::LevelHistogram;
+use crate::file::page_index::index_reader::ColumnIndex;
use std::fmt::Debug;
/// Typed statistics for one data page
@@ -193,6 +194,7 @@ impl<T: ParquetValueType> NativeIndex<T> {
pub const PHYSICAL_TYPE: Type = T::PHYSICAL_TYPE;
/// Creates a new [`NativeIndex`]
+ #[allow(dead_code)]
pub(crate) fn try_new(index: crate::format::ColumnIndex) -> Result<Self,
ParquetError> {
let len = index.min_values.len();
@@ -306,6 +308,75 @@ impl<T: ParquetValueType> NativeIndex<T> {
definition_level_histograms,
)
}
+
+ /// Creates a new [`NativeIndex`]
+ pub(crate) fn try_new_local(index: ColumnIndex) -> Result<Self,
ParquetError> {
+ let len = index.min_values.len();
+
+ // turn Option<Vec<i64>> into Vec<Option<i64>>
+ let null_counts = index
+ .null_counts
+ .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+ .unwrap_or_else(|| vec![None; len]);
+
+ // histograms are a 1D array encoding a 2D num_pages X num_levels
matrix.
+ let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
+ if let Some(hist) = opt_hist {
+ // TODO: should we assert (hist.len() % len) == 0?
+ let num_levels = hist.len() / len;
+ let mut res = Vec::with_capacity(len);
+ for i in 0..len {
+ let page_idx = i * num_levels;
+ let page_hist = hist[page_idx..page_idx +
num_levels].to_vec();
+ res.push(Some(LevelHistogram::from(page_hist)));
+ }
+ res
+ } else {
+ vec![None; len]
+ }
+ };
+
+ // turn Option<Vec<i64>> into Vec<Option<i64>>
+ let rep_hists: Vec<Option<LevelHistogram>> =
+ to_page_histograms(index.repetition_level_histograms);
+ let def_hists: Vec<Option<LevelHistogram>> =
+ to_page_histograms(index.definition_level_histograms);
+
+ // start assembling Vec<PageIndex>
+ let mut indexes: Vec<PageIndex<T>> = Vec::with_capacity(len);
+ let mut rep_iter = rep_hists.into_iter();
+ let mut def_iter = def_hists.into_iter();
+
+ // this used to zip together the other iters, but that was quite a bit
+ // slower than this approach.
+ for (i, null_count) in null_counts.into_iter().enumerate().take(len) {
+ let is_null = index.null_pages[i];
+ let min = if is_null {
+ None
+ } else {
+ Some(T::try_from_le_slice(index.min_values[i])?)
+ };
+ let max = if is_null {
+ None
+ } else {
+ Some(T::try_from_le_slice(index.max_values[i])?)
+ };
+
+ indexes.push(PageIndex {
+ min,
+ max,
+ null_count,
+ repetition_level_histogram: rep_iter.next().unwrap_or(None),
+ definition_level_histogram: def_iter.next().unwrap_or(None),
+ })
+ }
+
+ let boundary_order = index.boundary_order;
+ Ok(Self {
+ indexes,
+ boundary_order,
+ })
+ }
}
#[cfg(test)]
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index d4d405d68f..fbe6d39845 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -17,14 +17,15 @@
//! Support for reading [`Index`] and [`OffsetIndexMetaData`] from parquet
metadata.
-use crate::basic::Type;
+use crate::basic::{BoundaryOrder, Type};
use crate::data_type::Int96;
-use crate::errors::ParquetError;
+use crate::errors::{ParquetError, Result};
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{Index, NativeIndex};
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::ChunkReader;
-use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
+use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol};
+use crate::thrift_struct;
use std::ops::Range;
/// Computes the covering range of two optional ranges
@@ -129,25 +130,37 @@ pub fn read_offset_indexes<R: ChunkReader>(
}
pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData,
ParquetError> {
- let mut prot = TCompactSliceInputProtocol::new(data);
- let offset = crate::format::OffsetIndex::read_from_in_protocol(&mut prot)?;
- OffsetIndexMetaData::try_new(offset)
+ let mut prot = ThriftCompactInputProtocol::new(data);
+ OffsetIndexMetaData::try_from(&mut prot)
}
-pub(crate) fn decode_column_index(data: &[u8], column_type: Type) ->
Result<Index, ParquetError> {
- let mut prot = TCompactSliceInputProtocol::new(data);
+thrift_struct!(
+pub(crate) struct ColumnIndex<'a> {
+ 1: required list<bool> null_pages
+ 2: required list<'a><binary> min_values
+ 3: required list<'a><binary> max_values
+ 4: required BoundaryOrder boundary_order
+ 5: optional list<i64> null_counts
+ 6: optional list<i64> repetition_level_histograms;
+ 7: optional list<i64> definition_level_histograms;
+}
+);
- let index = crate::format::ColumnIndex::read_from_in_protocol(&mut prot)?;
+pub(crate) fn decode_column_index(data: &[u8], column_type: Type) ->
Result<Index, ParquetError> {
+ let mut prot = ThriftCompactInputProtocol::new(data);
+ let index = ColumnIndex::try_from(&mut prot)?;
let index = match column_type {
- Type::BOOLEAN => Index::BOOLEAN(NativeIndex::<bool>::try_new(index)?),
- Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index)?),
- Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index)?),
- Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index)?),
- Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index)?),
- Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index)?),
- Type::BYTE_ARRAY => Index::BYTE_ARRAY(NativeIndex::try_new(index)?),
- Type::FIXED_LEN_BYTE_ARRAY =>
Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new(index)?),
+ Type::BOOLEAN =>
Index::BOOLEAN(NativeIndex::<bool>::try_new_local(index)?),
+ Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new_local(index)?),
+ Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new_local(index)?),
+ Type::INT96 =>
Index::INT96(NativeIndex::<Int96>::try_new_local(index)?),
+ Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new_local(index)?),
+ Type::DOUBLE =>
Index::DOUBLE(NativeIndex::<f64>::try_new_local(index)?),
+ Type::BYTE_ARRAY =>
Index::BYTE_ARRAY(NativeIndex::try_new_local(index)?),
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ Index::FIXED_LEN_BYTE_ARRAY(NativeIndex::try_new_local(index)?)
+ }
};
Ok(index)
diff --git a/parquet/src/file/page_index/offset_index.rs
b/parquet/src/file/page_index/offset_index.rs
index 5614b1750a..d4c196a3ae 100644
--- a/parquet/src/file/page_index/offset_index.rs
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -19,21 +19,25 @@
//!
//! [`OffsetIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
-use crate::errors::ParquetError;
+use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol};
+use crate::{
+ errors::{ParquetError, Result},
+ thrift_struct,
+};
+thrift_struct!(
/// Page location information for [`OffsetIndexMetaData`]
-#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PageLocation {
- /// Offset of the page in the file *
- pub offset: i64,
- /// Size of the page, including header. Sum of compressed_page_size and
header
- /// length
- pub compressed_page_size: i32,
- /// Index within the RowGroup of the first row of the page. When an
- /// OffsetIndex is present, pages must begin on row boundaries
- /// (repetition_level = 0).
- pub first_row_index: i64,
+ /// Offset of the page in the file
+ 1: required i64 offset
+ /// Size of the page, including header. Sum of compressed_page_size and
header
+ 2: required i32 compressed_page_size
+ /// Index within the RowGroup of the first row of the page. When an
+ /// OffsetIndex is present, pages must begin on row boundaries
+ /// (repetition_level = 0).
+ 3: required i64 first_row_index
}
+);
impl From<&crate::format::PageLocation> for PageLocation {
fn from(value: &crate::format::PageLocation) -> Self {
@@ -55,24 +59,26 @@ impl From<&PageLocation> for crate::format::PageLocation {
}
}
+thrift_struct!(
/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes
for each page
/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY
columns.
///
/// [`OffsetIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
-#[derive(Debug, Clone, PartialEq)]
pub struct OffsetIndexMetaData {
- /// Vector of [`PageLocation`] objects, one per page in the chunk.
- pub page_locations: Vec<PageLocation>,
- /// Optional vector of unencoded page sizes, one per page in the chunk.
- /// Only defined for BYTE_ARRAY columns.
- pub unencoded_byte_array_data_bytes: Option<Vec<i64>>,
+ /// Vector of [`PageLocation`] objects, one per page in the chunk.
+ 1: required list<PageLocation> page_locations
+ /// Optional vector of unencoded page sizes, one per page in the chunk.
+ /// Only defined for BYTE_ARRAY columns.
+ 2: optional list<i64> unencoded_byte_array_data_bytes
}
+);
impl OffsetIndexMetaData {
/// Creates a new [`OffsetIndexMetaData`] from an [`OffsetIndex`].
///
/// [`OffsetIndex`]: crate::format::OffsetIndex
- pub(crate) fn try_new(index: crate::format::OffsetIndex) -> Result<Self,
ParquetError> {
+ #[allow(dead_code)]
+ pub(crate) fn try_new(index: crate::format::OffsetIndex) -> Result<Self> {
let page_locations = index.page_locations.iter().map(|loc|
loc.into()).collect();
Ok(Self {
page_locations,
@@ -91,8 +97,6 @@ impl OffsetIndexMetaData {
self.unencoded_byte_array_data_bytes.as_ref()
}
- // TODO: remove annotation after merge
- #[allow(dead_code)]
pub(crate) fn to_thrift(&self) -> crate::format::OffsetIndex {
let page_locations = self.page_locations.iter().map(|loc|
loc.into()).collect();
crate::format::OffsetIndex::new(
diff --git a/parquet/src/parquet_macros.rs b/parquet/src/parquet_macros.rs
index ebd86d8615..2d1ccd819b 100644
--- a/parquet/src/parquet_macros.rs
+++ b/parquet/src/parquet_macros.rs
@@ -30,11 +30,12 @@ macro_rules! thrift_enum {
#[allow(non_camel_case_types)]
#[allow(missing_docs)]
pub enum $identifier {
- $($field_name = $field_value,)*
+ $($(#[cfg_attr(not(doctest), $($field_attrs)*)])* $field_name =
$field_value,)*
}
impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for $identifier {
type Error = ParquetError;
+ #[allow(deprecated)]
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) ->
Result<Self> {
let val = prot.read_i32()?;
match val {
@@ -54,6 +55,7 @@ macro_rules! thrift_enum {
impl TryFrom<crate::format::$identifier> for $identifier {
type Error = ParquetError;
+ #[allow(deprecated)]
fn try_from(value: crate::format::$identifier) -> Result<Self> {
Ok(match value {
$(crate::format::$identifier::$field_name =>
Self::$field_name,)*
@@ -63,6 +65,7 @@ macro_rules! thrift_enum {
}
impl From<$identifier> for crate::format::$identifier {
+ #[allow(deprecated)]
fn from(value: $identifier) -> Self {
match value {
$($identifier::$field_name => Self::$field_name,)*