This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5676c6e37 Integrate Record Skipping into Column Reader Fuzz Test 
(#2315)
5676c6e37 is described below

commit 5676c6e378ab638efffc1789116d1e535ea88d36
Author: Yang Jiang <[email protected]>
AuthorDate: Sun Aug 7 23:51:24 2022 +0800

    Integrate Record Skipping into Column Reader Fuzz Test (#2315)
    
    * fix conflict
    
    * add log info
    
    * Integrate Record Skipping into Column Reader Fuzz Test
---
 parquet/src/arrow/arrow_reader.rs | 193 ++++++++++++++++++++++++++++----------
 1 file changed, 142 insertions(+), 51 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index 67bd2a619..1f53c2abc 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -391,7 +391,7 @@ mod tests {
     use std::path::PathBuf;
     use std::sync::Arc;
 
-    use rand::{thread_rng, RngCore};
+    use rand::{thread_rng, Rng, RngCore};
     use tempfile::tempfile;
 
     use arrow::array::*;
@@ -889,6 +889,8 @@ mod tests {
         enabled_statistics: EnabledStatistics,
         /// Encoding
         encoding: Encoding,
+        //row selections and total selected row count
+        row_selections: Option<(Vec<RowSelection>, usize)>,
     }
 
     impl Default for TestOptions {
@@ -904,6 +906,7 @@ mod tests {
                 writer_version: WriterVersion::PARQUET_1_0,
                 enabled_statistics: EnabledStatistics::Page,
                 encoding: Encoding::PLAIN,
+                row_selections: None,
             }
         }
     }
@@ -946,6 +949,20 @@ mod tests {
             }
         }
 
+        fn with_row_selections(self) -> Self {
+            let mut rng = thread_rng();
+            let step = rng.gen_range(self.record_batch_size..self.num_rows);
+            let row_selections = create_test_selection(
+                step,
+                self.num_row_groups * self.num_rows,
+                rng.gen::<bool>(),
+            );
+            Self {
+                row_selections: Some(row_selections),
+                ..self
+            }
+        }
+
         fn writer_props(&self) -> WriterProperties {
             let builder = WriterProperties::builder()
                 .set_data_pagesize_limit(self.max_data_page_size)
@@ -984,7 +1001,7 @@ mod tests {
         A: Array + 'static,
         C: Converter<Vec<Option<T::T>>, A> + 'static,
     {
-        let all_options = vec![
+        let mut all_options = vec![
             // choose record_batch_batch (15) so batches cross row
             // group boundaries (50 rows in 2 row groups) cases.
             TestOptions::new(2, 100, 15),
@@ -1015,6 +1032,39 @@ mod tests {
                 .with_enabled_statistics(EnabledStatistics::None),
         ];
 
+        let skip_options = vec![
+            // choose record_batch_batch (15) so batches cross row
+            // group boundaries (50 rows in 2 row groups) cases.
+            TestOptions::new(2, 100, 15).with_row_selections(),
+            // choose record_batch_batch (5) so batches sometime fall
+            // on row group boundaries and (25 rows in 3 row groups
+            // --> row groups of 10, 10, and 5). Tests buffer
+            // refilling edge cases.
+            TestOptions::new(3, 25, 5).with_row_selections(),
+            // Choose record_batch_size (25) so all batches fall
+            // exactly on row group boundary (25). Tests buffer
+            // refilling edge cases.
+            TestOptions::new(4, 100, 25).with_row_selections(),
+            // Set maximum page size so row groups have multiple pages
+            TestOptions::new(3, 256, 73)
+                .with_max_data_page_size(128)
+                .with_row_selections(),
+            // Set small dictionary page size to test dictionary fallback
+            TestOptions::new(3, 256, 57)
+                .with_max_dict_page_size(128)
+                .with_row_selections(),
+            // Test optional but with no nulls
+            TestOptions::new(2, 256, 127)
+                .with_null_percent(0)
+                .with_row_selections(),
+            // Test optional with nulls
+            TestOptions::new(2, 256, 93)
+                .with_null_percent(25)
+                .with_row_selections(),
+        ];
+
+        all_options.extend(skip_options);
+
         all_options.into_iter().for_each(|opts| {
             for writer_version in [WriterVersion::PARQUET_1_0, 
WriterVersion::PARQUET_2_0]
             {
@@ -1022,7 +1072,7 @@ mod tests {
                     let opts = TestOptions {
                         writer_version,
                         encoding: *encoding,
-                        ..opts
+                        ..opts.clone()
                     };
 
                     single_column_reader_test::<T, A, C, G>(
@@ -1054,10 +1104,11 @@ mod tests {
     {
         // Print out options to facilitate debugging failures on CI
         println!(
-            "Running single_column_reader_test 
ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
-            converted_type, arrow_type, opts
+            "Running type {:?} single_column_reader_test 
ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
+            T::get_physical_type(), converted_type, arrow_type, opts
         );
 
+        //according to null_percent generate def_levels
         let (repetition, def_levels) = match opts.null_percent.as_ref() {
             Some(null_percent) => {
                 let mut rng = thread_rng();
@@ -1076,6 +1127,7 @@ mod tests {
             None => (Repetition::REQUIRED, None),
         };
 
+        //generate random table data
         let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
             .map(|idx| {
                 let null_count = match def_levels.as_ref() {
@@ -1126,36 +1178,49 @@ mod tests {
 
         file.rewind().unwrap();
 
-        let mut arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
+        let mut arrow_reader;
+        let expected_data: Vec<Option<T::T>>;
+        if let Some((selections, row_count)) = opts.row_selections.clone() {
+            let options =
+                
ArrowReaderOptions::new().with_row_selection(selections.clone());
+            arrow_reader =
+                ParquetFileArrowReader::try_new_with_options(file, 
options).unwrap();
+            let mut without_skip_data = gen_expected_data::<T>(&def_levels, 
&values);
+
+            let mut skip_data: Vec<Option<T::T>> = vec![];
+            for select in selections {
+                if select.skip {
+                    without_skip_data.drain(0..select.row_count);
+                } else {
+                    
skip_data.extend(without_skip_data.drain(0..select.row_count));
+                }
+            }
+            expected_data = skip_data;
+            assert_eq!(expected_data.len(), row_count);
+        } else {
+            arrow_reader = ParquetFileArrowReader::try_new(file).unwrap();
+            //get flatten table data
+            expected_data = gen_expected_data::<T>(&def_levels, &values);
+            assert_eq!(expected_data.len(), opts.num_rows * 
opts.num_row_groups);
+        }
+
         let mut record_reader = arrow_reader
             .get_record_reader(opts.record_batch_size)
             .unwrap();
 
-        let expected_data: Vec<Option<T::T>> = match def_levels {
-            Some(levels) => {
-                let mut values_iter = values.iter().flatten();
-                levels
-                    .iter()
-                    .flatten()
-                    .map(|d| match d {
-                        1 => Some(values_iter.next().cloned().unwrap()),
-                        0 => None,
-                        _ => unreachable!(),
-                    })
-                    .collect()
-            }
-            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
-        };
-
-        assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
-
         let mut total_read = 0;
         loop {
             let maybe_batch = record_reader.next();
             if total_read < expected_data.len() {
-                let end = min(total_read + opts.record_batch_size, 
expected_data.len());
+                let mut end =
+                    min(total_read + opts.record_batch_size, 
expected_data.len());
                 let batch = maybe_batch.unwrap().unwrap();
-                assert_eq!(end - total_read, batch.num_rows());
+                //TODO remove this after implement 
https://github.com/apache/arrow-rs/issues/2197
+                if opts.row_selections.is_none() {
+                    assert_eq!(end - total_read, batch.num_rows());
+                } else {
+                    end = end.min(total_read + batch.num_rows())
+                }
 
                 let mut data = vec![];
                 data.extend_from_slice(&expected_data[total_read..end]);
@@ -1181,6 +1246,28 @@ mod tests {
         }
     }
 
+    fn gen_expected_data<T: DataType>(
+        def_levels: &Option<Vec<Vec<i16>>>,
+        values: &[Vec<T::T>],
+    ) -> Vec<Option<T::T>> {
+        let data: Vec<Option<T::T>> = match def_levels {
+            Some(levels) => {
+                let mut values_iter = values.iter().flatten();
+                levels
+                    .iter()
+                    .flatten()
+                    .map(|d| match d {
+                        1 => Some(values_iter.next().cloned().unwrap()),
+                        0 => None,
+                        _ => unreachable!(),
+                    })
+                    .collect()
+            }
+            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
+        };
+        data
+    }
+
     fn generate_single_column_file_with_data<T: DataType>(
         values: &[Vec<T::T>],
         def_levels: Option<&Vec<Vec<i16>>>,
@@ -1746,6 +1833,34 @@ mod tests {
         expected_batches
     }
 
+    fn create_test_selection(
+        step_len: usize,
+        total_len: usize,
+        skip_first: bool,
+    ) -> (Vec<RowSelection>, usize) {
+        let mut remaining = total_len;
+        let mut skip = skip_first;
+        let mut vec = vec![];
+        let mut selected_count = 0;
+        while remaining != 0 {
+            let step = if remaining > step_len {
+                step_len
+            } else {
+                remaining
+            };
+            vec.push(RowSelection {
+                row_count: step,
+                skip,
+            });
+            remaining -= step;
+            if !skip {
+                selected_count += step;
+            }
+            skip = !skip;
+        }
+        (vec, selected_count)
+    }
+
     #[test]
     fn test_scan_row_with_selection() {
         let testdata = arrow::util::test_util::parquet_test_data();
@@ -1760,7 +1875,7 @@ mod tests {
         let do_test = |batch_size: usize, selection_len: usize| {
             for skip_first in [false, true] {
                 let selections =
-                    create_test_selection(batch_size, data.num_rows(), 
skip_first);
+                    create_test_selection(batch_size, data.num_rows(), 
skip_first).0;
 
                 let expected = get_expected_batches(&data, &selections, 
batch_size);
                 let skip_reader = create_skip_reader(&test_file, batch_size, 
selections);
@@ -1804,29 +1919,5 @@ mod tests {
             .unwrap();
             skip_arrow_reader.get_record_reader(batch_size).unwrap()
         }
-
-        fn create_test_selection(
-            step_len: usize,
-            total_len: usize,
-            skip_first: bool,
-        ) -> Vec<RowSelection> {
-            let mut remaining = total_len;
-            let mut skip = skip_first;
-            let mut vec = vec![];
-            while remaining != 0 {
-                let step = if remaining > step_len {
-                    step_len
-                } else {
-                    remaining
-                };
-                vec.push(RowSelection {
-                    row_count: step,
-                    skip,
-                });
-                remaining -= step;
-                skip = !skip;
-            }
-            vec
-        }
     }
 }

Reply via email to