This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e9ea12b138 Implement Push Parquet Decoder (#7997)
e9ea12b138 is described below
commit e9ea12b138f8fa11c6046e94cb2b55b2830a0d7c
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 29 08:52:15 2025 -0400
Implement Push Parquet Decoder (#7997)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/8000
- closes https://github.com/apache/arrow-rs/issues/7983
# Rationale for this change
This PR is the first part of separating IO and decode operations in the
rust parquet decoder.
Decoupling IO and CPU enables several important usecases:
1. Different IO patterns (e.g. not buffer the entire row group at once)
2. Different IO APIs e.g. use io_uring, or OpenDAL, etc.
3. Deliberate prefetching within a file
4. Avoid code duplication between the `ParquetRecordBatchStreamBuilder`
and `ParquetRecordBatchReaderBuilder`
# What changes are included in this PR?
1. Add new `ParquetDecoderBuilder`, and `ParquetDecoder` and tests
It is effectively an explicit version of the state machine that is used
in existing async reader (where the state machine is encoded as Rust
`async` / `await` structures)
# Are these changes tested?
Yes -- there are extensive tests for the new code
Note that this PR actually adds a **3rd** path for control flow (when I
claim this will remove duplication!) In follow on PRs I will convert the
existing readers to use this new pattern, similarly to the sequence I
did for the metadata decoder:
- https://github.com/apache/arrow-rs/pull/8080
- https://github.com/apache/arrow-rs/pull/8340
Here is a preview of a PR that consolidates the async reader to use the
push decoder internally (and removes one duplicate):
- https://github.com/apache/arrow-rs/pull/8159
- closes https://github.com/apache/arrow-rs/pull/8022
# Are there any user-facing changes?
Yes, a new API, but now changes to the existing APIs
---------
Co-authored-by: Matthijs Brobbel <[email protected]>
Co-authored-by: Adrian Garcia Badaracco
<[email protected]>
---
parquet/src/arrow/arrow_reader/mod.rs | 16 +
parquet/src/arrow/arrow_reader/read_plan.rs | 5 +-
parquet/src/arrow/arrow_reader/selection.rs | 1 -
parquet/src/arrow/async_reader/mod.rs | 250 +----
parquet/src/arrow/in_memory_row_group.rs | 307 ++++++
parquet/src/arrow/mod.rs | 6 +
parquet/src/arrow/push_decoder/mod.rs | 1151 ++++++++++++++++++++
.../src/arrow/push_decoder/reader_builder/data.rs | 233 ++++
.../arrow/push_decoder/reader_builder/filter.rs | 143 +++
.../src/arrow/push_decoder/reader_builder/mod.rs | 659 +++++++++++
parquet/src/arrow/push_decoder/remaining.rs | 118 ++
parquet/src/file/metadata/mod.rs | 1 -
parquet/src/lib.rs | 16 +-
parquet/src/util/push_buffers.rs | 25 +
14 files changed, 2689 insertions(+), 242 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 4e91685519..1cc7673a57 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -58,6 +58,7 @@ pub mod statistics;
///
/// * synchronous API: [`ParquetRecordBatchReaderBuilder::try_new`]
/// * `async` API: [`ParquetRecordBatchStreamBuilder::new`]
+/// * decoder API: [`ParquetDecoderBuilder::new`]
///
/// # Features
/// * Projection pushdown: [`Self::with_projection`]
@@ -93,6 +94,7 @@ pub mod statistics;
/// Millisecond Latency] Arrow blog post.
///
/// [`ParquetRecordBatchStreamBuilder::new`]:
crate::arrow::async_reader::ParquetRecordBatchStreamBuilder::new
+/// [`ParquetDecoderBuilder::new`]:
crate::arrow::push_decoder::ParquetPushDecoderBuilder::new
/// [Apache Arrow]: https://arrow.apache.org/
/// [`StatisticsConverter`]: statistics::StatisticsConverter
/// [Querying Parquet with Millisecond Latency]:
https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
@@ -992,12 +994,26 @@ impl<T: ChunkReader + 'static> PageIterator for
ReaderPageIterator<T> {}
/// An `Iterator<Item = ArrowResult<RecordBatch>>` that yields [`RecordBatch`]
/// read from a parquet data source
+///
+/// This reader is created by [`ParquetRecordBatchReaderBuilder`], and has all
+/// the buffered state (DataPages, etc) necessary to decode the parquet data
into
+/// Arrow arrays.
pub struct ParquetRecordBatchReader {
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
read_plan: ReadPlan,
}
+impl Debug for ParquetRecordBatchReader {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ParquetRecordBatchReader")
+ .field("array_reader", &"...")
+ .field("schema", &self.schema)
+ .field("read_plan", &self.read_plan)
+ .finish()
+ }
+}
+
impl Iterator for ParquetRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;
diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs
b/parquet/src/arrow/arrow_reader/read_plan.rs
index 754fcd339c..2210f47df2 100644
--- a/parquet/src/arrow/arrow_reader/read_plan.rs
+++ b/parquet/src/arrow/arrow_reader/read_plan.rs
@@ -28,7 +28,7 @@ use arrow_select::filter::prep_null_mask_filter;
use std::collections::VecDeque;
/// A builder for [`ReadPlan`]
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct ReadPlanBuilder {
batch_size: usize,
/// Current to apply, includes all filters
@@ -51,7 +51,6 @@ impl ReadPlanBuilder {
}
/// Returns the current selection, if any
- #[cfg(feature = "async")]
pub fn selection(&self) -> Option<&RowSelection> {
self.selection.as_ref()
}
@@ -76,7 +75,6 @@ impl ReadPlanBuilder {
}
/// Returns the number of rows selected, or `None` if all rows are
selected.
- #[cfg(feature = "async")]
pub fn num_rows_selected(&self) -> Option<usize> {
self.selection.as_ref().map(|s| s.row_count())
}
@@ -230,6 +228,7 @@ impl LimitedReadPlanBuilder {
/// A plan reading specific rows from a Parquet Row Group.
///
/// See [`ReadPlanBuilder`] to create `ReadPlan`s
+#[derive(Debug)]
pub struct ReadPlan {
/// The number of rows to read in each batch
batch_size: usize,
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index adbbff1ca2..1eb7c85d1d 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -447,7 +447,6 @@ impl RowSelection {
/// Expands the selection to align with batch boundaries.
/// This is needed when using cached array readers to ensure that
/// the cached data covers full batches.
- #[cfg(feature = "async")]
pub(crate) fn expand_to_batch_boundaries(&self, batch_size: usize,
total_rows: usize) -> Self {
if batch_size == 0 {
return self.clone();
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 27f90e3d7b..9b81e8e569 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -29,7 +29,7 @@ use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
-use bytes::{Buf, Bytes};
+use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
@@ -38,10 +38,6 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek,
AsyncSeekExt};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Fields, Schema, SchemaRef};
-use crate::arrow::ProjectionMask;
-use crate::arrow::array_reader::{
- ArrayReaderBuilder, CacheOptionsBuilder, RowGroupCache, RowGroups,
-};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
ParquetRecordBatchReader,
RowFilter, RowSelection,
@@ -51,11 +47,8 @@ use crate::basic::{BloomFilterAlgorithm,
BloomFilterCompression, BloomFilterHash
use crate::bloom_filter::{
SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
};
-use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData,
ParquetMetaDataReader};
-use crate::file::page_index::offset_index::OffsetIndexMetaData;
-use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
mod metadata;
pub use metadata::*;
@@ -63,8 +56,11 @@ pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder,
RowGroupCache};
use crate::arrow::arrow_reader::ReadPlanBuilder;
use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
use crate::arrow::schema::ParquetField;
#[cfg(feature = "object_store")]
pub use store::*;
@@ -571,6 +567,8 @@ struct ReaderFactory<T> {
metrics: ArrowReaderMetrics,
/// Maximum size of the predicate cache
+ ///
+ /// See [`RowGroupCache`] for details.
max_predicate_cache_size: usize,
}
@@ -967,23 +965,16 @@ where
}
}
-/// An in-memory collection of column chunks
-struct InMemoryRowGroup<'a> {
- offset_index: Option<&'a [OffsetIndexMetaData]>,
- /// Column chunks for this row group
- column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
- row_count: usize,
- row_group_idx: usize,
- metadata: &'a ParquetMetaData,
-}
-
+// Note this implementation is not with the rest of the InMemoryRowGroup
+// implementation because it relies on several async traits and types
+// that are only available when the "async" feature is enabled.
impl InMemoryRowGroup<'_> {
/// Fetches any additional column data specified in `projection` that is
not already
/// present in `self.column_chunks`.
///
/// If `selection` is provided, only the pages required for the selection
/// are fetched. Otherwise, all pages are fetched.
- async fn fetch<T: AsyncFileReader + Send>(
+ pub(crate) async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
projection: &ProjectionMask,
@@ -991,221 +982,18 @@ impl InMemoryRowGroup<'_> {
batch_size: usize,
cache_mask: Option<&ProjectionMask>,
) -> Result<()> {
- let metadata = self.metadata.row_group(self.row_group_idx);
- if let Some((selection, offset_index)) =
selection.zip(self.offset_index) {
- let expanded_selection =
- selection.expand_to_batch_boundaries(batch_size,
self.row_count);
- // If we have a `RowSelection` and an `OffsetIndex` then only
fetch pages required for the
- // `RowSelection`
- let mut page_start_offsets: Vec<Vec<u64>> = vec![];
-
- let fetch_ranges = self
- .column_chunks
- .iter()
- .zip(metadata.columns())
- .enumerate()
- .filter(|&(idx, (chunk, _chunk_meta))| {
- chunk.is_none() && projection.leaf_included(idx)
- })
- .flat_map(|(idx, (_chunk, chunk_meta))| {
- // If the first page does not start at the beginning of
the column,
- // then we need to also fetch a dictionary page.
- let mut ranges: Vec<Range<u64>> = vec![];
- let (start, _len) = chunk_meta.byte_range();
- match offset_index[idx].page_locations.first() {
- Some(first) if first.offset as u64 != start => {
- ranges.push(start..first.offset as u64);
- }
- _ => (),
- }
-
- // Expand selection to batch boundaries only for cached
columns
- let use_expanded = cache_mask.map(|m|
m.leaf_included(idx)).unwrap_or(false);
- if use_expanded {
- ranges.extend(
-
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
- );
- } else {
-
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
- }
- page_start_offsets.push(ranges.iter().map(|range|
range.start).collect());
-
- ranges
- })
- .collect();
-
- let mut chunk_data =
input.get_byte_ranges(fetch_ranges).await?.into_iter();
- let mut page_start_offsets = page_start_offsets.into_iter();
-
- for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
- if chunk.is_some() || !projection.leaf_included(idx) {
- continue;
- }
-
- if let Some(offsets) = page_start_offsets.next() {
- let mut chunks = Vec::with_capacity(offsets.len());
- for _ in 0..offsets.len() {
- chunks.push(chunk_data.next().unwrap());
- }
-
- *chunk = Some(Arc::new(ColumnChunkData::Sparse {
- length: metadata.column(idx).byte_range().1 as usize,
- data: offsets
- .into_iter()
- .map(|x| x as usize)
- .zip(chunks.into_iter())
- .collect(),
- }))
- }
- }
- } else {
- let fetch_ranges = self
- .column_chunks
- .iter()
- .enumerate()
- .filter(|&(idx, chunk)| chunk.is_none() &&
projection.leaf_included(idx))
- .map(|(idx, _chunk)| {
- let column = metadata.column(idx);
- let (start, length) = column.byte_range();
- start..(start + length)
- })
- .collect();
-
- let mut chunk_data =
input.get_byte_ranges(fetch_ranges).await?.into_iter();
-
- for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
- if chunk.is_some() || !projection.leaf_included(idx) {
- continue;
- }
-
- if let Some(data) = chunk_data.next() {
- *chunk = Some(Arc::new(ColumnChunkData::Dense {
- offset: metadata.column(idx).byte_range().0 as usize,
- data,
- }));
- }
- }
- }
-
+ // Figure out what ranges to fetch
+ let FetchRanges {
+ ranges,
+ page_start_offsets,
+ } = self.fetch_ranges(projection, selection, batch_size, cache_mask);
+ // do the actual fetch
+ let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
+ // update our in memory buffers (self.column_chunks) with the fetched
data
+ self.fill_column_chunks(projection, page_start_offsets, chunk_data);
Ok(())
}
}
-
-impl RowGroups for InMemoryRowGroup<'_> {
- fn num_rows(&self) -> usize {
- self.row_count
- }
-
- /// Return chunks for column i
- fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
- match &self.column_chunks[i] {
- None => Err(ParquetError::General(format!(
- "Invalid column index {i}, column was not fetched"
- ))),
- Some(data) => {
- let page_locations = self
- .offset_index
- // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
- .filter(|index| !index.is_empty())
- .map(|index| index[i].page_locations.clone());
- let column_chunk_metadata =
self.metadata.row_group(self.row_group_idx).column(i);
- let page_reader = SerializedPageReader::new(
- data.clone(),
- column_chunk_metadata,
- self.row_count,
- page_locations,
- )?;
- let page_reader = page_reader.add_crypto_context(
- self.row_group_idx,
- i,
- self.metadata,
- column_chunk_metadata,
- )?;
-
- let page_reader: Box<dyn PageReader> = Box::new(page_reader);
-
- Ok(Box::new(ColumnChunkIterator {
- reader: Some(Ok(page_reader)),
- }))
- }
- }
- }
-}
-
-/// An in-memory column chunk
-#[derive(Clone)]
-enum ColumnChunkData {
- /// Column chunk data representing only a subset of data pages
- Sparse {
- /// Length of the full column chunk
- length: usize,
- /// Subset of data pages included in this sparse chunk.
- ///
- /// Each element is a tuple of (page offset within file, page data).
- /// Each entry is a complete page and the list is ordered by offset.
- data: Vec<(usize, Bytes)>,
- },
- /// Full column chunk and the offset within the original file
- Dense { offset: usize, data: Bytes },
-}
-
-impl ColumnChunkData {
- /// Return the data for this column chunk at the given offset
- fn get(&self, start: u64) -> Result<Bytes> {
- match &self {
- ColumnChunkData::Sparse { data, .. } => data
- .binary_search_by_key(&start, |(offset, _)| *offset as u64)
- .map(|idx| data[idx].1.clone())
- .map_err(|_| {
- ParquetError::General(format!(
- "Invalid offset in sparse column chunk data: {start}"
- ))
- }),
- ColumnChunkData::Dense { offset, data } => {
- let start = start as usize - *offset;
- Ok(data.slice(start..))
- }
- }
- }
-}
-
-impl Length for ColumnChunkData {
- /// Return the total length of the full column chunk
- fn len(&self) -> u64 {
- match &self {
- ColumnChunkData::Sparse { length, .. } => *length as u64,
- ColumnChunkData::Dense { data, .. } => data.len() as u64,
- }
- }
-}
-
-impl ChunkReader for ColumnChunkData {
- type T = bytes::buf::Reader<Bytes>;
-
- fn get_read(&self, start: u64) -> Result<Self::T> {
- Ok(self.get(start)?.reader())
- }
-
- fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
- Ok(self.get(start)?.slice(..length))
- }
-}
-
-/// Implements [`PageIterator`] for a single column chunk, yielding a single
[`PageReader`]
-struct ColumnChunkIterator {
- reader: Option<Result<Box<dyn PageReader>>>,
-}
-
-impl Iterator for ColumnChunkIterator {
- type Item = Result<Box<dyn PageReader>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.reader.take()
- }
-}
-
-impl PageIterator for ColumnChunkIterator {}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/parquet/src/arrow/in_memory_row_group.rs
b/parquet/src/arrow/in_memory_row_group.rs
new file mode 100644
index 0000000000..34e46cd34e
--- /dev/null
+++ b/parquet/src/arrow/in_memory_row_group.rs
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::RowGroups;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::column::page::{PageIterator, PageReader};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+use bytes::{Buf, Bytes};
+use std::ops::Range;
+use std::sync::Arc;
+
+/// An in-memory collection of column chunks
+#[derive(Debug)]
+pub(crate) struct InMemoryRowGroup<'a> {
+ pub(crate) offset_index: Option<&'a [OffsetIndexMetaData]>,
+ /// Column chunks for this row group
+ pub(crate) column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+ pub(crate) row_count: usize,
+ pub(crate) row_group_idx: usize,
+ pub(crate) metadata: &'a ParquetMetaData,
+}
+
+/// What ranges to fetch for the columns in this row group
+#[derive(Debug)]
+pub(crate) struct FetchRanges {
+ /// The byte ranges to fetch
+ pub(crate) ranges: Vec<Range<u64>>,
+ /// If `Some`, the start offsets of each page for each column chunk
+ pub(crate) page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl InMemoryRowGroup<'_> {
+ /// Returns the byte ranges to fetch for the columns specified in
+ /// `projection` and `selection`.
+ ///
+ /// `cache_mask` indicates which columns, if any, are being cached by
+ /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache).
+ /// The `selection` for Cached columns is expanded to batch boundaries to
simplify
+ /// accounting for what data is cached.
+ pub(crate) fn fetch_ranges(
+ &self,
+ projection: &ProjectionMask,
+ selection: Option<&RowSelection>,
+ batch_size: usize,
+ cache_mask: Option<&ProjectionMask>,
+ ) -> FetchRanges {
+ let metadata = self.metadata.row_group(self.row_group_idx);
+ if let Some((selection, offset_index)) =
selection.zip(self.offset_index) {
+ let expanded_selection =
+ selection.expand_to_batch_boundaries(batch_size,
self.row_count);
+
+ // If we have a `RowSelection` and an `OffsetIndex` then only fetch
+ // pages required for the `RowSelection`
+ // Consider preallocating outer vec:
https://github.com/apache/arrow-rs/issues/8667
+ let mut page_start_offsets: Vec<Vec<u64>> = vec![];
+
+ let ranges = self
+ .column_chunks
+ .iter()
+ .zip(metadata.columns())
+ .enumerate()
+ .filter(|&(idx, (chunk, _chunk_meta))| {
+ chunk.is_none() && projection.leaf_included(idx)
+ })
+ .flat_map(|(idx, (_chunk, chunk_meta))| {
+ // If the first page does not start at the beginning of
the column,
+ // then we need to also fetch a dictionary page.
+ let mut ranges: Vec<Range<u64>> = vec![];
+ let (start, _len) = chunk_meta.byte_range();
+ match offset_index[idx].page_locations.first() {
+ Some(first) if first.offset as u64 != start => {
+ ranges.push(start..first.offset as u64);
+ }
+ _ => (),
+ }
+
+ // Expand selection to batch boundaries if needed for
caching
+ // (see doc comment for this function for details on
`cache_mask`)
+ let use_expanded = cache_mask.map(|m|
m.leaf_included(idx)).unwrap_or(false);
+ if use_expanded {
+ ranges.extend(
+
expanded_selection.scan_ranges(&offset_index[idx].page_locations),
+ );
+ } else {
+
ranges.extend(selection.scan_ranges(&offset_index[idx].page_locations));
+ }
+ page_start_offsets.push(ranges.iter().map(|range|
range.start).collect());
+
+ ranges
+ })
+ .collect();
+ FetchRanges {
+ ranges,
+ page_start_offsets: Some(page_start_offsets),
+ }
+ } else {
+ let ranges = self
+ .column_chunks
+ .iter()
+ .enumerate()
+ .filter(|&(idx, chunk)| chunk.is_none() &&
projection.leaf_included(idx))
+ .map(|(idx, _chunk)| {
+ let column = metadata.column(idx);
+ let (start, length) = column.byte_range();
+ start..(start + length)
+ })
+ .collect();
+ FetchRanges {
+ ranges,
+ page_start_offsets: None,
+ }
+ }
+ }
+
+ /// Fills in `self.column_chunks` with the data fetched from `chunk_data`.
+ ///
+ /// This function **must** be called with the data from the ranges
returned by
+ /// `fetch_ranges` and the corresponding page_start_offsets, with the
exact same and `selection`.
+ pub(crate) fn fill_column_chunks<I>(
+ &mut self,
+ projection: &ProjectionMask,
+ page_start_offsets: Option<Vec<Vec<u64>>>,
+ chunk_data: I,
+ ) where
+ I: IntoIterator<Item = Bytes>,
+ {
+ let mut chunk_data = chunk_data.into_iter();
+ let metadata = self.metadata.row_group(self.row_group_idx);
+ if let Some(page_start_offsets) = page_start_offsets {
+ // If we have a `RowSelection` and an `OffsetIndex` then only
fetch pages required for the
+ // `RowSelection`
+ let mut page_start_offsets = page_start_offsets.into_iter();
+
+ for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+ if chunk.is_some() || !projection.leaf_included(idx) {
+ continue;
+ }
+
+ if let Some(offsets) = page_start_offsets.next() {
+ let mut chunks = Vec::with_capacity(offsets.len());
+ for _ in 0..offsets.len() {
+ chunks.push(chunk_data.next().unwrap());
+ }
+
+ *chunk = Some(Arc::new(ColumnChunkData::Sparse {
+ length: metadata.column(idx).byte_range().1 as usize,
+ data: offsets
+ .into_iter()
+ .map(|x| x as usize)
+ .zip(chunks.into_iter())
+ .collect(),
+ }))
+ }
+ }
+ } else {
+ for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+ if chunk.is_some() || !projection.leaf_included(idx) {
+ continue;
+ }
+
+ if let Some(data) = chunk_data.next() {
+ *chunk = Some(Arc::new(ColumnChunkData::Dense {
+ offset: metadata.column(idx).byte_range().0 as usize,
+ data,
+ }));
+ }
+ }
+ }
+ }
+}
+
+impl RowGroups for InMemoryRowGroup<'_> {
+ fn num_rows(&self) -> usize {
+ self.row_count
+ }
+
+ /// Return chunks for column i
+ fn column_chunks(&self, i: usize) -> crate::errors::Result<Box<dyn
PageIterator>> {
+ match &self.column_chunks[i] {
+ None => Err(ParquetError::General(format!(
+ "Invalid column index {i}, column was not fetched"
+ ))),
+ Some(data) => {
+ let page_locations = self
+ .offset_index
+ // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
+ .filter(|index| !index.is_empty())
+ .map(|index| index[i].page_locations.clone());
+ let column_chunk_metadata =
self.metadata.row_group(self.row_group_idx).column(i);
+ let page_reader = SerializedPageReader::new(
+ data.clone(),
+ column_chunk_metadata,
+ self.row_count,
+ page_locations,
+ )?;
+ let page_reader = page_reader.add_crypto_context(
+ self.row_group_idx,
+ i,
+ self.metadata,
+ column_chunk_metadata,
+ )?;
+
+ let page_reader: Box<dyn PageReader> = Box::new(page_reader);
+
+ Ok(Box::new(ColumnChunkIterator {
+ reader: Some(Ok(page_reader)),
+ }))
+ }
+ }
+ }
+}
+
+/// An in-memory column chunk.
+/// This allows us to hold either dense column chunks or sparse column chunks
and easily
+/// access them by offset.
+#[derive(Clone, Debug)]
+pub(crate) enum ColumnChunkData {
+ /// Column chunk data representing only a subset of data pages.
+ /// For example if a row selection (possibly caused by a filter in a
query) causes us to read only
+ /// a subset of the rows in the column.
+ Sparse {
+ /// Length of the full column chunk
+ length: usize,
+ /// Subset of data pages included in this sparse chunk.
+ ///
+ /// Each element is a tuple of (page offset within file, page data).
+ /// Each entry is a complete page and the list is ordered by offset.
+ data: Vec<(usize, Bytes)>,
+ },
+ /// Full column chunk and the offset within the original file
+ Dense { offset: usize, data: Bytes },
+}
+
+impl ColumnChunkData {
+ /// Return the data for this column chunk at the given offset
+ fn get(&self, start: u64) -> crate::errors::Result<Bytes> {
+ match &self {
+ ColumnChunkData::Sparse { data, .. } => data
+ .binary_search_by_key(&start, |(offset, _)| *offset as u64)
+ .map(|idx| data[idx].1.clone())
+ .map_err(|_| {
+ ParquetError::General(format!(
+ "Invalid offset in sparse column chunk data: {start}"
+ ))
+ }),
+ ColumnChunkData::Dense { offset, data } => {
+ let start = start as usize - *offset;
+ Ok(data.slice(start..))
+ }
+ }
+ }
+}
+
+impl Length for ColumnChunkData {
+ /// Return the total length of the full column chunk
+ fn len(&self) -> u64 {
+ match &self {
+ ColumnChunkData::Sparse { length, .. } => *length as u64,
+ ColumnChunkData::Dense { data, .. } => data.len() as u64,
+ }
+ }
+}
+
+impl ChunkReader for ColumnChunkData {
+ type T = bytes::buf::Reader<Bytes>;
+
+ fn get_read(&self, start: u64) -> crate::errors::Result<Self::T> {
+ Ok(self.get(start)?.reader())
+ }
+
+ fn get_bytes(&self, start: u64, length: usize) ->
crate::errors::Result<Bytes> {
+ Ok(self.get(start)?.slice(..length))
+ }
+}
+
+/// Implements [`PageIterator`] for a single column chunk, yielding a single
[`PageReader`]
+struct ColumnChunkIterator {
+ reader: Option<crate::errors::Result<Box<dyn PageReader>>>,
+}
+
+impl Iterator for ColumnChunkIterator {
+ type Item = crate::errors::Result<Box<dyn PageReader>>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.reader.take()
+ }
+}
+
+impl PageIterator for ColumnChunkIterator {}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 94e3590651..34aac8b08a 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -190,9 +190,15 @@ pub mod async_reader;
#[cfg(feature = "async")]
pub mod async_writer;
+pub mod push_decoder;
+
+mod in_memory_row_group;
mod record_reader;
+
experimental!(mod schema);
+use std::fmt::Debug;
+
pub use self::arrow_writer::ArrowWriter;
#[cfg(feature = "async")]
pub use self::async_reader::ParquetRecordBatchStreamBuilder;
diff --git a/parquet/src/arrow/push_decoder/mod.rs
b/parquet/src/arrow/push_decoder/mod.rs
new file mode 100644
index 0000000000..4b932fb080
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -0,0 +1,1151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
+//! caller (rather than from an underlying reader).
+
+mod reader_builder;
+mod remaining;
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{
+ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
ParquetRecordBatchReader,
+};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use reader_builder::RowGroupReaderBuilder;
+use remaining::RemainingRowGroups;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A builder for [`ParquetPushDecoder`].
+///
+/// To create a new decoder, use
[`ParquetPushDecoderBuilder::try_new_decoder`] and pass
+/// the file length and metadata of the Parquet file to decode.
+///
+/// You can decode the metadata from a Parquet file using either
+/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]:
crate::file::metadata::ParquetMetaDataPushDecoder
+///
+/// Note the "input" type is `u64` which represents the length of the Parquet
file
+/// being decoded. This is needed to initialize the internal buffers that track
+/// what data has been provided to the decoder.
+///
+/// # Example
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # let file_bytes = {
+/// # let mut buffer = vec![];
+/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(),
None).unwrap();
+/// # writer.write(&batch).unwrap();
+/// # writer.close().unwrap();
+/// # Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// # let start = range.start as usize;
+/// # let end = range.end as usize;
+/// # file_bytes.slice(start..end)
+/// # };
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length],
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) =
metadata_decoder.try_decode().unwrap() else { panic!("failed to decode
metadata") };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// // The file length and metadata are required to create the decoder
+/// let mut decoder =
+/// ParquetPushDecoderBuilder::try_new_decoder(file_length,
parquet_metadata)
+/// .unwrap()
+/// // Optionally configure the decoder, e.g. batch size
+/// .with_batch_size(1024)
+/// // Build the decoder
+/// .build()
+/// .unwrap();
+///
+/// // In a loop, ask the decoder what it needs next, and provide it with
the required data
+/// loop {
+/// match decoder.try_decode().unwrap() {
+/// DecodeResult::NeedsData(ranges) => {
+/// // The decoder needs more data. Fetch the data for the
given ranges
+/// let data = ranges.iter().map(|r|
get_range(r)).collect::<Vec<_>>();
+/// // Push the data to the decoder
+/// decoder.push_ranges(ranges, data).unwrap();
+/// // After pushing the data, we can try to decode again on
the next iteration
+/// }
+/// DecodeResult::Data(batch) => {
+/// // Successfully decoded a batch of data
+/// assert!(batch.num_rows() > 0);
+/// }
+/// DecodeResult::Finished => {
+/// // The decoder has finished decoding exit the loop
+/// break;
+/// }
+/// }
+/// }
+/// ```
+pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
+
+/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`]
for
+/// more options that can be configured.
+impl ParquetPushDecoderBuilder {
+ /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder
for the given file.
+ ///
+ /// See [`ParquetMetadataDecoder`] for a builder that can read the
metadata from a Parquet file.
+ ///
+ /// [`ParquetMetadataDecoder`]:
crate::file::metadata::ParquetMetaDataPushDecoder
+ ///
+ /// See example on [`ParquetPushDecoderBuilder`]
+ pub fn try_new_decoder(
+ file_len: u64,
+ parquet_metadata: Arc<ParquetMetaData>,
+ ) -> Result<Self, ParquetError> {
+ Self::try_new_decoder_with_options(
+ file_len,
+ parquet_metadata,
+ ArrowReaderOptions::default(),
+ )
+ }
+
+ /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder
for the given file
+ /// with the given reader options.
+ ///
+ /// This is similar to [`Self::try_new_decoder`] but allows configuring
+ /// options such as Arrow schema
+ pub fn try_new_decoder_with_options(
+ file_len: u64,
+ parquet_metadata: Arc<ParquetMetaData>,
+ arrow_reader_options: ArrowReaderOptions,
+ ) -> Result<Self, ParquetError> {
+ let arrow_reader_metadata =
+ ArrowReaderMetadata::try_new(parquet_metadata,
arrow_reader_options)?;
+ Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
+ }
+
+ /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
+ ///
+ /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata
from
+ /// the Parquet metadata and reader options.
+ pub fn new_with_metadata(file_len: u64, arrow_reader_metadata:
ArrowReaderMetadata) -> Self {
+ Self::new_builder(file_len, arrow_reader_metadata)
+ }
+
+ /// Create a [`ParquetPushDecoder`] with the configured options
+ pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
+ let Self {
+ input: file_len,
+ metadata: parquet_metadata,
+ schema: _,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
+
+ // If no row groups were specified, read all of them
+ let row_groups =
+ row_groups.unwrap_or_else(||
(0..parquet_metadata.num_row_groups()).collect());
+
+ // Prepare to build RowGroup readers
+ let buffers = PushBuffers::new(file_len);
+ let row_group_reader_builder = RowGroupReaderBuilder::new(
+ batch_size,
+ projection,
+ Arc::clone(&parquet_metadata),
+ fields,
+ filter,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ buffers,
+ );
+
+ // Initialize the decoder with the configured options
+ let remaining_row_groups = RemainingRowGroups::new(
+ parquet_metadata,
+ row_groups,
+ selection,
+ row_group_reader_builder,
+ );
+
+ Ok(ParquetPushDecoder {
+ state: ParquetDecoderState::ReadingRowGroup {
+ remaining_row_groups: Box::new(remaining_row_groups),
+ },
+ })
+ }
+}
+
+/// A push based Parquet Decoder
+///
+/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use
the decoder.
+///
+/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data
without an
+/// underlying reader for performing IO, and thus offers fine grained control
+/// over how data is fetched and decoded.
+///
+/// When more data is needed to make progress, instead of reading data directly
+/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
+/// are needed. Once the caller provides the requested ranges via
+/// [`Self::push_ranges`], they try to decode again by calling
+/// [`Self::try_decode`].
+///
+/// The decoder's internal state tracks what has been already decoded and what
+/// is needed next.
+#[derive(Debug)]
+pub struct ParquetPushDecoder {
+ /// The inner state.
+ ///
+ /// This state is consumed on every transition and a new state is produced
+ /// so the Rust compiler can ensure that the state is always valid and
+ /// transitions are not missed.
+ state: ParquetDecoderState,
+}
+
+impl ParquetPushDecoder {
+ /// Attempt to decode the next batch of data, or return what data is needed
+ ///
+ /// The the decoder communicates the next state with a [`DecodeResult`]
+ ///
+ /// See full example in [`ParquetPushDecoderBuilder`]
+ ///
+ /// ```no_run
+ /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
+ /// use parquet::DecodeResult;
+ /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
+ /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges:
Vec<std::ops::Range<u64>>) { unimplemented!() }
+ /// let mut decoder = get_decoder();
+ /// loop {
+ /// match decoder.try_decode().unwrap() {
+ /// DecodeResult::NeedsData(ranges) => {
+ /// // The decoder needs more data. Fetch the data for the given
ranges
+ /// // call decoder.push_ranges(ranges, data) and call again
+ /// push_data(&mut decoder, ranges);
+ /// }
+ /// DecodeResult::Data(batch) => {
+ /// // Successfully decoded the next batch of data
+ /// println!("Got batch with {} rows", batch.num_rows());
+ /// }
+ /// DecodeResult::Finished => {
+ /// // The decoder has finished decoding all data
+ /// break;
+ /// }
+ /// }
+ /// }
+ ///```
+ pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>,
ParquetError> {
+ let current_state = std::mem::replace(&mut self.state,
ParquetDecoderState::Finished);
+ let (new_state, decode_result) = current_state.try_transition()?;
+ self.state = new_state;
+ Ok(decode_result)
+ }
+
+ /// Push data into the decoder for processing
+ ///
+ /// This is a convenience wrapper around [`Self::push_ranges`] for pushing
a
+ /// single range of data.
+ ///
+ /// Note this can be the entire file or just a part of it. If it is part
of the file,
+ /// the ranges should correspond to the data ranges requested by the
decoder.
+ ///
+ /// See example in [`ParquetPushDecoderBuilder`]
+ pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(),
ParquetError> {
+ self.push_ranges(vec![range], vec![data])
+ }
+
+ /// Push data into the decoder for processing
+ ///
+ /// This should correspond to the data ranges requested by the decoder
+ pub fn push_ranges(
+ &mut self,
+ ranges: Vec<Range<u64>>,
+ data: Vec<Bytes>,
+ ) -> Result<(), ParquetError> {
+ let current_state = std::mem::replace(&mut self.state,
ParquetDecoderState::Finished);
+ self.state = current_state.push_data(ranges, data)?;
+ Ok(())
+ }
+
+ /// Returns the total number of buffered bytes in the decoder
+ ///
+ /// This is the sum of the size of all [`Bytes`] that has been pushed to
the
+ /// decoder but not yet consumed.
+ ///
+ /// Note that this does not include any overhead of the internal data
+ /// structures and that since [`Bytes`] are ref counted memory, this may
not
+ /// reflect additional memory usage.
+ ///
+ /// This can be used to monitor memory usage of the decoder.
+ pub fn buffered_bytes(&self) -> u64 {
+ self.state.buffered_bytes()
+ }
+}
+
+/// Internal state machine for the [`ParquetPushDecoder`]
+#[derive(Debug)]
+enum ParquetDecoderState {
+ /// Waiting for data needed to decode the next RowGroup
+ ReadingRowGroup {
+ remaining_row_groups: Box<RemainingRowGroups>,
+ },
+ /// The decoder is actively decoding a RowGroup
+ DecodingRowGroup {
+ /// Current active reader
+ record_batch_reader: Box<ParquetRecordBatchReader>,
+ remaining_row_groups: Box<RemainingRowGroups>,
+ },
+ /// The decoder has finished processing all data
+ Finished,
+}
+
+impl ParquetDecoderState {
+ /// Current state --> next state + output
+ ///
+ /// This function is called to check if the decoder has any RecordBatches
+ /// and [`Self::push_data`] is called when new data is available.
+ ///
+ /// # Notes
+ ///
+ /// This structure is used to reduce the indentation level of the main loop
+ /// in try_build
+ fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>),
ParquetError> {
+ match self {
+ Self::ReadingRowGroup {
+ mut remaining_row_groups,
+ } => {
+ match remaining_row_groups.try_next_reader()? {
+ // If we have a next reader, we can transition to decoding
it
+ DecodeResult::Data(record_batch_reader) => {
+ // Transition to decoding the row group
+ Self::DecodingRowGroup {
+ record_batch_reader: Box::new(record_batch_reader),
+ remaining_row_groups,
+ }
+ .try_transition()
+ }
+ // If there are no more readers, we are finished
+ DecodeResult::NeedsData(ranges) => {
+ // If we need more data, we return the ranges needed
and stay in Reading
+ // RowGroup state
+ Ok((
+ Self::ReadingRowGroup {
+ remaining_row_groups,
+ },
+ DecodeResult::NeedsData(ranges),
+ ))
+ }
+ DecodeResult::Finished => {
+ // No more row groups to read, we are finished
+ Ok((Self::Finished, DecodeResult::Finished))
+ }
+ }
+ }
+ Self::DecodingRowGroup {
+ mut record_batch_reader,
+ remaining_row_groups,
+ } => {
+ // Decide the next record batch
+ match record_batch_reader.next() {
+ Some(Ok(batch)) => {
+ // Successfully decoded a batch, return it
+ Ok((
+ Self::DecodingRowGroup {
+ record_batch_reader,
+ remaining_row_groups,
+ },
+ DecodeResult::Data(batch),
+ ))
+ }
+ None => {
+ // No more batches in this row group, move to the next
row group
+ // or finish if there are no more row groups
+ Self::ReadingRowGroup {
+ remaining_row_groups,
+ }
+ .try_transition()
+ }
+ Some(Err(e)) => Err(ParquetError::from(e)), // some error
occurred while decoding
+ }
+ }
+ Self::Finished => Ok((Self::Finished, DecodeResult::Finished)),
+ }
+ }
+
+ /// Push data, and transition state if needed
+ ///
+ /// This should correspond to the data ranges requested by the decoder
+ pub fn push_data(
+ self,
+ ranges: Vec<Range<u64>>,
+ data: Vec<Bytes>,
+ ) -> Result<Self, ParquetError> {
+ match self {
+ ParquetDecoderState::ReadingRowGroup {
+ mut remaining_row_groups,
+ } => {
+ // Push data to the RowGroupReaderBuilder
+ remaining_row_groups.push_data(ranges, data);
+ Ok(ParquetDecoderState::ReadingRowGroup {
+ remaining_row_groups,
+ })
+ }
+ // it is ok to get data before we asked for it
+ ParquetDecoderState::DecodingRowGroup {
+ record_batch_reader,
+ mut remaining_row_groups,
+ } => {
+ remaining_row_groups.push_data(ranges, data);
+ Ok(ParquetDecoderState::DecodingRowGroup {
+ record_batch_reader,
+ remaining_row_groups,
+ })
+ }
+ ParquetDecoderState::Finished => Err(ParquetError::General(
+ "Cannot push data to a finished decoder".to_string(),
+ )),
+ }
+ }
+
+ /// How many bytes are currently buffered in the decoder?
+ fn buffered_bytes(&self) -> u64 {
+ match self {
+ ParquetDecoderState::ReadingRowGroup {
+ remaining_row_groups,
+ } => remaining_row_groups.buffered_bytes(),
+ ParquetDecoderState::DecodingRowGroup {
+ record_batch_reader: _,
+ remaining_row_groups,
+ } => remaining_row_groups.buffered_bytes(),
+ ParquetDecoderState::Finished => 0,
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::DecodeResult;
+ use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter,
RowSelection, RowSelector};
+ use crate::arrow::push_decoder::{ParquetPushDecoder,
ParquetPushDecoderBuilder};
+ use crate::arrow::{ArrowWriter, ProjectionMask};
+ use crate::errors::ParquetError;
+ use crate::file::metadata::ParquetMetaDataPushDecoder;
+ use crate::file::properties::WriterProperties;
+ use arrow::compute::kernels::cmp::{gt, lt};
+ use arrow_array::cast::AsArray;
+ use arrow_array::types::Int64Type;
+ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
+ use arrow_select::concat::concat_batches;
+ use bytes::Bytes;
+ use std::fmt::Debug;
+ use std::ops::Range;
+ use std::sync::{Arc, LazyLock};
+
+ /// Test decoder struct size (as they are copied around on each
transition, they
+ /// should not grow too large)
+ #[test]
+ fn test_decoder_size() {
+ assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
+ }
+
+ /// Decode the entire file at once, simulating a scenario where all data is
+ /// available in memory
+ #[test]
+ fn test_decoder_all_data() {
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap()
+ .build()
+ .unwrap();
+
+ decoder
+ .push_range(test_file_range(), TEST_FILE_DATA.clone())
+ .unwrap();
+
+ let results = vec![
+ // first row group should be decoded without needing more data
+ expect_data(decoder.try_decode()),
+ // second row group should be decoded without needing more data
+ expect_data(decoder.try_decode()),
+ ];
+ expect_finished(decoder.try_decode());
+
+ let all_output = concat_batches(&TEST_BATCH.schema(),
&results).unwrap();
+ // Check that the output matches the input batch
+ assert_eq!(all_output, *TEST_BATCH);
+ }
+
+ /// Decode the entire file incrementally, simulating a scenario where data
is
+ /// fetched as needed
+ #[test]
+ fn test_decoder_incremental() {
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let mut results = vec![];
+
+ // First row group, expect a single request
+ let ranges = expect_needs_data(decoder.try_decode());
+ let num_bytes_requested: u64 = ranges.iter().map(|r| r.end -
r.start).sum();
+ push_ranges_to_decoder(&mut decoder, ranges);
+ // The decoder should currently only store the data it needs to decode
the first row group
+ assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+ results.push(expect_data(decoder.try_decode()));
+ // the decoder should have consumed the data for the first row group
and freed it
+ assert_eq!(decoder.buffered_bytes(), 0);
+
+ // Second row group,
+ let ranges = expect_needs_data(decoder.try_decode());
+ let num_bytes_requested: u64 = ranges.iter().map(|r| r.end -
r.start).sum();
+ push_ranges_to_decoder(&mut decoder, ranges);
+ // The decoder should currently only store the data it needs to decode
the second row group
+ assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+ results.push(expect_data(decoder.try_decode()));
+ // the decoder should have consumed the data for the second row group
and freed it
+ assert_eq!(decoder.buffered_bytes(), 0);
+ expect_finished(decoder.try_decode());
+
+ // Check that the output matches the input batch
+ let all_output = concat_batches(&TEST_BATCH.schema(),
&results).unwrap();
+ assert_eq!(all_output, *TEST_BATCH);
+ }
+
+ /// Decode the entire file incrementally, simulating partial reads
+ #[test]
+ fn test_decoder_partial() {
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // First row group, expect a single request for all data needed to
read "a" and "b"
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(0, 200);
+ assert_eq!(batch1, expected1);
+
+ // Second row group, this time provide the data in two steps
+ let ranges = expect_needs_data(decoder.try_decode());
+ let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
+ assert!(!ranges1.is_empty());
+ assert!(!ranges2.is_empty());
+ // push first half to simulate partial read
+ push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
+
+ // still expect more data
+ let ranges = expect_needs_data(decoder.try_decode());
+ assert_eq!(ranges, ranges2); // should be the remaining ranges
+ // push empty ranges should be a no-op
+ push_ranges_to_decoder(&mut decoder, vec![]);
+ let ranges = expect_needs_data(decoder.try_decode());
+ assert_eq!(ranges, ranges2); // should be the remaining ranges
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let batch2 = expect_data(decoder.try_decode());
+ let expected2 = TEST_BATCH.slice(200, 200);
+ assert_eq!(batch2, expected2);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ /// Decode multiple columns "a" and "b", expect that the decoder requests
+ /// only a single request per row group
+ #[test]
+ fn test_decoder_selection_does_one_request() {
+ let builder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap();
+
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ let mut decoder = builder
+ .with_projection(
+ ProjectionMask::columns(&schema_descr, ["a", "b"]), // read
"a", "b"
+ )
+ .build()
+ .unwrap();
+
+ // First row group, expect a single request for all data needed to
read "a" and "b"
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
+ assert_eq!(batch1, expected1);
+
+ // Second row group, similarly expect a single request for all data
needed to read "a" and "b"
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let batch2 = expect_data(decoder.try_decode());
+ let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
+ assert_eq!(batch2, expected2);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ /// Decode with a filter that requires multiple requests, but only provide
part
+ /// of the data needed for the filter at a time simulating partial reads.
+ #[test]
+ fn test_decoder_single_filter_partial() {
+ let builder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap();
+
+ // Values in column "a" range 0..399
+ // First filter: "a" > 250 (nothing in Row Group 0, both data pages
in Row Group 1)
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // a > 250
+ let row_filter_a = ArrowPredicateFn::new(
+ // claim to use both a and b so we get two ranges requests for the
filter pages
+ ProjectionMask::columns(&schema_descr, ["a", "b"]),
+ |batch: RecordBatch| {
+ let scalar_250 = Int64Array::new_scalar(250);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ gt(column, &scalar_250)
+ },
+ );
+
+ let mut decoder = builder
+ .with_projection(
+ // read only column "a" to test that filter pages are reused
+ ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
+ )
+ .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+ .build()
+ .unwrap();
+
+ // First row group, evaluating filters
+ let ranges = expect_needs_data(decoder.try_decode());
+ // only provide half the ranges
+ let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
+ assert!(!ranges1.is_empty());
+ assert!(!ranges2.is_empty());
+ push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
+ // still expect more data
+ let ranges = expect_needs_data(decoder.try_decode());
+ assert_eq!(ranges, ranges2); // should be the remaining ranges
+ let ranges = expect_needs_data(decoder.try_decode());
+ assert_eq!(ranges, ranges2); // should be the remaining ranges
+ push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
+
+ // Since no rows in the first row group pass the filters, there is no
+ // additional requests to read data pages for "b" here
+
+ // Second row group
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let batch = expect_data(decoder.try_decode());
+ let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
+ assert_eq!(batch, expected);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ /// Decode with a filter where we also skip one of the RowGroups via a
RowSelection
+ #[test]
+ fn test_decoder_single_filter_and_row_selection() {
+ let builder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap();
+
+ // Values in column "a" range 0..399
+ // First filter: "a" > 250 (nothing in Row Group 0, last data page in
Row Group 1)
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // a > 250
+ let row_filter_a = ArrowPredicateFn::new(
+ ProjectionMask::columns(&schema_descr, ["a"]),
+ |batch: RecordBatch| {
+ let scalar_250 = Int64Array::new_scalar(250);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ gt(column, &scalar_250)
+ },
+ );
+
+ let mut decoder = builder
+ .with_projection(
+ // read only column "a" to test that filter pages are reused
+ ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
+ )
+ .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+ .with_row_selection(RowSelection::from(vec![
+ RowSelector::skip(200), // skip first row group
+ RowSelector::select(100), // first 100 rows of second row group
+ RowSelector::skip(100),
+ ]))
+ .build()
+ .unwrap();
+
+ // expect the first row group to be filtered out (no filter is
evaluated due to row selection)
+
+ // First row group, first filter (a > 250)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // Second row group
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ let batch = expect_data(decoder.try_decode());
+ let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
+ assert_eq!(batch, expected);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ /// Decode with multiple filters that require multiple requests
+ #[test]
+ fn test_decoder_multi_filters() {
+ // Create a decoder for decoding parquet data (note it does not have
any IO / readers)
+ let builder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap();
+
+ // Values in column "a" range 0..399
+ // Values in column "b" range 400..799
+ // First filter: "a" > 175 (last data page in Row Group 0)
+ // Second filter: "b" < 625 (last data page in Row Group 0 and first
DataPage in RowGroup 1)
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // a > 175
+ let row_filter_a = ArrowPredicateFn::new(
+ ProjectionMask::columns(&schema_descr, ["a"]),
+ |batch: RecordBatch| {
+ let scalar_175 = Int64Array::new_scalar(175);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ gt(column, &scalar_175)
+ },
+ );
+
+ // b < 625
+ let row_filter_b = ArrowPredicateFn::new(
+ ProjectionMask::columns(&schema_descr, ["b"]),
+ |batch: RecordBatch| {
+ let scalar_625 = Int64Array::new_scalar(625);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ lt(column, &scalar_625)
+ },
+ );
+
+ let mut decoder = builder
+ .with_projection(
+ ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
+ )
+ .with_row_filter(RowFilter::new(vec![
+ Box::new(row_filter_a),
+ Box::new(row_filter_b),
+ ]))
+ .build()
+ .unwrap();
+
+ // First row group, first filter (a > 175)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // first row group, second filter (b < 625)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // first row group, data pages for "c"
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the first batch to be decoded: rows 176..199, column "c"
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
+ assert_eq!(batch1, expected1);
+
+ // Second row group, first filter (a > 175)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // Second row group, second filter (b < 625)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // Second row group, data pages for "c"
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the second batch to be decoded: rows 200..224, column "c"
+ let batch2 = expect_data(decoder.try_decode());
+ let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
+ assert_eq!(batch2, expected2);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ /// Decode with a filter that uses a column that is also projected, and
expect
+ /// that the filter pages are reused (don't refetch them)
+ #[test]
+ fn test_decoder_reuses_filter_pages() {
+ // Create a decoder for decoding parquet data (note it does not have
any IO / readers)
+ let builder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap();
+
+ // Values in column "a" range 0..399
+ // First filter: "a" > 250 (nothing in Row Group 0, last data page in
Row Group 1)
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // a > 250
+ let row_filter_a = ArrowPredicateFn::new(
+ ProjectionMask::columns(&schema_descr, ["a"]),
+ |batch: RecordBatch| {
+ let scalar_250 = Int64Array::new_scalar(250);
+ let column = batch.column(0).as_primitive::<Int64Type>();
+ gt(column, &scalar_250)
+ },
+ );
+
+ let mut decoder = builder
+ .with_projection(
+ // read only column "a" to test that filter pages are reused
+ ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
+ )
+ .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
+ .build()
+ .unwrap();
+
+ // First row group, first filter (a > 175)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the first row group to be filtered out (no rows match)
+
+ // Second row group, first filter (a > 250)
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect that the second row group is decoded: rows 251..399, column
"a"
+ // Note that the filter pages for "a" should be reused and no
additional data
+ // should be requested
+ let batch = expect_data(decoder.try_decode());
+ let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
+ assert_eq!(batch, expected);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ #[test]
+ fn test_decoder_empty_filters() {
+ let builder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap();
+ let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
+
+ // only read column "c", but with empty filters
+ let mut decoder = builder
+ .with_projection(
+ ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
+ )
+ .with_row_filter(RowFilter::new(vec![
+ // empty filters should be ignored
+ ]))
+ .build()
+ .unwrap();
+
+ // First row group
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the first batch to be decoded: rows 0..199, column "c"
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
+ assert_eq!(batch1, expected1);
+
+ // Second row group,
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the second batch to be decoded: rows 200..399, column "c"
+ let batch2 = expect_data(decoder.try_decode());
+ let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
+
+ assert_eq!(batch2, expected2);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ #[test]
+ fn test_decoder_offset_limit() {
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap()
+ // skip entire first row group (200 rows) and first 25 rows of second
row group
+ .with_offset(225)
+ // and limit to 20 rows
+ .with_limit(20)
+ .build()
+ .unwrap();
+
+ // First row group should be skipped,
+
+ // Second row group
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the first and only batch to be decoded
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(225, 20);
+ assert_eq!(batch1, expected1);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ #[test]
+ fn test_decoder_row_group_selection() {
+ // take only the second row group
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap()
+ .with_row_groups(vec![1])
+ .build()
+ .unwrap();
+
+ // First row group should be skipped,
+
+ // Second row group
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the first and only batch to be decoded
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(200, 200);
+ assert_eq!(batch1, expected1);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ #[test]
+ fn test_decoder_row_selection() {
+ // take only the second row group
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
+ test_file_len(),
+ test_file_parquet_metadata(),
+ )
+ .unwrap()
+ .with_row_selection(RowSelection::from(vec![
+ RowSelector::skip(225), // skip first row group and 25 rows of
second])
+ RowSelector::select(20), // take 20 rows
+ ]))
+ .build()
+ .unwrap();
+
+ // First row group should be skipped,
+
+ // Second row group
+ let ranges = expect_needs_data(decoder.try_decode());
+ push_ranges_to_decoder(&mut decoder, ranges);
+
+ // expect the first ane only batch to be decoded
+ let batch1 = expect_data(decoder.try_decode());
+ let expected1 = TEST_BATCH.slice(225, 20);
+ assert_eq!(batch1, expected1);
+
+ expect_finished(decoder.try_decode());
+ }
+
+ /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
+ ///
+ /// Note c is a different types (so the data page sizes will be different)
+ static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
+ let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
+ let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
+ let c: ArrayRef =
Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
+ if i % 2 == 0 {
+ format!("string_{i}")
+ } else {
+ format!("A string larger than 12 bytes and thus not inlined
{i}")
+ }
+ })));
+
+ RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
+ });
+
+ /// Create a parquet file in memory for testing.
+ ///
+ /// See [`TEST_BATCH`] for the data in the file.
+ ///
+ /// Each column is written in 4 data pages, each with 100 rows, across 2
+ /// row groups. Each column in each row group has two data pages.
+ ///
+ /// The data is split across row groups like this
+ ///
+ /// Column | Values | Data Page | Row Group
+ /// -------|------------------------|-----------|-----------
+ /// a | 0..99 | 1 | 0
+ /// a | 100..199 | 2 | 0
+ /// a | 200..299 | 1 | 1
+ /// a | 300..399 | 2 | 1
+ ///
+ /// b | 400..499 | 1 | 0
+ /// b | 500..599 | 2 | 0
+ /// b | 600..699 | 1 | 1
+ /// b | 700..799 | 2 | 1
+ ///
+ /// c | "string_0".."string_99" | 1 | 0
+ /// c | "string_100".."string_199" | 2 | 0
+ /// c | "string_200".."string_299" | 1 | 1
+ /// c | "string_300".."string_399" | 2 | 1
+ static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+ let input_batch = &TEST_BATCH;
+ let mut output = Vec::new();
+
+ let writer_options = WriterProperties::builder()
+ .set_max_row_group_size(200)
+ .set_data_page_row_count_limit(100)
+ .build();
+ let mut writer =
+ ArrowWriter::try_new(&mut output, input_batch.schema(),
Some(writer_options)).unwrap();
+
+ // since the limits are only enforced on batch boundaries, write the
input
+ // batch in chunks of 50
+ let mut row_remain = input_batch.num_rows();
+ while row_remain > 0 {
+ let chunk_size = row_remain.min(50);
+ let chunk = input_batch.slice(input_batch.num_rows() - row_remain,
chunk_size);
+ writer.write(&chunk).unwrap();
+ row_remain -= chunk_size;
+ }
+ writer.close().unwrap();
+ Bytes::from(output)
+ });
+
+ /// Return the length of [`TEST_FILE_DATA`], in bytes
+ fn test_file_len() -> u64 {
+ TEST_FILE_DATA.len() as u64
+ }
+
+ /// Return a range that covers the entire [`TEST_FILE_DATA`]
+ fn test_file_range() -> Range<u64> {
+ 0..test_file_len()
+ }
+
+ /// Return a slice of the test file data from the given range
+ pub fn test_file_slice(range: Range<u64>) -> Bytes {
+ let start: usize = range.start.try_into().unwrap();
+ let end: usize = range.end.try_into().unwrap();
+ TEST_FILE_DATA.slice(start..end)
+ }
+
+ /// return the metadata for the test file
+ pub fn test_file_parquet_metadata() ->
Arc<crate::file::metadata::ParquetMetaData> {
+ let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
+ push_ranges_to_metadata_decoder(&mut metadata_decoder,
vec![test_file_range()]);
+ let metadata = metadata_decoder.try_decode().unwrap();
+ let DecodeResult::Data(metadata) = metadata else {
+ panic!("Expected metadata to be decoded successfully");
+ };
+ Arc::new(metadata)
+ }
+
+ /// Push the given ranges to the metadata decoder, simulating reading from
a file
+ fn push_ranges_to_metadata_decoder(
+ metadata_decoder: &mut ParquetMetaDataPushDecoder,
+ ranges: Vec<Range<u64>>,
+ ) {
+ let data = ranges
+ .iter()
+ .map(|range| test_file_slice(range.clone()))
+ .collect::<Vec<_>>();
+ metadata_decoder.push_ranges(ranges, data).unwrap();
+ }
+
+ fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges:
Vec<Range<u64>>) {
+ let data = ranges
+ .iter()
+ .map(|range| test_file_slice(range.clone()))
+ .collect::<Vec<_>>();
+ decoder.push_ranges(ranges, data).unwrap();
+ }
+
+ /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and
return the corresponding element
+ fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) ->
T {
+ match result.expect("Expected Ok(DecodeResult::Data(T))") {
+ DecodeResult::Data(data) => data,
+ result => panic!("Expected DecodeResult::Data, got {result:?}"),
+ }
+ }
+
+ /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and
return the corresponding ranges
+ fn expect_needs_data<T: Debug>(
+ result: Result<DecodeResult<T>, ParquetError>,
+ ) -> Vec<Range<u64>> {
+ match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
+ DecodeResult::NeedsData(ranges) => ranges,
+ result => panic!("Expected DecodeResult::NeedsData, got
{result:?}"),
+ }
+ }
+
+ fn expect_finished<T: Debug>(result: Result<DecodeResult<T>,
ParquetError>) {
+ match result.expect("Expected Ok(DecodeResult::Finished)") {
+ DecodeResult::Finished => {}
+ result => panic!("Expected DecodeResult::Finished, got
{result:?}"),
+ }
+ }
+}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/data.rs
b/parquet/src/arrow/push_decoder/reader_builder/data.rs
new file mode 100644
index 0000000000..6fbc2090b0
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/reader_builder/data.rs
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`DataRequest`] tracks and holds data needed to construct InMemoryRowGroups
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::arrow_reader::RowSelection;
+use crate::arrow::in_memory_row_group::{ColumnChunkData, FetchRanges,
InMemoryRowGroup};
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
+use crate::file::reader::ChunkReader;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Contains in-progress state to construct InMemoryRowGroups
+///
+/// See [`DataRequestBuilder`] for creating new requests
+#[derive(Debug)]
+pub(super) struct DataRequest {
+ /// Any previously read column chunk data
+ column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+ /// The ranges of data that are needed next
+ ranges: Vec<Range<u64>>,
+ /// Optional page start offsets for each requested range. This is used
+ /// to create the relevant InMemoryRowGroup
+ page_start_offsets: Option<Vec<Vec<u64>>>,
+}
+
+impl DataRequest {
+ /// return what ranges are still needed to satisfy this request. Returns
an empty vec
+ /// if all ranges are satisfied
+ pub fn needed_ranges(&self, buffers: &PushBuffers) -> Vec<Range<u64>> {
+ self.ranges
+ .iter()
+ .filter(|&range| !buffers.has_range(range))
+ .cloned()
+ .collect()
+ }
+
+ /// Returns the chunks from the buffers that satisfy this request
+ fn get_chunks(&self, buffers: &PushBuffers) -> Result<Vec<Bytes>,
ParquetError> {
+ self.ranges
+ .iter()
+ .map(|range| {
+ let length: usize = (range.end - range.start)
+ .try_into()
+ .expect("overflow for offset");
+ // should have all the data due to the check above
+ buffers.get_bytes(range.start, length).map_err(|e| {
+ ParquetError::General(format!(
+ "Internal Error missing data for range {range:?} in
buffers: {e}",
+ ))
+ })
+ })
+ .collect()
+ }
+
+ /// Create a new InMemoryRowGroup, and fill it with provided data
+ ///
+ /// Assumes that all needed data is present in the buffers
+ /// and clears any explicitly requested ranges
+ pub fn try_into_in_memory_row_group<'a>(
+ self,
+ row_group_idx: usize,
+ row_count: usize,
+ parquet_metadata: &'a ParquetMetaData,
+ projection: &ProjectionMask,
+ buffers: &mut PushBuffers,
+ ) -> Result<InMemoryRowGroup<'a>, ParquetError> {
+ let chunks = self.get_chunks(buffers)?;
+
+ let Self {
+ column_chunks,
+ ranges,
+ page_start_offsets,
+ } = self;
+
+ // Create an InMemoryRowGroup to hold the column chunks, this is a
+ // temporary structure used to tell the ArrowReaders what pages are
+ // needed for decoding
+ let mut in_memory_row_group = InMemoryRowGroup {
+ row_count,
+ column_chunks,
+ offset_index: get_offset_index(parquet_metadata, row_group_idx),
+ row_group_idx,
+ metadata: parquet_metadata,
+ };
+
+ in_memory_row_group.fill_column_chunks(projection, page_start_offsets,
chunks);
+
+ // Clear the ranges that were explicitly requested
+ buffers.clear_ranges(&ranges);
+
+ Ok(in_memory_row_group)
+ }
+}
+
+/// Builder for [`DataRequest`]
+pub(super) struct DataRequestBuilder<'a> {
+ /// The row group index
+ row_group_idx: usize,
+ /// The number of rows in the row group
+ row_count: usize,
+ /// The batch size to read
+ batch_size: usize,
+ /// The parquet metadata
+ parquet_metadata: &'a ParquetMetaData,
+ /// The projection mask (which columns to read)
+ projection: &'a ProjectionMask,
+ /// Optional row selection to apply
+ selection: Option<&'a RowSelection>,
+ /// Optional projection mask if using
+ /// [`RowGroupCache`](crate::arrow::array_reader::RowGroupCache)
+ /// for caching decoded columns.
+ cache_projection: Option<&'a ProjectionMask>,
+ /// Any previously read column chunks
+ column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+}
+
+impl<'a> DataRequestBuilder<'a> {
+ pub(super) fn new(
+ row_group_idx: usize,
+ row_count: usize,
+ batch_size: usize,
+ parquet_metadata: &'a ParquetMetaData,
+ projection: &'a ProjectionMask,
+ ) -> Self {
+ Self {
+ row_group_idx,
+ row_count,
+ batch_size,
+ parquet_metadata,
+ projection,
+ selection: None,
+ cache_projection: None,
+ column_chunks: None,
+ }
+ }
+
+ /// Set an optional row selection to apply
+ pub(super) fn with_selection(mut self, selection: Option<&'a
RowSelection>) -> Self {
+ self.selection = selection;
+ self
+ }
+
+ /// set columns to cache, if any
+ pub(super) fn with_cache_projection(
+ mut self,
+ cache_projection: Option<&'a ProjectionMask>,
+ ) -> Self {
+ self.cache_projection = cache_projection;
+ self
+ }
+
+ /// Provide any previously read column chunks
+ pub(super) fn with_column_chunks(
+ mut self,
+ column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+ ) -> Self {
+ self.column_chunks = column_chunks;
+ self
+ }
+
+ pub(crate) fn build(self) -> DataRequest {
+ let Self {
+ row_group_idx,
+ row_count,
+ batch_size,
+ parquet_metadata,
+ projection,
+ selection,
+ cache_projection,
+ column_chunks,
+ } = self;
+
+ let row_group_meta_data = parquet_metadata.row_group(row_group_idx);
+
+ // If no previously read column chunks are provided, create a new
location to hold them
+ let column_chunks =
+ column_chunks.unwrap_or_else(|| vec![None;
row_group_meta_data.columns().len()]);
+
+ // Create an InMemoryRowGroup to hold the column chunks, this is a
+ // temporary structure used to tell the ArrowReaders what pages are
+ // needed for decoding
+ let row_group = InMemoryRowGroup {
+ row_count,
+ column_chunks,
+ offset_index: get_offset_index(parquet_metadata, row_group_idx),
+ row_group_idx,
+ metadata: parquet_metadata,
+ };
+
+ let FetchRanges {
+ ranges,
+ page_start_offsets,
+ } = row_group.fetch_ranges(projection, selection, batch_size,
cache_projection);
+
+ DataRequest {
+ // Save any previously read column chunks
+ column_chunks: row_group.column_chunks,
+ ranges,
+ page_start_offsets,
+ }
+ }
+}
+
+fn get_offset_index(
+ parquet_metadata: &ParquetMetaData,
+ row_group_idx: usize,
+) -> Option<&[OffsetIndexMetaData]> {
+ parquet_metadata
+ .offset_index()
+ // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
+ .filter(|index| !index.is_empty())
+ .map(|x| x[row_group_idx].as_slice())
+}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/filter.rs
b/parquet/src/arrow/push_decoder/reader_builder/filter.rs
new file mode 100644
index 0000000000..4a3c38e959
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/reader_builder/filter.rs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FilterInfo`] state machine for evaluating row filters
+
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{CacheOptionsBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::{ArrowPredicate, RowFilter};
+use std::num::NonZeroUsize;
+use std::sync::{Arc, Mutex};
+
+/// State machine for evaluating a sequence of predicates.
+///
+/// The `FilterInfo` owns the [`RowFilter`] being evaluated and tracks the
current
+/// predicate to evaluate.
+#[derive(Debug)]
+pub(super) struct FilterInfo {
+ /// The predicates to evaluate, in order
+ ///
+ /// RowFilter is owned by `FilterInfo` because they may be mutated as part
+ /// of evaluation. Specifically, [`ArrowPredicate`] requires &mut self for
+ /// evaluation.
+ filter: RowFilter,
+ /// The next filter to be evaluated
+ next_predicate: NonZeroUsize,
+ /// Previously computed filter results
+ cache_info: CacheInfo,
+}
+
+/// Predicate cache
+///
+/// Note this is basically the same as CacheOptionsBuilder
+/// but it owns the ProjectionMask and RowGroupCache
+#[derive(Debug)]
+pub(super) struct CacheInfo {
+ /// The columns to cache in the predicate cache.
+ /// Normally these are the columns that filters may look at such that
+ /// if we have a filter like `(a + 10 > 5) AND (a + b = 0)` we cache `a`
to avoid re-reading it between evaluating `a + 10 > 5` and `a + b = 0`.
+ cache_projection: ProjectionMask,
+ row_group_cache: Arc<Mutex<RowGroupCache>>,
+}
+
+impl CacheInfo {
+ pub(super) fn new(
+ cache_projection: ProjectionMask,
+ row_group_cache: Arc<Mutex<RowGroupCache>>,
+ ) -> Self {
+ Self {
+ cache_projection,
+ row_group_cache,
+ }
+ }
+
+ pub(super) fn builder(&self) -> CacheOptionsBuilder<'_> {
+ CacheOptionsBuilder::new(&self.cache_projection, &self.row_group_cache)
+ }
+}
+
+pub(super) enum AdvanceResult {
+ /// Advanced to the next predicate
+ Continue(FilterInfo),
+ /// No more predicates returns the row filter and cache info
+ Done(RowFilter, CacheInfo),
+}
+
+impl FilterInfo {
+ /// Create a new FilterInfo
+ pub(super) fn new(filter: RowFilter, cache_info: CacheInfo) -> Self {
+ Self {
+ filter,
+ next_predicate: NonZeroUsize::new(1).expect("1 is always
non-zero"),
+ cache_info,
+ }
+ }
+
+ /// Advance to the next predicate
+ ///
+ /// Returns
+ /// * [`AdvanceResult::Continue`] returning the `FilterInfo` if there are
+ /// more predicate to evaluate.
+ /// * [`AdvanceResult::Done`] with the inner [`RowFilter`] and
[`CacheInfo]`
+ /// if there are no more predicates
+ pub(super) fn advance(mut self) -> AdvanceResult {
+ if self.next_predicate.get() >= self.filter.predicates.len() {
+ AdvanceResult::Done(self.filter, self.cache_info)
+ } else {
+ self.next_predicate = self
+ .next_predicate
+ .checked_add(1)
+ .expect("no usize overflow");
+ AdvanceResult::Continue(self)
+ }
+ }
+
+ /// Return a mutable reference to the current predicate
+ pub(super) fn current_mut(&mut self) -> &mut dyn ArrowPredicate {
+ self.filter
+ .predicates
+ .get_mut(self.next_predicate.get() - 1)
+ // advance ensures next_predicate is always in bounds
+ .unwrap()
+ .as_mut()
+ }
+
+ /// Return the current predicate to evaluate
+ pub(super) fn current(&self) -> &dyn ArrowPredicate {
+ self.filter
+ .predicates
+ .get(self.next_predicate.get() - 1)
+ // advance ensures next_predicate is always in bounds
+ .unwrap()
+ .as_ref()
+ }
+
+ /// Return a reference to the cache projection
+ pub(super) fn cache_projection(&self) -> &ProjectionMask {
+ &self.cache_info.cache_projection
+ }
+
+ /// Return a cache builder to save the results of predicate evaluation
+ pub(super) fn cache_builder(&self) -> CacheOptionsBuilder<'_> {
+ self.cache_info.builder()
+ }
+
+ /// Returns the inner filter, consuming this FilterInfo
+ pub(super) fn into_filter(self) -> RowFilter {
+ self.filter
+ }
+}
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
new file mode 100644
index 0000000000..be9070ae8b
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -0,0 +1,659 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod data;
+mod filter;
+
+use crate::DecodeResult;
+use crate::arrow::ProjectionMask;
+use crate::arrow::array_reader::{ArrayReaderBuilder, RowGroupCache};
+use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
+use crate::arrow::arrow_reader::{
+ ParquetRecordBatchReader, ReadPlanBuilder, RowFilter, RowSelection,
+};
+use crate::arrow::in_memory_row_group::ColumnChunkData;
+use crate::arrow::push_decoder::reader_builder::data::DataRequestBuilder;
+use crate::arrow::push_decoder::reader_builder::filter::CacheInfo;
+use crate::arrow::schema::ParquetField;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use crate::util::push_buffers::PushBuffers;
+use bytes::Bytes;
+use data::DataRequest;
+use filter::AdvanceResult;
+use filter::FilterInfo;
+use std::ops::Range;
+use std::sync::{Arc, Mutex};
+
+/// The current row group being read and the read plan
+#[derive(Debug)]
+struct RowGroupInfo {
+ row_group_idx: usize,
+ row_count: usize,
+ plan_builder: ReadPlanBuilder,
+}
+
+/// This is the inner state machine for reading a single row group.
+#[derive(Debug)]
+enum RowGroupDecoderState {
+ Start {
+ row_group_info: RowGroupInfo,
+ },
+ /// Planning filters, but haven't yet requested data to evaluate them
+ Filters {
+ row_group_info: RowGroupInfo,
+ /// Any previously read column chunk data from prior filters
+ column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+ filter_info: FilterInfo,
+ },
+ /// Needs data to evaluate current filter
+ WaitingOnFilterData {
+ row_group_info: RowGroupInfo,
+ filter_info: FilterInfo,
+ data_request: DataRequest,
+ },
+ /// Know what data to actually read, after all predicates
+ StartData {
+ row_group_info: RowGroupInfo,
+ /// Any previously read column chunk data from the filtering phase
+ column_chunks: Option<Vec<Option<Arc<ColumnChunkData>>>>,
+ /// Any cached filter results
+ cache_info: Option<CacheInfo>,
+ },
+ /// Needs data to proceed with reading the output
+ WaitingOnData {
+ row_group_info: RowGroupInfo,
+ data_request: DataRequest,
+ /// Any cached filter results
+ cache_info: Option<CacheInfo>,
+ },
+ /// Finished (or not yet started) reading this group
+ Finished,
+}
+
+/// Result of a state transition
+#[derive(Debug)]
+struct NextState {
+ next_state: RowGroupDecoderState,
+ /// result to return, if any
+ ///
+ /// * `Some`: the processing should stop and return the result
+ /// * `None`: processing should continue
+ result: Option<DecodeResult<ParquetRecordBatchReader>>,
+}
+
+impl NextState {
+ /// The next state with no result.
+ ///
+ /// This indicates processing should continue
+ fn again(next_state: RowGroupDecoderState) -> Self {
+ Self {
+ next_state,
+ result: None,
+ }
+ }
+
+ /// Create a NextState with a result that should be returned
+ fn result(
+ next_state: RowGroupDecoderState,
+ result: DecodeResult<ParquetRecordBatchReader>,
+ ) -> Self {
+ Self {
+ next_state,
+ result: Some(result),
+ }
+ }
+}
+
+/// Builder for [`ParquetRecordBatchReader`] for a single row group
+///
+/// This struct drives the main state machine for decoding each row group -- it
+/// determines what data is needed, and then assembles the
+/// `ParquetRecordBatchReader` when all data is available.
+#[derive(Debug)]
+pub(crate) struct RowGroupReaderBuilder {
+ /// The output batch size
+ batch_size: usize,
+
+ /// What columns to project (produce in each output batch)
+ projection: ProjectionMask,
+
+ /// The Parquet file metadata
+ metadata: Arc<ParquetMetaData>,
+
+ /// Top level parquet schema and arrow schema mapping
+ fields: Option<Arc<ParquetField>>,
+
+ /// Optional filter
+ filter: Option<RowFilter>,
+
+ /// Limit to apply to remaining row groups (decremented as rows are read)
+ limit: Option<usize>,
+
+ /// Offset to apply to remaining row groups (decremented as rows are read)
+ offset: Option<usize>,
+
+ /// The size in bytes of the predicate cache to use
+ ///
+ /// See [`RowGroupCache`] for details.
+ max_predicate_cache_size: usize,
+
+ /// The metrics collector
+ metrics: ArrowReaderMetrics,
+
+ /// Current state of the decoder.
+ ///
+ /// It is taken when processing, and must be put back before returning
+ /// it is a bug error if it is not put back after transitioning states.
+ state: Option<RowGroupDecoderState>,
+
+ /// The underlying data store
+ buffers: PushBuffers,
+}
+
+impl RowGroupReaderBuilder {
+ /// Create a new RowGroupReaderBuilder
+ #[expect(clippy::too_many_arguments)]
+ pub(crate) fn new(
+ batch_size: usize,
+ projection: ProjectionMask,
+ metadata: Arc<ParquetMetaData>,
+ fields: Option<Arc<ParquetField>>,
+ filter: Option<RowFilter>,
+ limit: Option<usize>,
+ offset: Option<usize>,
+ metrics: ArrowReaderMetrics,
+ max_predicate_cache_size: usize,
+ buffers: PushBuffers,
+ ) -> Self {
+ Self {
+ batch_size,
+ projection,
+ metadata,
+ fields,
+ filter,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ state: Some(RowGroupDecoderState::Finished),
+ buffers,
+ }
+ }
+
+ /// Push new data buffers that can be used to satisfy pending requests
+ pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+ self.buffers.push_ranges(ranges, buffers);
+ }
+
+ /// Returns the total number of buffered bytes available
+ pub fn buffered_bytes(&self) -> u64 {
+ self.buffers.buffered_bytes()
+ }
+
+ /// take the current state, leaving None in its place.
+ ///
+ /// Returns an error if there the state wasn't put back after the previous
+ /// call to [`Self::take_state`].
+ ///
+ /// Any code that calls this method must ensure that the state is put back
+ /// before returning, otherwise the reader will error next time it is
called
+ fn take_state(&mut self) -> Result<RowGroupDecoderState, ParquetError> {
+ self.state.take().ok_or_else(|| {
+ ParquetError::General(String::from(
+ "Internal Error: RowGroupReader in invalid state",
+ ))
+ })
+ }
+
+ /// Setup this reader to read the next row group
+ pub(crate) fn next_row_group(
+ &mut self,
+ row_group_idx: usize,
+ row_count: usize,
+ selection: Option<RowSelection>,
+ ) -> Result<(), ParquetError> {
+ let state = self.take_state()?;
+ if !matches!(state, RowGroupDecoderState::Finished) {
+ return Err(ParquetError::General(format!(
+ "Internal Error: next_row_group called while still reading a
row group. Expected Finished state, got {state:?}"
+ )));
+ }
+ let plan_builder =
ReadPlanBuilder::new(self.batch_size).with_selection(selection);
+
+ let row_group_info = RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ };
+
+ self.state = Some(RowGroupDecoderState::Start { row_group_info });
+ Ok(())
+ }
+
+ /// Try to build the next `ParquetRecordBatchReader` from this
RowGroupReader.
+ ///
+ /// If more data is needed, returns [`DecodeResult::NeedsData`] with the
+ /// ranges of data that are needed to proceed.
+ ///
+ /// If a [`ParquetRecordBatchReader`] is ready, it is returned in
+ /// `DecodeResult::Data`.
+ pub(crate) fn try_build(
+ &mut self,
+ ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+ loop {
+ let current_state = self.take_state()?;
+ // Try to transition the decoder.
+ match self.try_transition(current_state)? {
+ // Either produced a batch reader, needed input, or finished
+ NextState {
+ next_state,
+ result: Some(result),
+ } => {
+ // put back the next state
+ self.state = Some(next_state);
+ return Ok(result);
+ }
+ // completed one internal state, maybe can proceed further
+ NextState {
+ next_state,
+ result: None,
+ } => {
+ // continue processing
+ self.state = Some(next_state);
+ }
+ }
+ }
+ }
+
+ /// Current state --> next state + optional output
+ ///
+ /// This is the main state transition function for the row group reader
+ /// and encodes the row group decoding state machine.
+ ///
+ /// # Notes
+ ///
+ /// This structure is used to reduce the indentation level of the main loop
+ /// in try_build
+ fn try_transition(
+ &mut self,
+ current_state: RowGroupDecoderState,
+ ) -> Result<NextState, ParquetError> {
+ let result = match current_state {
+ RowGroupDecoderState::Start { row_group_info } => {
+ let column_chunks = None; // no prior column chunks
+
+ let Some(filter) = self.filter.take() else {
+ // no filter, start trying to read data immediately
+ return Ok(NextState::again(RowGroupDecoderState::StartData
{
+ row_group_info,
+ column_chunks,
+ cache_info: None,
+ }));
+ };
+ // no predicates in filter, so start reading immediately
+ if filter.predicates.is_empty() {
+ return Ok(NextState::again(RowGroupDecoderState::StartData
{
+ row_group_info,
+ column_chunks,
+ cache_info: None,
+ }));
+ };
+
+ // we have predicates to evaluate
+ let cache_projection =
+
self.compute_cache_projection(row_group_info.row_group_idx, &filter);
+
+ let cache_info = CacheInfo::new(
+ cache_projection,
+ Arc::new(Mutex::new(RowGroupCache::new(
+ self.batch_size,
+ self.max_predicate_cache_size,
+ ))),
+ );
+
+ let filter_info = FilterInfo::new(filter, cache_info);
+ NextState::again(RowGroupDecoderState::Filters {
+ row_group_info,
+ filter_info,
+ column_chunks,
+ })
+ }
+ // need to evaluate filters
+ RowGroupDecoderState::Filters {
+ row_group_info,
+ column_chunks,
+ filter_info,
+ } => {
+ let RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ } = row_group_info;
+
+ // If nothing is selected, we are done with this row group
+ if !plan_builder.selects_any() {
+ // ruled out entire row group
+ self.filter = Some(filter_info.into_filter());
+ return Ok(NextState::result(
+ RowGroupDecoderState::Finished,
+ DecodeResult::Finished,
+ ));
+ }
+
+ // Make a request for the data needed to evaluate the current
predicate
+ let predicate = filter_info.current();
+
+ // need to fetch pages the column needs for decoding, figure
+ // that out based on the current selection and projection
+ let data_request = DataRequestBuilder::new(
+ row_group_idx,
+ row_count,
+ self.batch_size,
+ &self.metadata,
+ predicate.projection(), // use the predicate's projection
+ )
+ .with_selection(plan_builder.selection())
+ // Fetch predicate columns; expand selection only for cached
predicate columns
+ .with_cache_projection(Some(filter_info.cache_projection()))
+ .with_column_chunks(column_chunks)
+ .build();
+
+ let row_group_info = RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ };
+
+ NextState::again(RowGroupDecoderState::WaitingOnFilterData {
+ row_group_info,
+ filter_info,
+ data_request,
+ })
+ }
+ RowGroupDecoderState::WaitingOnFilterData {
+ row_group_info,
+ data_request,
+ mut filter_info,
+ } => {
+ // figure out what ranges we still need
+ let needed_ranges = data_request.needed_ranges(&self.buffers);
+ if !needed_ranges.is_empty() {
+ // still need data
+ return Ok(NextState::result(
+ RowGroupDecoderState::WaitingOnFilterData {
+ row_group_info,
+ filter_info,
+ data_request,
+ },
+ DecodeResult::NeedsData(needed_ranges),
+ ));
+ }
+
+ // otherwise we have all the data we need to evaluate the
predicate
+ let RowGroupInfo {
+ row_group_idx,
+ row_count,
+ mut plan_builder,
+ } = row_group_info;
+
+ let predicate = filter_info.current();
+
+ let row_group = data_request.try_into_in_memory_row_group(
+ row_group_idx,
+ row_count,
+ &self.metadata,
+ predicate.projection(),
+ &mut self.buffers,
+ )?;
+
+ let cache_options = filter_info.cache_builder().producer();
+
+ let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
+ .with_cache_options(Some(&cache_options))
+ .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
+
+ plan_builder =
+ plan_builder.with_predicate(array_reader,
filter_info.current_mut())?;
+
+ let row_group_info = RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ };
+
+ // Take back the column chunks that were read
+ let column_chunks = Some(row_group.column_chunks);
+
+ // advance to the next predicate, if any
+ match filter_info.advance() {
+ AdvanceResult::Continue(filter_info) => {
+ NextState::again(RowGroupDecoderState::Filters {
+ row_group_info,
+ column_chunks,
+ filter_info,
+ })
+ }
+ // done with predicates, proceed to reading data
+ AdvanceResult::Done(filter, cache_info) => {
+ // remember we need to put back the filter
+ assert!(self.filter.is_none());
+ self.filter = Some(filter);
+ NextState::again(RowGroupDecoderState::StartData {
+ row_group_info,
+ column_chunks,
+ cache_info: Some(cache_info),
+ })
+ }
+ }
+ }
+ RowGroupDecoderState::StartData {
+ row_group_info,
+ column_chunks,
+ cache_info,
+ } => {
+ let RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ } = row_group_info;
+
+ // Compute the number of rows in the selection before applying
limit and offset
+ let rows_before =
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+ if rows_before == 0 {
+ // ruled out entire row group
+ return Ok(NextState::result(
+ RowGroupDecoderState::Finished,
+ DecodeResult::Finished,
+ ));
+ }
+
+ // Apply any limit and offset
+ let plan_builder = plan_builder
+ .limited(row_count)
+ .with_offset(self.offset)
+ .with_limit(self.limit)
+ .build_limited();
+
+ let rows_after =
plan_builder.num_rows_selected().unwrap_or(row_count);
+
+ // Update running offset and limit for after the current row
group is read
+ if let Some(offset) = &mut self.offset {
+ // Reduction is either because of offset or limit, as
limit is applied
+ // after offset has been "exhausted" can just use
saturating sub here
+ *offset = offset.saturating_sub(rows_before - rows_after)
+ }
+
+ if rows_after == 0 {
+ // no rows left after applying limit/offset
+ return Ok(NextState::result(
+ RowGroupDecoderState::Finished,
+ DecodeResult::Finished,
+ ));
+ }
+
+ if let Some(limit) = &mut self.limit {
+ *limit -= rows_after;
+ }
+
+ let data_request = DataRequestBuilder::new(
+ row_group_idx,
+ row_count,
+ self.batch_size,
+ &self.metadata,
+ &self.projection,
+ )
+ .with_selection(plan_builder.selection())
+ .with_column_chunks(column_chunks)
+ // Final projection fetch shouldn't expand selection for cache
+ // so don't call with_cache_projection here
+ .build();
+
+ let row_group_info = RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ };
+
+ NextState::again(RowGroupDecoderState::WaitingOnData {
+ row_group_info,
+ data_request,
+ cache_info,
+ })
+ }
+ // Waiting on data to proceed with reading the output
+ RowGroupDecoderState::WaitingOnData {
+ row_group_info,
+ data_request,
+ cache_info,
+ } => {
+ let needed_ranges = data_request.needed_ranges(&self.buffers);
+ if !needed_ranges.is_empty() {
+ // still need data
+ return Ok(NextState::result(
+ RowGroupDecoderState::WaitingOnData {
+ row_group_info,
+ data_request,
+ cache_info,
+ },
+ DecodeResult::NeedsData(needed_ranges),
+ ));
+ }
+
+ // otherwise we have all the data we need to proceed
+ let RowGroupInfo {
+ row_group_idx,
+ row_count,
+ plan_builder,
+ } = row_group_info;
+
+ let row_group = data_request.try_into_in_memory_row_group(
+ row_group_idx,
+ row_count,
+ &self.metadata,
+ &self.projection,
+ &mut self.buffers,
+ )?;
+
+ let plan = plan_builder.build();
+
+ // if we have any cached results, connect them up
+ let array_reader_builder = ArrayReaderBuilder::new(&row_group,
&self.metrics);
+ let array_reader = if let Some(cache_info) =
cache_info.as_ref() {
+ let cache_options = cache_info.builder().consumer();
+ array_reader_builder
+ .with_cache_options(Some(&cache_options))
+ .build_array_reader(self.fields.as_deref(),
&self.projection)
+ } else {
+ array_reader_builder
+ .build_array_reader(self.fields.as_deref(),
&self.projection)
+ }?;
+
+ let reader = ParquetRecordBatchReader::new(array_reader, plan);
+ NextState::result(RowGroupDecoderState::Finished,
DecodeResult::Data(reader))
+ }
+ RowGroupDecoderState::Finished => {
+ // nothing left to read
+ NextState::result(RowGroupDecoderState::Finished,
DecodeResult::Finished)
+ }
+ };
+ Ok(result)
+ }
+
+ /// Which columns should be cached?
+ ///
+ /// Returns the columns that are used by the filters *and* then used in the
+ /// final projection, excluding any nested columns.
+ fn compute_cache_projection(&self, row_group_idx: usize, filter:
&RowFilter) -> ProjectionMask {
+ let meta = self.metadata.row_group(row_group_idx);
+ match self.compute_cache_projection_inner(filter) {
+ Some(projection) => projection,
+ None => ProjectionMask::none(meta.columns().len()),
+ }
+ }
+
+ fn compute_cache_projection_inner(&self, filter: &RowFilter) ->
Option<ProjectionMask> {
+ let mut cache_projection =
filter.predicates.first()?.projection().clone();
+ for predicate in filter.predicates.iter() {
+ cache_projection.union(predicate.projection());
+ }
+ cache_projection.intersect(&self.projection);
+ self.exclude_nested_columns_from_cache(&cache_projection)
+ }
+
+ /// Exclude leaves belonging to roots that span multiple parquet leaves
(i.e. nested columns)
+ fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) ->
Option<ProjectionMask> {
+ let schema = self.metadata.file_metadata().schema_descr();
+ let num_leaves = schema.num_columns();
+
+ // Count how many leaves each root column has
+ let num_roots = schema.root_schema().get_fields().len();
+ let mut root_leaf_counts = vec![0usize; num_roots];
+ for leaf_idx in 0..num_leaves {
+ let root_idx = schema.get_column_root_idx(leaf_idx);
+ root_leaf_counts[root_idx] += 1;
+ }
+
+ // Keep only leaves whose root has exactly one leaf (non-nested)
+ let mut included_leaves = Vec::new();
+ for leaf_idx in 0..num_leaves {
+ if mask.leaf_included(leaf_idx) {
+ let root_idx = schema.get_column_root_idx(leaf_idx);
+ if root_leaf_counts[root_idx] == 1 {
+ included_leaves.push(leaf_idx);
+ }
+ }
+ }
+
+ if included_leaves.is_empty() {
+ None
+ } else {
+ Some(ProjectionMask::leaves(schema, included_leaves))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ #[test]
+ // Verify that the size of RowGroupDecoderState does not grow too large
+ fn test_structure_size() {
+ assert_eq!(std::mem::size_of::<RowGroupDecoderState>(), 184);
+ }
+}
diff --git a/parquet/src/arrow/push_decoder/remaining.rs
b/parquet/src/arrow/push_decoder/remaining.rs
new file mode 100644
index 0000000000..4613fda087
--- /dev/null
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::DecodeResult;
+use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
+use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder;
+use crate::errors::ParquetError;
+use crate::file::metadata::ParquetMetaData;
+use bytes::Bytes;
+use std::collections::VecDeque;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// State machine that tracks the remaining high level chunks (row groups) of
+/// Parquet data are left to read.
+///
+/// This is currently a row group, but the author aspires to extend the pattern
+/// to data boundaries other than RowGroups in the future.
+#[derive(Debug)]
+pub(crate) struct RemainingRowGroups {
+ /// The underlying Parquet metadata
+ parquet_metadata: Arc<ParquetMetaData>,
+
+ /// The row groups that have not yet been read
+ row_groups: VecDeque<usize>,
+
+ /// Remaining selection to apply to the next row groups
+ selection: Option<RowSelection>,
+
+ /// State for building the reader for the current row group
+ row_group_reader_builder: RowGroupReaderBuilder,
+}
+
+impl RemainingRowGroups {
+ pub fn new(
+ parquet_metadata: Arc<ParquetMetaData>,
+ row_groups: Vec<usize>,
+ selection: Option<RowSelection>,
+ row_group_reader_builder: RowGroupReaderBuilder,
+ ) -> Self {
+ Self {
+ parquet_metadata,
+ row_groups: VecDeque::from(row_groups),
+ selection,
+ row_group_reader_builder,
+ }
+ }
+
+ /// Push new data buffers that can be used to satisfy pending requests
+ pub fn push_data(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
+ self.row_group_reader_builder.push_data(ranges, buffers);
+ }
+
+ /// Return the total number of bytes buffered so far
+ pub fn buffered_bytes(&self) -> u64 {
+ self.row_group_reader_builder.buffered_bytes()
+ }
+
+ /// returns [`ParquetRecordBatchReader`] suitable for reading the next
+ /// group of rows from the Parquet data, or the list of data ranges still
+ /// needed to proceed
+ pub fn try_next_reader(
+ &mut self,
+ ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+ loop {
+ // Are we ready yet to start reading?
+ let result: DecodeResult<ParquetRecordBatchReader> =
+ self.row_group_reader_builder.try_build()?;
+ match result {
+ DecodeResult::Finished => {
+ // reader is done, proceed to the next row group
+ // fall through to the next row group
+ // This happens if the row group was completely filtered
out
+ }
+ DecodeResult::NeedsData(ranges) => {
+ // need more data to proceed
+ return Ok(DecodeResult::NeedsData(ranges));
+ }
+ DecodeResult::Data(batch_reader) => {
+ // ready to read the row group
+ return Ok(DecodeResult::Data(batch_reader));
+ }
+ }
+
+ // No current reader, proceed to the next row group if any
+ let row_group_idx = match self.row_groups.pop_front() {
+ None => return Ok(DecodeResult::Finished),
+ Some(idx) => idx,
+ };
+
+ let row_count: usize = self
+ .parquet_metadata
+ .row_group(row_group_idx)
+ .num_rows()
+ .try_into()
+ .map_err(|e| ParquetError::General(format!("Row count
overflow: {e}")))?;
+
+ let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
+ self.row_group_reader_builder
+ .next_row_group(row_group_idx, row_count, selection)?;
+ // the next iteration will try to build the reader for the new row
group
+ }
+ }
+}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 7022bd61c4..1c8f3e9c69 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -42,7 +42,6 @@
//! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O
//! * [`ParquetMetaDataWriter`] for writing.
//!
-//!
//! # Examples
//!
//! Please see [`external_metadata.rs`]
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index f8457fac23..98106a2c10 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -55,25 +55,28 @@
//! ## Reading and Writing Arrow (`arrow` feature)
//!
//! The [`arrow`] module supports reading and writing Parquet data to/from
-//! Arrow `RecordBatch`es. Using Arrow is simple and performant, and allows
workloads
+//! Arrow [`RecordBatch`]es. Using Arrow is simple and performant, and allows
workloads
//! to leverage the wide range of data transforms provided by the [arrow]
crate, and by the
//! ecosystem of [Arrow] compatible systems.
//!
//! Most users will use [`ArrowWriter`] for writing and
[`ParquetRecordBatchReaderBuilder`] for
-//! reading.
+//! reading from synchronous IO sources such as files or in-memory buffers.
//!
-//! Lower level APIs include [`ArrowColumnWriter`] for writing using multiple
-//! threads, and [`RowFilter`] to apply filters during decode.
+//! Lower level APIs include
+//! * [`ParquetPushDecoder`] for file grained control over interleaving of IO
and CPU.
+//! * [`ArrowColumnWriter`] for writing using multiple threads,
+//! * [`RowFilter`] to apply filters during decode
//!
//! [`ArrowWriter`]: arrow::arrow_writer::ArrowWriter
//! [`ParquetRecordBatchReaderBuilder`]:
arrow::arrow_reader::ParquetRecordBatchReaderBuilder
+//! [`ParquetPushDecoder`]: arrow::push_decoder::ParquetPushDecoder
//! [`ArrowColumnWriter`]: arrow::arrow_writer::ArrowColumnWriter
//! [`RowFilter`]: arrow::arrow_reader::RowFilter
//!
-//! ## `async` Reading and Writing Arrow (`async` feature)
+//! ## `async` Reading and Writing Arrow (`arrow` feature + `async` feature)
//!
//! The [`async_reader`] and [`async_writer`] modules provide async APIs to
-//! read and write `RecordBatch`es asynchronously.
+//! read and write [`RecordBatch`]es asynchronously.
//!
//! Most users will use [`AsyncArrowWriter`] for writing and
[`ParquetRecordBatchStreamBuilder`]
//! for reading. When the `object_store` feature is enabled,
[`ParquetObjectReader`]
@@ -104,6 +107,7 @@
//!
//! [arrow]: https://docs.rs/arrow/latest/arrow/index.html
//! [Arrow]: https://arrow.apache.org/
+//! [`RecordBatch`]:
https://docs.rs/arrow/latest/arrow/array/struct.RecordBatch.html
//! [CSV]: https://en.wikipedia.org/wiki/Comma-separated_values
//! [Dremel]: https://research.google/pubs/pub36632/
//! [Logical Types]:
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs
index b30f91a81b..0475d48768 100644
--- a/parquet/src/util/push_buffers.rs
+++ b/parquet/src/util/push_buffers.rs
@@ -129,6 +129,31 @@ impl PushBuffers {
self.offset = offset;
self
}
+
+ /// Return the total of all buffered ranges
+ #[cfg(feature = "arrow")]
+ pub fn buffered_bytes(&self) -> u64 {
+ self.ranges.iter().map(|r| r.end - r.start).sum()
+ }
+
+ /// Clear any range and corresponding buffer that is exactly in the
ranges_to_clear
+ #[cfg(feature = "arrow")]
+ pub fn clear_ranges(&mut self, ranges_to_clear: &[Range<u64>]) {
+ let mut new_ranges = Vec::new();
+ let mut new_buffers = Vec::new();
+
+ for (range, buffer) in self.iter() {
+ if !ranges_to_clear
+ .iter()
+ .any(|r| r.start == range.start && r.end == range.end)
+ {
+ new_ranges.push(range.clone());
+ new_buffers.push(buffer.clone());
+ }
+ }
+ self.ranges = new_ranges;
+ self.buffers = new_buffers;
+ }
}
impl Length for PushBuffers {