vustef commented on code in PR #8159:
URL: https://github.com/apache/arrow-rs/pull/8159#discussion_r2485866611
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
-
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
+ let decoder = ParquetPushDecoderBuilder {
+ // Async reader doesn't know the overall size of the input, but it
+ // is not required for decoding as we will already have the
metadata
+ input: 0,
+ metadata,
+ schema,
+ fields,
+ projection,
+ filter,
+ selection,
batch_size,
row_groups,
- projection: self.projection,
- selection: self.selection,
- schema,
- reader_factory: Some(reader_factory),
- state: StreamState::Init,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ }
+ .build()?;
+
+ let request_state = RequestState::None { input: input.0 };
+
+ Ok(ParquetRecordBatchStream {
+ schema: projected_schema,
+ decoder,
+ request_state,
})
}
}
-/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`]
for the next row group
+/// State machine that tracks outstanding requests to fetch data
///
-/// Note: If all rows are filtered out in the row group (e.g by filters, limit
or
-/// offset), returns `None` for the reader.
-type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
-
-/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
-/// [`ParquetRecordBatchReader`]
-struct ReaderFactory<T> {
- metadata: Arc<ParquetMetaData>,
-
- /// Top level parquet schema
- fields: Option<Arc<ParquetField>>,
-
- input: T,
-
- /// Optional filter
- filter: Option<RowFilter>,
-
- /// Limit to apply to remaining row groups.
- limit: Option<usize>,
-
- /// Offset to apply to the next
- offset: Option<usize>,
-
- /// Metrics
- metrics: ArrowReaderMetrics,
-
- /// Maximum size of the predicate cache
- ///
- /// See [`RowGroupCache`] for details.
- max_predicate_cache_size: usize,
+/// The parameter `T` is the input, typically an `AsyncFileReader`
+enum RequestState<T> {
+ /// No outstanding requests
+ None {
+ input: T,
+ },
+ /// There is an outstanding request for data
+ Outstanding {
+ /// Ranges that have been requested
+ ranges: Vec<Range<u64>>,
+ /// Future that will resolve (input, requested_ranges)
+ ///
+ /// Note the future owns the reader while the request it outstanding
Review Comment:
```suggestion
/// Note the future owns the reader while the request is outstanding
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
-
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
+ let decoder = ParquetPushDecoderBuilder {
+ // Async reader doesn't know the overall size of the input, but it
+ // is not required for decoding as we will already have the
metadata
+ input: 0,
+ metadata,
+ schema,
+ fields,
+ projection,
+ filter,
+ selection,
batch_size,
row_groups,
- projection: self.projection,
- selection: self.selection,
- schema,
- reader_factory: Some(reader_factory),
- state: StreamState::Init,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ }
+ .build()?;
+
+ let request_state = RequestState::None { input: input.0 };
+
+ Ok(ParquetRecordBatchStream {
+ schema: projected_schema,
+ decoder,
+ request_state,
})
}
}
-/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`]
for the next row group
+/// State machine that tracks outstanding requests to fetch data
///
-/// Note: If all rows are filtered out in the row group (e.g by filters, limit
or
-/// offset), returns `None` for the reader.
-type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
-
-/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
-/// [`ParquetRecordBatchReader`]
-struct ReaderFactory<T> {
- metadata: Arc<ParquetMetaData>,
-
- /// Top level parquet schema
- fields: Option<Arc<ParquetField>>,
-
- input: T,
-
- /// Optional filter
- filter: Option<RowFilter>,
-
- /// Limit to apply to remaining row groups.
- limit: Option<usize>,
-
- /// Offset to apply to the next
- offset: Option<usize>,
-
- /// Metrics
- metrics: ArrowReaderMetrics,
-
- /// Maximum size of the predicate cache
- ///
- /// See [`RowGroupCache`] for details.
- max_predicate_cache_size: usize,
+/// The parameter `T` is the input, typically an `AsyncFileReader`
+enum RequestState<T> {
+ /// No outstanding requests
+ None {
+ input: T,
+ },
+ /// There is an outstanding request for data
+ Outstanding {
+ /// Ranges that have been requested
+ ranges: Vec<Range<u64>>,
+ /// Future that will resolve (input, requested_ranges)
+ ///
+ /// Note the future owns the reader while the request it outstanding
+ /// and returns it upon completion
+ future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
+ },
+ Done,
}
-impl<T> ReaderFactory<T>
+impl<T> RequestState<T>
where
- T: AsyncFileReader + Send,
+ T: AsyncFileReader + Unpin + Send + 'static,
{
- /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
- ///
- /// Updates the `limit` and `offset` of the reader factory
- ///
- /// Note: this captures self so that the resulting future has a static
lifetime
- async fn read_row_group(
- mut self,
- row_group_idx: usize,
- selection: Option<RowSelection>,
- projection: ProjectionMask,
- batch_size: usize,
- ) -> ReadResult<T> {
- // TODO: calling build_array multiple times is wasteful
-
- let meta = self.metadata.row_group(row_group_idx);
- let offset_index = self
- .metadata
- .offset_index()
- // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
- .filter(|index| !index.is_empty())
- .map(|x| x[row_group_idx].as_slice());
-
- // Reuse columns that are selected and used by the filters
- let cache_projection = match
self.compute_cache_projection(&projection) {
- Some(projection) => projection,
- None => ProjectionMask::none(meta.columns().len()),
- };
- let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
- batch_size,
- self.max_predicate_cache_size,
- )));
-
- let mut row_group = InMemoryRowGroup {
- // schema: meta.schema_descr_ptr(),
- row_count: meta.num_rows() as usize,
- column_chunks: vec![None; meta.columns().len()],
- offset_index,
- row_group_idx,
- metadata: self.metadata.as_ref(),
- };
-
- let cache_options_builder =
CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
-
- let filter = self.filter.as_mut();
- let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
-
- // Update selection based on any filters
- if let Some(filter) = filter {
- let cache_options = cache_options_builder.clone().producer();
-
- for predicate in filter.predicates.iter_mut() {
- if !plan_builder.selects_any() {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // (pre) Fetch only the columns that are selected by the
predicate
- let selection = plan_builder.selection();
- // Fetch predicate columns; expand selection only for cached
predicate columns
- let cache_mask = Some(&cache_projection);
- row_group
- .fetch(
- &mut self.input,
- predicate.projection(),
- selection,
- batch_size,
- cache_mask,
- )
- .await?;
-
- let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
- .with_cache_options(Some(&cache_options))
- .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
-
- plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
- }
- }
-
- // Compute the number of rows in the selection before applying limit
and offset
- let rows_before = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- if rows_before == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // Apply any limit and offset
- let plan_builder = plan_builder
- .limited(row_group.row_count)
- .with_offset(self.offset)
- .with_limit(self.limit)
- .build_limited();
-
- let rows_after = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- // Update running offset and limit for after the current row group is
read
- if let Some(offset) = &mut self.offset {
- // Reduction is either because of offset or limit, as limit is
applied
- // after offset has been "exhausted" can just use saturating sub
here
- *offset = offset.saturating_sub(rows_before - rows_after)
- }
-
- if rows_after == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- if let Some(limit) = &mut self.limit {
- *limit -= rows_after;
+ // Issue a request to fetch a single range, returining the Outstanding
state
+ fn begin_request(mut input: T, ranges: Vec<Range<u64>>) -> Self {
+ let ranges_captured = ranges.clone();
+
+ // Note this must move the input *into* the future
+ // because the get_byte_ranges future has a lifetime
+ // (aka can have references internally) and thus must
+ // own the input while the request is outstanding.
+ let future = async move {
+ let data = input.get_byte_ranges(ranges_captured).await?;
Review Comment:
An aside: I don't understand why the default implementation for the
AsyncReader fetches range by range sequentially instead of utilizing
concurrency of the underlying runtime:
```
/// Retrieve multiple byte ranges. The default implementation will call
`get_bytes` sequentially
```
Please let me know if it doesn't resonate with you either and I can open an
issue for that.
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -201,10 +194,10 @@ impl ArrowReaderMetadata {
}
#[doc(hidden)]
-// A newtype used within `ReaderOptionsBuilder` to distinguish sync readers
from async
-//
-// Allows sharing the same builder for both the sync and async versions,
whilst also not
-// breaking the pre-existing ParquetRecordBatchStreamBuilder API
+/// Newtype (wrapper) used within [`ArrowReaderBuilder`] to distinguish sync
readers from async
+///
+/// Allows sharing the same builder different readers while keeping the same
Review Comment:
```suggestion
/// Allows sharing the same builder for different readers while keeping the
same
```
The sentence doesn't read right, perhaps this is what is missing?
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
-
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
+ let decoder = ParquetPushDecoderBuilder {
+ // Async reader doesn't know the overall size of the input, but it
+ // is not required for decoding as we will already have the
metadata
+ input: 0,
Review Comment:
I missed the previous PR, but am a bit confused with the `input` field of
`ArrowReaderBuilder`. Is it meant to represent arbitrary input, specific to a
specialized `type` (in this case `file_length` for
`ParquetPushDecoderBuilder`)? I wonder if it would've been better if we had
something like this:
```rust
struct ParquetPushDecoderBuilder {
reader_builder::ArrowReaderBuilder
file_len::ut6
}
```
Just a thought, not intended to be addressed here.
##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -332,16 +379,106 @@ enum ParquetDecoderState {
}
impl ParquetDecoderState {
+ /// If actively reading a RowGroup, return the currently active
+ /// ParquetRecordBatchReader and advance to the next group.
+ fn try_next_reader(
Review Comment:
Is this so that we can preserve the `pub async fn next_row_group(&mut self)
-> Result<Option<ParquetRecordBatchReader>>` API on the async reader?
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
-
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
+ let decoder = ParquetPushDecoderBuilder {
+ // Async reader doesn't know the overall size of the input, but it
+ // is not required for decoding as we will already have the
metadata
+ input: 0,
+ metadata,
+ schema,
+ fields,
+ projection,
+ filter,
+ selection,
batch_size,
row_groups,
- projection: self.projection,
- selection: self.selection,
- schema,
- reader_factory: Some(reader_factory),
- state: StreamState::Init,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ }
+ .build()?;
+
+ let request_state = RequestState::None { input: input.0 };
+
+ Ok(ParquetRecordBatchStream {
+ schema: projected_schema,
+ decoder,
+ request_state,
})
}
}
-/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`]
for the next row group
+/// State machine that tracks outstanding requests to fetch data
///
-/// Note: If all rows are filtered out in the row group (e.g by filters, limit
or
-/// offset), returns `None` for the reader.
-type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
-
-/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
-/// [`ParquetRecordBatchReader`]
-struct ReaderFactory<T> {
- metadata: Arc<ParquetMetaData>,
-
- /// Top level parquet schema
- fields: Option<Arc<ParquetField>>,
-
- input: T,
-
- /// Optional filter
- filter: Option<RowFilter>,
-
- /// Limit to apply to remaining row groups.
- limit: Option<usize>,
-
- /// Offset to apply to the next
- offset: Option<usize>,
-
- /// Metrics
- metrics: ArrowReaderMetrics,
-
- /// Maximum size of the predicate cache
- ///
- /// See [`RowGroupCache`] for details.
- max_predicate_cache_size: usize,
+/// The parameter `T` is the input, typically an `AsyncFileReader`
+enum RequestState<T> {
+ /// No outstanding requests
+ None {
+ input: T,
+ },
+ /// There is an outstanding request for data
+ Outstanding {
+ /// Ranges that have been requested
+ ranges: Vec<Range<u64>>,
+ /// Future that will resolve (input, requested_ranges)
+ ///
+ /// Note the future owns the reader while the request it outstanding
+ /// and returns it upon completion
+ future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
+ },
+ Done,
}
-impl<T> ReaderFactory<T>
+impl<T> RequestState<T>
where
- T: AsyncFileReader + Send,
+ T: AsyncFileReader + Unpin + Send + 'static,
{
- /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
- ///
- /// Updates the `limit` and `offset` of the reader factory
- ///
- /// Note: this captures self so that the resulting future has a static
lifetime
- async fn read_row_group(
- mut self,
- row_group_idx: usize,
- selection: Option<RowSelection>,
- projection: ProjectionMask,
- batch_size: usize,
- ) -> ReadResult<T> {
- // TODO: calling build_array multiple times is wasteful
-
- let meta = self.metadata.row_group(row_group_idx);
- let offset_index = self
- .metadata
- .offset_index()
- // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
- .filter(|index| !index.is_empty())
- .map(|x| x[row_group_idx].as_slice());
-
- // Reuse columns that are selected and used by the filters
- let cache_projection = match
self.compute_cache_projection(&projection) {
- Some(projection) => projection,
- None => ProjectionMask::none(meta.columns().len()),
- };
- let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
- batch_size,
- self.max_predicate_cache_size,
- )));
-
- let mut row_group = InMemoryRowGroup {
- // schema: meta.schema_descr_ptr(),
- row_count: meta.num_rows() as usize,
- column_chunks: vec![None; meta.columns().len()],
- offset_index,
- row_group_idx,
- metadata: self.metadata.as_ref(),
- };
-
- let cache_options_builder =
CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
-
- let filter = self.filter.as_mut();
- let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
-
- // Update selection based on any filters
- if let Some(filter) = filter {
- let cache_options = cache_options_builder.clone().producer();
-
- for predicate in filter.predicates.iter_mut() {
- if !plan_builder.selects_any() {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // (pre) Fetch only the columns that are selected by the
predicate
- let selection = plan_builder.selection();
- // Fetch predicate columns; expand selection only for cached
predicate columns
- let cache_mask = Some(&cache_projection);
- row_group
- .fetch(
- &mut self.input,
- predicate.projection(),
- selection,
- batch_size,
- cache_mask,
- )
- .await?;
-
- let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
- .with_cache_options(Some(&cache_options))
- .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
-
- plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
- }
- }
-
- // Compute the number of rows in the selection before applying limit
and offset
- let rows_before = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- if rows_before == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // Apply any limit and offset
- let plan_builder = plan_builder
- .limited(row_group.row_count)
- .with_offset(self.offset)
- .with_limit(self.limit)
- .build_limited();
-
- let rows_after = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- // Update running offset and limit for after the current row group is
read
- if let Some(offset) = &mut self.offset {
- // Reduction is either because of offset or limit, as limit is
applied
- // after offset has been "exhausted" can just use saturating sub
here
- *offset = offset.saturating_sub(rows_before - rows_after)
- }
-
- if rows_after == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- if let Some(limit) = &mut self.limit {
- *limit -= rows_after;
+ // Issue a request to fetch a single range, returining the Outstanding
state
Review Comment:
```suggestion
// Issue a request to fetch a single range, returning the Outstanding
state
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -858,45 +653,33 @@ where
/// - `Ok(Some(reader))` which holds all the data for the row group.
pub async fn next_row_group(&mut self) ->
Result<Option<ParquetRecordBatchReader>> {
loop {
- match &mut self.state {
- StreamState::Decoding(_) | StreamState::Reading(_) => {
- return Err(ParquetError::General(
- "Cannot combine the use of next_row_group with the
Stream API".to_string(),
- ));
- }
- StreamState::Init => {
- let row_group_idx = match self.row_groups.pop_front() {
- Some(idx) => idx,
- None => return Ok(None),
- };
-
- let row_count =
self.metadata.row_group(row_group_idx).num_rows() as usize;
-
- let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
-
- let reader_factory =
self.reader_factory.take().expect("lost reader factory");
-
- let (reader_factory, maybe_reader) = reader_factory
- .read_row_group(
- row_group_idx,
- selection,
- self.projection.clone(),
- self.batch_size,
- )
- .await
- .inspect_err(|_| {
- self.state = StreamState::Error;
- })?;
- self.reader_factory = Some(reader_factory);
-
- if let Some(reader) = maybe_reader {
- return Ok(Some(reader));
- } else {
- // All rows skipped, read next row group
- continue;
+ let request_state = std::mem::replace(&mut self.request_state,
RequestState::Done);
Review Comment:
Why was this ownership trick needed? Perhaps you could comment in the code?
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
-
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
+ let decoder = ParquetPushDecoderBuilder {
+ // Async reader doesn't know the overall size of the input, but it
+ // is not required for decoding as we will already have the
metadata
+ input: 0,
+ metadata,
+ schema,
+ fields,
+ projection,
+ filter,
+ selection,
batch_size,
row_groups,
- projection: self.projection,
- selection: self.selection,
- schema,
- reader_factory: Some(reader_factory),
- state: StreamState::Init,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ }
+ .build()?;
+
+ let request_state = RequestState::None { input: input.0 };
+
+ Ok(ParquetRecordBatchStream {
+ schema: projected_schema,
+ decoder,
+ request_state,
})
}
}
-/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`]
for the next row group
+/// State machine that tracks outstanding requests to fetch data
///
-/// Note: If all rows are filtered out in the row group (e.g by filters, limit
or
-/// offset), returns `None` for the reader.
-type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
-
-/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
-/// [`ParquetRecordBatchReader`]
-struct ReaderFactory<T> {
- metadata: Arc<ParquetMetaData>,
-
- /// Top level parquet schema
- fields: Option<Arc<ParquetField>>,
-
- input: T,
-
- /// Optional filter
- filter: Option<RowFilter>,
-
- /// Limit to apply to remaining row groups.
- limit: Option<usize>,
-
- /// Offset to apply to the next
- offset: Option<usize>,
-
- /// Metrics
- metrics: ArrowReaderMetrics,
-
- /// Maximum size of the predicate cache
- ///
- /// See [`RowGroupCache`] for details.
- max_predicate_cache_size: usize,
+/// The parameter `T` is the input, typically an `AsyncFileReader`
+enum RequestState<T> {
+ /// No outstanding requests
+ None {
+ input: T,
+ },
+ /// There is an outstanding request for data
+ Outstanding {
+ /// Ranges that have been requested
+ ranges: Vec<Range<u64>>,
+ /// Future that will resolve (input, requested_ranges)
+ ///
+ /// Note the future owns the reader while the request it outstanding
+ /// and returns it upon completion
+ future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
+ },
+ Done,
}
-impl<T> ReaderFactory<T>
+impl<T> RequestState<T>
where
- T: AsyncFileReader + Send,
+ T: AsyncFileReader + Unpin + Send + 'static,
{
- /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
- ///
- /// Updates the `limit` and `offset` of the reader factory
- ///
- /// Note: this captures self so that the resulting future has a static
lifetime
- async fn read_row_group(
- mut self,
- row_group_idx: usize,
- selection: Option<RowSelection>,
- projection: ProjectionMask,
- batch_size: usize,
- ) -> ReadResult<T> {
- // TODO: calling build_array multiple times is wasteful
-
- let meta = self.metadata.row_group(row_group_idx);
- let offset_index = self
- .metadata
- .offset_index()
- // filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
- .filter(|index| !index.is_empty())
- .map(|x| x[row_group_idx].as_slice());
-
- // Reuse columns that are selected and used by the filters
- let cache_projection = match
self.compute_cache_projection(&projection) {
- Some(projection) => projection,
- None => ProjectionMask::none(meta.columns().len()),
- };
- let row_group_cache = Arc::new(Mutex::new(RowGroupCache::new(
- batch_size,
- self.max_predicate_cache_size,
- )));
-
- let mut row_group = InMemoryRowGroup {
- // schema: meta.schema_descr_ptr(),
- row_count: meta.num_rows() as usize,
- column_chunks: vec![None; meta.columns().len()],
- offset_index,
- row_group_idx,
- metadata: self.metadata.as_ref(),
- };
-
- let cache_options_builder =
CacheOptionsBuilder::new(&cache_projection, &row_group_cache);
-
- let filter = self.filter.as_mut();
- let mut plan_builder =
ReadPlanBuilder::new(batch_size).with_selection(selection);
-
- // Update selection based on any filters
- if let Some(filter) = filter {
- let cache_options = cache_options_builder.clone().producer();
-
- for predicate in filter.predicates.iter_mut() {
- if !plan_builder.selects_any() {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // (pre) Fetch only the columns that are selected by the
predicate
- let selection = plan_builder.selection();
- // Fetch predicate columns; expand selection only for cached
predicate columns
- let cache_mask = Some(&cache_projection);
- row_group
- .fetch(
- &mut self.input,
- predicate.projection(),
- selection,
- batch_size,
- cache_mask,
- )
- .await?;
-
- let array_reader = ArrayReaderBuilder::new(&row_group,
&self.metrics)
- .with_cache_options(Some(&cache_options))
- .build_array_reader(self.fields.as_deref(),
predicate.projection())?;
-
- plan_builder = plan_builder.with_predicate(array_reader,
predicate.as_mut())?;
- }
- }
-
- // Compute the number of rows in the selection before applying limit
and offset
- let rows_before = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- if rows_before == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- // Apply any limit and offset
- let plan_builder = plan_builder
- .limited(row_group.row_count)
- .with_offset(self.offset)
- .with_limit(self.limit)
- .build_limited();
-
- let rows_after = plan_builder
- .num_rows_selected()
- .unwrap_or(row_group.row_count);
-
- // Update running offset and limit for after the current row group is
read
- if let Some(offset) = &mut self.offset {
- // Reduction is either because of offset or limit, as limit is
applied
- // after offset has been "exhausted" can just use saturating sub
here
- *offset = offset.saturating_sub(rows_before - rows_after)
- }
-
- if rows_after == 0 {
- return Ok((self, None)); // ruled out entire row group
- }
-
- if let Some(limit) = &mut self.limit {
- *limit -= rows_after;
+ // Issue a request to fetch a single range, returining the Outstanding
state
Review Comment:
If it's to fetch a single range, why does it take `Vec<Range<u64>>` as a
parameter?
I suppose the comment is wrong, not the parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]