Ted-Jiang commented on code in PR #2199:
URL: https://github.com/apache/arrow-rs/pull/2199#discussion_r931734404
##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -1605,154 +1606,105 @@ mod tests {
test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
}
- #[test]
- fn test_scan_row_with_selection() {
- let testdata = arrow::util::test_util::parquet_test_data();
- let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
- let test_file = File::open(&path).unwrap();
+ /// Given a RecordBatch containing all the column data, return the
expected batches given
+ /// a `batch_size` and `selection`
+ fn get_expected_batches(
+ column: &RecordBatch,
+ selection: &[RowSelection],
+ batch_size: usize,
+ ) -> Vec<RecordBatch> {
+ let mut expected_batches = vec![];
+
+ let mut selection: VecDeque<_> = selection.iter().cloned().collect();
+ let mut row_offset = 0;
+ let mut last_start = None;
+ while row_offset < column.num_rows() && !selection.is_empty() {
+ let mut batch_remaining = batch_size.min(column.num_rows() -
row_offset);
+ while batch_remaining > 0 && !selection.is_empty() {
+ let (to_read, skip) = match selection.front_mut() {
+ Some(selection) if selection.row_count > batch_remaining
=> {
+ selection.row_count -= batch_remaining;
+ (batch_remaining, selection.skip)
+ }
+ Some(_) => {
+ let select = selection.pop_front().unwrap();
+ (select.row_count, select.skip)
+ }
+ None => break,
+ };
- // total row count 7300
- // 1. test selection len more than one page row count
- let batch_size = 1000;
- let expected_data = create_expect_batch(&test_file, batch_size);
-
- let selections = create_test_selection(batch_size, 7300, false);
- let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
- let mut total_row_count = 0;
- let mut index = 0;
- for batch in skip_reader {
- let batch = batch.unwrap();
- assert_eq!(batch, expected_data.get(index).unwrap().clone());
- index += 2;
- let num = batch.num_rows();
- assert!(num == batch_size || num == 300);
- total_row_count += num;
- }
- assert_eq!(total_row_count, 4000);
+ batch_remaining -= to_read;
- let selections = create_test_selection(batch_size, 7300, true);
- let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
- let mut total_row_count = 0;
- let mut index = 1;
- for batch in skip_reader {
- let batch = batch.unwrap();
- assert_eq!(batch, expected_data.get(index).unwrap().clone());
- index += 2;
- let num = batch.num_rows();
- //the lase batch will be 300
- assert!(num == batch_size || num == 300);
- total_row_count += num;
+ match skip {
+ true => {
+ if let Some(last_start) = last_start.take() {
+ expected_batches
+ .push(column.slice(last_start, row_offset -
last_start))
+ }
+ row_offset += to_read
+ }
+ false => {
+ last_start.get_or_insert(row_offset);
+ row_offset += to_read
+ }
+ }
+ }
}
- assert_eq!(total_row_count, 3300);
- // 2. test selection len less than one page row count
- let batch_size = 20;
- let expected_data = create_expect_batch(&test_file, batch_size);
- let selections = create_test_selection(batch_size, 7300, false);
-
- let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
- let mut total_row_count = 0;
- let mut index = 0;
- for batch in skip_reader {
- let batch = batch.unwrap();
- assert_eq!(batch, expected_data.get(index).unwrap().clone());
- index += 2;
- let num = batch.num_rows();
- assert_eq!(num, batch_size);
- total_row_count += num;
+ if let Some(last_start) = last_start.take() {
+ expected_batches.push(column.slice(last_start, row_offset -
last_start))
}
- assert_eq!(total_row_count, 3660);
- let selections = create_test_selection(batch_size, 7300, true);
- let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
- let mut total_row_count = 0;
- let mut index = 1;
- for batch in skip_reader {
- let batch = batch.unwrap();
- assert_eq!(batch, expected_data.get(index).unwrap().clone());
- index += 2;
- let num = batch.num_rows();
- assert_eq!(num, batch_size);
- total_row_count += num;
+ // Sanity check, all batches except the final should be the batch size
+ for batch in &expected_batches[..expected_batches.len() - 1] {
+ assert_eq!(batch.num_rows(), batch_size);
}
- assert_eq!(total_row_count, 3640);
- // 3. test selection_len less than batch_size
- let batch_size = 20;
- let selection_len = 5;
- let expected_data_batch = create_expect_batch(&test_file, batch_size);
- let expected_data_selection = create_expect_batch(&test_file,
selection_len);
- let selections = create_test_selection(selection_len, 7300, false);
- let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
+ expected_batches
+ }
+
+ #[test]
+ fn test_scan_row_with_selection() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+ let test_file = File::open(&path).unwrap();
- let mut total_row_count = 0;
+ let mut serial_arrow_reader =
+
ParquetFileArrowReader::try_new(File::open(path).unwrap()).unwrap();
+ let mut serial_reader =
serial_arrow_reader.get_record_reader(7300).unwrap();
+ let data = serial_reader.next().unwrap().unwrap();
- for batch in skip_reader {
- let batch = batch.unwrap();
- let num = batch.num_rows();
- assert!(num == batch_size || num == selection_len);
- if num == batch_size {
- assert_eq!(
- batch,
- expected_data_batch
- .get(total_row_count / batch_size)
- .unwrap()
- .clone()
- );
- total_row_count += batch_size;
- } else if num == selection_len {
+ let do_test = |batch_size: usize, selection_len: usize| {
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]