This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f8ff7fe Extends parquet fuzz tests to also tests nulls, dictionaries
and row groups with multiple pages (#1053) (#1110)
f8ff7fe is described below
commit f8ff7feed62280c149dc144b5baace1331a6e954
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Jan 11 14:00:50 2022 +0000
Extends parquet fuzz tests to also tests nulls, dictionaries and row groups
with multiple pages (#1053) (#1110)
* Parquet fuzz tests (#1053)
* Test multiple WriterVersions
* Revert array_reader change
---
parquet/src/arrow/arrow_reader.rs | 402 ++++++++++++++++++++++---------
parquet/src/util/test_common/rand_gen.rs | 9 +-
2 files changed, 295 insertions(+), 116 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader.rs
b/parquet/src/arrow/arrow_reader.rs
index 761c5a6..e04287c 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -235,24 +235,26 @@ impl ParquetRecordBatchReader {
mod tests {
use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
use crate::arrow::converter::{
- Converter, FixedSizeArrayConverter, FromConverter,
IntervalDayTimeArrayConverter,
- Utf8ArrayConverter,
+ BinaryArrayConverter, Converter, FixedSizeArrayConverter,
FromConverter,
+ IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter,
Utf8ArrayConverter,
};
+ use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
+ use crate::basic::{ConvertedType, Repetition};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray,
- FixedLenByteArrayType, Int32Type,
+ FixedLenByteArrayType, Int32Type, Int64Type,
};
use crate::errors::Result;
- use crate::file::properties::WriterProperties;
+ use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::{FileWriter, SerializedFileWriter};
- use crate::schema::parser::parse_message_type;
- use crate::schema::types::TypePtr;
+ use crate::schema::types::{Type, TypePtr};
use crate::util::test_common::{get_temp_filename, RandGen};
use arrow::array::*;
+ use arrow::datatypes::DataType as ArrowDataType;
use arrow::record_batch::RecordBatchReader;
- use rand::RngCore;
+ use rand::{thread_rng, RngCore};
use serde_json::json;
use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
use std::cmp::min;
@@ -317,20 +319,25 @@ mod tests {
}
#[test]
- fn test_bool_single_column_reader_test() {
- let message_type = "
- message test_schema {
- REQUIRED BOOLEAN leaf;
- }
- ";
-
- let converter = FromConverter::new();
- run_single_column_reader_tests::<
- BoolType,
- BooleanArray,
- FromConverter<Vec<Option<bool>>, BooleanArray>,
- BoolType,
- >(2, message_type, &converter);
+ fn test_primitive_single_column_reader_test() {
+ run_single_column_reader_tests::<BoolType, BooleanArray, _, BoolType>(
+ 2,
+ ConvertedType::NONE,
+ None,
+ &FromConverter::new(),
+ );
+ run_single_column_reader_tests::<Int32Type, Int32Array, _, Int32Type>(
+ 2,
+ ConvertedType::NONE,
+ None,
+ &FromConverter::new(),
+ );
+ run_single_column_reader_tests::<Int64Type, Int64Array, _, Int64Type>(
+ 2,
+ ConvertedType::NONE,
+ None,
+ &FromConverter::new(),
+ );
}
struct RandFixedLenGen {}
@@ -345,36 +352,24 @@ mod tests {
#[test]
fn test_fixed_length_binary_column_reader() {
- let message_type = "
- message test_schema {
- REQUIRED FIXED_LEN_BYTE_ARRAY (20) leaf;
- }
- ";
-
let converter = FixedSizeArrayConverter::new(20);
run_single_column_reader_tests::<
FixedLenByteArrayType,
FixedSizeBinaryArray,
FixedSizeArrayConverter,
RandFixedLenGen,
- >(20, message_type, &converter);
+ >(20, ConvertedType::NONE, None, &converter);
}
#[test]
fn test_interval_day_time_column_reader() {
- let message_type = "
- message test_schema {
- REQUIRED FIXED_LEN_BYTE_ARRAY (12) leaf (INTERVAL);
- }
- ";
-
let converter = IntervalDayTimeArrayConverter {};
run_single_column_reader_tests::<
FixedLenByteArrayType,
IntervalDayTimeArray,
IntervalDayTimeArrayConverter,
RandFixedLenGen,
- >(12, message_type, &converter);
+ >(12, ConvertedType::INTERVAL, None, &converter);
}
struct RandUtf8Gen {}
@@ -387,11 +382,13 @@ mod tests {
#[test]
fn test_utf8_single_column_reader_test() {
- let message_type = "
- message test_schema {
- REQUIRED BINARY leaf (UTF8);
- }
- ";
+ let converter = BinaryArrayConverter {};
+ run_single_column_reader_tests::<
+ ByteArrayType,
+ BinaryArray,
+ BinaryArrayConverter,
+ RandUtf8Gen,
+ >(2, ConvertedType::NONE, None, &converter);
let converter = Utf8ArrayConverter {};
run_single_column_reader_tests::<
@@ -399,7 +396,47 @@ mod tests {
StringArray,
Utf8ArrayConverter,
RandUtf8Gen,
- >(2, message_type, &converter);
+ >(2, ConvertedType::UTF8, None, &converter);
+
+ run_single_column_reader_tests::<
+ ByteArrayType,
+ StringArray,
+ Utf8ArrayConverter,
+ RandUtf8Gen,
+ >(
+ 2,
+ ConvertedType::UTF8,
+ Some(ArrowDataType::Utf8),
+ &converter,
+ );
+
+ run_single_column_reader_tests::<
+ ByteArrayType,
+ StringArray,
+ Utf8ArrayConverter,
+ RandUtf8Gen,
+ >(
+ 2,
+ ConvertedType::UTF8,
+ Some(ArrowDataType::Dictionary(
+ Box::new(ArrowDataType::Int32),
+ Box::new(ArrowDataType::Utf8),
+ )),
+ &converter,
+ );
+
+ let converter = LargeUtf8ArrayConverter {};
+ run_single_column_reader_tests::<
+ ByteArrayType,
+ LargeStringArray,
+ LargeUtf8ArrayConverter,
+ RandUtf8Gen,
+ >(
+ 2,
+ ConvertedType::UTF8,
+ Some(ArrowDataType::LargeUtf8),
+ &converter,
+ );
}
#[test]
@@ -435,19 +472,72 @@ mod tests {
}
/// Parameters for single_column_reader_test
- #[derive(Debug)]
+ #[derive(Debug, Clone)]
struct TestOptions {
/// Number of row group to write to parquet (row group size =
/// num_row_groups / num_rows)
num_row_groups: usize,
- /// Total number of rows
+ /// Total number of rows per row group
num_rows: usize,
/// Size of batches to read back
record_batch_size: usize,
- /// Total number of batches to attempt to read.
- /// `record_batch_size` * `num_iterations` should be greater
- /// than `num_rows` to ensure the data can be read back completely
- num_iterations: usize,
+ /// Percentage of nulls in column or None if required
+ null_percent: Option<usize>,
+ /// Maximum size of page in bytes
+ max_data_page_size: usize,
+ /// Maximum size of dictionary page in bytes
+ max_dict_page_size: usize,
+ /// Writer version
+ writer_version: WriterVersion,
+ }
+
+ impl Default for TestOptions {
+ fn default() -> Self {
+ Self {
+ num_row_groups: 2,
+ num_rows: 100,
+ record_batch_size: 15,
+ null_percent: None,
+ max_data_page_size: 1024 * 1024,
+ max_dict_page_size: 1024 * 1024,
+ writer_version: WriterVersion::PARQUET_1_0,
+ }
+ }
+ }
+
+ impl TestOptions {
+ fn new(num_row_groups: usize, num_rows: usize, record_batch_size:
usize) -> Self {
+ Self {
+ num_row_groups,
+ num_rows,
+ record_batch_size,
+ null_percent: None,
+ max_data_page_size: 1024 * 1024,
+ max_dict_page_size: 1024 * 1024,
+ writer_version: WriterVersion::PARQUET_1_0,
+ }
+ }
+
+ fn with_null_percent(self, null_percent: usize) -> Self {
+ Self {
+ null_percent: Some(null_percent),
+ ..self
+ }
+ }
+
+ fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
+ Self {
+ max_data_page_size,
+ ..self
+ }
+ }
+
+ fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
+ Self {
+ max_dict_page_size,
+ ..self
+ }
+ }
}
/// Create a parquet file and then read it using
@@ -458,53 +548,54 @@ mod tests {
/// value generator
fn run_single_column_reader_tests<T, A, C, G>(
rand_max: i32,
- message_type: &str,
+ converted_type: ConvertedType,
+ arrow_type: Option<ArrowDataType>,
converter: &C,
) where
T: DataType,
G: RandGen<T>,
- A: PartialEq + Array + 'static,
+ A: Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
let all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
- TestOptions {
- num_row_groups: 2,
- num_rows: 100,
- record_batch_size: 15,
- num_iterations: 50,
- },
+ TestOptions::new(2, 100, 15),
// choose record_batch_batch (5) so batches sometime fall
// on row group boundaries and (25 rows in 3 row groups
// --> row groups of 10, 10, and 5). Tests buffer
// refilling edge cases.
- TestOptions {
- num_row_groups: 3,
- num_rows: 25,
- record_batch_size: 5,
- num_iterations: 50,
- },
+ TestOptions::new(3, 25, 5),
// Choose record_batch_size (25) so all batches fall
// exactly on row group boundary (25). Tests buffer
// refilling edge cases.
- TestOptions {
- num_row_groups: 4,
- num_rows: 100,
- record_batch_size: 25,
- num_iterations: 50,
- },
+ TestOptions::new(4, 100, 25),
+ // Set maximum page size so row groups have multiple pages
+ TestOptions::new(3, 256, 73).with_max_data_page_size(128),
+ // Set small dictionary page size to test dictionary fallback
+ TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
+ // Test optional but with no nulls
+ TestOptions::new(2, 256, 127).with_null_percent(0),
+ // Test optional with nulls
+ TestOptions::new(2, 256, 93).with_null_percent(25),
];
all_options.into_iter().for_each(|opts| {
- // Print out options to facilitate debugging failures on CI
- println!("Running with Test Options: {:?}", opts);
- single_column_reader_test::<T, A, C, G>(
- opts,
- rand_max,
- message_type,
- converter,
- )
+ for writer_version in [WriterVersion::PARQUET_1_0,
WriterVersion::PARQUET_2_0]
+ {
+ let opts = TestOptions {
+ writer_version,
+ ..opts
+ };
+
+ single_column_reader_test::<T, A, C, G>(
+ opts,
+ rand_max,
+ converted_type,
+ arrow_type.clone(),
+ converter,
+ )
+ }
});
}
@@ -514,24 +605,86 @@ mod tests {
fn single_column_reader_test<T, A, C, G>(
opts: TestOptions,
rand_max: i32,
- message_type: &str,
+ converted_type: ConvertedType,
+ arrow_type: Option<ArrowDataType>,
converter: &C,
) where
T: DataType,
G: RandGen<T>,
- A: PartialEq + Array + 'static,
+ A: Array + 'static,
C: Converter<Vec<Option<T::T>>, A> + 'static,
{
+ // Print out options to facilitate debugging failures on CI
+ println!(
+ "Running single_column_reader_test
ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
+ converted_type, arrow_type, opts
+ );
+
+ let (repetition, def_levels) = match opts.null_percent.as_ref() {
+ Some(null_percent) => {
+ let mut rng = thread_rng();
+
+ let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
+ .map(|_| {
+ std::iter::from_fn(|| {
+ Some((rng.next_u32() as usize % 100 >=
*null_percent) as i16)
+ })
+ .take(opts.num_rows)
+ .collect()
+ })
+ .collect();
+ (Repetition::OPTIONAL, Some(def_levels))
+ }
+ None => (Repetition::REQUIRED, None),
+ };
+
let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
- .map(|_| G::gen_vec(rand_max, opts.num_rows))
+ .map(|idx| {
+ let null_count = match def_levels.as_ref() {
+ Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
+ None => 0,
+ };
+ G::gen_vec(rand_max, opts.num_rows - null_count)
+ })
.collect();
let path = get_temp_filename();
- let schema = parse_message_type(message_type).map(Arc::new).unwrap();
+ let len = match T::get_physical_type() {
+ crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
+ crate::basic::Type::INT96 => 12,
+ _ => -1,
+ };
- generate_single_column_file_with_data::<T>(&values, path.as_path(),
schema)
- .unwrap();
+ let mut fields = vec![Arc::new(
+ Type::primitive_type_builder("leaf", T::get_physical_type())
+ .with_repetition(repetition)
+ .with_converted_type(converted_type)
+ .with_length(len)
+ .build()
+ .unwrap(),
+ )];
+
+ let schema = Arc::new(
+ Type::group_type_builder("test_schema")
+ .with_fields(&mut fields)
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_field = arrow_type
+ .clone()
+ .map(|t| arrow::datatypes::Field::new("leaf", t, false));
+
+ generate_single_column_file_with_data::<T>(
+ &values,
+ def_levels.as_ref(),
+ path.as_path(),
+ schema,
+ arrow_field,
+ &opts,
+ )
+ .unwrap();
let parquet_reader =
SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
@@ -541,57 +694,88 @@ mod tests {
.get_record_reader(opts.record_batch_size)
.unwrap();
- let expected_data: Vec<Option<T::T>> = values
- .iter()
- .flat_map(|v| v.iter())
- .map(|b| Some(b.clone()))
- .collect();
+ let expected_data: Vec<Option<T::T>> = match def_levels {
+ Some(levels) => {
+ let mut values_iter = values.iter().flatten();
+ levels
+ .iter()
+ .flatten()
+ .map(|d| match d {
+ 1 => Some(values_iter.next().cloned().unwrap()),
+ 0 => None,
+ _ => unreachable!(),
+ })
+ .collect()
+ }
+ None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
+ };
- for i in 0..opts.num_iterations {
- let start = i * opts.record_batch_size;
+ assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
- let batch = record_reader.next();
- if start < expected_data.len() {
- let end = min(start + opts.record_batch_size,
expected_data.len());
- assert!(batch.is_some());
+ let mut total_read = 0;
+ loop {
+ let maybe_batch = record_reader.next();
+ if total_read < expected_data.len() {
+ let end = min(total_read + opts.record_batch_size,
expected_data.len());
+ let batch = maybe_batch.unwrap().unwrap();
+ assert_eq!(end - total_read, batch.num_rows());
let mut data = vec![];
- data.extend_from_slice(&expected_data[start..end]);
-
- assert_eq!(
- &converter.convert(data).unwrap(),
- batch
- .unwrap()
- .unwrap()
- .column(0)
- .as_any()
- .downcast_ref::<A>()
- .unwrap()
- );
+ data.extend_from_slice(&expected_data[total_read..end]);
+
+ let a = converter.convert(data).unwrap();
+ let mut b = Arc::clone(batch.column(0));
+
+ if let Some(arrow_type) = arrow_type.as_ref() {
+ assert_eq!(b.data_type(), arrow_type);
+ if let ArrowDataType::Dictionary(_, v) = arrow_type {
+ assert_eq!(a.data_type(), v.as_ref());
+ b = arrow::compute::cast(&b, v.as_ref()).unwrap()
+ }
+ }
+ assert_eq!(a.data_type(), b.data_type());
+ assert_eq!(a.data(), b.data());
+
+ total_read = end;
} else {
- assert!(batch.is_none());
+ assert!(maybe_batch.is_none());
+ break;
}
}
}
fn generate_single_column_file_with_data<T: DataType>(
values: &[Vec<T::T>],
+ def_levels: Option<&Vec<Vec<i16>>>,
path: &Path,
schema: TypePtr,
+ field: Option<arrow::datatypes::Field>,
+ opts: &TestOptions,
) -> Result<parquet_format::FileMetaData> {
let file = File::create(path)?;
- let writer_props = Arc::new(WriterProperties::builder().build());
+ let mut writer_props = WriterProperties::builder()
+ .set_data_pagesize_limit(opts.max_data_page_size)
+ .set_dictionary_pagesize_limit(opts.max_dict_page_size)
+ .set_dictionary_enabled(true)
+ .set_writer_version(opts.writer_version)
+ .build();
+
+ if let Some(field) = field {
+ let arrow_schema = arrow::datatypes::Schema::new(vec![field]);
+ add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut
writer_props);
+ }
- let mut writer = SerializedFileWriter::new(file, schema,
writer_props)?;
+ let mut writer = SerializedFileWriter::new(file, schema,
Arc::new(writer_props))?;
- for v in values {
+ for (idx, v) in values.iter().enumerate() {
+ let def_levels = def_levels.map(|d| d[idx].as_slice());
let mut row_group_writer = writer.next_row_group()?;
let mut column_writer = row_group_writer
.next_column()?
.expect("Column writer is none!");
get_typed_column_writer_mut::<T>(&mut column_writer)
- .write_batch(v, None, None)?;
+ .write_batch(v, def_levels, None)?;
row_group_writer.close_column(column_writer)?;
writer.close_row_group(row_group_writer)?
diff --git a/parquet/src/util/test_common/rand_gen.rs
b/parquet/src/util/test_common/rand_gen.rs
index ea91b28..d9c2565 100644
--- a/parquet/src/util/test_common/rand_gen.rs
+++ b/parquet/src/util/test_common/rand_gen.rs
@@ -91,13 +91,8 @@ impl RandGen<ByteArrayType> for ByteArrayType {
impl RandGen<FixedLenByteArrayType> for FixedLenByteArrayType {
fn gen(len: i32) -> FixedLenByteArray {
- let mut rng = thread_rng();
- let value_len = if len < 0 {
- rng.gen_range(0..128)
- } else {
- len as usize
- };
- let value = random_bytes(value_len);
+ assert!(len >= 0);
+ let value = random_bytes(len as usize);
ByteArray::from(value).into()
}
}