This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c2424bb84e Rewrite `ParquetRecordBatchStream` in terms of the
PushDecoder (#8159)
c2424bb84e is described below
commit c2424bb84ed428cddb105eaecd24c971cdf131a9
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Nov 10 07:03:44 2025 -0500
Rewrite `ParquetRecordBatchStream` in terms of the PushDecoder (#8159)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/7983
- Part of https://github.com/apache/arrow-rs/issues/8000
- closes https://github.com/apache/arrow-rs/issues/8677
I am also working on a blog post about this
- https://github.com/apache/arrow-rs/issues/8035
# TODOs
- [x] Rewrite `test_cache_projection_excludes_nested_columns` in terms
of higher level APIs (https://github.com/apache/arrow-rs/pull/8754)
- [x] Benchmarks
- [x] Benchmarks with DataFusion:
https://github.com/apache/datafusion/pull/18385
# Rationale for this change
A new ParquetPushDecoder was implemented here
- https://github.com/apache/arrow-rs/pull/7997
I need to refactor the async and sync readers to use the new push
decoder in order to:
1. avoid the [xkcd standards effect](https://xkcd.com/927/) (aka there
are now three control loops)
3. Prove that the push decoder works (by passing all the tests of the
other two)
4. Set the stage for improving filter pushdown more with a single
control loop
<img width="400" alt="image"
src="https://github.com/user-attachments/assets/e6886ee9-58b3-4a1e-8e88-9d2d03132b19"
/>
# What changes are included in this PR?
1. Refactor the `ParquetRecordBatchStream` to use `ParquetPushDecoder`
# Are these changes tested?
Yes, by the existing CI tests
I also ran several benchmarks, both in arrow-rs and in DataFusion and I
do not see any substantial performance difference (as expected):
- https://github.com/apache/datafusion/pull/18385
# Are there any user-facing changes?
No
---------
Co-authored-by: Vukasin Stefanovic <[email protected]>
---
parquet/src/arrow/arrow_reader/mod.rs | 7 +
parquet/src/arrow/async_reader/mod.rs | 714 ++++++---------------
parquet/src/arrow/push_decoder/mod.rs | 358 +++++++----
.../src/arrow/push_decoder/reader_builder/mod.rs | 4 +
parquet/src/util/push_buffers.rs | 7 -
parquet/tests/encryption/encryption_agnostic.rs | 4 +-
6 files changed, 428 insertions(+), 666 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index e718486895..4e12c55c9f 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -99,6 +99,13 @@ pub mod statistics;
/// [`StatisticsConverter`]: statistics::StatisticsConverter
/// [Querying Parquet with Millisecond Latency]:
https://arrow.apache.org/blog/2022/12/26/querying-parquet-with-millisecond-latency/
pub struct ArrowReaderBuilder<T> {
+ /// The "input" to read parquet data from.
+ ///
+ /// Note in the case of the [`ParquetPushDecoderBuilder`], there
+ /// is no underlying input, which is indicated by a type parameter of
[`NoInput`]
+ ///
+ /// [`ParquetPushDecoderBuilder`]:
crate::arrow::push_decoder::ParquetPushDecoderBuilder
+ /// [`NoInput`]: crate::arrow::push_decoder::NoInput
pub(crate) input: T,
pub(crate) metadata: Arc<ParquetMetaData>,
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index c5badea7f3..44c5465202 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -21,26 +21,23 @@
//!
//! See example on [`ParquetRecordBatchStreamBuilder::new`]
-use std::collections::VecDeque;
use std::fmt::Formatter;
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
-use futures::ready;
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow_array::RecordBatch;
-use arrow_schema::{DataType, Fields, Schema, SchemaRef};
+use arrow_schema::{Schema, SchemaRef};
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
ParquetRecordBatchReader,
- RowFilter, RowSelection,
};
use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression,
BloomFilterHash};
@@ -56,12 +53,8 @@ pub use metadata::*;
#[cfg(feature = "object_store")]
mod store;
-use crate::arrow::ProjectionMask;
-use crate::arrow::array_reader::{ArrayReaderBuilder, CacheOptionsBuilder,
RowGroupCache};
-use crate::arrow::arrow_reader::ReadPlanBuilder;
-use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
-use crate::arrow::in_memory_row_group::{FetchRanges, InMemoryRowGroup};
-use crate::arrow::schema::ParquetField;
+use crate::DecodeResult;
+use crate::arrow::push_decoder::{NoInput, ParquetPushDecoder,
ParquetPushDecoderBuilder};
#[cfg(feature = "object_store")]
pub use store::*;
@@ -201,10 +194,10 @@ impl ArrowReaderMetadata {
}
#[doc(hidden)]
-// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers
from async
-//
-// Allows sharing the same builder for both the sync and async versions,
whilst also not
-// breaking the pre-existing ParquetRecordBatchStreamBuilder API
+/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync
readers from async
+///
+/// Allows sharing the same builder for different readers while keeping the
same
+/// ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
@@ -483,300 +476,112 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let decoder = ParquetPushDecoderBuilder {
+ input: NoInput,
+ metadata,
+ schema,
+ fields,
+ projection,
+ filter,
+ selection,
batch_size,
row_groups,
- projection: self.projection,
- selection: self.selection,
- schema,
- reader_factory: Some(reader_factory),
- state: StreamState::Init,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ }
+ .build()?;
+
+ let request_state = RequestState::None { input: input.0 };
+
+ Ok(ParquetRecordBatchStream {
+ schema: projected_schema,
+ decoder,
+ request_state,
})
}
}
-/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`]
for the next row group
+/// State machine that tracks outstanding requests to fetch data
///
-/// Note: If all rows are filtered out in the row group (e.g by filters, limit
or
-/// offset), returns `None` for the reader.
-type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
-
-/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
-/// [`ParquetRecordBatchReader`]
-struct ReaderFactory<T> {
- metadata: Arc<ParquetMetaData>,
-
- /// Top level parquet schema
- fields: Option<Arc<ParquetField>>,
-
- input: T,
-
- /// Optional filter
- filter: Option<RowFilter>,
-
- /// Limit to apply to remaining row groups.
- limit: Option<usize>,
-
- /// Offset to apply to the next
- offset: Option<usize>,
-
- /// Metrics
- metrics: ArrowReaderMetrics,
-
- /// Maximum size of the predicate cache
- ///
- /// See [`RowGroupCache`] for details.
- max_predicate_cache_size: usize,
+/// The parameter `T` is the input, typically an `AsyncFileReader`
+enum RequestState<T> {
+ /// No outstanding requests
+ None {
+ input: T,
+ },
+ /// There is an outstanding request for data
+ Outstanding {
+ /// Ranges that have been requested
+ ranges: Vec<Range<u64>>,
+ /// Future that will resolve (input, requested_ranges)
+ ///
+ /// Note the future owns the reader while the request is outstanding
+ /// and returns it upon completion
+ future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
+ },
+ Done,
}
-impl<T> ReaderFactory<T>
+impl<T> RequestState<T>
where
- T: AsyncFileReader + Send,
+ T: AsyncFileReader + Unpin + Send + 'static,
{
- /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
- ///
- /// Updates the `limit` and `offset` of the reader factory
- ///
- /// Note: this captures self so that the resulting future has a static
lifetime
- async fn read_row_group(
- mut self,
- row_group_idx: usize,
- selection: Option<RowSelection>,
- projection: ProjectionMask,
- batch_size: usize,
- ) -> ReadResult<T> {
- // TODO: calling build_array multiple times is wasteful
-
- let meta = self.metadata.row_group(row_group_idx);
- let offset_index = self
- .metadata
- .offset_index()
- // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
- .filter(|index| !index.is_empty())
- .map(|x| x[row_group_idx].as_slice());
-
- // Reuse columns that are selected and used by the filters
- let cache_projection = match
self.compute_cache_projection(&projection) {
- Some(projection) => projection,
- None => ProjectionMask::none(meta.columns().len()),
- };
- let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
- batch_size,
- self.max_predicate_cache_size,
- )));
-
- let mut row_group = InMemoryRowGroup {
- // schema: meta.schema_descr_ptr(),
- row_count: meta.num_rows() as usize,
- column_chunks: vec![None; meta.columns().len()],
- offset_index,
- row_group_idx,
- metadata: self.metadata.as_ref(),
- };
-
- let cache_options_builder =
CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
-
- let filter = self.filter.as_mut();
- let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
-
- // Update selection based on any filters
- if let Some(filter) = filter {
- let cache_options = cache_options_builder.clone().producer();
-
- for predicate in filter.predicates.iter_mut() {
- if !plan_builder.selects_any() {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // (pre) Fetch only the columns that are selected by the
predicate
- let selection = plan_builder.selection();
- // Fetch predicate columns; expand selection only for cached
predicate columns
- let cache_mask = Some(&cache_projection);
- row_group
- .fetch(
- &mut self.input,
- predicate.projection(),
- selection,
- batch_size,
- cache_mask,
- )
- .await?;
-
- let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
- .with_cache_options(Some(&cache_options))
- .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
-
- plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
- }
- }
-
- // Compute the number of rows in the selection before applying limit
and offset
- let rows_before = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- if rows_before == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // Apply any limit and offset
- let plan_builder = plan_builder
- .limited(row_group.row_count)
- .with_offset(self.offset)
- .with_limit(self.limit)
- .build_limited();
-
- let rows_after = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- // Update running offset and limit for after the current row group is
read
- if let Some(offset) = &mut self.offset {
- // Reduction is either because of offset or limit, as limit is
applied
- // after offset has been "exhausted" can just use saturating sub
here
- *offset = offset.saturating_sub(rows_before - rows_after)
- }
-
- if rows_after == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- if let Some(limit) = &mut self.limit {
- *limit -= rows_after;
- }
- // fetch the pages needed for decoding
- row_group
- // Final projection fetch shouldn't expand selection for cache;
pass None
- .fetch(
- &mut self.input,
- &projection,
- plan_builder.selection(),
- batch_size,
- None,
- )
- .await?;
-
- let plan = plan_builder.build();
-
- let cache_options = cache_options_builder.consumer();
- let array_reader = ArrayReaderBuilder::new(&row_group, &self.metrics)
- .with_cache_options(Some(&cache_options))
- .build_array_reader(self.fields.as_deref(), &projection)?;
-
- let reader = ParquetRecordBatchReader::new(array_reader, plan);
-
- Ok((self, Some(reader)))
- }
-
- /// Compute which columns are used in filters and the final (output)
projection
- fn compute_cache_projection(&self, projection: &ProjectionMask) ->
Option<ProjectionMask> {
- // Do not compute the projection mask if the predicate cache is
disabled
- if self.max_predicate_cache_size == 0 {
- return None;
- }
-
- let filters = self.filter.as_ref()?;
- let mut cache_projection =
filters.predicates.first()?.projection().clone();
- for predicate in filters.predicates.iter() {
- cache_projection.union(predicate.projection());
- }
- cache_projection.intersect(projection);
- self.exclude_nested_columns_from_cache(&cache_projection)
- }
-
- /// Exclude leaves belonging to roots that span multiple parquet leaves
(i.e. nested columns)
- fn exclude_nested_columns_from_cache(&self, mask: &ProjectionMask) ->
Option<ProjectionMask> {
- let schema = self.metadata.file_metadata().schema_descr();
- let num_leaves = schema.num_columns();
-
- // Count how many leaves each root column has
- let num_roots = schema.root_schema().get_fields().len();
- let mut root_leaf_counts = vec![0usize; num_roots];
- for leaf_idx in 0..num_leaves {
- let root_idx = schema.get_column_root_idx(leaf_idx);
- root_leaf_counts[root_idx] += 1;
- }
-
- // Keep only leaves whose root has exactly one leaf (non-nested)
- let mut included_leaves = Vec::new();
- for leaf_idx in 0..num_leaves {
- if mask.leaf_included(leaf_idx) {
- let root_idx = schema.get_column_root_idx(leaf_idx);
- if root_leaf_counts[root_idx] == 1 {
- included_leaves.push(leaf_idx);
- }
- }
- }
-
- if included_leaves.is_empty() {
- None
- } else {
- Some(ProjectionMask::leaves(schema, included_leaves))
+ /// Issue a request to fetch `ranges`, returning the Outstanding state
+ fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
+ let ranges_captured = ranges.clone();
+
+ // Note this must move the input *into* the future
+ // because the get_byte_ranges future has a lifetime
+ // (aka can have references internally) and thus must
+ // own the input while the request is outstanding.
+ let future = async move {
+ let data = input.get_byte_ranges(ranges_captured).await?;
+ Ok((input, data))
}
+ .boxed();
+ RequestState::Outstanding { ranges, future }
}
}
-enum StreamState<T> {
- /// At the start of a new row group, or the end of the parquet stream
- Init,
- /// Decoding a batch
- Decoding(ParquetRecordBatchReader),
- /// Reading data from input
- Reading(BoxFuture<'static, ReadResult<T>>),
- /// Error
- Error,
-}
-
-impl<T> std::fmt::Debug for StreamState<T> {
+impl<T> std::fmt::Debug for RequestState<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
- StreamState::Init => write!(f, "StreamState::Init"),
- StreamState::Decoding(_) => write!(f, "StreamState::Decoding"),
- StreamState::Reading(_) => write!(f, "StreamState::Reading"),
- StreamState::Error => write!(f, "StreamState::Error"),
+ RequestState::None { input: _ } => f
+ .debug_struct("RequestState::None")
+ .field("input", &"...")
+ .finish(),
+ RequestState::Outstanding { ranges, .. } => f
+ .debug_struct("RequestState::Outstanding")
+ .field("ranges", &ranges)
+ .finish(),
+ RequestState::Done => {
+ write!(f, "RequestState::Done")
+ }
}
}
}
@@ -796,35 +601,23 @@ impl<T> std::fmt::Debug for StreamState<T> {
/// required, which is especially important for object stores, where IO
operations
/// have latencies in the hundreds of milliseconds
///
+/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over
+/// buffering.
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
pub struct ParquetRecordBatchStream<T> {
- metadata: Arc<ParquetMetaData>,
-
+ /// Output schema of the stream
schema: SchemaRef,
-
- row_groups: VecDeque<usize>,
-
- projection: ProjectionMask,
-
- batch_size: usize,
-
- selection: Option<RowSelection>,
-
- /// This is an option so it can be moved into a future
- reader_factory: Option<ReaderFactory<T>>,
-
- state: StreamState<T>,
+ /// Input and Outstanding IO request, if any
+ request_state: RequestState<T>,
+ /// Decoding state machine (no IO)
+ decoder: ParquetPushDecoder,
}
impl<T> std::fmt::Debug for ParquetRecordBatchStream<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ParquetRecordBatchStream")
- .field("metadata", &self.metadata)
- .field("schema", &self.schema)
- .field("batch_size", &self.batch_size)
- .field("projection", &self.projection)
- .field("state", &self.state)
+ .field("request_state", &self.request_state)
.finish()
}
}
@@ -858,45 +651,35 @@ where
/// - `Ok(Some(reader))` which holds all the data for the row group.
pub async fn next_row_group(&mut self) ->
Result<Option<ParquetRecordBatchReader>> {
loop {
- match &mut self.state {
- StreamState::Decoding(_) | StreamState::Reading(_) => {
- return Err(ParquetError::General(
- "Cannot combine the use of next_row_group with the
Stream API".to_string(),
- ));
- }
- StreamState::Init => {
- let row_group_idx = match self.row_groups.pop_front() {
- Some(idx) => idx,
- None => return Ok(None),
- };
-
- let row_count =
self.metadata.row_group(row_group_idx).num_rows() as usize;
-
- let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
-
- let reader_factory =
self.reader_factory.take().expect("lost reader factory");
-
- let (reader_factory, maybe_reader) = reader_factory
- .read_row_group(
- row_group_idx,
- selection,
- self.projection.clone(),
- self.batch_size,
- )
- .await
- .inspect_err(|_| {
- self.state = StreamState::Error;
- })?;
- self.reader_factory = Some(reader_factory);
-
- if let Some(reader) = maybe_reader {
- return Ok(Some(reader));
- } else {
- // All rows skipped, read next row group
- continue;
+ // Take ownership of request state to process, leaving self in a
+ // valid state
+ let request_state = std::mem::replace(&mut self.request_state,
RequestState::Done);
+ match request_state {
+ // No outstanding requests, proceed to setup next row group
+ RequestState::None { input } => {
+ match self.decoder.try_next_reader()? {
+ DecodeResult::NeedsData(ranges) => {
+ self.request_state =
RequestState::begin_request(input, ranges);
+ continue; // poll again (as the input might be
ready immediately)
+ }
+ DecodeResult::Data(reader) => {
+ self.request_state = RequestState::None { input };
+ return Ok(Some(reader));
+ }
+ DecodeResult::Finished => return Ok(None),
}
}
- StreamState::Error => return Ok(None), // Ends the stream as
error happens.
+ RequestState::Outstanding { ranges, future } => {
+ let (input, data) = future.await?;
+ // Push the requested data to the decoder and try again
+ self.decoder.push_ranges(ranges, data)?;
+ self.request_state = RequestState::None { input };
+ continue; // try and decode on next iteration
+ }
+ RequestState::Done => {
+ self.request_state = RequestState::Done;
+ return Ok(None);
+ }
}
}
}
@@ -907,102 +690,82 @@ where
T: AsyncFileReader + Unpin + Send + 'static,
{
type Item = Result<RecordBatch>;
-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ match self.poll_next_inner(cx) {
+ Ok(res) => {
+ // Successfully decoded a batch, or reached end of stream.
+ // convert Option<RecordBatch> to Option<Result<RecordBatch>>
+ res.map(|res| Ok(res).transpose())
+ }
+ Err(e) => {
+ self.request_state = RequestState::Done;
+ Poll::Ready(Some(Err(e)))
+ }
+ }
+ }
+}
+
+impl<T> ParquetRecordBatchStream<T>
+where
+ T: AsyncFileReader + Unpin + Send + 'static,
+{
+ /// Inner state machine
+ ///
+ /// Note this is separate from poll_next so we can use ? operator to check
for errors
+ /// as it returns `Result<Poll<Option<RecordBatch>>>`
+ fn poll_next_inner(&mut self, cx: &mut Context<'_>) ->
Result<Poll<Option<RecordBatch>>> {
loop {
- match &mut self.state {
- StreamState::Decoding(batch_reader) => match
batch_reader.next() {
- Some(Ok(batch)) => {
- return Poll::Ready(Some(Ok(batch)));
- }
- Some(Err(e)) => {
- self.state = StreamState::Error;
- return
Poll::Ready(Some(Err(ParquetError::ArrowError(e.to_string()))));
+ let request_state = std::mem::replace(&mut self.request_state,
RequestState::Done);
+ match request_state {
+ RequestState::None { input } => {
+ // No outstanding requests, proceed to decode the next
batch
+ match self.decoder.try_decode()? {
+ DecodeResult::NeedsData(ranges) => {
+ self.request_state =
RequestState::begin_request(input, ranges);
+ continue; // poll again (as the input might be
ready immediately)
+ }
+ DecodeResult::Data(batch) => {
+ self.request_state = RequestState::None { input };
+ return Ok(Poll::Ready(Some(batch)));
+ }
+ DecodeResult::Finished => {
+ self.request_state = RequestState::Done;
+ return Ok(Poll::Ready(None));
+ }
}
- None => self.state = StreamState::Init,
- },
- StreamState::Init => {
- let row_group_idx = match self.row_groups.pop_front() {
- Some(idx) => idx,
- None => return Poll::Ready(None),
- };
-
- let reader = self.reader_factory.take().expect("lost
reader factory");
-
- let row_count =
self.metadata.row_group(row_group_idx).num_rows() as usize;
-
- let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
-
- let fut = reader
- .read_row_group(
- row_group_idx,
- selection,
- self.projection.clone(),
- self.batch_size,
- )
- .boxed();
-
- self.state = StreamState::Reading(fut)
}
- StreamState::Reading(f) => match ready!(f.poll_unpin(cx)) {
- Ok((reader_factory, maybe_reader)) => {
- self.reader_factory = Some(reader_factory);
- match maybe_reader {
- // Read records from [`ParquetRecordBatchReader`]
- Some(reader) => self.state =
StreamState::Decoding(reader),
- // All rows skipped, read next row group
- None => self.state = StreamState::Init,
- }
+ RequestState::Outstanding { ranges, mut future } => match
future.poll_unpin(cx) {
+ // Data was ready, push it to the decoder and continue
+ Poll::Ready(result) => {
+ let (input, data) = result?;
+ // Push the requested data to the decoder
+ self.decoder.push_ranges(ranges, data)?;
+ self.request_state = RequestState::None { input };
+ continue; // next iteration will try to decode the
next batch
}
- Err(e) => {
- self.state = StreamState::Error;
- return Poll::Ready(Some(Err(e)));
+ Poll::Pending => {
+ self.request_state = RequestState::Outstanding {
ranges, future };
+ return Ok(Poll::Pending);
}
},
- StreamState::Error => return Poll::Ready(None), // Ends the
stream as error happens.
+ RequestState::Done => {
+ // Stream is done (error or end), return None
+ self.request_state = RequestState::Done;
+ return Ok(Poll::Ready(None));
+ }
}
}
}
}
-// Note this implementation is not with the rest of the InMemoryRowGroup
-// implementation because it relies on several async traits and types
-// that are only available when the "async" feature is enabled.
-impl InMemoryRowGroup<'_> {
- /// Fetches any additional column data specified in `projection` that is
not already
- /// present in `self.column_chunks`.
- ///
- /// If `selection` is provided, only the pages required for the selection
- /// are fetched. Otherwise, all pages are fetched.
- pub(crate) async fn fetch<T: AsyncFileReader + Send>(
- &mut self,
- input: &mut T,
- projection: &ProjectionMask,
- selection: Option<&RowSelection>,
- batch_size: usize,
- cache_mask: Option<&ProjectionMask>,
- ) -> Result<()> {
- // Figure out what ranges to fetch
- let FetchRanges {
- ranges,
- page_start_offsets,
- } = self.fetch_ranges(projection, selection, batch_size, cache_mask);
- // do the actual fetch
- let chunk_data = input.get_byte_ranges(ranges).await?.into_iter();
- // update our in memory buffers (self.column_chunks) with the fetched
data
- self.fill_column_chunks(projection, page_start_offsets, chunk_data);
- Ok(())
- }
-}
#[cfg(test)]
mod tests {
use super::*;
- use crate::arrow::ArrowWriter;
use crate::arrow::arrow_reader::{
- ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
+ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter,
RowSelection, RowSelector,
};
use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
- use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
+ use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
@@ -1724,98 +1487,6 @@ mod tests {
assert_eq!(total_rows, 730);
}
- #[tokio::test]
- #[allow(deprecated)]
- async fn test_in_memory_row_group_sparse() {
- let testdata = arrow::util::test_util::parquet_test_data();
- let path = format!("{testdata}/alltypes_tiny_pages.parquet");
- let data = Bytes::from(std::fs::read(path).unwrap());
-
- let metadata = ParquetMetaDataReader::new()
- .with_page_indexes(true)
- .parse_and_finish(&data)
- .unwrap();
-
- let offset_index = metadata.offset_index().expect("reading offset
index")[0].clone();
-
- let mut metadata_builder = metadata.into_builder();
- let mut row_groups = metadata_builder.take_row_groups();
- row_groups.truncate(1);
- let row_group_meta = row_groups.pop().unwrap();
-
- let metadata = metadata_builder
- .add_row_group(row_group_meta)
- .set_column_index(None)
- .set_offset_index(Some(vec![offset_index.clone()]))
- .build();
-
- let metadata = Arc::new(metadata);
-
- let num_rows = metadata.row_group(0).num_rows();
-
- assert_eq!(metadata.num_row_groups(), 1);
-
- let async_reader = TestReader::new(data.clone());
-
- let requests = async_reader.requests.clone();
- let (_, fields) = parquet_to_arrow_schema_and_fields(
- metadata.file_metadata().schema_descr(),
- ProjectionMask::all(),
- None,
- )
- .unwrap();
-
- let _schema_desc = metadata.file_metadata().schema_descr();
-
- let projection =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), vec![0]);
-
- let reader_factory = ReaderFactory {
- metadata,
- fields: fields.map(Arc::new),
- input: async_reader,
- filter: None,
- limit: None,
- offset: None,
- metrics: ArrowReaderMetrics::disabled(),
- max_predicate_cache_size: 0,
- };
-
- let mut skip = true;
- let mut pages = offset_index[0].page_locations.iter().peekable();
-
- // Setup `RowSelection` so that we can skip every other page,
selecting the last page
- let mut selectors = vec![];
- let mut expected_page_requests: Vec<Range<usize>> = vec![];
- while let Some(page) = pages.next() {
- let num_rows = if let Some(next_page) = pages.peek() {
- next_page.first_row_index - page.first_row_index
- } else {
- num_rows - page.first_row_index
- };
-
- if skip {
- selectors.push(RowSelector::skip(num_rows as usize));
- } else {
- selectors.push(RowSelector::select(num_rows as usize));
- let start = page.offset as usize;
- let end = start + page.compressed_page_size as usize;
- expected_page_requests.push(start..end);
- }
- skip = !skip;
- }
-
- let selection = RowSelection::from(selectors);
-
- let (_factory, _reader) = reader_factory
- .read_row_group(0, Some(selection), projection.clone(), 48)
- .await
- .expect("reading row group");
-
- let requests = requests.lock().unwrap();
-
- assert_eq!(&requests[..], &expected_page_requests)
- }
-
#[tokio::test]
async fn test_batch_size_overallocate() {
let testdata = arrow::util::test_util::parquet_test_data();
@@ -1831,13 +1502,16 @@ mod tests {
let file_rows = builder.metadata().file_metadata().num_rows() as usize;
- let stream = builder
+ let builder = builder
.with_projection(ProjectionMask::all())
- .with_batch_size(1024)
- .build()
- .unwrap();
+ .with_batch_size(1024);
+
+ // even though the batch size is set to 1024, it should adjust to the
max
+ // number of rows in the file (8)
assert_ne!(1024, file_rows);
- assert_eq!(stream.batch_size, file_rows);
+ assert_eq!(builder.batch_size, file_rows);
+
+ let _stream = builder.build().unwrap();
}
#[tokio::test]
diff --git a/parquet/src/arrow/push_decoder/mod.rs
b/parquet/src/arrow/push_decoder/mod.rs
index 4b932fb080..b26a21132c 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -81,7 +81,7 @@ use std::sync::Arc;
/// # let parquet_metadata = Arc::new(parquet_metadata);
/// // The file length and metadata are required to create the decoder
/// let mut decoder =
-/// ParquetPushDecoderBuilder::try_new_decoder(file_length,
parquet_metadata)
+/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
/// .unwrap()
/// // Optionally configure the decoder, e.g. batch size
/// .with_batch_size(1024)
@@ -110,7 +110,19 @@ use std::sync::Arc;
/// }
/// }
/// ```
-pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
+pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
+
+/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
+///
+/// There is no "input" for the push decoder by design (the idea is that
+/// the caller pushes data to the decoder as needed)..
+///
+/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
+/// which DO have an `input`. To support reusing the same builder code for
+/// all three types of decoders, we define this `NoInput` for the push decoder
to
+/// denote in the type system there is no type.
+#[derive(Debug, Clone, Copy)]
+pub struct NoInput;
/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`]
for
/// more options that can be configured.
@@ -122,15 +134,8 @@ impl ParquetPushDecoderBuilder {
/// [`ParquetMetadataDecoder`]:
crate::file::metadata::ParquetMetaDataPushDecoder
///
/// See example on [`ParquetPushDecoderBuilder`]
- pub fn try_new_decoder(
- file_len: u64,
- parquet_metadata: Arc<ParquetMetaData>,
- ) -> Result<Self, ParquetError> {
- Self::try_new_decoder_with_options(
- file_len,
- parquet_metadata,
- ArrowReaderOptions::default(),
- )
+ pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) ->
Result<Self, ParquetError> {
+ Self::try_new_decoder_with_options(parquet_metadata,
ArrowReaderOptions::default())
}
/// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder
for the given file
@@ -139,27 +144,26 @@ impl ParquetPushDecoderBuilder {
/// This is similar to [`Self::try_new_decoder`] but allows configuring
/// options such as Arrow schema
pub fn try_new_decoder_with_options(
- file_len: u64,
parquet_metadata: Arc<ParquetMetaData>,
arrow_reader_options: ArrowReaderOptions,
) -> Result<Self, ParquetError> {
let arrow_reader_metadata =
ArrowReaderMetadata::try_new(parquet_metadata,
arrow_reader_options)?;
- Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
+ Ok(Self::new_with_metadata(arrow_reader_metadata))
}
/// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
///
/// See [`ArrowReaderMetadata::try_new`] for how to create the metadata
from
/// the Parquet metadata and reader options.
- pub fn new_with_metadata(file_len: u64, arrow_reader_metadata:
ArrowReaderMetadata) -> Self {
- Self::new_builder(file_len, arrow_reader_metadata)
+ pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) ->
Self {
+ Self::new_builder(NoInput, arrow_reader_metadata)
}
/// Create a [`ParquetPushDecoder`] with the configured options
pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
let Self {
- input: file_len,
+ input: NoInput,
metadata: parquet_metadata,
schema: _,
fields,
@@ -179,6 +183,7 @@ impl ParquetPushDecoderBuilder {
row_groups.unwrap_or_else(||
(0..parquet_metadata.num_row_groups()).collect());
// Prepare to build RowGroup readers
+ let file_len = 0; // not used in push decoder
let buffers = PushBuffers::new(file_len);
let row_group_reader_builder = RowGroupReaderBuilder::new(
batch_size,
@@ -268,7 +273,54 @@ impl ParquetPushDecoder {
///```
pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>,
ParquetError> {
let current_state = std::mem::replace(&mut self.state,
ParquetDecoderState::Finished);
- let (new_state, decode_result) = current_state.try_transition()?;
+ let (new_state, decode_result) = current_state.try_next_batch()?;
+ self.state = new_state;
+ Ok(decode_result)
+ }
+
+ /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows,
or
+ /// return what data is needed to produce it.
+ ///
+ /// This API can be used to get a reader for decoding the next set of
+ /// RecordBatches while proceeding to begin fetching data for the set (e.g
+ /// row group)
+ ///
+ /// Example
+ /// ```no_run
+ /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
+ /// use parquet::DecodeResult;
+ /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
+ /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges:
Vec<std::ops::Range<u64>>) { unimplemented!() }
+ /// let mut decoder = get_decoder();
+ /// loop {
+ /// match decoder.try_next_reader().unwrap() {
+ /// DecodeResult::NeedsData(ranges) => {
+ /// // The decoder needs more data. Fetch the data for the given
ranges
+ /// // call decoder.push_ranges(ranges, data) and call again
+ /// push_data(&mut decoder, ranges);
+ /// }
+ /// DecodeResult::Data(reader) => {
+ /// // spawn a thread to read the batches in parallel
+ /// // with fetching the next row group / data
+ /// std::thread::spawn(move || {
+ /// for batch in reader {
+ /// let batch = batch.unwrap();
+ /// println!("Got batch with {} rows", batch.num_rows());
+ /// }
+ /// });
+ /// }
+ /// DecodeResult::Finished => {
+ /// // The decoder has finished decoding all data
+ /// break;
+ /// }
+ /// }
+ /// }
+ ///```
+ pub fn try_next_reader(
+ &mut self,
+ ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
+ let current_state = std::mem::replace(&mut self.state,
ParquetDecoderState::Finished);
+ let (new_state, decode_result) = current_state.try_next_reader()?;
self.state = new_state;
Ok(decode_result)
}
@@ -332,16 +384,106 @@ enum ParquetDecoderState {
}
impl ParquetDecoderState {
+ /// If actively reading a RowGroup, return the currently active
+ /// ParquetRecordBatchReader and advance to the next group.
+ fn try_next_reader(
+ self,
+ ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
+ let mut current_state = self;
+ loop {
+ let (next_state, decode_result) = current_state.transition()?;
+ // if more data is needed to transition, can't proceed further
without it
+ match decode_result {
+ DecodeResult::NeedsData(ranges) => {
+ return Ok((next_state, DecodeResult::NeedsData(ranges)));
+ }
+ // act next based on state
+ DecodeResult::Data(()) | DecodeResult::Finished => {}
+ }
+ match next_state {
+ // not ready to read yet, continue transitioning
+ Self::ReadingRowGroup { .. } => current_state = next_state,
+ // have a reader ready, so return it and set ourself to
ReadingRowGroup
+ Self::DecodingRowGroup {
+ record_batch_reader,
+ remaining_row_groups,
+ } => {
+ let result = DecodeResult::Data(*record_batch_reader);
+ let next_state = Self::ReadingRowGroup {
+ remaining_row_groups,
+ };
+ return Ok((next_state, result));
+ }
+ Self::Finished => {
+ return Ok((Self::Finished, DecodeResult::Finished));
+ }
+ }
+ }
+ }
+
/// Current state --> next state + output
///
- /// This function is called to check if the decoder has any RecordBatches
- /// and [`Self::push_data`] is called when new data is available.
- ///
- /// # Notes
+ /// This function is called to get the next RecordBatch
///
/// This structure is used to reduce the indentation level of the main loop
/// in try_build
- fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>),
ParquetError> {
+ fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>),
ParquetError> {
+ let mut current_state = self;
+ loop {
+ let (new_state, decode_result) = current_state.transition()?;
+ // if more data is needed to transition, can't proceed further
without it
+ match decode_result {
+ DecodeResult::NeedsData(ranges) => {
+ return Ok((new_state, DecodeResult::NeedsData(ranges)));
+ }
+ // act next based on state
+ DecodeResult::Data(()) | DecodeResult::Finished => {}
+ }
+ match new_state {
+ // not ready to read yet, continue transitioning
+ Self::ReadingRowGroup { .. } => current_state = new_state,
+ // have a reader ready, so decode the next batch
+ Self::DecodingRowGroup {
+ mut record_batch_reader,
+ remaining_row_groups,
+ } => {
+ match record_batch_reader.next() {
+ // Successfully decoded a batch, return it
+ Some(Ok(batch)) => {
+ let result = DecodeResult::Data(batch);
+ let next_state = Self::DecodingRowGroup {
+ record_batch_reader,
+ remaining_row_groups,
+ };
+ return Ok((next_state, result));
+ }
+ // No more batches in this row group, move to the next
row group
+ None => {
+ current_state = Self::ReadingRowGroup {
+ remaining_row_groups,
+ }
+ }
+ // some error occurred while decoding, so return that
+ Some(Err(e)) => {
+ // TODO: preserve ArrowError in ParquetError
(rather than convert to a string)
+ return
Err(ParquetError::ArrowError(e.to_string()));
+ }
+ }
+ }
+ Self::Finished => {
+ return Ok((Self::Finished, DecodeResult::Finished));
+ }
+ }
+ }
+ }
+
+ /// Transition to the next state with a reader (data can be produced), if
not end of stream
+ ///
+ /// This function is called in a loop until the decoder is ready to return
+ /// data (has the required pages buffered) or is finished.
+ fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
+ // result returned when there is data ready
+ let data_ready = DecodeResult::Data(());
match self {
Self::ReadingRowGroup {
mut remaining_row_groups,
@@ -350,13 +492,14 @@ impl ParquetDecoderState {
// If we have a next reader, we can transition to decoding
it
DecodeResult::Data(record_batch_reader) => {
// Transition to decoding the row group
- Self::DecodingRowGroup {
- record_batch_reader: Box::new(record_batch_reader),
- remaining_row_groups,
- }
- .try_transition()
+ Ok((
+ Self::DecodingRowGroup {
+ record_batch_reader:
Box::new(record_batch_reader),
+ remaining_row_groups,
+ },
+ data_ready,
+ ))
}
- // If there are no more readers, we are finished
DecodeResult::NeedsData(ranges) => {
// If we need more data, we return the ranges needed
and stay in Reading
// RowGroup state
@@ -367,40 +510,17 @@ impl ParquetDecoderState {
DecodeResult::NeedsData(ranges),
))
}
+ // If there are no more readers, we are finished
DecodeResult::Finished => {
// No more row groups to read, we are finished
Ok((Self::Finished, DecodeResult::Finished))
}
}
}
- Self::DecodingRowGroup {
- mut record_batch_reader,
- remaining_row_groups,
- } => {
- // Decide the next record batch
- match record_batch_reader.next() {
- Some(Ok(batch)) => {
- // Successfully decoded a batch, return it
- Ok((
- Self::DecodingRowGroup {
- record_batch_reader,
- remaining_row_groups,
- },
- DecodeResult::Data(batch),
- ))
- }
- None => {
- // No more batches in this row group, move to the next
row group
- // or finish if there are no more row groups
- Self::ReadingRowGroup {
- remaining_row_groups,
- }
- .try_transition()
- }
- Some(Err(e)) => Err(ParquetError::from(e)), // some error
occurred while decoding
- }
- }
- Self::Finished => Ok((Self::Finished, DecodeResult::Finished)),
+ // if we are already in DecodingRowGroup, just return data ready
+ Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
+ // if finished, just return finished
+ Self::Finished => Ok((self, DecodeResult::Finished)),
}
}
@@ -485,13 +605,10 @@ mod test {
/// available in memory
#[test]
fn test_decoder_all_data() {
- let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap()
- .build()
- .unwrap();
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .build()
+ .unwrap();
decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
@@ -514,13 +631,10 @@ mod test {
/// fetched as needed
#[test]
fn test_decoder_incremental() {
- let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap()
- .build()
- .unwrap();
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .build()
+ .unwrap();
let mut results = vec![];
@@ -553,13 +667,10 @@ mod test {
/// Decode the entire file incrementally, simulating partial reads
#[test]
fn test_decoder_partial() {
- let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap()
- .build()
- .unwrap();
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .build()
+ .unwrap();
// First row group, expect a single request for all data needed to
read "a" and "b"
let ranges = expect_needs_data(decoder.try_decode());
@@ -597,11 +708,8 @@ mod test {
/// only a single request per row group
#[test]
fn test_decoder_selection_does_one_request() {
- let builder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap();
+ let builder =
+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
@@ -635,11 +743,8 @@ mod test {
/// of the data needed for the filter at a time simulating partial reads.
#[test]
fn test_decoder_single_filter_partial() {
- let builder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap();
+ let builder =
+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
// Values in column "a" range 0..399
// First filter: "a" > 250 (nothing in Row Group 0, both data pages
in Row Group 1)
@@ -696,11 +801,8 @@ mod test {
/// Decode with a filter where we also skip one of the RowGroups via a
RowSelection
#[test]
fn test_decoder_single_filter_and_row_selection() {
- let builder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap();
+ let builder =
+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
// Values in column "a" range 0..399
// First filter: "a" > 250 (nothing in Row Group 0, last data page in
Row Group 1)
@@ -751,11 +853,8 @@ mod test {
#[test]
fn test_decoder_multi_filters() {
// Create a decoder for decoding parquet data (note it does not have
any IO / readers)
- let builder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap();
+ let builder =
+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
// Values in column "a" range 0..399
// Values in column "b" range 400..799
@@ -836,11 +935,8 @@ mod test {
#[test]
fn test_decoder_reuses_filter_pages() {
// Create a decoder for decoding parquet data (note it does not have
any IO / readers)
- let builder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap();
+ let builder =
+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
// Values in column "a" range 0..399
// First filter: "a" > 250 (nothing in Row Group 0, last data page in
Row Group 1)
@@ -887,11 +983,8 @@ mod test {
#[test]
fn test_decoder_empty_filters() {
- let builder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap();
+ let builder =
+
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
let schema_descr =
builder.metadata().file_metadata().schema_descr_ptr();
// only read column "c", but with empty filters
@@ -929,17 +1022,14 @@ mod test {
#[test]
fn test_decoder_offset_limit() {
- let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap()
- // skip entire first row group (200 rows) and first 25 rows of second
row group
- .with_offset(225)
- // and limit to 20 rows
- .with_limit(20)
- .build()
- .unwrap();
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ // skip entire first row group (200 rows) and first 25 rows of
second row group
+ .with_offset(225)
+ // and limit to 20 rows
+ .with_limit(20)
+ .build()
+ .unwrap();
// First row group should be skipped,
@@ -958,14 +1048,11 @@ mod test {
#[test]
fn test_decoder_row_group_selection() {
// take only the second row group
- let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap()
- .with_row_groups(vec![1])
- .build()
- .unwrap();
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_row_groups(vec![1])
+ .build()
+ .unwrap();
// First row group should be skipped,
@@ -984,17 +1071,14 @@ mod test {
#[test]
fn test_decoder_row_selection() {
// take only the second row group
- let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
- test_file_len(),
- test_file_parquet_metadata(),
- )
- .unwrap()
- .with_row_selection(RowSelection::from(vec![
- RowSelector::skip(225), // skip first row group and 25 rows of
second])
- RowSelector::select(20), // take 20 rows
- ]))
- .build()
- .unwrap();
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_row_selection(RowSelection::from(vec![
+ RowSelector::skip(225), // skip first row group and 25 rows
of second])
+ RowSelector::select(20), // take 20 rows
+ ]))
+ .build()
+ .unwrap();
// First row group should be skipped,
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index be9070ae8b..a0ced8aa85 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -608,6 +608,10 @@ impl RowGroupReaderBuilder {
}
fn compute_cache_projection_inner(&self, filter: &RowFilter) ->
Option<ProjectionMask> {
+ // Do not compute the projection mask if the predicate cache is
disabled
+ if self.max_predicate_cache_size == 0 {
+ return None;
+ }
let mut cache_projection =
filter.predicates.first()?.projection().clone();
for predicate in filter.predicates.iter() {
cache_projection.union(predicate.projection());
diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs
index 0475d48768..0c00cf9bd5 100644
--- a/parquet/src/util/push_buffers.rs
+++ b/parquet/src/util/push_buffers.rs
@@ -200,13 +200,6 @@ impl ChunkReader for PushBuffers {
}
fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes,
ParquetError> {
- if start > self.file_len {
- return Err(ParquetError::General(format!(
- "Requested start {start} is beyond the end of the file (file
length: {})",
- self.file_len
- )));
- }
-
// find the range that contains the start offset
for (range, data) in self.iter() {
if range.start <= start && range.end >= start + length as u64 {
diff --git a/parquet/tests/encryption/encryption_agnostic.rs
b/parquet/tests/encryption/encryption_agnostic.rs
index 06c3b743ae..604155c81a 100644
--- a/parquet/tests/encryption/encryption_agnostic.rs
+++ b/parquet/tests/encryption/encryption_agnostic.rs
@@ -139,8 +139,8 @@ pub async fn
read_plaintext_footer_file_without_decryption_properties_async() {
Some(Err(ParquetError::ArrowError(s))) => {
assert!(s.contains("Parquet error"));
}
- _ => {
- panic!("Expected ArrowError::ParquetError");
+ err => {
+ panic!("Expected ArrowError::ParquetError, got {err:?}");
}
};
}