This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 373ac8199 Add page index reader test for all types and support empty 
index. (#2012)
373ac8199 is described below

commit 373ac819937633ea035e303fce7127c0f06fd6c4
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Jul 9 00:05:19 2022 +0800

    Add page index reader test for all types and support empty index. (#2012)
    
    * Add page index reader test and support empty index.
    
    * update parquet-testing commit id
    
    * refine code
    
    * Update parquet/src/file/page_index/index.rs
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * add comment and fix
    
    * Update parquet/src/file/page_index/index.rs
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * use `None` represent lack of index
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
---
 parquet-testing                             |   2 +-
 parquet/src/file/metadata.rs                |  14 +-
 parquet/src/file/page_index/index.rs        |   4 +
 parquet/src/file/page_index/index_reader.rs |  11 +-
 parquet/src/file/serialized_reader.rs       | 236 ++++++++++++++++++++++++++--
 5 files changed, 240 insertions(+), 27 deletions(-)

diff --git a/parquet-testing b/parquet-testing
index 7175a4713..aafd3fc9d 160000
--- a/parquet-testing
+++ b/parquet-testing
@@ -1 +1 @@
-Subproject commit 7175a471339704c7645af0fe66c68305e2e6759c
+Subproject commit aafd3fc9df431c2625a514fb46626e5614f1d199
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 7ec29de01..ad8fe16ad 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -55,8 +55,10 @@ use crate::schema::types::{
 pub struct ParquetMetaData {
     file_metadata: FileMetaData,
     row_groups: Vec<RowGroupMetaData>,
-    page_indexes: Option<Vec<Index>>,
-    offset_indexes: Option<Vec<Vec<PageLocation>>>,
+    /// Page index for all pages in each column chunk
+    page_indexes: Option<Vec<Vec<Index>>>,
+    /// Offset index for all pages in each column chunk
+    offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
 }
 
 impl ParquetMetaData {
@@ -74,8 +76,8 @@ impl ParquetMetaData {
     pub fn new_with_page_index(
         file_metadata: FileMetaData,
         row_groups: Vec<RowGroupMetaData>,
-        page_indexes: Option<Vec<Index>>,
-        offset_indexes: Option<Vec<Vec<PageLocation>>>,
+        page_indexes: Option<Vec<Vec<Index>>>,
+        offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
     ) -> Self {
         ParquetMetaData {
             file_metadata,
@@ -107,12 +109,12 @@ impl ParquetMetaData {
     }
 
     /// Returns page indexes in this file.
-    pub fn page_indexes(&self) -> Option<&Vec<Index>> {
+    pub fn page_indexes(&self) -> Option<&Vec<Vec<Index>>> {
         self.page_indexes.as_ref()
     }
 
     /// Returns offset indexes in this file.
-    pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
+    pub fn offset_indexes(&self) -> Option<&Vec<Vec<Vec<PageLocation>>>> {
         self.offset_indexes.as_ref()
     }
 }
diff --git a/parquet/src/file/page_index/index.rs 
b/parquet/src/file/page_index/index.rs
index e97826c63..5c0a7df84 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -56,6 +56,10 @@ pub enum Index {
     DOUBLE(NativeIndex<f64>),
     BYTE_ARRAY(ByteArrayIndex),
     FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
+    /// Sometimes reading page index from parquet file
+    /// will only return pageLocations without min_max index,
+    /// `None` represents this lack of index information
+    None,
 }
 
 /// An index of a column of [`Type`] physical representation
diff --git a/parquet/src/file/page_index/index_reader.rs 
b/parquet/src/file/page_index/index_reader.rs
index 841448090..616502139 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -101,13 +101,7 @@ fn get_index_offset_and_lengths(
         .iter()
         .map(|x| x.column_index_length())
         .map(|maybe_length| {
-            let index_length = maybe_length.ok_or_else(|| {
-                ParquetError::General(
-                    "The column_index_length must exist if offset_index_offset 
exists"
-                        .to_string(),
-                )
-            })?;
-
+            let index_length = maybe_length.unwrap_or(0);
             Ok(index_length.try_into().unwrap())
         })
         .collect::<Result<Vec<_>, ParquetError>>()?;
@@ -143,6 +137,9 @@ fn deserialize_column_index(
     data: &[u8],
     column_type: Type,
 ) -> Result<Index, ParquetError> {
+    if data.is_empty() {
+        return Ok(Index::None);
+    }
     let mut d = Cursor::new(data);
     let mut prot = TCompactInputProtocol::new(&mut d);
 
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 1dfd1eb45..d3f5d7127 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -248,13 +248,17 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         }
 
         if options.enable_page_index {
-            //Todo for now test data `data_index_bloom_encoding_stats.parquet` 
only have one rowgroup
-            //support multi after create multi-RG test data.
-            let cols = metadata.row_group(0);
-            let columns_indexes =
-                index_reader::read_columns_indexes(&chunk_reader, 
cols.columns())?;
-            let pages_locations =
-                index_reader::read_pages_locations(&chunk_reader, 
cols.columns())?;
+            let mut columns_indexes = vec![];
+            let mut offset_indexes = vec![];
+
+            for rg in &filtered_row_groups {
+                let column_index =
+                    index_reader::read_columns_indexes(&chunk_reader, 
rg.columns())?;
+                let offset_index =
+                    index_reader::read_pages_locations(&chunk_reader, 
rg.columns())?;
+                columns_indexes.push(column_index);
+                offset_indexes.push(offset_index);
+            }
 
             Ok(Self {
                 chunk_reader: Arc::new(chunk_reader),
@@ -262,7 +266,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
                     metadata.file_metadata().clone(),
                     filtered_row_groups,
                     Some(columns_indexes),
-                    Some(pages_locations),
+                    Some(offset_indexes),
                 ),
             })
         } else {
@@ -569,9 +573,11 @@ impl<T: Read + Send> PageReader for 
SerializedPageReader<T> {
 mod tests {
     use super::*;
     use crate::basic::{self, ColumnOrder};
-    use crate::file::page_index::index::Index;
+    use crate::data_type::private::ParquetValueType;
+    use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex};
     use crate::record::RowAccessor;
     use crate::schema::parser::parse_message_type;
+    use crate::util::bit_util::from_le_slice;
     use crate::util::test_common::{get_test_file, get_test_path};
     use parquet_format::BoundaryOrder;
     use std::sync::Arc;
@@ -1085,7 +1091,7 @@ mod tests {
 
         // only one row group
         assert_eq!(page_indexes.len(), 1);
-        let index = if let Index::BYTE_ARRAY(index) = 
page_indexes.get(0).unwrap() {
+        let index = if let Index::BYTE_ARRAY(index) = &page_indexes[0][0] {
             index
         } else {
             unreachable!()
@@ -1097,7 +1103,7 @@ mod tests {
         //only one page group
         assert_eq!(index_in_pages.len(), 1);
 
-        let page0 = index_in_pages.get(0).unwrap();
+        let page0 = &index_in_pages[0];
         let min = page0.min.as_ref().unwrap();
         let max = page0.max.as_ref().unwrap();
         assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap());
@@ -1106,11 +1112,215 @@ mod tests {
         let offset_indexes = metadata.offset_indexes().unwrap();
         // only one row group
         assert_eq!(offset_indexes.len(), 1);
-        let offset_index = offset_indexes.get(0).unwrap();
-        let page_offset = offset_index.get(0).unwrap();
+        let offset_index = &offset_indexes[0];
+        let page_offset = &offset_index[0][0];
 
         assert_eq!(4, page_offset.offset);
         assert_eq!(152, page_offset.compressed_page_size);
         assert_eq!(0, page_offset.first_row_index);
     }
+
+    #[test]
+    fn test_page_index_reader_all_type() {
+        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
+        let builder = ReadOptionsBuilder::new();
+        //enable read page index
+        let options = builder.with_page_index().build();
+        let reader_result = SerializedFileReader::new_with_options(test_file, 
options);
+        let reader = reader_result.unwrap();
+
+        // Test contents in Parquet metadata
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let page_indexes = metadata.page_indexes().unwrap();
+        let row_group_offset_indexes = &metadata.offset_indexes().unwrap()[0];
+
+        // only one row group
+        assert_eq!(page_indexes.len(), 1);
+        let row_group_metadata = metadata.row_group(0);
+
+        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 
ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
+        if let Index::INT32(index) = &page_indexes[0][0] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 0),
+                BoundaryOrder::Unordered,
+            );
+            assert_eq!(row_group_offset_indexes[0].len(), 325);
+        } else {
+            unreachable!()
+        };
+        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 
VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
+        if let Index::BOOLEAN(index) = &page_indexes[0][1] {
+            assert_eq!(index.indexes.len(), 82);
+            assert_eq!(row_group_offset_indexes[1].len(), 82);
+        } else {
+            unreachable!()
+        };
+        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, 
num_nulls: 0]
+        if let Index::INT32(index) = &page_indexes[0][2] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 2),
+                BoundaryOrder::Ascending,
+            );
+            assert_eq!(row_group_offset_indexes[2].len(), 325);
+        } else {
+            unreachable!()
+        };
+        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, 
num_nulls: 0]
+        if let Index::INT32(index) = &page_indexes[0][3] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 3),
+                BoundaryOrder::Ascending,
+            );
+            assert_eq!(row_group_offset_indexes[3].len(), 325);
+        } else {
+            unreachable!()
+        };
+        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, 
num_nulls: 0]
+        if let Index::INT32(index) = &page_indexes[0][4] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 4),
+                BoundaryOrder::Ascending,
+            );
+            assert_eq!(row_group_offset_indexes[4].len(), 325);
+        } else {
+            unreachable!()
+        };
+        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 
SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, 
num_nulls: 0]
+        if let Index::INT64(index) = &page_indexes[0][5] {
+            check_native_page_index(
+                index,
+                528,
+                get_row_group_min_max_bytes(row_group_metadata, 5),
+                BoundaryOrder::Unordered,
+            );
+            assert_eq!(row_group_offset_indexes[5].len(), 528);
+        } else {
+            unreachable!()
+        };
+        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 
SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, 
num_nulls: 0]
+        if let Index::FLOAT(index) = &page_indexes[0][6] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 6),
+                BoundaryOrder::Ascending,
+            );
+            assert_eq!(row_group_offset_indexes[6].len(), 325);
+        } else {
+            unreachable!()
+        };
+        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 
SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 
90.89999999999999, num_nulls: 0]
+        if let Index::DOUBLE(index) = &page_indexes[0][7] {
+            check_native_page_index(
+                index,
+                528,
+                get_row_group_min_max_bytes(row_group_metadata, 7),
+                BoundaryOrder::Unordered,
+            );
+            assert_eq!(row_group_offset_indexes[7].len(), 528);
+        } else {
+            unreachable!()
+        };
+        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 
SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 
12/31/10, num_nulls: 0]
+        if let Index::BYTE_ARRAY(index) = &page_indexes[0][8] {
+            check_bytes_page_index(
+                index,
+                974,
+                get_row_group_min_max_bytes(row_group_metadata, 8),
+                BoundaryOrder::Unordered,
+            );
+            assert_eq!(row_group_offset_indexes[8].len(), 974);
+        } else {
+            unreachable!()
+        };
+        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 
SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, 
num_nulls: 0]
+        if let Index::BYTE_ARRAY(index) = &page_indexes[0][9] {
+            check_bytes_page_index(
+                index,
+                352,
+                get_row_group_min_max_bytes(row_group_metadata, 9),
+                BoundaryOrder::Ascending,
+            );
+            assert_eq!(row_group_offset_indexes[9].len(), 352);
+        } else {
+            unreachable!()
+        };
+        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 
SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, 
min/max not defined]
+        //Notice: min_max values for each page for this col not exits.
+        if let Index::None = &page_indexes[0][10] {
+            assert_eq!(row_group_offset_indexes[10].len(), 974);
+        } else {
+            unreachable!()
+        };
+        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 
VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
+        if let Index::INT32(index) = &page_indexes[0][11] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 11),
+                BoundaryOrder::Ascending,
+            );
+            assert_eq!(row_group_offset_indexes[11].len(), 325);
+        } else {
+            unreachable!()
+        };
+        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 
VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
+        if let Index::INT32(index) = &page_indexes[0][12] {
+            check_native_page_index(
+                index,
+                325,
+                get_row_group_min_max_bytes(row_group_metadata, 12),
+                BoundaryOrder::Unordered,
+            );
+            assert_eq!(row_group_offset_indexes[12].len(), 325);
+        } else {
+            unreachable!()
+        };
+    }
+
+    fn check_native_page_index<T: ParquetValueType>(
+        row_group_index: &NativeIndex<T>,
+        page_size: usize,
+        min_max: (&[u8], &[u8]),
+        boundary_order: BoundaryOrder,
+    ) {
+        assert_eq!(row_group_index.indexes.len(), page_size);
+        assert_eq!(row_group_index.boundary_order, boundary_order);
+        row_group_index.indexes.iter().all(|x| {
+            x.min.as_ref().unwrap() >= &from_le_slice::<T>(min_max.0)
+                && x.max.as_ref().unwrap() <= &from_le_slice::<T>(min_max.1)
+        });
+    }
+
+    fn check_bytes_page_index(
+        row_group_index: &ByteArrayIndex,
+        page_size: usize,
+        min_max: (&[u8], &[u8]),
+        boundary_order: BoundaryOrder,
+    ) {
+        assert_eq!(row_group_index.indexes.len(), page_size);
+        assert_eq!(row_group_index.boundary_order, boundary_order);
+        row_group_index.indexes.iter().all(|x| {
+            x.min.as_ref().unwrap().as_slice() >= min_max.0
+                && x.max.as_ref().unwrap().as_slice() <= min_max.1
+        });
+    }
+
+    fn get_row_group_min_max_bytes(
+        r: &RowGroupMetaData,
+        col_num: usize,
+    ) -> (&[u8], &[u8]) {
+        let statistics = r.column(col_num).statistics().unwrap();
+        (statistics.min_bytes(), statistics.max_bytes())
+    }
 }

Reply via email to