This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 373ac8199 Add page index reader test for all types and support empty
index. (#2012)
373ac8199 is described below
commit 373ac819937633ea035e303fce7127c0f06fd6c4
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Jul 9 00:05:19 2022 +0800
Add page index reader test for all types and support empty index. (#2012)
* Add page index reader test and support empty index.
* update parquet-testing commit id
* refine code
* Update parquet/src/file/page_index/index.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* add comment and fix
* Update parquet/src/file/page_index/index.rs
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* use `None` represent lack of index
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
parquet-testing | 2 +-
parquet/src/file/metadata.rs | 14 +-
parquet/src/file/page_index/index.rs | 4 +
parquet/src/file/page_index/index_reader.rs | 11 +-
parquet/src/file/serialized_reader.rs | 236 ++++++++++++++++++++++++++--
5 files changed, 240 insertions(+), 27 deletions(-)
diff --git a/parquet-testing b/parquet-testing
index 7175a4713..aafd3fc9d 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 7175a471339704c7645af0fe66c68305e2e6759c
+Subproject commit aafd3fc9df431c2625a514fb46626e5614f1d199
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 7ec29de01..ad8fe16ad 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -55,8 +55,10 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
- page_indexes: Option<Vec<Index>>,
- offset_indexes: Option<Vec<Vec<PageLocation>>>,
+ /// Page index for all pages in each column chunk
+ page_indexes: Option<Vec<Vec<Index>>>,
+ /// Offset index for all pages in each column chunk
+ offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
}
impl ParquetMetaData {
@@ -74,8 +76,8 @@ impl ParquetMetaData {
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
- page_indexes: Option<Vec<Index>>,
- offset_indexes: Option<Vec<Vec<PageLocation>>>,
+ page_indexes: Option<Vec<Vec<Index>>>,
+ offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
@@ -107,12 +109,12 @@ impl ParquetMetaData {
}
/// Returns page indexes in this file.
- pub fn page_indexes(&self) -> Option<&Vec<Index>> {
+ pub fn page_indexes(&self) -> Option<&Vec<Vec<Index>>> {
self.page_indexes.as_ref()
}
/// Returns offset indexes in this file.
- pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
+ pub fn offset_indexes(&self) -> Option<&Vec<Vec<Vec<PageLocation>>>> {
self.offset_indexes.as_ref()
}
}
diff --git a/parquet/src/file/page_index/index.rs
b/parquet/src/file/page_index/index.rs
index e97826c63..5c0a7df84 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -56,6 +56,10 @@ pub enum Index {
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(ByteArrayIndex),
FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
+ /// Sometimes reading page index from parquet file
+ /// will only return pageLocations without min_max index,
+ /// `None` represents this lack of index information
+ None,
}
/// An index of a column of [`Type`] physical representation
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index 841448090..616502139 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -101,13 +101,7 @@ fn get_index_offset_and_lengths(
.iter()
.map(|x| x.column_index_length())
.map(|maybe_length| {
- let index_length = maybe_length.ok_or_else(|| {
- ParquetError::General(
- "The column_index_length must exist if offset_index_offset
exists"
- .to_string(),
- )
- })?;
-
+ let index_length = maybe_length.unwrap_or(0);
Ok(index_length.try_into().unwrap())
})
.collect::<Result<Vec<_>, ParquetError>>()?;
@@ -143,6 +137,9 @@ fn deserialize_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
+ if data.is_empty() {
+ return Ok(Index::None);
+ }
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 1dfd1eb45..d3f5d7127 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -248,13 +248,17 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}
if options.enable_page_index {
- //Todo for now test data `data_index_bloom_encoding_stats.parquet`
only have one rowgroup
- //support multi after create multi-RG test data.
- let cols = metadata.row_group(0);
- let columns_indexes =
- index_reader::read_columns_indexes(&chunk_reader,
cols.columns())?;
- let pages_locations =
- index_reader::read_pages_locations(&chunk_reader,
cols.columns())?;
+ let mut columns_indexes = vec![];
+ let mut offset_indexes = vec![];
+
+ for rg in &filtered_row_groups {
+ let column_index =
+ index_reader::read_columns_indexes(&chunk_reader,
rg.columns())?;
+ let offset_index =
+ index_reader::read_pages_locations(&chunk_reader,
rg.columns())?;
+ columns_indexes.push(column_index);
+ offset_indexes.push(offset_index);
+ }
Ok(Self {
chunk_reader: Arc::new(chunk_reader),
@@ -262,7 +266,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
metadata.file_metadata().clone(),
filtered_row_groups,
Some(columns_indexes),
- Some(pages_locations),
+ Some(offset_indexes),
),
})
} else {
@@ -569,9 +573,11 @@ impl<T: Read + Send> PageReader for
SerializedPageReader<T> {
mod tests {
use super::*;
use crate::basic::{self, ColumnOrder};
- use crate::file::page_index::index::Index;
+ use crate::data_type::private::ParquetValueType;
+ use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex};
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
+ use crate::util::bit_util::from_le_slice;
use crate::util::test_common::{get_test_file, get_test_path};
use parquet_format::BoundaryOrder;
use std::sync::Arc;
@@ -1085,7 +1091,7 @@ mod tests {
// only one row group
assert_eq!(page_indexes.len(), 1);
- let index = if let Index::BYTE_ARRAY(index) =
page_indexes.get(0).unwrap() {
+ let index = if let Index::BYTE_ARRAY(index) = &page_indexes[0][0] {
index
} else {
unreachable!()
@@ -1097,7 +1103,7 @@ mod tests {
//only one page group
assert_eq!(index_in_pages.len(), 1);
- let page0 = index_in_pages.get(0).unwrap();
+ let page0 = &index_in_pages[0];
let min = page0.min.as_ref().unwrap();
let max = page0.max.as_ref().unwrap();
assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap());
@@ -1106,11 +1112,215 @@ mod tests {
let offset_indexes = metadata.offset_indexes().unwrap();
// only one row group
assert_eq!(offset_indexes.len(), 1);
- let offset_index = offset_indexes.get(0).unwrap();
- let page_offset = offset_index.get(0).unwrap();
+ let offset_index = &offset_indexes[0];
+ let page_offset = &offset_index[0][0];
assert_eq!(4, page_offset.offset);
assert_eq!(152, page_offset.compressed_page_size);
assert_eq!(0, page_offset.first_row_index);
}
+
+ #[test]
+ fn test_page_index_reader_all_type() {
+ let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
+ let builder = ReadOptionsBuilder::new();
+ //enable read page index
+ let options = builder.with_page_index().build();
+ let reader_result = SerializedFileReader::new_with_options(test_file,
options);
+ let reader = reader_result.unwrap();
+
+ // Test contents in Parquet metadata
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let page_indexes = metadata.page_indexes().unwrap();
+ let row_group_offset_indexes = &metadata.offset_indexes().unwrap()[0];
+
+ // only one row group
+ assert_eq!(page_indexes.len(), 1);
+ let row_group_metadata = metadata.row_group(0);
+
+ //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300
ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
+ if let Index::INT32(index) = &page_indexes[0][0] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 0),
+ BoundaryOrder::Unordered,
+ );
+ assert_eq!(row_group_offset_indexes[0].len(), 325);
+ } else {
+ unreachable!()
+ };
+ //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00
VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
+ if let Index::BOOLEAN(index) = &page_indexes[0][1] {
+ assert_eq!(index.indexes.len(), 82);
+ assert_eq!(row_group_offset_indexes[1].len(), 82);
+ } else {
+ unreachable!()
+ };
+ //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9,
num_nulls: 0]
+ if let Index::INT32(index) = &page_indexes[0][2] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 2),
+ BoundaryOrder::Ascending,
+ );
+ assert_eq!(row_group_offset_indexes[2].len(), 325);
+ } else {
+ unreachable!()
+ };
+ //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9,
num_nulls: 0]
+ if let Index::INT32(index) = &page_indexes[0][3] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 3),
+ BoundaryOrder::Ascending,
+ );
+ assert_eq!(row_group_offset_indexes[3].len(), 325);
+ } else {
+ unreachable!()
+ };
+ //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9,
num_nulls: 0]
+ if let Index::INT32(index) = &page_indexes[0][4] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 4),
+ BoundaryOrder::Ascending,
+ );
+ assert_eq!(row_group_offset_indexes[4].len(), 325);
+ } else {
+ unreachable!()
+ };
+ //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326
SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90,
num_nulls: 0]
+ if let Index::INT64(index) = &page_indexes[0][5] {
+ check_native_page_index(
+ index,
+ 528,
+ get_row_group_min_max_bytes(row_group_metadata, 5),
+ BoundaryOrder::Unordered,
+ );
+ assert_eq!(row_group_offset_indexes[5].len(), 528);
+ } else {
+ unreachable!()
+ };
+ //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9,
num_nulls: 0]
+ if let Index::FLOAT(index) = &page_indexes[0][6] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 6),
+ BoundaryOrder::Ascending,
+ );
+ assert_eq!(row_group_offset_indexes[6].len(), 325);
+ } else {
+ unreachable!()
+ };
+ //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249
SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max:
90.89999999999999, num_nulls: 0]
+ if let Index::DOUBLE(index) = &page_indexes[0][7] {
+ check_native_page_index(
+ index,
+ 528,
+ get_row_group_min_max_bytes(row_group_metadata, 7),
+ BoundaryOrder::Unordered,
+ );
+ assert_eq!(row_group_offset_indexes[7].len(), 528);
+ } else {
+ unreachable!()
+ };
+ //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847
SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max:
12/31/10, num_nulls: 0]
+ if let Index::BYTE_ARRAY(index) = &page_indexes[0][8] {
+ check_bytes_page_index(
+ index,
+ 974,
+ get_row_group_min_max_bytes(row_group_metadata, 8),
+ BoundaryOrder::Unordered,
+ );
+ assert_eq!(row_group_offset_indexes[8].len(), 974);
+ } else {
+ unreachable!()
+ };
+ //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795
SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9,
num_nulls: 0]
+ if let Index::BYTE_ARRAY(index) = &page_indexes[0][9] {
+ check_bytes_page_index(
+ index,
+ 352,
+ get_row_group_min_max_bytes(row_group_metadata, 9),
+ BoundaryOrder::Ascending,
+ );
+ assert_eq!(row_group_offset_indexes[9].len(), 352);
+ } else {
+ unreachable!()
+ };
+ //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093
SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0,
min/max not defined]
+ //Notice: min_max values for each page for this col not exits.
+ if let Index::None = &page_indexes[0][10] {
+ assert_eq!(row_group_offset_indexes[10].len(), 974);
+ } else {
+ unreachable!()
+ };
+ //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00
VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
+ if let Index::INT32(index) = &page_indexes[0][11] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 11),
+ BoundaryOrder::Ascending,
+ );
+ assert_eq!(row_group_offset_indexes[11].len(), 325);
+ } else {
+ unreachable!()
+ };
+ //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00
VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
+ if let Index::INT32(index) = &page_indexes[0][12] {
+ check_native_page_index(
+ index,
+ 325,
+ get_row_group_min_max_bytes(row_group_metadata, 12),
+ BoundaryOrder::Unordered,
+ );
+ assert_eq!(row_group_offset_indexes[12].len(), 325);
+ } else {
+ unreachable!()
+ };
+ }
+
+ fn check_native_page_index<T: ParquetValueType>(
+ row_group_index: &NativeIndex<T>,
+ page_size: usize,
+ min_max: (&[u8], &[u8]),
+ boundary_order: BoundaryOrder,
+ ) {
+ assert_eq!(row_group_index.indexes.len(), page_size);
+ assert_eq!(row_group_index.boundary_order, boundary_order);
+ row_group_index.indexes.iter().all(|x| {
+ x.min.as_ref().unwrap() >= &from_le_slice::<T>(min_max.0)
+ && x.max.as_ref().unwrap() <= &from_le_slice::<T>(min_max.1)
+ });
+ }
+
+ fn check_bytes_page_index(
+ row_group_index: &ByteArrayIndex,
+ page_size: usize,
+ min_max: (&[u8], &[u8]),
+ boundary_order: BoundaryOrder,
+ ) {
+ assert_eq!(row_group_index.indexes.len(), page_size);
+ assert_eq!(row_group_index.boundary_order, boundary_order);
+ row_group_index.indexes.iter().all(|x| {
+ x.min.as_ref().unwrap().as_slice() >= min_max.0
+ && x.max.as_ref().unwrap().as_slice() <= min_max.1
+ });
+ }
+
+ fn get_row_group_min_max_bytes(
+ r: &RowGroupMetaData,
+ col_num: usize,
+ ) -> (&[u8], &[u8]) {
+ let statistics = r.column(col_num).statistics().unwrap();
+ (statistics.min_bytes(), statistics.max_bytes())
+ }
}