This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch 53.0.0-dev
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/53.0.0-dev by this push:
new e40b311518 Add `unencoded_byte_array_data_bytes` to `ParquetMetaData`
(#6068)
e40b311518 is described below
commit e40b3115183c4e31132377839cc03a76244e10a7
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Jul 19 13:29:52 2024 -0700
Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
* update to latest thrift (as of 11 Jul 2024) from parquet-format
* pass None for optional size statistics
* escape HTML tags
* don't need to escape brackets in arrays
* add support for unencoded_byte_array_data_bytes
* add comments
* change sig of ColumnMetrics::update_variable_length_bytes()
* rename ParquetOffsetIndex to OffsetSizeIndex
* rename some functions
* suggestion from review
Co-authored-by: Andrew Lamb <[email protected]>
* add Default trait to ColumnMetrics as suggested in review
* rename OffsetSizeIndex to OffsetIndexMetaData
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/arrow_writer/byte_array.rs | 19 ++++++-
parquet/src/arrow/async_reader/metadata.rs | 8 ++-
parquet/src/arrow/async_reader/mod.rs | 1 +
parquet/src/column/writer/encoder.rs | 8 +++
parquet/src/column/writer/mod.rs | 54 ++++++++++++------
parquet/src/data_type.rs | 11 ++++
parquet/src/file/metadata/memory.rs | 1 +
parquet/src/file/metadata/mod.rs | 77 +++++++++++++++++++++++--
parquet/src/file/page_index/index_reader.rs | 53 +++++++++++++++--
parquet/src/file/page_index/mod.rs | 1 +
parquet/src/file/page_index/offset_index.rs | 50 ++++++++++++++++
parquet/src/file/serialized_reader.rs | 15 ++++-
parquet/src/file/writer.rs | 85 +++++++++++++++++++++++++++-
13 files changed, 349 insertions(+), 34 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs
b/parquet/src/arrow/arrow_writer/byte_array.rs
index fc37ebfb45..2d23ad8510 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -96,6 +96,7 @@ macro_rules! downcast_op {
struct FallbackEncoder {
encoder: FallbackEncoderImpl,
num_values: usize,
+ variable_length_bytes: i64,
}
/// The fallback encoder in use
@@ -152,6 +153,7 @@ impl FallbackEncoder {
Ok(Self {
encoder,
num_values: 0,
+ variable_length_bytes: 0,
})
}
@@ -168,7 +170,8 @@ impl FallbackEncoder {
let value = values.value(*idx);
let value = value.as_ref();
buffer.extend_from_slice((value.len() as u32).as_bytes());
- buffer.extend_from_slice(value)
+ buffer.extend_from_slice(value);
+ self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
@@ -177,6 +180,7 @@ impl FallbackEncoder {
let value = value.as_ref();
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
+ self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::Delta {
@@ -205,6 +209,7 @@ impl FallbackEncoder {
buffer.extend_from_slice(&value[prefix_length..]);
prefix_lengths.put(&[prefix_length as i32]).unwrap();
suffix_lengths.put(&[suffix_length as i32]).unwrap();
+ self.variable_length_bytes += value.len() as i64;
}
}
}
@@ -269,12 +274,17 @@ impl FallbackEncoder {
}
};
+ // Capture value of variable_length_bytes and reset for next page
+ let variable_length_bytes = Some(self.variable_length_bytes);
+ self.variable_length_bytes = 0;
+
Ok(DataPageValues {
buf: buf.into(),
num_values: std::mem::take(&mut self.num_values),
encoding,
min_value,
max_value,
+ variable_length_bytes,
})
}
}
@@ -321,6 +331,7 @@ impl Storage for ByteArrayStorage {
struct DictEncoder {
interner: Interner<ByteArrayStorage>,
indices: Vec<u64>,
+ variable_length_bytes: i64,
}
impl DictEncoder {
@@ -336,6 +347,7 @@ impl DictEncoder {
let value = values.value(*idx);
let interned = self.interner.intern(value.as_ref());
self.indices.push(interned);
+ self.variable_length_bytes += value.as_ref().len() as i64;
}
}
@@ -384,12 +396,17 @@ impl DictEncoder {
self.indices.clear();
+ // Capture value of variable_length_bytes and reset for next page
+ let variable_length_bytes = Some(self.variable_length_bytes);
+ self.variable_length_bytes = 0;
+
DataPageValues {
buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
+ variable_length_bytes,
}
}
}
diff --git a/parquet/src/arrow/async_reader/metadata.rs
b/parquet/src/arrow/async_reader/metadata.rs
index 9224ea3f68..4a3489a208 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -20,7 +20,9 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
-use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
+use crate::file::page_index::index_reader::{
+ acc_range, decode_column_index, decode_page_locations,
+};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
@@ -177,7 +179,9 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
- Some(r) => decode_offset_index(&data[r.start -
offset..r.end - offset]),
+ Some(r) => {
+ decode_page_locations(&data[r.start -
offset..r.end - offset])
+ }
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index e4205b7ef2..5a790fa6af 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -1538,6 +1538,7 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
+ None,
);
let metadata = Arc::new(metadata);
diff --git a/parquet/src/column/writer/encoder.rs
b/parquet/src/column/writer/encoder.rs
index b6c8212608..9d01c09040 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -63,6 +63,7 @@ pub struct DataPageValues<T> {
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
+ pub variable_length_bytes: Option<i64>,
}
/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
@@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
+ variable_length_bytes: Option<i64>,
}
impl<T: DataType> ColumnValueEncoderImpl<T> {
@@ -150,6 +152,10 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}
+
+ if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
+ *self.variable_length_bytes.get_or_insert(0) += var_bytes;
+ }
}
// encode the values into bloom filter if enabled
@@ -203,6 +209,7 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
bloom_filter,
min_value: None,
max_value: None,
+ variable_length_bytes: None,
})
}
@@ -296,6 +303,7 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
+ variable_length_bytes: self.variable_length_bytes.take(),
})
}
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 8594e59714..0e227a157d 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -192,7 +192,8 @@ struct PageMetrics {
}
// Metrics per column writer
-struct ColumnMetrics<T> {
+#[derive(Default)]
+struct ColumnMetrics<T: Default> {
total_bytes_written: u64,
total_rows_written: u64,
total_uncompressed_size: u64,
@@ -204,6 +205,20 @@ struct ColumnMetrics<T> {
max_column_value: Option<T>,
num_column_nulls: u64,
column_distinct_count: Option<u64>,
+ variable_length_bytes: Option<i64>,
+}
+
+impl<T: Default> ColumnMetrics<T> {
+ fn new() -> Self {
+ Default::default()
+ }
+
+ /// Sum the provided page variable_length_bytes into the chunk
variable_length_bytes
+ fn update_variable_length_bytes(&mut self, variable_length_bytes:
Option<i64>) {
+ if let Some(var_bytes) = variable_length_bytes {
+ *self.variable_length_bytes.get_or_insert(0) += var_bytes;
+ }
+ }
}
/// Typed column writer for a primitive column.
@@ -276,19 +291,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
num_buffered_rows: 0,
num_page_nulls: 0,
},
- column_metrics: ColumnMetrics {
- total_bytes_written: 0,
- total_rows_written: 0,
- total_uncompressed_size: 0,
- total_compressed_size: 0,
- total_num_values: 0,
- dictionary_page_offset: None,
- data_page_offset: None,
- min_column_value: None,
- max_column_value: None,
- num_column_nulls: 0,
- column_distinct_count: None,
- },
+ column_metrics: ColumnMetrics::<E::T>::new(),
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
@@ -634,7 +637,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
}
/// Update the column index and offset index when adding the data page
- fn update_column_offset_index(&mut self, page_statistics:
Option<&ValueStatistics<E::T>>) {
+ fn update_column_offset_index(
+ &mut self,
+ page_statistics: Option<&ValueStatistics<E::T>>,
+ page_variable_length_bytes: Option<i64>,
+ ) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) ==
self.page_metrics.num_page_nulls;
@@ -708,6 +715,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);
+
+ self.offset_index_builder
+
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
}
/// Determine if we should allow truncating min/max values for this
column's statistics
@@ -783,7 +793,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
};
// update column and offset index
- self.update_column_offset_index(page_statistics.as_ref());
+ self.update_column_offset_index(
+ page_statistics.as_ref(),
+ values_data.variable_length_bytes,
+ );
+
+ // Update variable_length_bytes in column_metrics
+ self.column_metrics
+ .update_variable_length_bytes(values_data.variable_length_bytes);
+
let page_statistics = page_statistics.map(Statistics::from);
let compressed_page = match self.props.writer_version() {
@@ -993,7 +1011,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
stats => stats,
};
- builder = builder.set_statistics(statistics);
+ builder = builder
+ .set_statistics(statistics)
+
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
}
let metadata = builder.build()?;
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index b85a75cfd4..01e92115c4 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -644,6 +644,13 @@ pub(crate) mod private {
(std::mem::size_of::<Self>(), 1)
}
+ /// Return the number of variable length bytes in a given slice of data
+ ///
+ /// Returns the sum of lengths for BYTE_ARRAY data, and None for all
other data types
+ fn variable_length_bytes(_: &[Self]) -> Option<i64> {
+ None
+ }
+
/// Return the value as i64 if possible
///
/// This is essentially the same as `std::convert::TryInto<i64>` but
can't be
@@ -956,6 +963,10 @@ pub(crate) mod private {
Ok(num_values)
}
+ fn variable_length_bytes(values: &[Self]) -> Option<i64> {
+ Some(values.iter().map(|x| x.len() as i64).sum())
+ }
+
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize> {
let data = decoder
.data
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index 57b2f7eec0..a1b40a8de9 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -97,6 +97,7 @@ impl HeapSize for ColumnChunkMetaData {
+ self.compression.heap_size()
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
+ + self.unencoded_byte_array_data_bytes.heap_size()
}
}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 39b6de41fa..d86b1ce572 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -36,7 +36,7 @@ use std::sync::Arc;
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex,
PageLocation, RowGroup,
- SortingColumn,
+ SizeStatistics, SortingColumn,
};
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
@@ -96,6 +96,8 @@ pub struct ParquetMetaData {
column_index: Option<ParquetColumnIndex>,
/// Offset index for all each page in each column chunk
offset_index: Option<ParquetOffsetIndex>,
+ /// `unencoded_byte_array_data_bytes` from the offset index
+ unencoded_byte_array_data_bytes: Option<Vec<Vec<Option<Vec<i64>>>>>,
}
impl ParquetMetaData {
@@ -107,6 +109,7 @@ impl ParquetMetaData {
row_groups,
column_index: None,
offset_index: None,
+ unencoded_byte_array_data_bytes: None,
}
}
@@ -117,12 +120,14 @@ impl ParquetMetaData {
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
+ unencoded_byte_array_data_bytes: Option<Vec<Vec<Option<Vec<i64>>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
column_index,
offset_index,
+ unencoded_byte_array_data_bytes,
}
}
@@ -179,6 +184,19 @@ impl ParquetMetaData {
self.offset_index.as_ref()
}
+ /// Returns `unencoded_byte_array_data_bytes` from the offset indexes in
this file, if loaded
+ ///
+ /// This value represents the output size of the total bytes in this file,
which can be useful for
+ /// allocating an appropriately sized output buffer.
+ ///
+ /// Returns `None` if the parquet file does not have a `OffsetIndex` or
+ /// [ArrowReaderOptions::with_page_index] was set to false.
+ ///
+ /// [ArrowReaderOptions::with_page_index]:
https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index
+ pub fn unencoded_byte_array_data_bytes(&self) ->
Option<&Vec<Vec<Option<Vec<i64>>>>> {
+ self.unencoded_byte_array_data_bytes.as_ref()
+ }
+
/// Estimate of the bytes allocated to store `ParquetMetadata`
///
/// # Notes:
@@ -199,6 +217,7 @@ impl ParquetMetaData {
+ self.row_groups.heap_size()
+ self.column_index.heap_size()
+ self.offset_index.heap_size()
+ + self.unencoded_byte_array_data_bytes.heap_size()
}
/// Override the column index
@@ -543,6 +562,7 @@ pub struct ColumnChunkMetaData {
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
+ unencoded_byte_array_data_bytes: Option<i64>,
}
/// Represents common operations for a column chunk.
@@ -695,6 +715,14 @@ impl ColumnChunkMetaData {
Some(offset..(offset + length))
}
+ /// Returns the number of bytes of variable length data after decoding.
+ ///
+ /// Only set for BYTE_ARRAY columns. This field may not be set by older
+ /// writers.
+ pub fn unencoded_byte_array_data_bytes(&self) -> Option<i64> {
+ self.unencoded_byte_array_data_bytes
+ }
+
/// Method to convert from Thrift.
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) ->
Result<Self> {
if cc.meta_data.is_none() {
@@ -732,6 +760,12 @@ impl ColumnChunkMetaData {
let offset_index_length = cc.offset_index_length;
let column_index_offset = cc.column_index_offset;
let column_index_length = cc.column_index_length;
+ let unencoded_byte_array_data_bytes = if let Some(size_stats) =
col_metadata.size_statistics
+ {
+ size_stats.unencoded_byte_array_data_bytes
+ } else {
+ None
+ };
let result = ColumnChunkMetaData {
column_descr,
@@ -753,6 +787,7 @@ impl ColumnChunkMetaData {
offset_index_length,
column_index_offset,
column_index_length,
+ unencoded_byte_array_data_bytes,
};
Ok(result)
}
@@ -776,6 +811,16 @@ impl ColumnChunkMetaData {
/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
+ let size_statistics = if
self.unencoded_byte_array_data_bytes.is_some() {
+ Some(SizeStatistics {
+ unencoded_byte_array_data_bytes:
self.unencoded_byte_array_data_bytes,
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
+ })
+ } else {
+ None
+ };
+
ColumnMetaData {
type_: self.column_type().into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
@@ -795,7 +840,7 @@ impl ColumnChunkMetaData {
.map(|vec|
vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
bloom_filter_length: self.bloom_filter_length,
- size_statistics: None,
+ size_statistics,
}
}
@@ -831,6 +876,7 @@ impl ColumnChunkMetaDataBuilder {
offset_index_length: None,
column_index_offset: None,
column_index_length: None,
+ unencoded_byte_array_data_bytes: None,
})
}
@@ -942,6 +988,12 @@ impl ColumnChunkMetaDataBuilder {
self
}
+ /// Sets optional length of variable length data in bytes.
+ pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option<i64>)
-> Self {
+ self.0.unencoded_byte_array_data_bytes = value;
+ self
+ }
+
/// Builds column chunk metadata.
pub fn build(self) -> Result<ColumnChunkMetaData> {
Ok(self.0)
@@ -1021,6 +1073,7 @@ pub struct OffsetIndexBuilder {
offset_array: Vec<i64>,
compressed_page_size_array: Vec<i32>,
first_row_index_array: Vec<i64>,
+ unencoded_byte_array_data_bytes_array: Option<Vec<i64>>,
current_first_row_index: i64,
}
@@ -1036,6 +1089,7 @@ impl OffsetIndexBuilder {
offset_array: Vec::new(),
compressed_page_size_array: Vec::new(),
first_row_index_array: Vec::new(),
+ unencoded_byte_array_data_bytes_array: None,
current_first_row_index: 0,
}
}
@@ -1051,6 +1105,17 @@ impl OffsetIndexBuilder {
self.compressed_page_size_array.push(compressed_page_size);
}
+ pub fn append_unencoded_byte_array_data_bytes(
+ &mut self,
+ unencoded_byte_array_data_bytes: Option<i64>,
+ ) {
+ if let Some(val) = unencoded_byte_array_data_bytes {
+ self.unencoded_byte_array_data_bytes_array
+ .get_or_insert(Vec::new())
+ .push(val);
+ }
+ }
+
/// Build and get the thrift metadata of offset index
pub fn build_to_thrift(self) -> OffsetIndex {
let locations = self
@@ -1060,7 +1125,7 @@ impl OffsetIndexBuilder {
.zip(self.first_row_index_array.iter())
.map(|((offset, size), row_index)| PageLocation::new(*offset,
*size, *row_index))
.collect::<Vec<_>>();
- OffsetIndex::new(locations, None)
+ OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array)
}
}
@@ -1211,6 +1276,7 @@ mod tests {
.set_offset_index_length(Some(25))
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
+ .set_unencoded_byte_array_data_bytes(Some(2000))
.build()
.unwrap();
@@ -1298,7 +1364,7 @@ mod tests {
column_orders,
);
let parquet_meta = ParquetMetaData::new(file_metadata.clone(),
row_group_meta.clone());
- let base_expected_size = 1320;
+ let base_expected_size = 1376;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
let mut column_index = ColumnIndexBuilder::new();
@@ -1315,9 +1381,10 @@ mod tests {
vec![PageLocation::new(1, 2, 3)],
vec![PageLocation::new(1, 2, 3)],
]]),
+ Some(vec![vec![Some(vec![10, 20, 30])]]),
);
- let bigger_expected_size = 2304;
+ let bigger_expected_size = 2464;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index 2ddf826fb0..7959cb95c0 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -22,6 +22,7 @@ use crate::data_type::Int96;
use crate::errors::ParquetError;
use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{Index, NativeIndex};
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
@@ -45,9 +46,9 @@ pub(crate) fn acc_range(a: Option<Range<usize>>, b:
Option<Range<usize>>) -> Opt
/// Returns an empty vector if this row group does not contain a
/// [`ColumnIndex`].
///
-/// See [Column Index Documentation] for more details.
+/// See [Page Index Documentation] for more details.
///
-/// [Column Index Documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+/// [Page Index Documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn read_columns_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
@@ -81,9 +82,9 @@ pub fn read_columns_indexes<R: ChunkReader>(
/// Return an empty vector if this row group does not contain an
/// [`OffsetIndex]`.
///
-/// See [Column Index Documentation] for more details.
+/// See [Page Index Documentation] for more details.
///
-/// [Column Index Documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+/// [Page Index Documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn read_pages_locations<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
@@ -100,6 +101,42 @@ pub fn read_pages_locations<R: ChunkReader>(
let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end -
fetch.start)];
+ chunks
+ .iter()
+ .map(|c| match c.offset_index_range() {
+ Some(r) => decode_page_locations(get(r)),
+ None => Err(general_err!("missing offset index")),
+ })
+ .collect()
+}
+
+/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by
+/// decoding [`OffsetIndex`] .
+///
+/// Returns a vector of `offset_index[column_number]`.
+///
+/// Returns an empty vector if this row group does not contain an
+/// [`OffsetIndex`].
+///
+/// See [Page Index Documentation] for more details.
+///
+/// [Page Index Documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+pub fn read_offset_indexes<R: ChunkReader>(
+ reader: &R,
+ chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<OffsetIndexMetaData>, ParquetError> {
+ let fetch = chunks
+ .iter()
+ .fold(None, |range, c| acc_range(range, c.offset_index_range()));
+
+ let fetch = match fetch {
+ Some(r) => r,
+ None => return Ok(vec![]),
+ };
+
+ let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
+ let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end -
fetch.start)];
+
chunks
.iter()
.map(|c| match c.offset_index_range() {
@@ -109,7 +146,13 @@ pub fn read_pages_locations<R: ChunkReader>(
.collect()
}
-pub(crate) fn decode_offset_index(data: &[u8]) -> Result<Vec<PageLocation>,
ParquetError> {
+pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData,
ParquetError> {
+ let mut prot = TCompactSliceInputProtocol::new(data);
+ let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+ OffsetIndexMetaData::try_new(offset)
+}
+
+pub(crate) fn decode_page_locations(data: &[u8]) -> Result<Vec<PageLocation>,
ParquetError> {
let mut prot = TCompactSliceInputProtocol::new(data);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
Ok(offset.page_locations)
diff --git a/parquet/src/file/page_index/mod.rs
b/parquet/src/file/page_index/mod.rs
index 9372645d76..a8077896db 100644
--- a/parquet/src/file/page_index/mod.rs
+++ b/parquet/src/file/page_index/mod.rs
@@ -21,3 +21,4 @@
pub mod index;
pub mod index_reader;
+pub mod offset_index;
diff --git a/parquet/src/file/page_index/offset_index.rs
b/parquet/src/file/page_index/offset_index.rs
new file mode 100644
index 0000000000..9138620e3f
--- /dev/null
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`OffsetIndexMetaData`] structure holding decoded [`OffsetIndex`]
information
+
+use crate::errors::ParquetError;
+use crate::format::{OffsetIndex, PageLocation};
+
+/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes
for each page
+/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY
columns.
+#[derive(Debug, Clone, PartialEq)]
+pub struct OffsetIndexMetaData {
+ pub page_locations: Vec<PageLocation>,
+ pub unencoded_byte_array_data_bytes: Option<Vec<i64>>,
+}
+
+impl OffsetIndexMetaData {
+ /// Creates a new [`OffsetIndexMetaData`] from an [`OffsetIndex`].
+ pub(crate) fn try_new(index: OffsetIndex) -> Result<Self, ParquetError> {
+ Ok(Self {
+ page_locations: index.page_locations,
+ unencoded_byte_array_data_bytes:
index.unencoded_byte_array_data_bytes,
+ })
+ }
+
+ /// Vector of [`PageLocation`] objects, one per page in the chunk.
+ pub fn page_locations(&self) -> &Vec<PageLocation> {
+ &self.page_locations
+ }
+
+ /// Optional vector of unencoded page sizes, one per page in the chunk.
Only defined
+ /// for BYTE_ARRAY columns.
+ pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec<i64>> {
+ self.unencoded_byte_array_data_bytes.as_ref()
+ }
+}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index ac7d2d2874..65b6ebf2ec 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -211,12 +211,22 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];
+ let mut unenc_byte_sizes = vec![];
for rg in &mut filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
- let offset_index =
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
+ let offset_index =
index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
+
+ // split offset_index into two vectors to not break API
+ let mut page_locations = vec![];
+ let mut unenc_bytes = vec![];
+ offset_index.into_iter().for_each(|index| {
+ page_locations.push(index.page_locations);
+ unenc_bytes.push(index.unencoded_byte_array_data_bytes);
+ });
columns_indexes.push(column_index);
- offset_indexes.push(offset_index);
+ offset_indexes.push(page_locations);
+ unenc_byte_sizes.push(unenc_bytes);
}
Ok(Self {
@@ -226,6 +236,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
filtered_row_groups,
Some(columns_indexes),
Some(offset_indexes),
+ Some(unenc_byte_sizes),
)),
props: Arc::new(options.props),
})
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index eb633f31c4..b109a2da8e 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -659,7 +659,8 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
.set_total_uncompressed_size(metadata.uncompressed_size())
.set_num_values(metadata.num_values())
.set_data_page_offset(map_offset(src_data_offset))
- .set_dictionary_page_offset(src_dictionary_offset.map(map_offset));
+ .set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
+
.set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
if let Some(statistics) = metadata.statistics() {
builder = builder.set_statistics(statistics.clone())
@@ -827,7 +828,7 @@ mod tests {
use crate::column::page::{Page, PageReader};
use crate::column::reader::get_typed_column_reader;
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
- use crate::data_type::{BoolType, Int32Type};
+ use crate::data_type::{BoolType, ByteArrayType, Int32Type};
use crate::file::page_index::index::Index;
use crate::file::properties::EnabledStatistics;
use crate::file::serialized_reader::ReadOptionsBuilder;
@@ -840,6 +841,7 @@ mod tests {
use crate::record::{Row, RowAccessor};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{ColumnDescriptor, ColumnPath};
+ use crate::util::test_common::rand_gen::RandGen;
#[test]
fn test_row_group_writer_error_not_all_columns_written() {
@@ -1851,4 +1853,83 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}
+
+ #[test]
+ fn test_byte_array_size_statistics() {
+ let message_type = "
+ message test_schema {
+ OPTIONAL BYTE_ARRAY a (UTF8);
+ }
+ ";
+ let schema = Arc::new(parse_message_type(message_type).unwrap());
+ let data = ByteArrayType::gen_vec(32, 7);
+ let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
+ let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
+ let file: File = tempfile::tempfile().unwrap();
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Page)
+ .build(),
+ );
+
+ let mut writer = SerializedFileWriter::new(&file, schema,
props).unwrap();
+ let mut row_group_writer = writer.next_row_group().unwrap();
+
+ let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+ col_writer
+ .typed::<ByteArrayType>()
+ .write_batch(&data, Some(&def_levels), None)
+ .unwrap();
+ col_writer.close().unwrap();
+ row_group_writer.close().unwrap();
+ let file_metadata = writer.close().unwrap();
+
+ assert_eq!(file_metadata.row_groups.len(), 1);
+ assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+ assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+ assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+ let meta_data = file_metadata.row_groups[0].columns[0]
+ .meta_data
+ .as_ref()
+ .unwrap();
+ assert!(meta_data.size_statistics.is_some());
+ let size_stats = meta_data.size_statistics.as_ref().unwrap();
+
+ assert!(size_stats.repetition_level_histogram.is_none());
+ assert!(size_stats.definition_level_histogram.is_none());
+ assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
+ assert_eq!(
+ unenc_size,
+ size_stats.unencoded_byte_array_data_bytes.unwrap()
+ );
+
+ // check that the read metadata is also correct
+ let options = ReadOptionsBuilder::new().with_page_index().build();
+ let reader = SerializedFileReader::new_with_options(file,
options).unwrap();
+
+ let rfile_metadata = reader.metadata().file_metadata();
+ assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+ assert_eq!(reader.num_row_groups(), 1);
+ let rowgroup = reader.get_row_group(0).unwrap();
+ assert_eq!(rowgroup.num_columns(), 1);
+ let column = rowgroup.metadata().column(0);
+ assert!(column.unencoded_byte_array_data_bytes().is_some());
+ assert_eq!(
+ unenc_size,
+ column.unencoded_byte_array_data_bytes().unwrap()
+ );
+
+ assert!(reader
+ .metadata()
+ .unencoded_byte_array_data_bytes()
+ .is_some());
+ let unenc_sizes =
reader.metadata().unencoded_byte_array_data_bytes().unwrap();
+ assert_eq!(unenc_sizes.len(), 1);
+ assert_eq!(unenc_sizes[0].len(), 1);
+ assert!(unenc_sizes[0][0].is_some());
+ let page_sizes = unenc_sizes[0][0].as_ref().unwrap();
+ assert_eq!(page_sizes.len(), 1);
+ assert_eq!(page_sizes[0], unenc_size);
+ }
}