alamb commented on code in PR #8159:
URL: https://github.com/apache/arrow-rs/pull/8159#discussion_r2481055830
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
-
- let row_groups = match self.row_groups {
- Some(row_groups) => {
- if let Some(col) = row_groups.iter().find(|x| **x >=
num_row_groups) {
- return Err(general_err!(
- "row group {} out of bounds 0..{}",
- col,
- num_row_groups
- ));
- }
- row_groups.into()
- }
- None => (0..self.metadata.row_groups().len()).collect(),
- };
-
- // Try to avoid allocate large buffer
- let batch_size = self
- .batch_size
- .min(self.metadata.file_metadata().num_rows() as usize);
- let reader_factory = ReaderFactory {
- input: self.input.0,
- filter: self.filter,
- metadata: self.metadata.clone(),
- fields: self.fields,
- limit: self.limit,
- offset: self.offset,
- metrics: self.metrics,
- max_predicate_cache_size: self.max_predicate_cache_size,
- };
+ let Self {
+ input,
+ metadata,
+ schema,
+ fields,
+ batch_size,
+ row_groups,
+ projection,
+ filter,
+ selection,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ } = self;
// Ensure schema of ParquetRecordBatchStream respects projection, and
does
// not store metadata (same as for ParquetRecordBatchReader and
emitted RecordBatches)
- let projected_fields = match reader_factory.fields.as_deref().map(|pf|
&pf.arrow_type) {
- Some(DataType::Struct(fields)) => {
- fields.filter_leaves(|idx, _|
self.projection.leaf_included(idx))
- }
- None => Fields::empty(),
- _ => unreachable!("Must be Struct for root type"),
- };
- let schema = Arc::new(Schema::new(projected_fields));
-
- Ok(ParquetRecordBatchStream {
- metadata: self.metadata,
+ let projected_fields = schema
+ .fields
+ .filter_leaves(|idx, _| projection.leaf_included(idx));
+ let projected_schema = Arc::new(Schema::new(projected_fields));
+
+ let decoder = ParquetPushDecoderBuilder {
+ // Async reader doesn't know the overall size of the input, but it
+ // is not required for decoding as we will already have the
metadata
+ input: 0,
+ metadata,
+ schema,
+ fields,
+ projection,
+ filter,
+ selection,
batch_size,
row_groups,
- projection: self.projection,
- selection: self.selection,
- schema,
- reader_factory: Some(reader_factory),
- state: StreamState::Init,
+ limit,
+ offset,
+ metrics,
+ max_predicate_cache_size,
+ }
+ .build()?;
+
+ let request_state = RequestState::None { input: input.0 };
+
+ Ok(ParquetRecordBatchStream {
Review Comment:
You can see the Stream is much simpler now -- only the decoder and an object
to track the current I/O state
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static>
ParquetRecordBatchStreamBuilder<T> {
///
/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
- let num_row_groups = self.metadata.row_groups().len();
Review Comment:
The whole point of this PR is to remove all this code (and instead use the
copy in the push decoder)
##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -332,16 +379,106 @@ enum ParquetDecoderState {
}
impl ParquetDecoderState {
+ /// If actively reading a RowGroup, return the currently active
+ /// ParquetRecordBatchReader and advance to the next group.
+ fn try_next_reader(
Review Comment:
This is a newly added "batched" API that makes it possible to read the next
reader (that is ready to produce record batches)
##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -332,16 +379,106 @@ enum ParquetDecoderState {
}
impl ParquetDecoderState {
+ /// If actively reading a RowGroup, return the currently active
+ /// ParquetRecordBatchReader and advance to the next group.
+ fn try_next_reader(
+ self,
+ ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
+ let mut current_state = self;
+ loop {
+ let (next_state, decode_result) = current_state.transition()?;
+ // if more data is needed to transition, can't proceed further
without it
+ match decode_result {
+ DecodeResult::NeedsData(ranges) => {
+ return Ok((next_state, DecodeResult::NeedsData(ranges)));
+ }
+ // act next based on state
+ DecodeResult::Data(()) | DecodeResult::Finished => {}
+ }
+ match next_state {
+ // not ready to read yet, continue transitioning
+ Self::ReadingRowGroup { .. } => current_state = next_state,
+ // have a reader ready, so return it and set ourself to
ReadingRowGroup
+ Self::DecodingRowGroup {
+ record_batch_reader,
+ remaining_row_groups,
+ } => {
+ let result = DecodeResult::Data(*record_batch_reader);
+ let next_state = Self::ReadingRowGroup {
+ remaining_row_groups,
+ };
+ return Ok((next_state, result));
+ }
+ Self::Finished => {
+ return Ok((Self::Finished, DecodeResult::Finished));
+ }
+ }
+ }
+ }
+
/// Current state --> next state + output
///
- /// This function is called to check if the decoder has any RecordBatches
- /// and [`Self::push_data`] is called when new data is available.
- ///
- /// # Notes
+ /// This function is called to get the next RecordBatch
///
/// This structure is used to reduce the indentation level of the main loop
/// in try_build
- fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>),
ParquetError> {
+ fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>),
ParquetError> {
+ let mut current_state = self;
+ loop {
+ let (new_state, decode_result) = current_state.transition()?;
+ // if more data is needed to transition, can't proceed further
without it
+ match decode_result {
+ DecodeResult::NeedsData(ranges) => {
+ return Ok((new_state, DecodeResult::NeedsData(ranges)));
+ }
+ // act next based on state
+ DecodeResult::Data(()) | DecodeResult::Finished => {}
+ }
+ match new_state {
+ // not ready to read yet, continue transitioning
+ Self::ReadingRowGroup { .. } => current_state = new_state,
+ // have a reader ready, so decode the next batch
+ Self::DecodingRowGroup {
+ mut record_batch_reader,
+ remaining_row_groups,
+ } => {
+ match record_batch_reader.next() {
+ // Successfully decoded a batch, return it
+ Some(Ok(batch)) => {
+ let result = DecodeResult::Data(batch);
+ let next_state = Self::DecodingRowGroup {
+ record_batch_reader,
+ remaining_row_groups,
+ };
+ return Ok((next_state, result));
+ }
+ // No more batches in this row group, move to the next
row group
+ None => {
+ current_state = Self::ReadingRowGroup {
+ remaining_row_groups,
+ }
+ }
+ // some error occurred while decoding, so return that
+ Some(Err(e)) => {
+ // TODO: preserve ArrowError in ParquetError
(rather than convert to a string)
+ return
Err(ParquetError::ArrowError(e.to_string()));
+ }
+ }
+ }
+ Self::Finished => {
+ return Ok((Self::Finished, DecodeResult::Finished));
+ }
+ }
+ }
+ }
+
+ /// Transition to the next state with a reader (data can be produced), if
not end of stream
+ ///
+ /// This function is called in a loop until the decoder is ready to return
+ /// data (has the required pages buffered) or is finished.
+ fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
Review Comment:
reworked so it can be shared between `try_next_batch` and `try_next_reader`
It also now avoids a self-recursive call which I think is a (minor)
improvement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]