This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5676c6e37 Integrate Record Skipping into Column Reader Fuzz Test
(#2315)
5676c6e37 is described below
commit 5676c6e378ab638efffc1789116d1e535ea88d36
Author: Yang Jiang <[email protected]>
AuthorDate: Sun Aug 7 23:51:24 2022 +0800
Integrate Record Skipping into Column Reader Fuzz Test (#2315)
* fix conflict
* add log info
* Integrate Record Skipping into Column Reader Fuzz Test
---
parquet/src/arrow/arrow_reader.rs | 193 ++++++++++++++++++++++++++++----------
1 file changed, 142 insertions(+), 51 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index 67bd2a619..1f53c2abc 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -391,7 +391,7 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;
- use rand::{thread_rng, RngCore};
+ use rand::{thread_rng, Rng, RngCore};
use tempfile::tempfile;
use arrow::array::*;
@@ -889,6 +889,8 @@ mod tests {
enabled_statistics: EnabledStatistics,
/// Encoding
encoding: Encoding,
+ //row selections and total selected row count
+ row_selections: Option<(Vec<RowSelection>, usize)>,
}
impl Default for TestOptions {
@@ -904,6 +906,7 @@ mod tests {
writer_version: WriterVersion::PARQUET_1_0,
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
+ row_selections: None,
}
}
}
@@ -946,6 +949,20 @@ mod tests {
}
}
+ fn with_row_selections(self) -> Self {
+ let mut rng = thread_rng();
+ let step = rng.gen_range(self.record_batch_size..self.num_rows);
+ let row_selections = create_test_selection(
+ step,
+ self.num_row_groups * self.num_rows,
+ rng.gen::<bool>(),
+ );
+ Self {
+ row_selections: Some(row_selections),
+ ..self
+ }
+ }
+
fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
@@ -984,7 +1001,7 @@ mod tests {
A: Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
- let all_options = vec![
+ let mut all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15),
@@ -1015,6 +1032,39 @@ mod tests {
.with_enabled_statistics(EnabledStatistics::None),
];
+ let skip_options = vec![
+ // choose record_batch_batch (15) so batches cross row
+ // group boundaries (50 rows in 2 row groups) cases.
+ TestOptions::new(2, 100, 15).with_row_selections(),
+ // choose record_batch_batch (5) so batches sometime fall
+ // on row group boundaries and (25 rows in 3 row groups
+ // --> row groups of 10, 10, and 5). Tests buffer
+ // refilling edge cases.
+ TestOptions::new(3, 25, 5).with_row_selections(),
+ // Choose record_batch_size (25) so all batches fall
+ // exactly on row group boundary (25). Tests buffer
+ // refilling edge cases.
+ TestOptions::new(4, 100, 25).with_row_selections(),
+ // Set maximum page size so row groups have multiple pages
+ TestOptions::new(3, 256, 73)
+ .with_max_data_page_size(128)
+ .with_row_selections(),
+ // Set small dictionary page size to test dictionary fallback
+ TestOptions::new(3, 256, 57)
+ .with_max_dict_page_size(128)
+ .with_row_selections(),
+ // Test optional but with no nulls
+ TestOptions::new(2, 256, 127)
+ .with_null_percent(0)
+ .with_row_selections(),
+ // Test optional with nulls
+ TestOptions::new(2, 256, 93)
+ .with_null_percent(25)
+ .with_row_selections(),
+ ];
+
+ all_options.extend(skip_options);
+
all_options.into_iter().for_each(|opts| {
for writer_version in [WriterVersion::PARQUET_1_0,
WriterVersion::PARQUET_2_0]
{
@@ -1022,7 +1072,7 @@ mod tests {
let opts = TestOptions {
writer_version,
encoding: *encoding,
- ..opts
+ ..opts.clone()
};
single_column_reader_test::<T, A, C, G>(
@@ -1054,10 +1104,11 @@ mod tests {
{
// Print out options to facilitate debugging failures on CI
println!(
- "Running single_column_reader_test
ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
- converted_type, arrow_type, opts
+ "Running type {:?} single_column_reader_test
ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
+ T::get_physical_type(), converted_type, arrow_type, opts
);
+ //according to null_percent generate def_levels
let (repetition, def_levels) = match opts.null_percent.as_ref() {
Some(null_percent) => {
let mut rng = thread_rng();
@@ -1076,6 +1127,7 @@ mod tests {
None => (Repetition::REQUIRED, None),
};
+ //generate random table data
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
.map(|idx| {
let null_count = match def_levels.as_ref() {
@@ -1126,36 +1178,49 @@ mod tests {
file.rewind().unwrap();
- let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
+ let mut arrow_reader;
+ let expected_data: Vec<Option<T::T>>;
+ if let Some((selections, row_count)) = opts.row_selections.clone() {
+ let options =
+
ArrowReaderOptions::new().with_row_selection(selections.clone());
+ arrow_reader =
+ ParquetFileArrowReader::try_new_with_options(file,
options).unwrap();
+ let mut without_skip_data = gen_expected_data::<T>(&def_levels,
&values);
+
+ let mut skip_data: Vec<Option<T::T>> = vec![];
+ for select in selections {
+ if select.skip {
+ without_skip_data.drain(0..select.row_count);
+ } else {
+
skip_data.extend(without_skip_data.drain(0..select.row_count));
+ }
+ }
+ expected_data = skip_data;
+ assert_eq!(expected_data.len(), row_count);
+ } else {
+ arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
+ //get flatten table data
+ expected_data = gen_expected_data::<T>(&def_levels, &values);
+ assert_eq!(expected_data.len(), opts.num_rows *
opts.num_row_groups);
+ }
+
let mut record_reader = arrow_reader
.get_record_reader(opts.record_batch_size)
.unwrap();
- let expected_data: Vec<Option<T::T>> = match def_levels {
- Some(levels) => {
- let mut values_iter = values.iter().flatten();
- levels
- .iter()
- .flatten()
- .map(|d| match d {
- 1 => Some(values_iter.next().cloned().unwrap()),
- 0 => None,
- _ => unreachable!(),
- })
- .collect()
- }
- None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
- };
-
- assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
-
let mut total_read = 0;
loop {
let maybe_batch = record_reader.next();
if total_read < expected_data.len() {
- let end = min(total_read + opts.record_batch_size,
expected_data.len());
+ let mut end =
+ min(total_read + opts.record_batch_size,
expected_data.len());
let batch = maybe_batch.unwrap().unwrap();
- assert_eq!(end - total_read, batch.num_rows());
+ //TODO remove this after implement
https://github.com/apache/arrow-rs/issues/2197
+ if opts.row_selections.is_none() {
+ assert_eq!(end - total_read, batch.num_rows());
+ } else {
+ end = end.min(total_read + batch.num_rows())
+ }
let mut data = vec![];
data.extend_from_slice(&expected_data[total_read..end]);
@@ -1181,6 +1246,28 @@ mod tests {
}
}
+ fn gen_expected_data<T: DataType>(
+ def_levels: &Option<Vec<Vec<i16>>>,
+ values: &[Vec<T::T>],
+ ) -> Vec<Option<T::T>> {
+ let data: Vec<Option<T::T>> = match def_levels {
+ Some(levels) => {
+ let mut values_iter = values.iter().flatten();
+ levels
+ .iter()
+ .flatten()
+ .map(|d| match d {
+ 1 => Some(values_iter.next().cloned().unwrap()),
+ 0 => None,
+ _ => unreachable!(),
+ })
+ .collect()
+ }
+ None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
+ };
+ data
+ }
+
fn generate_single_column_file_with_data<T: DataType>(
values: &[Vec<T::T>],
def_levels: Option<&Vec<Vec<i16>>>,
@@ -1746,6 +1833,34 @@ mod tests {
expected_batches
}
+ fn create_test_selection(
+ step_len: usize,
+ total_len: usize,
+ skip_first: bool,
+ ) -> (Vec<RowSelection>, usize) {
+ let mut remaining = total_len;
+ let mut skip = skip_first;
+ let mut vec = vec![];
+ let mut selected_count = 0;
+ while remaining != 0 {
+ let step = if remaining > step_len {
+ step_len
+ } else {
+ remaining
+ };
+ vec.push(RowSelection {
+ row_count: step,
+ skip,
+ });
+ remaining -= step;
+ if !skip {
+ selected_count += step;
+ }
+ skip = !skip;
+ }
+ (vec, selected_count)
+ }
+
#[test]
fn test_scan_row_with_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
@@ -1760,7 +1875,7 @@ mod tests {
let do_test = |batch_size: usize, selection_len: usize| {
for skip_first in [false, true] {
let selections =
- create_test_selection(batch_size, data.num_rows(),
skip_first);
+ create_test_selection(batch_size, data.num_rows(),
skip_first).0;
let expected = get_expected_batches(&data, &selections,
batch_size);
let skip_reader = create_skip_reader(&test_file, batch_size,
selections);
@@ -1804,29 +1919,5 @@ mod tests {
.unwrap();
skip_arrow_reader.get_record_reader(batch_size).unwrap()
}
-
- fn create_test_selection(
- step_len: usize,
- total_len: usize,
- skip_first: bool,
- ) -> Vec<RowSelection> {
- let mut remaining = total_len;
- let mut skip = skip_first;
- let mut vec = vec![];
- while remaining != 0 {
- let step = if remaining > step_len {
- step_len
- } else {
- remaining
- };
- vec.push(RowSelection {
- row_count: step,
- skip,
- });
- remaining -= step;
- skip = !skip;
- }
- vec
- }
}
}