This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch 53.0.0-dev
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/53.0.0-dev by this push:
new 3bc998792 Deprecate read_page_locations() and simplify offset index in
`ParquetMetaData` (#6095)
3bc998792 is described below
commit 3bc998792b19ba20285ca82765991f03c7fa845a
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Jul 23 12:54:10 2024 -0700
Deprecate read_page_locations() and simplify offset index in
`ParquetMetaData` (#6095)
* deprecate read_page_locations
* add to_thrift() to OffsetIndexMetaData
---
parquet/src/arrow/arrow_reader/mod.rs | 4 +-
parquet/src/arrow/arrow_reader/statistics.rs | 14 +++++--
parquet/src/arrow/arrow_writer/mod.rs | 14 +++----
parquet/src/arrow/async_reader/metadata.rs | 8 +---
parquet/src/arrow/async_reader/mod.rs | 24 ++++++------
parquet/src/bin/parquet-index.rs | 12 +++---
parquet/src/file/metadata/memory.rs | 7 ++++
parquet/src/file/metadata/mod.rs | 54 +++++++++++----------------
parquet/src/file/page_index/index_reader.rs | 1 +
parquet/src/file/page_index/offset_index.rs | 9 +++++
parquet/src/file/serialized_reader.rs | 56 ++++++++++++----------------
parquet/src/file/writer.rs | 18 ++++-----
parquet/tests/arrow_writer_layout.rs | 10 +++--
13 files changed, 118 insertions(+), 113 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index e9edf7cb7..22a4e5a90 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -394,7 +394,7 @@ impl ArrowReaderMetadata {
let offset_index = metadata
.row_groups()
.iter()
- .map(|rg| index_reader::read_pages_locations(reader,
rg.columns()))
+ .map(|rg| index_reader::read_offset_indexes(reader,
rg.columns()))
.collect::<Result<Vec<_>>>()?;
metadata.set_offset_index(Some(offset_index))
@@ -689,7 +689,7 @@ impl<T: ChunkReader + 'static> Iterator for
ReaderPageIterator<T> {
// To avoid `i[rg_idx][self.oolumn_idx`] panic, we need to filter out
empty `i[rg_idx]`.
let page_locations = offset_index
.filter(|i| !i[rg_idx].is_empty())
- .map(|i| i[rg_idx][self.column_idx].clone());
+ .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();
diff --git a/parquet/src/arrow/arrow_reader/statistics.rs
b/parquet/src/arrow/arrow_reader/statistics.rs
index d536792b8..6e2a6fafc 100644
--- a/parquet/src/arrow/arrow_reader/statistics.rs
+++ b/parquet/src/arrow/arrow_reader/statistics.rs
@@ -1349,7 +1349,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
- let num_data_pages =
&column_offset_index[*rg_index][parquet_index].len();
+ let num_data_pages = &column_offset_index[*rg_index][parquet_index]
+ .page_locations()
+ .len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
@@ -1378,7 +1380,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
- let num_data_pages =
&column_offset_index[*rg_index][parquet_index].len();
+ let num_data_pages = &column_offset_index[*rg_index][parquet_index]
+ .page_locations()
+ .len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
@@ -1408,7 +1412,9 @@ impl<'a> StatisticsConverter<'a> {
let iter = row_group_indices.into_iter().map(|rg_index| {
let column_page_index_per_row_group_per_column =
&column_page_index[*rg_index][parquet_index];
- let num_data_pages =
&column_offset_index[*rg_index][parquet_index].len();
+ let num_data_pages = &column_offset_index[*rg_index][parquet_index]
+ .page_locations()
+ .len();
(*num_data_pages, column_page_index_per_row_group_per_column)
});
@@ -1450,7 +1456,7 @@ impl<'a> StatisticsConverter<'a> {
let mut row_count_total = Vec::new();
for rg_idx in row_group_indices {
- let page_locations = &column_offset_index[*rg_idx][parquet_index];
+ let page_locations =
&column_offset_index[*rg_idx][parquet_index].page_locations();
let row_count_per_page = page_locations
.windows(2)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 070d74009..8f7b514cc 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1096,7 +1096,7 @@ mod tests {
use crate::data_type::AsBytes;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
- use crate::file::page_index::index_reader::read_pages_locations;
+ use crate::file::page_index::index_reader::read_offset_indexes;
use crate::file::properties::{
BloomFilterPosition, EnabledStatistics, ReaderProperties,
WriterVersion,
};
@@ -1669,16 +1669,16 @@ mod tests {
"Expected a dictionary page"
);
- let page_locations = read_pages_locations(&file, column).unwrap();
+ let offset_indexes = read_offset_indexes(&file, column).unwrap();
- let offset_index = page_locations[0].clone();
+ let page_locations = offset_indexes[0].page_locations.clone();
// We should fallback to PLAIN encoding after the first row and our
max page size is 1 bytes
// so we expect one dictionary encoded page and then a page per row
thereafter.
assert_eq!(
- offset_index.len(),
+ page_locations.len(),
10,
- "Expected 9 pages but got {offset_index:#?}"
+ "Expected 9 pages but got {page_locations:#?}"
);
}
@@ -3020,8 +3020,8 @@ mod tests {
assert_eq!(index.len(), 1);
assert_eq!(index[0].len(), 2); // 2 columns
- assert_eq!(index[0][0].len(), 1); // 1 page
- assert_eq!(index[0][1].len(), 1); // 1 page
+ assert_eq!(index[0][0].page_locations().len(), 1); // 1 page
+ assert_eq!(index[0][1].page_locations().len(), 1); // 1 page
}
#[test]
diff --git a/parquet/src/arrow/async_reader/metadata.rs
b/parquet/src/arrow/async_reader/metadata.rs
index 4a3489a20..9224ea3f6 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -20,9 +20,7 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
-use crate::file::page_index::index_reader::{
- acc_range, decode_column_index, decode_page_locations,
-};
+use crate::file::page_index::index_reader::{acc_range, decode_column_index,
decode_offset_index};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
@@ -179,9 +177,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
- Some(r) => {
- decode_page_locations(&data[r.start -
offset..r.end - offset])
- }
+ Some(r) => decode_offset_index(&data[r.start -
offset..r.end - offset]),
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 5a790fa6a..5695dbc10 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -106,9 +106,10 @@ use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::file::FOOTER_SIZE;
-use crate::format::{BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHash, PageLocation};
+use crate::format::{BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHash};
mod metadata;
pub use metadata::*;
@@ -489,7 +490,7 @@ where
// TODO: calling build_array multiple times is wasteful
let meta = self.metadata.row_group(row_group_idx);
- let page_locations = self
+ let offset_index = self
.metadata
.offset_index()
.map(|x| x[row_group_idx].as_slice());
@@ -499,7 +500,7 @@ where
// schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
- page_locations,
+ offset_index,
};
if let Some(filter) = self.filter.as_mut() {
@@ -703,7 +704,7 @@ where
/// An in-memory collection of column chunks
struct InMemoryRowGroup<'a> {
metadata: &'a RowGroupMetaData,
- page_locations: Option<&'a [Vec<PageLocation>]>,
+ offset_index: Option<&'a [OffsetIndexMetaData]>,
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}
@@ -716,7 +717,7 @@ impl<'a> InMemoryRowGroup<'a> {
projection: &ProjectionMask,
selection: Option<&RowSelection>,
) -> Result<()> {
- if let Some((selection, page_locations)) =
selection.zip(self.page_locations) {
+ if let Some((selection, offset_index)) =
selection.zip(self.offset_index) {
// If we have a `RowSelection` and an `OffsetIndex` then only
fetch pages required for the
// `RowSelection`
let mut page_start_offsets: Vec<Vec<usize>> = vec![];
@@ -734,14 +735,14 @@ impl<'a> InMemoryRowGroup<'a> {
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
- match page_locations[idx].first() {
+ match offset_index[idx].page_locations.first() {
Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}
- ranges.extend(selection.scan_ranges(&page_locations[idx]));
+
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
page_start_offsets.push(ranges.iter().map(|range|
range.start).collect());
ranges
@@ -812,7 +813,9 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> {
"Invalid column index {i}, column was not fetched"
))),
Some(data) => {
- let page_locations = self.page_locations.map(|index|
index[i].clone());
+ let page_locations = self
+ .offset_index
+ .map(|index| index[i].page_locations.clone());
let page_reader: Box<dyn PageReader> =
Box::new(SerializedPageReader::new(
data.clone(),
self.metadata.column(i),
@@ -1529,7 +1532,7 @@ mod tests {
let metadata = parse_metadata(&data).unwrap();
let offset_index =
- index_reader::read_pages_locations(&data,
metadata.row_group(0).columns())
+ index_reader::read_offset_indexes(&data,
metadata.row_group(0).columns())
.expect("reading offset index");
let row_group_meta = metadata.row_group(0).clone();
@@ -1538,7 +1541,6 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
- None,
);
let metadata = Arc::new(metadata);
@@ -1575,7 +1577,7 @@ mod tests {
};
let mut skip = true;
- let mut pages = offset_index[0].iter().peekable();
+ let mut pages = offset_index[0].page_locations.iter().peekable();
// Setup `RowSelection` so that we can skip every other page,
selecting the last page
let mut selectors = vec![];
diff --git a/parquet/src/bin/parquet-index.rs b/parquet/src/bin/parquet-index.rs
index 86e08b6da..1a9b74dd7 100644
--- a/parquet/src/bin/parquet-index.rs
+++ b/parquet/src/bin/parquet-index.rs
@@ -37,6 +37,7 @@
use clap::Parser;
use parquet::errors::{ParquetError, Result};
use parquet::file::page_index::index::{Index, PageIndex};
+use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::ReadOptionsBuilder;
use parquet::format::PageLocation;
@@ -93,7 +94,8 @@ impl Args {
))
})?;
- let row_counts = compute_row_counts(offset_index,
row_group.num_rows());
+ let row_counts =
+ compute_row_counts(offset_index.page_locations.as_slice(),
row_group.num_rows());
match &column_indices[column_idx] {
Index::NONE => println!("NO INDEX"),
Index::BOOLEAN(v) => print_index(&v.indexes, offset_index,
&row_counts)?,
@@ -131,20 +133,20 @@ fn compute_row_counts(offset_index: &[PageLocation],
rows: i64) -> Vec<i64> {
/// Prints index information for a single column chunk
fn print_index<T: std::fmt::Display>(
column_index: &[PageIndex<T>],
- offset_index: &[PageLocation],
+ offset_index: &OffsetIndexMetaData,
row_counts: &[i64],
) -> Result<()> {
- if column_index.len() != offset_index.len() {
+ if column_index.len() != offset_index.page_locations.len() {
return Err(ParquetError::General(format!(
"Index length mismatch, got {} and {}",
column_index.len(),
- offset_index.len()
+ offset_index.page_locations.len()
)));
}
for (idx, ((c, o), row_count)) in column_index
.iter()
- .zip(offset_index)
+ .zip(offset_index.page_locations())
.zip(row_counts)
.enumerate()
{
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index a1b40a8de..a7d3d4ab8 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -23,6 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue,
RowGroupMetaData};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
use std::sync::Arc;
@@ -144,6 +145,12 @@ impl HeapSize for Statistics {
}
}
+impl HeapSize for OffsetIndexMetaData {
+ fn heap_size(&self) -> usize {
+ self.page_locations.heap_size() +
self.unencoded_byte_array_data_bytes.heap_size()
+ }
+}
+
impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index d86b1ce57..9a3ca6058 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -44,6 +44,7 @@ use crate::errors::{ParquetError, Result};
pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::page_encoding_stats::{self, PageEncodingStats};
use crate::file::page_index::index::Index;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::statistics::{self, Statistics};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr,
SchemaDescriptor,
@@ -56,20 +57,19 @@ use crate::schema::types::{
/// [`Index`] corresponding to column `column_number` of row group
/// `row_group_number`.
///
-/// For example `column_index[2][3]` holds the [`Index`] for the forth
+/// For example `column_index[2][3]` holds the [`Index`] for the fourth
/// column in the third row group of the parquet file.
pub type ParquetColumnIndex = Vec<Vec<Index>>;
-/// [`PageLocation`] for each data page of each row group of each column.
+/// [`OffsetIndexMetaData`] for each row group of each column.
///
-/// `offset_index[row_group_number][column_number][page_number]` holds
-/// the [`PageLocation`] corresponding to page `page_number` of column
+/// `offset_index[row_group_number][column_number]` holds
+/// the [`OffsetIndexMetaData`] corresponding to column
/// `column_number`of row group `row_group_number`.
///
-/// For example `offset_index[2][3][4]` holds the [`PageLocation`] for
-/// the fifth page of the forth column in the third row group of the
-/// parquet file.
-pub type ParquetOffsetIndex = Vec<Vec<Vec<PageLocation>>>;
+/// For example `offset_index[2][3]` holds the [`OffsetIndexMetaData`] for
+/// the fourth column in the third row group of the parquet file.
+pub type ParquetOffsetIndex = Vec<Vec<OffsetIndexMetaData>>;
/// Parsed metadata for a single Parquet file
///
@@ -94,10 +94,8 @@ pub struct ParquetMetaData {
row_groups: Vec<RowGroupMetaData>,
/// Page level index for each page in each column chunk
column_index: Option<ParquetColumnIndex>,
- /// Offset index for all each page in each column chunk
+ /// Offset index for each page in each column chunk
offset_index: Option<ParquetOffsetIndex>,
- /// `unencoded_byte_array_data_bytes` from the offset index
- unencoded_byte_array_data_bytes: Option<Vec<Vec<Option<Vec<i64>>>>>,
}
impl ParquetMetaData {
@@ -109,7 +107,6 @@ impl ParquetMetaData {
row_groups,
column_index: None,
offset_index: None,
- unencoded_byte_array_data_bytes: None,
}
}
@@ -120,14 +117,12 @@ impl ParquetMetaData {
row_groups: Vec<RowGroupMetaData>,
column_index: Option<ParquetColumnIndex>,
offset_index: Option<ParquetOffsetIndex>,
- unencoded_byte_array_data_bytes: Option<Vec<Vec<Option<Vec<i64>>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
row_groups,
column_index,
offset_index,
- unencoded_byte_array_data_bytes,
}
}
@@ -184,19 +179,6 @@ impl ParquetMetaData {
self.offset_index.as_ref()
}
- /// Returns `unencoded_byte_array_data_bytes` from the offset indexes in
this file, if loaded
- ///
- /// This value represents the output size of the total bytes in this file,
which can be useful for
- /// allocating an appropriately sized output buffer.
- ///
- /// Returns `None` if the parquet file does not have a `OffsetIndex` or
- /// [ArrowReaderOptions::with_page_index] was set to false.
- ///
- /// [ArrowReaderOptions::with_page_index]:
https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index
- pub fn unencoded_byte_array_data_bytes(&self) ->
Option<&Vec<Vec<Option<Vec<i64>>>>> {
- self.unencoded_byte_array_data_bytes.as_ref()
- }
-
/// Estimate of the bytes allocated to store `ParquetMetadata`
///
/// # Notes:
@@ -217,7 +199,6 @@ impl ParquetMetaData {
+ self.row_groups.heap_size()
+ self.column_index.heap_size()
+ self.offset_index.heap_size()
- + self.unencoded_byte_array_data_bytes.heap_size()
}
/// Override the column index
@@ -1364,7 +1345,7 @@ mod tests {
column_orders,
);
let parquet_meta = ParquetMetaData::new(file_metadata.clone(),
row_group_meta.clone());
- let base_expected_size = 1376;
+ let base_expected_size = 1352;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
let mut column_index = ColumnIndexBuilder::new();
@@ -1373,18 +1354,25 @@ mod tests {
let native_index = NativeIndex::<bool>::try_new(column_index).unwrap();
// Now, add in OffsetIndex
+ let mut offset_index = OffsetIndexBuilder::new();
+ offset_index.append_row_count(1);
+ offset_index.append_offset_and_size(2, 3);
+ offset_index.append_unencoded_byte_array_data_bytes(Some(10));
+ offset_index.append_row_count(1);
+ offset_index.append_offset_and_size(2, 3);
+ offset_index.append_unencoded_byte_array_data_bytes(Some(10));
+ let offset_index = offset_index.build_to_thrift();
+
let parquet_meta = ParquetMetaData::new_with_page_index(
file_metadata,
row_group_meta,
Some(vec![vec![Index::BOOLEAN(native_index)]]),
Some(vec![vec![
- vec![PageLocation::new(1, 2, 3)],
- vec![PageLocation::new(1, 2, 3)],
+ OffsetIndexMetaData::try_new(offset_index).unwrap()
]]),
- Some(vec![vec![Some(vec![10, 20, 30])]]),
);
- let bigger_expected_size = 2464;
+ let bigger_expected_size = 2400;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index 7959cb95c..395e9afe1 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -85,6 +85,7 @@ pub fn read_columns_indexes<R: ChunkReader>(
/// See [Page Index Documentation] for more details.
///
/// [Page Index Documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+#[deprecated(since = "53.0.0", note = "Use read_offset_indexes")]
pub fn read_pages_locations<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
diff --git a/parquet/src/file/page_index/offset_index.rs
b/parquet/src/file/page_index/offset_index.rs
index 9138620e3..2ae346414 100644
--- a/parquet/src/file/page_index/offset_index.rs
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -47,4 +47,13 @@ impl OffsetIndexMetaData {
pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec<i64>> {
self.unencoded_byte_array_data_bytes.as_ref()
}
+
+ // TODO: remove annotation after merge
+ #[allow(dead_code)]
+ pub(crate) fn to_thrift(&self) -> OffsetIndex {
+ OffsetIndex::new(
+ self.page_locations.clone(),
+ self.unencoded_byte_array_data_bytes.clone(),
+ )
+ }
}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 65b6ebf2e..70aea6fd5 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -28,6 +28,7 @@ use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::index_reader;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
footer,
metadata::*,
@@ -211,22 +212,12 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];
- let mut unenc_byte_sizes = vec![];
for rg in &mut filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index =
index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
-
- // split offset_index into two vectors to not break API
- let mut page_locations = vec![];
- let mut unenc_bytes = vec![];
- offset_index.into_iter().for_each(|index| {
- page_locations.push(index.page_locations);
- unenc_bytes.push(index.unencoded_byte_array_data_bytes);
- });
columns_indexes.push(column_index);
- offset_indexes.push(page_locations);
- unenc_byte_sizes.push(unenc_bytes);
+ offset_indexes.push(offset_index);
}
Ok(Self {
@@ -236,7 +227,6 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
filtered_row_groups,
Some(columns_indexes),
Some(offset_indexes),
- Some(unenc_byte_sizes),
)),
props: Arc::new(options.props),
})
@@ -296,7 +286,7 @@ impl<R: 'static + ChunkReader> FileReader for
SerializedFileReader<R> {
pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
- page_locations: Option<&'a [Vec<PageLocation>]>,
+ offset_index: Option<&'a [OffsetIndexMetaData]>,
props: ReaderPropertiesPtr,
bloom_filters: Vec<Option<Sbbf>>,
}
@@ -306,7 +296,7 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
pub fn new(
chunk_reader: Arc<R>,
metadata: &'a RowGroupMetaData,
- page_locations: Option<&'a [Vec<PageLocation>]>,
+ offset_index: Option<&'a [OffsetIndexMetaData]>,
props: ReaderPropertiesPtr,
) -> Result<Self> {
let bloom_filters = if props.read_bloom_filter() {
@@ -321,7 +311,7 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
Ok(Self {
chunk_reader,
metadata,
- page_locations,
+ offset_index,
props,
bloom_filters,
})
@@ -341,7 +331,7 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for
SerializedRowGroupReader<'
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
- let page_locations = self.page_locations.map(|x| x[i].clone());
+ let page_locations = self.offset_index.map(|x|
x[i].page_locations.clone());
let props = Arc::clone(&self.props);
Ok(Box::new(SerializedPageReader::new_with_properties(
@@ -787,7 +777,7 @@ mod tests {
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, FixedLenByteArrayType};
use crate::file::page_index::index::{Index, NativeIndex};
- use crate::file::page_index::index_reader::{read_columns_indexes,
read_pages_locations};
+ use crate::file::page_index::index_reader::{read_columns_indexes,
read_offset_indexes};
use crate::file::writer::SerializedFileWriter;
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
@@ -1325,7 +1315,7 @@ mod tests {
// only one row group
assert_eq!(offset_indexes.len(), 1);
let offset_index = &offset_indexes[0];
- let page_offset = &offset_index[0][0];
+ let page_offset = &offset_index[0].page_locations()[0];
assert_eq!(4, page_offset.offset);
assert_eq!(152, page_offset.compressed_page_size);
@@ -1348,8 +1338,8 @@ mod tests {
b.reverse();
assert_eq!(a, b);
- let a = read_pages_locations(&test_file, columns).unwrap();
- let mut b = read_pages_locations(&test_file, &reversed).unwrap();
+ let a = read_offset_indexes(&test_file, columns).unwrap();
+ let mut b = read_offset_indexes(&test_file, &reversed).unwrap();
b.reverse();
assert_eq!(a, b);
}
@@ -1386,7 +1376,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 0),
BoundaryOrder::UNORDERED,
);
- assert_eq!(row_group_offset_indexes[0].len(), 325);
+ assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
} else {
unreachable!()
};
@@ -1394,7 +1384,7 @@ mod tests {
assert!(&column_index[0][1].is_sorted());
if let Index::BOOLEAN(index) = &column_index[0][1] {
assert_eq!(index.indexes.len(), 82);
- assert_eq!(row_group_offset_indexes[1].len(), 82);
+ assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
} else {
unreachable!()
};
@@ -1407,7 +1397,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 2),
BoundaryOrder::ASCENDING,
);
- assert_eq!(row_group_offset_indexes[2].len(), 325);
+ assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
} else {
unreachable!()
};
@@ -1420,7 +1410,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 3),
BoundaryOrder::ASCENDING,
);
- assert_eq!(row_group_offset_indexes[3].len(), 325);
+ assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
} else {
unreachable!()
};
@@ -1433,7 +1423,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 4),
BoundaryOrder::ASCENDING,
);
- assert_eq!(row_group_offset_indexes[4].len(), 325);
+ assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
} else {
unreachable!()
};
@@ -1446,7 +1436,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 5),
BoundaryOrder::UNORDERED,
);
- assert_eq!(row_group_offset_indexes[5].len(), 528);
+ assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
} else {
unreachable!()
};
@@ -1459,7 +1449,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 6),
BoundaryOrder::ASCENDING,
);
- assert_eq!(row_group_offset_indexes[6].len(), 325);
+ assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
} else {
unreachable!()
};
@@ -1472,7 +1462,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 7),
BoundaryOrder::UNORDERED,
);
- assert_eq!(row_group_offset_indexes[7].len(), 528);
+ assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
} else {
unreachable!()
};
@@ -1485,7 +1475,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 8),
BoundaryOrder::UNORDERED,
);
- assert_eq!(row_group_offset_indexes[8].len(), 974);
+ assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
} else {
unreachable!()
};
@@ -1498,7 +1488,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 9),
BoundaryOrder::ASCENDING,
);
- assert_eq!(row_group_offset_indexes[9].len(), 352);
+ assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
} else {
unreachable!()
};
@@ -1506,7 +1496,7 @@ mod tests {
//Notice: min_max values for each page for this col not exits.
assert!(!&column_index[0][10].is_sorted());
if let Index::NONE = &column_index[0][10] {
- assert_eq!(row_group_offset_indexes[10].len(), 974);
+ assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
} else {
unreachable!()
};
@@ -1519,7 +1509,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 11),
BoundaryOrder::ASCENDING,
);
- assert_eq!(row_group_offset_indexes[11].len(), 325);
+ assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
} else {
unreachable!()
};
@@ -1532,7 +1522,7 @@ mod tests {
get_row_group_min_max_bytes(row_group_metadata, 12),
BoundaryOrder::UNORDERED,
);
- assert_eq!(row_group_offset_indexes[12].len(), 325);
+ assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
} else {
unreachable!()
};
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index b109a2da8..f7dde59f4 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1920,15 +1920,15 @@ mod tests {
column.unencoded_byte_array_data_bytes().unwrap()
);
- assert!(reader
- .metadata()
- .unencoded_byte_array_data_bytes()
- .is_some());
- let unenc_sizes =
reader.metadata().unencoded_byte_array_data_bytes().unwrap();
- assert_eq!(unenc_sizes.len(), 1);
- assert_eq!(unenc_sizes[0].len(), 1);
- assert!(unenc_sizes[0][0].is_some());
- let page_sizes = unenc_sizes[0][0].as_ref().unwrap();
+ assert!(reader.metadata().offset_index().is_some());
+ let offset_index = reader.metadata().offset_index().unwrap();
+ assert_eq!(offset_index.len(), 1);
+ assert_eq!(offset_index[0].len(), 1);
+ assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_some());
+ let page_sizes = offset_index[0][0]
+ .unencoded_byte_array_data_bytes
+ .as_ref()
+ .unwrap();
assert_eq!(page_sizes.len(), 1);
assert_eq!(page_sizes[0], unenc_size);
}
diff --git a/parquet/tests/arrow_writer_layout.rs
b/parquet/tests/arrow_writer_layout.rs
index cd124031c..3e0f6ce3a 100644
--- a/parquet/tests/arrow_writer_layout.rs
+++ b/parquet/tests/arrow_writer_layout.rs
@@ -89,12 +89,15 @@ fn assert_layout(file_reader: &Bytes, meta:
&ParquetMetaData, layout: &Layout) {
for (column_index, column_layout) in
offset_index.iter().zip(&row_group_layout.columns) {
assert_eq!(
- column_index.len(),
+ column_index.page_locations.len(),
column_layout.pages.len(),
"index page count mismatch"
);
- for (idx, (page, page_layout)) in
- column_index.iter().zip(&column_layout.pages).enumerate()
+ for (idx, (page, page_layout)) in column_index
+ .page_locations
+ .iter()
+ .zip(&column_layout.pages)
+ .enumerate()
{
assert_eq!(
page.compressed_page_size as usize,
@@ -102,6 +105,7 @@ fn assert_layout(file_reader: &Bytes, meta:
&ParquetMetaData, layout: &Layout) {
"index page {idx} size mismatch"
);
let next_first_row_index = column_index
+ .page_locations
.get(idx + 1)
.map(|x| x.first_row_index)
.unwrap_or_else(|| row_group.num_rows());