alamb commented on code in PR #8159:
URL: https://github.com/apache/arrow-rs/pull/8159#discussion_r2478358139
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -858,45 +653,33 @@ where
/// - `Ok(Some(reader))` which holds all the data for the row group.
pub async fn next_row_group(&mut self) ->
Result<Option<ParquetRecordBatchReader>> {
loop {
- match &mut self.state {
- StreamState::Decoding(_) | StreamState::Reading(_) => {
- return Err(ParquetError::General(
- "Cannot combine the use of next_row_group with the
Stream API".to_string(),
- ));
- }
- StreamState::Init => {
- let row_group_idx = match self.row_groups.pop_front() {
- Some(idx) => idx,
- None => return Ok(None),
- };
-
- let row_count =
self.metadata.row_group(row_group_idx).num_rows() as usize;
-
- let selection = self.selection.as_mut().map(|s|
s.split_off(row_count));
-
- let reader_factory =
self.reader_factory.take().expect("lost reader factory");
-
- let (reader_factory, maybe_reader) = reader_factory
- .read_row_group(
- row_group_idx,
- selection,
- self.projection.clone(),
- self.batch_size,
- )
- .await
- .inspect_err(|_| {
- self.state = StreamState::Error;
- })?;
- self.reader_factory = Some(reader_factory);
-
- if let Some(reader) = maybe_reader {
- return Ok(Some(reader));
- } else {
- // All rows skipped, read next row group
- continue;
+ let request_state = std::mem::replace(&mut self.request_state,
RequestState::Done);
+ match request_state {
+ // No outstanding requests, proceed to setup next row group
+ RequestState::None { input } => {
Review Comment:
This is now the core state machine of `ParquetRecordBatchStream`, and I am
pleased it represents what is going on in a straightforward way: it alternates
between decode and I/O
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]