zhuqi-lucas commented on code in PR #7537: URL: https://github.com/apache/arrow-rs/pull/7537#discussion_r2103735833
########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -808,54 +808,45 @@ impl ParquetRecordBatchReader { /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to /// simplify error handling with `?` fn next_inner(&mut self) -> Result<Option<RecordBatch>> { + let mut end_of_stream = false; let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = self.array_reader.skip_records(front.row_count)?; - - if skipped != front.row_count { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - )); - } - continue; - } + while read_records < batch_size { + let Some(front) = self.read_plan.next() else { + end_of_stream = true; + break; + }; - //Currently, when RowSelectors with row_count = 0 are included then its interpreted as end of reader. - //Fix is to skip such entries. See https://github.com/apache/arrow-rs/issues/2669 - if front.row_count == 0 { - continue; - } + if front.skip { + let skipped = self.array_reader.skip_records(front.row_count)?; - // try to read record - let need_read = batch_size - read_records; - let to_read = match front.row_count.checked_sub(need_read) { - Some(remaining) if remaining != 0 => { - // if page row count less than batch_size we must set batch size to page row count. - // add check avoid dead loop - selection.push_front(RowSelector::select(remaining)); - need_read - } - _ => front.row_count, - }; - match self.array_reader.read_records(to_read)? { - 0 => break, - rec => read_records += rec, - }; + if skipped != front.row_count { + return Err(general_err!( + "Internal Error: failed to skip rows, expected {}, got {}", + front.row_count, + skipped + )); + } + } else { + let read = self.array_reader.read_records(front.row_count)?; Review Comment: Minor, do we have fast path when read < front.row_count? ########## parquet/src/arrow/arrow_reader/read_plan.rs: ########## @@ -131,13 +129,101 @@ impl ReadPlanBuilder { selection, } = self; - let selection = selection.map(|s| s.trim().into()); + // If the batch size is 0, read "all rows" + if batch_size == 0 { + return ReadPlan::All { batch_size: 0 }; + } + + // If no selection is provided, read all rows + let Some(selection) = selection else { + return ReadPlan::All { batch_size }; + }; + + let iterator = SelectionIterator::new(batch_size, selection.into()); + ReadPlan::Subset { iterator } + } +} + +/// Incrementally returns [`RowSelector`]s that describe reading from a Parquet file. +/// +/// The returned stream of [`RowSelector`]s is guaranteed to have: +/// 1. No empty selections (that select no rows) +/// 2. No selections that span batch_size boundaries +/// 3. No trailing skip selections +/// +/// For example, if the `batch_size` is 100 and we are selecting all 200 rows +/// from a Parquet file, the selectors will be: +/// - `RowSelector::select(100) <-- forced break at batch_size boundary` +/// - `RowSelector::select(100)` Review Comment: Great work ! ########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -808,54 +809,45 @@ impl ParquetRecordBatchReader { /// Returns `Result<Option<..>>` rather than `Option<Result<..>>` to /// simplify error handling with `?` fn next_inner(&mut self) -> Result<Option<RecordBatch>> { + let mut end_of_stream = false; let mut read_records = 0; let batch_size = self.batch_size(); - match self.read_plan.selection_mut() { - Some(selection) => { - while read_records < batch_size && !selection.is_empty() { - let front = selection.pop_front().unwrap(); - if front.skip { - let skipped = self.array_reader.skip_records(front.row_count)?; - - if skipped != front.row_count { - return Err(general_err!( - "failed to skip rows, expected {}, got {}", - front.row_count, - skipped - )); - } - continue; - } + while read_records < batch_size { Review Comment: It's clear! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org