This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new f8ff7fe  Extends parquet fuzz tests to also tests nulls, dictionaries 
and row groups with multiple pages  (#1053) (#1110)
f8ff7fe is described below

commit f8ff7feed62280c149dc144b5baace1331a6e954
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jan 11 14:00:50 2022 +0000

    Extends parquet fuzz tests to also tests nulls, dictionaries and row groups 
with multiple pages  (#1053) (#1110)
    
    * Parquet fuzz tests (#1053)
    
    * Test multiple WriterVersions
    
    * Revert array_reader change
---
 parquet/src/arrow/arrow_reader.rs        | 402 ++++++++++++++++++++++---------
 parquet/src/util/test_common/rand_gen.rs |   9 +-
 2 files changed, 295 insertions(+), 116 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader.rs 
b/parquet/src/arrow/arrow_reader.rs
index 761c5a6..e04287c 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -235,24 +235,26 @@ impl ParquetRecordBatchReader {
 mod tests {
     use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
     use crate::arrow::converter::{
-        Converter, FixedSizeArrayConverter, FromConverter, 
IntervalDayTimeArrayConverter,
-        Utf8ArrayConverter,
+        BinaryArrayConverter, Converter, FixedSizeArrayConverter, 
FromConverter,
+        IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, 
Utf8ArrayConverter,
     };
+    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
+    use crate::basic::{ConvertedType, Repetition};
     use crate::column::writer::get_typed_column_writer_mut;
     use crate::data_type::{
         BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
-        FixedLenByteArrayType, Int32Type,
+        FixedLenByteArrayType, Int32Type, Int64Type,
     };
     use crate::errors::Result;
-    use crate::file::properties::WriterProperties;
+    use crate::file::properties::{WriterProperties, WriterVersion};
     use crate::file::reader::{FileReader, SerializedFileReader};
     use crate::file::writer::{FileWriter, SerializedFileWriter};
-    use crate::schema::parser::parse_message_type;
-    use crate::schema::types::TypePtr;
+    use crate::schema::types::{Type, TypePtr};
     use crate::util::test_common::{get_temp_filename, RandGen};
     use arrow::array::*;
+    use arrow::datatypes::DataType as ArrowDataType;
     use arrow::record_batch::RecordBatchReader;
-    use rand::RngCore;
+    use rand::{thread_rng, RngCore};
     use serde_json::json;
     use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
     use std::cmp::min;
@@ -317,20 +319,25 @@ mod tests {
     }
 
     #[test]
-    fn test_bool_single_column_reader_test() {
-        let message_type = "
-        message test_schema {
-          REQUIRED BOOLEAN leaf;
-        }
-        ";
-
-        let converter = FromConverter::new();
-        run_single_column_reader_tests::<
-            BoolType,
-            BooleanArray,
-            FromConverter<Vec<Option<bool>>, BooleanArray>,
-            BoolType,
-        >(2, message_type, &converter);
+    fn test_primitive_single_column_reader_test() {
+        run_single_column_reader_tests::<BoolType, BooleanArray, _, BoolType>(
+            2,
+            ConvertedType::NONE,
+            None,
+            &FromConverter::new(),
+        );
+        run_single_column_reader_tests::<Int32Type, Int32Array, _, Int32Type>(
+            2,
+            ConvertedType::NONE,
+            None,
+            &FromConverter::new(),
+        );
+        run_single_column_reader_tests::<Int64Type, Int64Array, _, Int64Type>(
+            2,
+            ConvertedType::NONE,
+            None,
+            &FromConverter::new(),
+        );
     }
 
     struct RandFixedLenGen {}
@@ -345,36 +352,24 @@ mod tests {
 
     #[test]
     fn test_fixed_length_binary_column_reader() {
-        let message_type = "
-        message test_schema {
-          REQUIRED FIXED_LEN_BYTE_ARRAY (20) leaf;
-        }
-        ";
-
         let converter = FixedSizeArrayConverter::new(20);
         run_single_column_reader_tests::<
             FixedLenByteArrayType,
             FixedSizeBinaryArray,
             FixedSizeArrayConverter,
             RandFixedLenGen,
-        >(20, message_type, &converter);
+        >(20, ConvertedType::NONE, None, &converter);
     }
 
     #[test]
     fn test_interval_day_time_column_reader() {
-        let message_type = "
-        message test_schema {
-          REQUIRED FIXED_LEN_BYTE_ARRAY (12) leaf (INTERVAL);
-        }
-        ";
-
         let converter = IntervalDayTimeArrayConverter {};
         run_single_column_reader_tests::<
             FixedLenByteArrayType,
             IntervalDayTimeArray,
             IntervalDayTimeArrayConverter,
             RandFixedLenGen,
-        >(12, message_type, &converter);
+        >(12, ConvertedType::INTERVAL, None, &converter);
     }
 
     struct RandUtf8Gen {}
@@ -387,11 +382,13 @@ mod tests {
 
     #[test]
     fn test_utf8_single_column_reader_test() {
-        let message_type = "
-        message test_schema {
-          REQUIRED BINARY leaf (UTF8);
-        }
-        ";
+        let converter = BinaryArrayConverter {};
+        run_single_column_reader_tests::<
+            ByteArrayType,
+            BinaryArray,
+            BinaryArrayConverter,
+            RandUtf8Gen,
+        >(2, ConvertedType::NONE, None, &converter);
 
         let converter = Utf8ArrayConverter {};
         run_single_column_reader_tests::<
@@ -399,7 +396,47 @@ mod tests {
             StringArray,
             Utf8ArrayConverter,
             RandUtf8Gen,
-        >(2, message_type, &converter);
+        >(2, ConvertedType::UTF8, None, &converter);
+
+        run_single_column_reader_tests::<
+            ByteArrayType,
+            StringArray,
+            Utf8ArrayConverter,
+            RandUtf8Gen,
+        >(
+            2,
+            ConvertedType::UTF8,
+            Some(ArrowDataType::Utf8),
+            &converter,
+        );
+
+        run_single_column_reader_tests::<
+            ByteArrayType,
+            StringArray,
+            Utf8ArrayConverter,
+            RandUtf8Gen,
+        >(
+            2,
+            ConvertedType::UTF8,
+            Some(ArrowDataType::Dictionary(
+                Box::new(ArrowDataType::Int32),
+                Box::new(ArrowDataType::Utf8),
+            )),
+            &converter,
+        );
+
+        let converter = LargeUtf8ArrayConverter {};
+        run_single_column_reader_tests::<
+            ByteArrayType,
+            LargeStringArray,
+            LargeUtf8ArrayConverter,
+            RandUtf8Gen,
+        >(
+            2,
+            ConvertedType::UTF8,
+            Some(ArrowDataType::LargeUtf8),
+            &converter,
+        );
     }
 
     #[test]
@@ -435,19 +472,72 @@ mod tests {
     }
 
     /// Parameters for single_column_reader_test
-    #[derive(Debug)]
+    #[derive(Debug, Clone)]
     struct TestOptions {
         /// Number of row group to write to parquet (row group size =
         /// num_row_groups / num_rows)
         num_row_groups: usize,
-        /// Total number of rows
+        /// Total number of rows per row group
         num_rows: usize,
         /// Size of batches to read back
         record_batch_size: usize,
-        /// Total number of batches to attempt to read.
-        /// `record_batch_size` * `num_iterations` should be greater
-        /// than `num_rows` to ensure the data can be read back completely
-        num_iterations: usize,
+        /// Percentage of nulls in column or None if required
+        null_percent: Option<usize>,
+        /// Maximum size of page in bytes
+        max_data_page_size: usize,
+        /// Maximum size of dictionary page in bytes
+        max_dict_page_size: usize,
+        /// Writer version
+        writer_version: WriterVersion,
+    }
+
+    impl Default for TestOptions {
+        fn default() -> Self {
+            Self {
+                num_row_groups: 2,
+                num_rows: 100,
+                record_batch_size: 15,
+                null_percent: None,
+                max_data_page_size: 1024 * 1024,
+                max_dict_page_size: 1024 * 1024,
+                writer_version: WriterVersion::PARQUET_1_0,
+            }
+        }
+    }
+
+    impl TestOptions {
+        fn new(num_row_groups: usize, num_rows: usize, record_batch_size: 
usize) -> Self {
+            Self {
+                num_row_groups,
+                num_rows,
+                record_batch_size,
+                null_percent: None,
+                max_data_page_size: 1024 * 1024,
+                max_dict_page_size: 1024 * 1024,
+                writer_version: WriterVersion::PARQUET_1_0,
+            }
+        }
+
+        fn with_null_percent(self, null_percent: usize) -> Self {
+            Self {
+                null_percent: Some(null_percent),
+                ..self
+            }
+        }
+
+        fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
+            Self {
+                max_data_page_size,
+                ..self
+            }
+        }
+
+        fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
+            Self {
+                max_dict_page_size,
+                ..self
+            }
+        }
     }
 
     /// Create a parquet file and then read it using
@@ -458,53 +548,54 @@ mod tests {
     /// value generator
     fn run_single_column_reader_tests<T, A, C, G>(
         rand_max: i32,
-        message_type: &str,
+        converted_type: ConvertedType,
+        arrow_type: Option<ArrowDataType>,
         converter: &C,
     ) where
         T: DataType,
         G: RandGen<T>,
-        A: PartialEq + Array + 'static,
+        A: Array + 'static,
         C: Converter<Vec<Option<T::T>>, A> + 'static,
     {
         let all_options = vec![
             // choose record_batch_batch (15) so batches cross row
             // group boundaries (50 rows in 2 row groups) cases.
-            TestOptions {
-                num_row_groups: 2,
-                num_rows: 100,
-                record_batch_size: 15,
-                num_iterations: 50,
-            },
+            TestOptions::new(2, 100, 15),
             // choose record_batch_batch (5) so batches sometime fall
             // on row group boundaries and (25 rows in 3 row groups
             // --> row groups of 10, 10, and 5). Tests buffer
             // refilling edge cases.
-            TestOptions {
-                num_row_groups: 3,
-                num_rows: 25,
-                record_batch_size: 5,
-                num_iterations: 50,
-            },
+            TestOptions::new(3, 25, 5),
             // Choose record_batch_size (25) so all batches fall
             // exactly on row group boundary (25). Tests buffer
             // refilling edge cases.
-            TestOptions {
-                num_row_groups: 4,
-                num_rows: 100,
-                record_batch_size: 25,
-                num_iterations: 50,
-            },
+            TestOptions::new(4, 100, 25),
+            // Set maximum page size so row groups have multiple pages
+            TestOptions::new(3, 256, 73).with_max_data_page_size(128),
+            // Set small dictionary page size to test dictionary fallback
+            TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
+            // Test optional but with no nulls
+            TestOptions::new(2, 256, 127).with_null_percent(0),
+            // Test optional with nulls
+            TestOptions::new(2, 256, 93).with_null_percent(25),
         ];
 
         all_options.into_iter().for_each(|opts| {
-            // Print out options to facilitate debugging failures on CI
-            println!("Running with Test Options: {:?}", opts);
-            single_column_reader_test::<T, A, C, G>(
-                opts,
-                rand_max,
-                message_type,
-                converter,
-            )
+            for writer_version in [WriterVersion::PARQUET_1_0, 
WriterVersion::PARQUET_2_0]
+            {
+                let opts = TestOptions {
+                    writer_version,
+                    ..opts
+                };
+
+                single_column_reader_test::<T, A, C, G>(
+                    opts,
+                    rand_max,
+                    converted_type,
+                    arrow_type.clone(),
+                    converter,
+                )
+            }
         });
     }
 
@@ -514,24 +605,86 @@ mod tests {
     fn single_column_reader_test<T, A, C, G>(
         opts: TestOptions,
         rand_max: i32,
-        message_type: &str,
+        converted_type: ConvertedType,
+        arrow_type: Option<ArrowDataType>,
         converter: &C,
     ) where
         T: DataType,
         G: RandGen<T>,
-        A: PartialEq + Array + 'static,
+        A: Array + 'static,
         C: Converter<Vec<Option<T::T>>, A> + 'static,
     {
+        // Print out options to facilitate debugging failures on CI
+        println!(
+            "Running single_column_reader_test 
ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
+            converted_type, arrow_type, opts
+        );
+
+        let (repetition, def_levels) = match opts.null_percent.as_ref() {
+            Some(null_percent) => {
+                let mut rng = thread_rng();
+
+                let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
+                    .map(|_| {
+                        std::iter::from_fn(|| {
+                            Some((rng.next_u32() as usize % 100 >= 
*null_percent) as i16)
+                        })
+                        .take(opts.num_rows)
+                        .collect()
+                    })
+                    .collect();
+                (Repetition::OPTIONAL, Some(def_levels))
+            }
+            None => (Repetition::REQUIRED, None),
+        };
+
         let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
-            .map(|_| G::gen_vec(rand_max, opts.num_rows))
+            .map(|idx| {
+                let null_count = match def_levels.as_ref() {
+                    Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
+                    None => 0,
+                };
+                G::gen_vec(rand_max, opts.num_rows - null_count)
+            })
             .collect();
 
         let path = get_temp_filename();
 
-        let schema = parse_message_type(message_type).map(Arc::new).unwrap();
+        let len = match T::get_physical_type() {
+            crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
+            crate::basic::Type::INT96 => 12,
+            _ => -1,
+        };
 
-        generate_single_column_file_with_data::<T>(&values, path.as_path(), 
schema)
-            .unwrap();
+        let mut fields = vec![Arc::new(
+            Type::primitive_type_builder("leaf", T::get_physical_type())
+                .with_repetition(repetition)
+                .with_converted_type(converted_type)
+                .with_length(len)
+                .build()
+                .unwrap(),
+        )];
+
+        let schema = Arc::new(
+            Type::group_type_builder("test_schema")
+                .with_fields(&mut fields)
+                .build()
+                .unwrap(),
+        );
+
+        let arrow_field = arrow_type
+            .clone()
+            .map(|t| arrow::datatypes::Field::new("leaf", t, false));
+
+        generate_single_column_file_with_data::<T>(
+            &values,
+            def_levels.as_ref(),
+            path.as_path(),
+            schema,
+            arrow_field,
+            &opts,
+        )
+        .unwrap();
 
         let parquet_reader =
             
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
@@ -541,57 +694,88 @@ mod tests {
             .get_record_reader(opts.record_batch_size)
             .unwrap();
 
-        let expected_data: Vec<Option<T::T>> = values
-            .iter()
-            .flat_map(|v| v.iter())
-            .map(|b| Some(b.clone()))
-            .collect();
+        let expected_data: Vec<Option<T::T>> = match def_levels {
+            Some(levels) => {
+                let mut values_iter = values.iter().flatten();
+                levels
+                    .iter()
+                    .flatten()
+                    .map(|d| match d {
+                        1 => Some(values_iter.next().cloned().unwrap()),
+                        0 => None,
+                        _ => unreachable!(),
+                    })
+                    .collect()
+            }
+            None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
+        };
 
-        for i in 0..opts.num_iterations {
-            let start = i * opts.record_batch_size;
+        assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
 
-            let batch = record_reader.next();
-            if start < expected_data.len() {
-                let end = min(start + opts.record_batch_size, 
expected_data.len());
-                assert!(batch.is_some());
+        let mut total_read = 0;
+        loop {
+            let maybe_batch = record_reader.next();
+            if total_read < expected_data.len() {
+                let end = min(total_read + opts.record_batch_size, 
expected_data.len());
+                let batch = maybe_batch.unwrap().unwrap();
+                assert_eq!(end - total_read, batch.num_rows());
 
                 let mut data = vec![];
-                data.extend_from_slice(&expected_data[start..end]);
-
-                assert_eq!(
-                    &converter.convert(data).unwrap(),
-                    batch
-                        .unwrap()
-                        .unwrap()
-                        .column(0)
-                        .as_any()
-                        .downcast_ref::<A>()
-                        .unwrap()
-                );
+                data.extend_from_slice(&expected_data[total_read..end]);
+
+                let a = converter.convert(data).unwrap();
+                let mut b = Arc::clone(batch.column(0));
+
+                if let Some(arrow_type) = arrow_type.as_ref() {
+                    assert_eq!(b.data_type(), arrow_type);
+                    if let ArrowDataType::Dictionary(_, v) = arrow_type {
+                        assert_eq!(a.data_type(), v.as_ref());
+                        b = arrow::compute::cast(&b, v.as_ref()).unwrap()
+                    }
+                }
+                assert_eq!(a.data_type(), b.data_type());
+                assert_eq!(a.data(), b.data());
+
+                total_read = end;
             } else {
-                assert!(batch.is_none());
+                assert!(maybe_batch.is_none());
+                break;
             }
         }
     }
 
     fn generate_single_column_file_with_data<T: DataType>(
         values: &[Vec<T::T>],
+        def_levels: Option<&Vec<Vec<i16>>>,
         path: &Path,
         schema: TypePtr,
+        field: Option<arrow::datatypes::Field>,
+        opts: &TestOptions,
     ) -> Result<parquet_format::FileMetaData> {
         let file = File::create(path)?;
-        let writer_props = Arc::new(WriterProperties::builder().build());
+        let mut writer_props = WriterProperties::builder()
+            .set_data_pagesize_limit(opts.max_data_page_size)
+            .set_dictionary_pagesize_limit(opts.max_dict_page_size)
+            .set_dictionary_enabled(true)
+            .set_writer_version(opts.writer_version)
+            .build();
+
+        if let Some(field) = field {
+            let arrow_schema = arrow::datatypes::Schema::new(vec![field]);
+            add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut 
writer_props);
+        }
 
-        let mut writer = SerializedFileWriter::new(file, schema, 
writer_props)?;
+        let mut writer = SerializedFileWriter::new(file, schema, 
Arc::new(writer_props))?;
 
-        for v in values {
+        for (idx, v) in values.iter().enumerate() {
+            let def_levels = def_levels.map(|d| d[idx].as_slice());
             let mut row_group_writer = writer.next_row_group()?;
             let mut column_writer = row_group_writer
                 .next_column()?
                 .expect("Column writer is none!");
 
             get_typed_column_writer_mut::<T>(&mut column_writer)
-                .write_batch(v, None, None)?;
+                .write_batch(v, def_levels, None)?;
 
             row_group_writer.close_column(column_writer)?;
             writer.close_row_group(row_group_writer)?
diff --git a/parquet/src/util/test_common/rand_gen.rs 
b/parquet/src/util/test_common/rand_gen.rs
index ea91b28..d9c2565 100644
--- a/parquet/src/util/test_common/rand_gen.rs
+++ b/parquet/src/util/test_common/rand_gen.rs
@@ -91,13 +91,8 @@ impl RandGen<ByteArrayType> for ByteArrayType {
 
 impl RandGen<FixedLenByteArrayType> for FixedLenByteArrayType {
     fn gen(len: i32) -> FixedLenByteArray {
-        let mut rng = thread_rng();
-        let value_len = if len < 0 {
-            rng.gen_range(0..128)
-        } else {
-            len as usize
-        };
-        let value = random_bytes(value_len);
+        assert!(len >= 0);
+        let value = random_bytes(len as usize);
         ByteArray::from(value).into()
     }
 }

Reply via email to