Copilot commented on code in PR #9937: URL: https://github.com/apache/arrow-rs/pull/9937#discussion_r3198907619
########## parquet/src/file/reverse_serialized_reader.rs: ########## @@ -0,0 +1,1034 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reverse page-order [`PageReader`] backed by [`OffsetIndexMetaData`]. +//! +//! Tracking issue: <https://github.com/apache/arrow-rs/issues/9934>. +//! +//! This reader emits pages from a Parquet column chunk in **reverse page order**: +//! the dictionary page (if any) is emitted first because data pages depend on it, +//! followed by data pages from the last `PageLocation` to the first. +//! +//! Pages are still **decoded in their native forward direction** — only the page +//! traversal order is reversed. Reversing rows *within* a page is impossible +//! because Parquet's RLE / bit-packing / delta / dictionary encodings are +//! forward streams; that responsibility belongs to a higher layer. +//! +//! ## Limitations (Phase 1 POC) +//! +//! * Encryption is not supported. +//! * `peek_next_page` does not populate `num_rows`. +//! * Requires the column chunk to have an `OffsetIndex` (i.e. page index). + +use std::sync::Arc; + +use crate::basic::Type; +use crate::column::page::{Page, PageMetadata, PageReader}; +use crate::compression::{Codec, create_codec}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ColumnChunkMetaData; +use crate::file::metadata::thrift::PageHeader; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; +use crate::file::properties::{ReaderProperties, ReaderPropertiesPtr}; +use crate::file::reader::ChunkReader; +use crate::file::serialized_reader::decode_page; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; + +/// A [`PageReader`] that emits pages in reverse order using `OffsetIndex`. +/// +/// See the module-level documentation for details. +pub struct ReverseSerializedPageReader<R: ChunkReader> { + reader: Arc<R>, + decompressor: Option<Box<dyn Codec>>, + physical_type: Type, + state: ReverseState, + read_stats: bool, +} + +enum ReverseState { + /// Initial state. The dictionary page (if any) is emitted on the next call, + /// then the state transitions to [`ReverseState::Data`]. + NeedDict { + dictionary_page: Option<PageLocation>, + page_locations: Vec<PageLocation>, + }, + /// Iterate `page_locations[..cursor]` from the back: emit + /// `page_locations[cursor - 1]` and decrement `cursor`. + Data { + page_locations: Vec<PageLocation>, + cursor: usize, + }, + Exhausted, +} + +impl<R: ChunkReader> ReverseSerializedPageReader<R> { + /// Create a new [`ReverseSerializedPageReader`] with default + /// [`ReaderProperties`]. + pub fn new( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + ) -> Result<Self> { + let props = Arc::new(ReaderProperties::builder().build()); + Self::new_with_properties(reader, meta, offset_index, props) + } + + /// Create a new [`ReverseSerializedPageReader`] with explicit + /// [`ReaderProperties`]. + pub fn new_with_properties( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + props: ReaderPropertiesPtr, + ) -> Result<Self> { + let decompressor = create_codec(meta.compression(), props.codec_options())?; + let (chunk_start, _chunk_len) = meta.byte_range(); + let locations = offset_index.page_locations().clone(); + + // If the first data page does not start at the column chunk's start, a + // dictionary page sits in front. Synthesize a `PageLocation` for it + // (mirrors `SerializedPageReader::new_with_properties`). + let dictionary_page = match locations.first() { + Some(first) if (first.offset as u64) != chunk_start => Some(PageLocation { + offset: chunk_start as i64, + compressed_page_size: (first.offset as u64 - chunk_start) as i32, + first_row_index: 0, + }), + _ => None, + }; + + Ok(Self { + reader, + decompressor, + physical_type: meta.column_type(), + state: ReverseState::NeedDict { + dictionary_page, + page_locations: locations, + }, + read_stats: props.read_page_stats(), + }) + } + + fn read_page_at(&mut self, loc: &PageLocation) -> Result<Page> { + let page_len = usize::try_from(loc.compressed_page_size) + .map_err(|e| ParquetError::General(format!("invalid compressed_page_size: {e}")))?; + let buffer = self.reader.get_bytes(loc.offset as u64, page_len)?; + + let mut prot = ThriftSliceInputProtocol::new(buffer.as_ref()); + let header = if self.read_stats { + PageHeader::read_thrift(&mut prot)? + } else { + PageHeader::read_thrift_without_stats(&mut prot)? + }; + let header_len = buffer.len() - prot.as_slice().len(); + let payload = buffer.slice(header_len..); + + decode_page( + header, + payload, + self.physical_type, + self.decompressor.as_mut(), + ) + } +} + +impl<R: ChunkReader> Iterator for ReverseSerializedPageReader<R> { + type Item = Result<Page>; + + fn next(&mut self) -> Option<Self::Item> { + self.get_next_page().transpose() + } +} + +impl<R: ChunkReader> PageReader for ReverseSerializedPageReader<R> { + fn get_next_page(&mut self) -> Result<Option<Page>> { + loop { + match std::mem::replace(&mut self.state, ReverseState::Exhausted) { + ReverseState::NeedDict { + dictionary_page, + page_locations, + } => { + let cursor = page_locations.len(); + self.state = ReverseState::Data { + page_locations, + cursor, + }; + if let Some(loc) = dictionary_page { + return self.read_page_at(&loc).map(Some); + } + // No dictionary; fall through to read the first reverse data page. + } + ReverseState::Data { + page_locations, + cursor, + } => { + if cursor == 0 { + return Ok(None); + } + let loc = page_locations[cursor - 1].clone(); + self.state = ReverseState::Data { + page_locations, + cursor: cursor - 1, + }; + return self.read_page_at(&loc).map(Some); + } + ReverseState::Exhausted => return Ok(None), Review Comment: Using `std::mem::replace(..., Exhausted)` forces moving `page_locations` in and out of the enum on every call, and then re-assigning `self.state`. This makes the state machine harder to reason about (and contributed to the “advance state before read” bug). Consider switching to `match &mut self.state` and mutating the `cursor` in place (or storing `page_locations` once on the struct and tracking only indices in the state), which simplifies control flow and avoids repeated moves/clones. ########## parquet/src/file/reverse_serialized_reader.rs: ########## @@ -0,0 +1,1034 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reverse page-order [`PageReader`] backed by [`OffsetIndexMetaData`]. +//! +//! Tracking issue: <https://github.com/apache/arrow-rs/issues/9934>. +//! +//! This reader emits pages from a Parquet column chunk in **reverse page order**: +//! the dictionary page (if any) is emitted first because data pages depend on it, +//! followed by data pages from the last `PageLocation` to the first. +//! +//! Pages are still **decoded in their native forward direction** — only the page +//! traversal order is reversed. Reversing rows *within* a page is impossible +//! because Parquet's RLE / bit-packing / delta / dictionary encodings are +//! forward streams; that responsibility belongs to a higher layer. +//! +//! ## Limitations (Phase 1 POC) +//! +//! * Encryption is not supported. +//! * `peek_next_page` does not populate `num_rows`. +//! * Requires the column chunk to have an `OffsetIndex` (i.e. page index). + +use std::sync::Arc; + +use crate::basic::Type; +use crate::column::page::{Page, PageMetadata, PageReader}; +use crate::compression::{Codec, create_codec}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ColumnChunkMetaData; +use crate::file::metadata::thrift::PageHeader; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; +use crate::file::properties::{ReaderProperties, ReaderPropertiesPtr}; +use crate::file::reader::ChunkReader; +use crate::file::serialized_reader::decode_page; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; + +/// A [`PageReader`] that emits pages in reverse order using `OffsetIndex`. +/// +/// See the module-level documentation for details. +pub struct ReverseSerializedPageReader<R: ChunkReader> { + reader: Arc<R>, + decompressor: Option<Box<dyn Codec>>, + physical_type: Type, + state: ReverseState, + read_stats: bool, +} + +enum ReverseState { + /// Initial state. The dictionary page (if any) is emitted on the next call, + /// then the state transitions to [`ReverseState::Data`]. + NeedDict { + dictionary_page: Option<PageLocation>, + page_locations: Vec<PageLocation>, + }, + /// Iterate `page_locations[..cursor]` from the back: emit + /// `page_locations[cursor - 1]` and decrement `cursor`. + Data { + page_locations: Vec<PageLocation>, + cursor: usize, + }, + Exhausted, +} + +impl<R: ChunkReader> ReverseSerializedPageReader<R> { + /// Create a new [`ReverseSerializedPageReader`] with default + /// [`ReaderProperties`]. + pub fn new( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + ) -> Result<Self> { + let props = Arc::new(ReaderProperties::builder().build()); + Self::new_with_properties(reader, meta, offset_index, props) + } + + /// Create a new [`ReverseSerializedPageReader`] with explicit + /// [`ReaderProperties`]. + pub fn new_with_properties( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + props: ReaderPropertiesPtr, + ) -> Result<Self> { + let decompressor = create_codec(meta.compression(), props.codec_options())?; + let (chunk_start, _chunk_len) = meta.byte_range(); + let locations = offset_index.page_locations().clone(); + + // If the first data page does not start at the column chunk's start, a + // dictionary page sits in front. Synthesize a `PageLocation` for it + // (mirrors `SerializedPageReader::new_with_properties`). + let dictionary_page = match locations.first() { + Some(first) if (first.offset as u64) != chunk_start => Some(PageLocation { + offset: chunk_start as i64, + compressed_page_size: (first.offset as u64 - chunk_start) as i32, + first_row_index: 0, + }), + _ => None, + }; + + Ok(Self { + reader, + decompressor, + physical_type: meta.column_type(), + state: ReverseState::NeedDict { + dictionary_page, + page_locations: locations, + }, + read_stats: props.read_page_stats(), + }) + } + + fn read_page_at(&mut self, loc: &PageLocation) -> Result<Page> { + let page_len = usize::try_from(loc.compressed_page_size) + .map_err(|e| ParquetError::General(format!("invalid compressed_page_size: {e}")))?; + let buffer = self.reader.get_bytes(loc.offset as u64, page_len)?; + + let mut prot = ThriftSliceInputProtocol::new(buffer.as_ref()); + let header = if self.read_stats { + PageHeader::read_thrift(&mut prot)? + } else { + PageHeader::read_thrift_without_stats(&mut prot)? + }; + let header_len = buffer.len() - prot.as_slice().len(); + let payload = buffer.slice(header_len..); + + decode_page( + header, + payload, + self.physical_type, + self.decompressor.as_mut(), + ) + } +} + +impl<R: ChunkReader> Iterator for ReverseSerializedPageReader<R> { + type Item = Result<Page>; + + fn next(&mut self) -> Option<Self::Item> { + self.get_next_page().transpose() + } +} + +impl<R: ChunkReader> PageReader for ReverseSerializedPageReader<R> { + fn get_next_page(&mut self) -> Result<Option<Page>> { + loop { + match std::mem::replace(&mut self.state, ReverseState::Exhausted) { + ReverseState::NeedDict { + dictionary_page, + page_locations, + } => { + let cursor = page_locations.len(); + self.state = ReverseState::Data { + page_locations, + cursor, + }; + if let Some(loc) = dictionary_page { + return self.read_page_at(&loc).map(Some); + } + // No dictionary; fall through to read the first reverse data page. + } + ReverseState::Data { + page_locations, + cursor, + } => { + if cursor == 0 { + return Ok(None); + } + let loc = page_locations[cursor - 1].clone(); + self.state = ReverseState::Data { + page_locations, + cursor: cursor - 1, + }; + return self.read_page_at(&loc).map(Some); + } + ReverseState::Exhausted => return Ok(None), + } + } + } + + fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { + match &self.state { + ReverseState::NeedDict { + dictionary_page, .. + } => Ok(Some(PageMetadata { + num_rows: None, + num_levels: None, + is_dict: dictionary_page.is_some(), + })), Review Comment: `peek_next_page` returns `Some(PageMetadata { .. })` in `NeedDict` unconditionally, even when there is no dictionary page and the offset index contains zero data pages (empty `page_locations`). In that case, the next `get_next_page` will return `Ok(None)` but `peek_next_page` misleadingly claims a page exists. Consider matching `page_locations` as well and returning `Ok(None)` when `dictionary_page.is_none()` and `page_locations.is_empty()`. ########## parquet/src/file/reverse_serialized_reader.rs: ########## @@ -0,0 +1,1034 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reverse page-order [`PageReader`] backed by [`OffsetIndexMetaData`]. +//! +//! Tracking issue: <https://github.com/apache/arrow-rs/issues/9934>. +//! +//! This reader emits pages from a Parquet column chunk in **reverse page order**: +//! the dictionary page (if any) is emitted first because data pages depend on it, +//! followed by data pages from the last `PageLocation` to the first. +//! +//! Pages are still **decoded in their native forward direction** — only the page +//! traversal order is reversed. Reversing rows *within* a page is impossible +//! because Parquet's RLE / bit-packing / delta / dictionary encodings are +//! forward streams; that responsibility belongs to a higher layer. +//! +//! ## Limitations (Phase 1 POC) +//! +//! * Encryption is not supported. +//! * `peek_next_page` does not populate `num_rows`. +//! * Requires the column chunk to have an `OffsetIndex` (i.e. page index). + +use std::sync::Arc; + +use crate::basic::Type; +use crate::column::page::{Page, PageMetadata, PageReader}; +use crate::compression::{Codec, create_codec}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ColumnChunkMetaData; +use crate::file::metadata::thrift::PageHeader; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; +use crate::file::properties::{ReaderProperties, ReaderPropertiesPtr}; +use crate::file::reader::ChunkReader; +use crate::file::serialized_reader::decode_page; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; + +/// A [`PageReader`] that emits pages in reverse order using `OffsetIndex`. +/// +/// See the module-level documentation for details. +pub struct ReverseSerializedPageReader<R: ChunkReader> { + reader: Arc<R>, + decompressor: Option<Box<dyn Codec>>, + physical_type: Type, + state: ReverseState, + read_stats: bool, +} + +enum ReverseState { + /// Initial state. The dictionary page (if any) is emitted on the next call, + /// then the state transitions to [`ReverseState::Data`]. + NeedDict { + dictionary_page: Option<PageLocation>, + page_locations: Vec<PageLocation>, + }, + /// Iterate `page_locations[..cursor]` from the back: emit + /// `page_locations[cursor - 1]` and decrement `cursor`. + Data { + page_locations: Vec<PageLocation>, + cursor: usize, + }, + Exhausted, +} + +impl<R: ChunkReader> ReverseSerializedPageReader<R> { + /// Create a new [`ReverseSerializedPageReader`] with default + /// [`ReaderProperties`]. + pub fn new( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + ) -> Result<Self> { + let props = Arc::new(ReaderProperties::builder().build()); + Self::new_with_properties(reader, meta, offset_index, props) + } + + /// Create a new [`ReverseSerializedPageReader`] with explicit + /// [`ReaderProperties`]. + pub fn new_with_properties( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + props: ReaderPropertiesPtr, + ) -> Result<Self> { + let decompressor = create_codec(meta.compression(), props.codec_options())?; + let (chunk_start, _chunk_len) = meta.byte_range(); + let locations = offset_index.page_locations().clone(); + + // If the first data page does not start at the column chunk's start, a + // dictionary page sits in front. Synthesize a `PageLocation` for it + // (mirrors `SerializedPageReader::new_with_properties`). + let dictionary_page = match locations.first() { + Some(first) if (first.offset as u64) != chunk_start => Some(PageLocation { + offset: chunk_start as i64, + compressed_page_size: (first.offset as u64 - chunk_start) as i32, + first_row_index: 0, + }), + _ => None, + }; + + Ok(Self { + reader, + decompressor, + physical_type: meta.column_type(), + state: ReverseState::NeedDict { + dictionary_page, + page_locations: locations, + }, + read_stats: props.read_page_stats(), + }) + } + + fn read_page_at(&mut self, loc: &PageLocation) -> Result<Page> { + let page_len = usize::try_from(loc.compressed_page_size) + .map_err(|e| ParquetError::General(format!("invalid compressed_page_size: {e}")))?; + let buffer = self.reader.get_bytes(loc.offset as u64, page_len)?; + + let mut prot = ThriftSliceInputProtocol::new(buffer.as_ref()); + let header = if self.read_stats { + PageHeader::read_thrift(&mut prot)? + } else { + PageHeader::read_thrift_without_stats(&mut prot)? + }; + let header_len = buffer.len() - prot.as_slice().len(); + let payload = buffer.slice(header_len..); + + decode_page( + header, + payload, + self.physical_type, + self.decompressor.as_mut(), + ) + } +} + +impl<R: ChunkReader> Iterator for ReverseSerializedPageReader<R> { + type Item = Result<Page>; + + fn next(&mut self) -> Option<Self::Item> { + self.get_next_page().transpose() + } +} + +impl<R: ChunkReader> PageReader for ReverseSerializedPageReader<R> { + fn get_next_page(&mut self) -> Result<Option<Page>> { + loop { + match std::mem::replace(&mut self.state, ReverseState::Exhausted) { + ReverseState::NeedDict { + dictionary_page, + page_locations, + } => { + let cursor = page_locations.len(); + self.state = ReverseState::Data { + page_locations, + cursor, + }; + if let Some(loc) = dictionary_page { + return self.read_page_at(&loc).map(Some); + } + // No dictionary; fall through to read the first reverse data page. + } + ReverseState::Data { + page_locations, + cursor, + } => { + if cursor == 0 { + return Ok(None); + } + let loc = page_locations[cursor - 1].clone(); + self.state = ReverseState::Data { + page_locations, + cursor: cursor - 1, + }; + return self.read_page_at(&loc).map(Some); + } Review Comment: The reader advances its internal state (transitions to `Data`, decrements `cursor`) *before* attempting `read_page_at`. If `read_page_at` returns an error, the state has already moved past that page (effectively “skipping” it on retry) and in the dict case also loses the dict slot. Update the state only after a successful read, or restore the prior state on error (e.g., read first, then commit the cursor/state mutation). ########## parquet/src/file/mod.rs: ########## @@ -103,6 +103,7 @@ pub mod metadata; pub mod page_index; pub mod properties; pub mod reader; +pub mod reverse_serialized_reader; Review Comment: This exposes a new public module/type (`ReverseSerializedPageReader`) as part of the crate’s public API. Given the PR is explicitly a Phase 1 POC “not intended for immediate merge” and mentions intentional limitations (e.g., no encryption and incomplete `peek_next_page` metadata), consider keeping it non-public for now (e.g., `pub(crate) mod ...`, feature-gating under an `experimental` flag, or `#[doc(hidden)]`) until the API shape is finalized to avoid locking in a surface that may need breaking changes later. ########## parquet/src/file/reverse_serialized_reader.rs: ########## @@ -0,0 +1,1034 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reverse page-order [`PageReader`] backed by [`OffsetIndexMetaData`]. +//! +//! Tracking issue: <https://github.com/apache/arrow-rs/issues/9934>. +//! +//! This reader emits pages from a Parquet column chunk in **reverse page order**: +//! the dictionary page (if any) is emitted first because data pages depend on it, +//! followed by data pages from the last `PageLocation` to the first. +//! +//! Pages are still **decoded in their native forward direction** — only the page +//! traversal order is reversed. Reversing rows *within* a page is impossible +//! because Parquet's RLE / bit-packing / delta / dictionary encodings are +//! forward streams; that responsibility belongs to a higher layer. +//! +//! ## Limitations (Phase 1 POC) +//! +//! * Encryption is not supported. +//! * `peek_next_page` does not populate `num_rows`. +//! * Requires the column chunk to have an `OffsetIndex` (i.e. page index). + +use std::sync::Arc; + +use crate::basic::Type; +use crate::column::page::{Page, PageMetadata, PageReader}; +use crate::compression::{Codec, create_codec}; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::ColumnChunkMetaData; +use crate::file::metadata::thrift::PageHeader; +use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; +use crate::file::properties::{ReaderProperties, ReaderPropertiesPtr}; +use crate::file::reader::ChunkReader; +use crate::file::serialized_reader::decode_page; +use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol}; + +/// A [`PageReader`] that emits pages in reverse order using `OffsetIndex`. +/// +/// See the module-level documentation for details. +pub struct ReverseSerializedPageReader<R: ChunkReader> { + reader: Arc<R>, + decompressor: Option<Box<dyn Codec>>, + physical_type: Type, + state: ReverseState, + read_stats: bool, +} + +enum ReverseState { + /// Initial state. The dictionary page (if any) is emitted on the next call, + /// then the state transitions to [`ReverseState::Data`]. + NeedDict { + dictionary_page: Option<PageLocation>, + page_locations: Vec<PageLocation>, + }, + /// Iterate `page_locations[..cursor]` from the back: emit + /// `page_locations[cursor - 1]` and decrement `cursor`. + Data { + page_locations: Vec<PageLocation>, + cursor: usize, + }, + Exhausted, +} + +impl<R: ChunkReader> ReverseSerializedPageReader<R> { + /// Create a new [`ReverseSerializedPageReader`] with default + /// [`ReaderProperties`]. + pub fn new( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + ) -> Result<Self> { + let props = Arc::new(ReaderProperties::builder().build()); + Self::new_with_properties(reader, meta, offset_index, props) + } + + /// Create a new [`ReverseSerializedPageReader`] with explicit + /// [`ReaderProperties`]. + pub fn new_with_properties( + reader: Arc<R>, + meta: &ColumnChunkMetaData, + offset_index: &OffsetIndexMetaData, + props: ReaderPropertiesPtr, + ) -> Result<Self> { + let decompressor = create_codec(meta.compression(), props.codec_options())?; + let (chunk_start, _chunk_len) = meta.byte_range(); + let locations = offset_index.page_locations().clone(); + + // If the first data page does not start at the column chunk's start, a + // dictionary page sits in front. Synthesize a `PageLocation` for it + // (mirrors `SerializedPageReader::new_with_properties`). + let dictionary_page = match locations.first() { + Some(first) if (first.offset as u64) != chunk_start => Some(PageLocation { + offset: chunk_start as i64, + compressed_page_size: (first.offset as u64 - chunk_start) as i32, + first_row_index: 0, + }), + _ => None, + }; + + Ok(Self { + reader, + decompressor, + physical_type: meta.column_type(), + state: ReverseState::NeedDict { + dictionary_page, + page_locations: locations, + }, + read_stats: props.read_page_stats(), + }) + } + + fn read_page_at(&mut self, loc: &PageLocation) -> Result<Page> { + let page_len = usize::try_from(loc.compressed_page_size) + .map_err(|e| ParquetError::General(format!("invalid compressed_page_size: {e}")))?; Review Comment: The error message doesn’t include the actual `compressed_page_size` value (or location offset), which makes diagnosing malformed offset indexes harder. Including `loc.compressed_page_size` (and optionally `loc.offset`) in the message would make failures much more actionable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
