This is an automated email from the ASF dual-hosted git repository.
goldmedal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c22abb4ac3 Improve `AggregateFuzz` testing: generate random queries
(#12847)
c22abb4ac3 is described below
commit c22abb4ac3f1af8bbdf176ef0198988fc7b0982c
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 22 05:43:21 2024 -0400
Improve `AggregateFuzz` testing: generate random queries (#12847)
* Add random queries into aggregate fuzz tester
* Address review comments
* Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
Co-authored-by: Jax Liu <[email protected]>
* Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
Co-authored-by: Jax Liu <[email protected]>
---------
Co-authored-by: Jax Liu <[email protected]>
---
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 385 +++++++--------------
.../aggregation_fuzzer/data_generator.rs | 37 +-
.../tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs | 237 ++++++++++++-
test-utils/src/string_gen.rs | 2 +-
4 files changed, 370 insertions(+), 291 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index ff51282933..1035fa31da 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -45,299 +45,150 @@ use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;
use crate::fuzz_cases::aggregation_fuzzer::{
- AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig,
+ AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig,
QueryBuilder,
};
// ========================================================================
// The new aggregation fuzz tests based on [`AggregationFuzzer`]
// ========================================================================
-
-// TODO: write more test case to cover more `group by`s and `aggregation
function`s
-// TODO: maybe we can use macro to simply the case creating
-
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `no
group by`
+//
+// Notes on tests:
+//
+// Since the supported types differ for each aggregation function, the tests
+// below are structured so they enumerate each different aggregate function.
+//
+// The test framework handles varying combinations of arguments (data types),
+// sortedness, and grouping parameters
+//
+// TODO: Test floating point values (where output needs to be compared with
some
+// acceptable range due to floating point rounding)
+//
+// TODO: test other aggregate functions
+// - AVG (unstable given the wide range of inputs)
+//
+// TODO: specific test for ordering (ensure all group by columns are ordered)
+// Currently the data is sorted by random columns, so there are almost no
+// repeated runs. To improve coverage we should also sort by lower
cardinality columns
#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_no_group() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![ColumnDescr::new("a", DataType::Int32)];
-
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set: Vec::new(),
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(16)
- .add_sql("SELECT sum(a) FROM fuzz_table")
- .add_sql("SELECT sum(distinct a) FROM fuzz_table")
- .add_sql("SELECT max(a) FROM fuzz_table")
- .add_sql("SELECT min(a) FROM fuzz_table")
- .add_sql("SELECT count(a) FROM fuzz_table")
- .add_sql("SELECT count(distinct a) FROM fuzz_table")
- .add_sql("SELECT avg(a) FROM fuzz_table")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await
+async fn test_min() {
+ let data_gen_config = baseline_config();
+
+ // Queries like SELECT min(a) FROM fuzz_table GROUP BY b
+ let query_builder = QueryBuilder::new()
+ .with_table_name("fuzz_table")
+ .with_aggregate_function("min")
+ // min works on all column types
+ .with_aggregate_arguments(data_gen_config.all_columns())
+ .with_group_by_columns(data_gen_config.all_columns());
+
+ AggregationFuzzerBuilder::from(data_gen_config)
+ .add_query_builder(query_builder)
+ .build()
+ .run()
+ .await;
}
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` +
`group by single int64`
#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_group_by_single_int64() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![
- ColumnDescr::new("a", DataType::Int32),
- ColumnDescr::new("b", DataType::Int64),
- ColumnDescr::new("c", DataType::Int64),
- ];
- let sort_keys_set = vec![
- vec!["b".to_string()],
- vec!["c".to_string(), "b".to_string()],
- ];
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set,
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(16)
- .add_sql("SELECT b, sum(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, sum(distinct a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, avg(a) FROM fuzz_table GROUP BY b")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
+async fn test_max() {
+ let data_gen_config = baseline_config();
+
+ // Queries like SELECT max(a) FROM fuzz_table GROUP BY b
+ let query_builder = QueryBuilder::new()
+ .with_table_name("fuzz_table")
+ .with_aggregate_function("max")
+ // max works on all column types
+ .with_aggregate_arguments(data_gen_config.all_columns())
+ .with_group_by_columns(data_gen_config.all_columns());
+
+ AggregationFuzzerBuilder::from(data_gen_config)
+ .add_query_builder(query_builder)
+ .build()
+ .run()
+ .await;
}
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` +
`group by single string`
#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_group_by_single_string() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![
- ColumnDescr::new("a", DataType::Int32),
- ColumnDescr::new("b", DataType::Utf8),
- ColumnDescr::new("c", DataType::Int64),
- ];
- let sort_keys_set = vec![
- vec!["b".to_string()],
- vec!["c".to_string(), "b".to_string()],
- ];
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set,
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(16)
- .add_sql("SELECT b, sum(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, sum(distinct a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, avg(a) FROM fuzz_table GROUP BY b")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
+async fn test_sum() {
+ let data_gen_config = baseline_config();
+
+ // Queries like SELECT sum(a), sum(distinct) FROM fuzz_table GROUP BY b
+ let query_builder = QueryBuilder::new()
+ .with_table_name("fuzz_table")
+ .with_aggregate_function("sum")
+ .with_distinct_aggregate_function("sum")
+ // sum only works on numeric columns
+ .with_aggregate_arguments(data_gen_config.numeric_columns())
+ .with_group_by_columns(data_gen_config.all_columns());
+
+ AggregationFuzzerBuilder::from(data_gen_config)
+ .add_query_builder(query_builder)
+ .build()
+ .run()
+ .await;
}
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` +
`group by string + int64`
#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_group_by_mixed_string_int64() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![
- ColumnDescr::new("a", DataType::Int32),
- ColumnDescr::new("b", DataType::Utf8),
- ColumnDescr::new("c", DataType::Int64),
- ColumnDescr::new("d", DataType::Int32),
- ];
- let sort_keys_set = vec![
- vec!["b".to_string(), "c".to_string()],
- vec!["d".to_string(), "b".to_string(), "c".to_string()],
- ];
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set,
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(16)
- .add_sql("SELECT b, c, sum(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, sum(distinct a) FROM fuzz_table GROUP BY b,c")
- .add_sql("SELECT b, c, max(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, min(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, count(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, count(distinct a) FROM fuzz_table GROUP BY b,
c")
- .add_sql("SELECT b, c, avg(a) FROM fuzz_table GROUP BY b, c")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
+async fn test_count() {
+ let data_gen_config = baseline_config();
+
+ // Queries like SELECT count(a), count(distinct) FROM fuzz_table GROUP BY b
+ let query_builder = QueryBuilder::new()
+ .with_table_name("fuzz_table")
+ .with_aggregate_function("count")
+ .with_distinct_aggregate_function("count")
+ // count work for all arguments
+ .with_aggregate_arguments(data_gen_config.all_columns())
+ .with_group_by_columns(data_gen_config.all_columns());
+
+ AggregationFuzzerBuilder::from(data_gen_config)
+ .add_query_builder(query_builder)
+ .build()
+ .run()
+ .await;
}
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `no
group by`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_no_group() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![ColumnDescr::new("a", DataType::Utf8)];
-
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set: Vec::new(),
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(8)
- .add_sql("SELECT max(a) FROM fuzz_table")
- .add_sql("SELECT min(a) FROM fuzz_table")
- .add_sql("SELECT count(a) FROM fuzz_table")
- .add_sql("SELECT count(distinct a) FROM fuzz_table")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
-}
-
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group
by single int64`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_group_by_single_int64() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![
- ColumnDescr::new("a", DataType::Utf8),
- ColumnDescr::new("b", DataType::Int64),
- ColumnDescr::new("c", DataType::Int64),
- ];
- let sort_keys_set = vec![
- vec!["b".to_string()],
- vec!["c".to_string(), "b".to_string()],
- ];
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set,
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(8)
- .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
-}
-
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group
by single string`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_group_by_single_string() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
+/// Return a standard set of columns for testing data generation
+///
+/// Includes numeric and string types
+///
+/// Does not include:
+/// 1. Floating point numbers
+/// 1. structured types
+fn baseline_config() -> DatasetGeneratorConfig {
let columns = vec![
- ColumnDescr::new("a", DataType::Utf8),
- ColumnDescr::new("b", DataType::Utf8),
- ColumnDescr::new("c", DataType::Int64),
- ];
- let sort_keys_set = vec![
- vec!["b".to_string()],
- vec!["c".to_string(), "b".to_string()],
+ ColumnDescr::new("i8", DataType::Int8),
+ ColumnDescr::new("i16", DataType::Int16),
+ ColumnDescr::new("i32", DataType::Int32),
+ ColumnDescr::new("i64", DataType::Int64),
+ ColumnDescr::new("u8", DataType::UInt8),
+ ColumnDescr::new("u16", DataType::UInt16),
+ ColumnDescr::new("u32", DataType::UInt32),
+ ColumnDescr::new("u64", DataType::UInt64),
+ // TODO: date/time columns
+ // todo decimal columns
+ // begin string columns
+ ColumnDescr::new("utf8", DataType::Utf8),
+ ColumnDescr::new("largeutf8", DataType::LargeUtf8),
+ // TODO add support for utf8view in data generator
+ // ColumnDescr::new("utf8view", DataType::Utf8View),
+ // todo binary
];
- let data_gen_config = DatasetGeneratorConfig {
- columns,
- rows_num_range: (512, 1024),
- sort_keys_set,
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(16)
- .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
- .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
-}
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group
by string + int64`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_group_by_mixed_string_int64() {
- let builder = AggregationFuzzerBuilder::default();
-
- // Define data generator config
- let columns = vec![
- ColumnDescr::new("a", DataType::Utf8),
- ColumnDescr::new("b", DataType::Utf8),
- ColumnDescr::new("c", DataType::Int64),
- ColumnDescr::new("d", DataType::Int32),
- ];
- let sort_keys_set = vec![
- vec!["b".to_string(), "c".to_string()],
- vec!["d".to_string(), "b".to_string(), "c".to_string()],
- ];
- let data_gen_config = DatasetGeneratorConfig {
+ DatasetGeneratorConfig {
columns,
rows_num_range: (512, 1024),
- sort_keys_set,
- };
-
- // Build fuzzer
- let fuzzer = builder
- .data_gen_config(data_gen_config)
- .data_gen_rounds(16)
- .add_sql("SELECT b, c, max(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, min(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, count(a) FROM fuzz_table GROUP BY b, c")
- .add_sql("SELECT b, c, count(distinct a) FROM fuzz_table GROUP BY b,
c")
- .table_name("fuzz_table")
- .build();
-
- fuzzer.run().await;
+ sort_keys_set: vec![
+ // low cardinality to try and get many repeated runs
+ vec![String::from("u8")],
+ vec![String::from("utf8"), String::from("u8")],
+ ],
+ }
}
// ========================================================================
// The old aggregation fuzz tests
// ========================================================================
+
/// Tracks if this stream is generating input or output
/// Tests that streaming aggregate and batch (non streaming) aggregate produce
/// same results
@@ -353,7 +204,7 @@ async fn streaming_aggregate_test() {
vec!["d", "c", "a"],
vec!["d", "c", "b", "a"],
];
- let n = 300;
+ let n = 10;
let distincts = vec![10, 20];
for distinct in distincts {
let mut join_set = JoinSet::new();
diff --git
a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
index 9d45779295..44f96d5a1a 100644
--- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
@@ -48,16 +48,41 @@ use test_utils::{
///
#[derive(Debug, Clone)]
pub struct DatasetGeneratorConfig {
- // Descriptions of columns in datasets, it's `required`
+ /// Descriptions of columns in datasets, it's `required`
pub columns: Vec<ColumnDescr>,
- // Rows num range of the generated datasets, it's `required`
+ /// Rows num range of the generated datasets, it's `required`
pub rows_num_range: (usize, usize),
- // Sort keys used to generate the sorted data set, it's optional
+ /// Additional optional sort keys
+ ///
+ /// The generated datasets always include a non-sorted copy. For each
+ /// element in `sort_keys_set`, an additional datasets is created that
+ /// is sorted by these values as well.
pub sort_keys_set: Vec<Vec<String>>,
}
+impl DatasetGeneratorConfig {
+ /// return a list of all column names
+ pub fn all_columns(&self) -> Vec<&str> {
+ self.columns.iter().map(|d| d.name.as_str()).collect()
+ }
+
+ /// return a list of column names that are "numeric"
+ pub fn numeric_columns(&self) -> Vec<&str> {
+ self.columns
+ .iter()
+ .filter_map(|d| {
+ if d.column_type.is_numeric() {
+ Some(d.name.as_str())
+ } else {
+ None
+ }
+ })
+ .collect()
+ }
+}
+
/// Dataset generator
///
/// It will generate one random [`Dataset`]s when `generate` function is
called.
@@ -96,7 +121,7 @@ impl DatasetGenerator {
pub fn generate(&self) -> Result<Vec<Dataset>> {
let mut datasets = Vec::with_capacity(self.sort_keys_set.len() + 1);
- // Generate the base batch
+ // Generate the base batch (unsorted)
let base_batch = self.batch_generator.generate()?;
let batches = stagger_batch(base_batch.clone());
let dataset = Dataset::new(batches, Vec::new());
@@ -362,7 +387,9 @@ impl RecordBatchGenerator {
DataType::LargeUtf8 => {
generate_string_array!(self, num_rows, batch_gen_rng,
array_gen_rng, i64)
}
- _ => unreachable!(),
+ _ => {
+ panic!("Unsupported data generator type: {data_type}")
+ }
}
}
}
diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
index 6daebc8942..898d1081ff 100644
--- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashSet;
use std::sync::Arc;
use arrow::util::pretty::pretty_format_batches;
@@ -61,7 +62,16 @@ impl AggregationFuzzerBuilder {
}
}
- pub fn add_sql(mut self, sql: &str) -> Self {
+ /// Adds random SQL queries to the fuzzer along with the table name
+ pub fn add_query_builder(mut self, query_builder: QueryBuilder) -> Self {
+ const NUM_QUERIES: usize = 10;
+ for _ in 0..NUM_QUERIES {
+ self = self.add_sql(&query_builder.generate_query());
+ }
+ self.table_name(query_builder.table_name())
+ }
+
+ fn add_sql(mut self, sql: &str) -> Self {
self.candidate_sqls.push(Arc::from(sql));
self
}
@@ -76,11 +86,6 @@ impl AggregationFuzzerBuilder {
self
}
- pub fn data_gen_rounds(mut self, data_gen_rounds: usize) -> Self {
- self.data_gen_rounds = data_gen_rounds;
- self
- }
-
pub fn build(self) -> AggregationFuzzer {
assert!(!self.candidate_sqls.is_empty());
let candidate_sqls = self.candidate_sqls;
@@ -99,12 +104,18 @@ impl AggregationFuzzerBuilder {
}
}
-impl Default for AggregationFuzzerBuilder {
+impl std::default::Default for AggregationFuzzerBuilder {
fn default() -> Self {
Self::new()
}
}
+impl From<DatasetGeneratorConfig> for AggregationFuzzerBuilder {
+ fn from(value: DatasetGeneratorConfig) -> Self {
+ Self::default().data_gen_config(value)
+ }
+}
+
/// AggregationFuzzer randomly generating multiple [`AggregationFuzzTestTask`],
/// and running them to check the correctness of the optimizations
/// (e.g. sorted, partial skipping, spilling...)
@@ -169,6 +180,10 @@ impl AggregationFuzzer {
})
.collect::<Vec<_>>();
+ for q in &query_groups {
+ println!(" Testing with query {}", q.sql);
+ }
+
let tasks = self.generate_fuzz_tasks(query_groups).await;
for task in tasks {
join_set.spawn(async move { task.run().await });
@@ -270,20 +285,27 @@ impl AggregationFuzzTestTask {
check_equality_of_batches(task_result, expected_result).map_err(|e| {
// If we found inconsistent result, we print the test details for
reproducing at first
let message = format!(
- "{}\n\
- ### Inconsistent row:\n\
- - row_idx:{}\n\
- - task_row:{}\n\
- - expected_row:{}\n\
- ### Task total result:\n{}\n\
- ### Expected total result:\n{}\n\
- ",
- self.context_error_report(),
+ "##### AggregationFuzzer error report #####\n\
+ ### Sql:\n{}\n\
+ ### Schema:\n{}\n\
+ ### Session context params:\n{:?}\n\
+ ### Inconsistent row:\n\
+ - row_idx:{}\n\
+ - task_row:{}\n\
+ - expected_row:{}\n\
+ ### Task total result:\n{}\n\
+ ### Expected total result:\n{}\n\
+ ### Input:\n{}\n\
+ ",
+ self.sql,
+ self.dataset_ref.batches[0].schema_ref(),
+ self.ctx_with_params.params,
e.row_idx,
e.lhs_row,
e.rhs_row,
- pretty_format_batches(task_result).unwrap(),
- pretty_format_batches(expected_result).unwrap(),
+ format_batches_with_limit(task_result),
+ format_batches_with_limit(expected_result),
+ format_batches_with_limit(&self.dataset_ref.batches),
);
DataFusionError::Internal(message)
})
@@ -305,3 +327,182 @@ impl AggregationFuzzTestTask {
)
}
}
+
+/// Pretty prints the `RecordBatch`es, limited to the first 100 rows
+fn format_batches_with_limit(batches: &[RecordBatch]) -> impl
std::fmt::Display {
+ const MAX_ROWS: usize = 100;
+ let mut row_count = 0;
+ let to_print = batches
+ .iter()
+ .filter_map(|b| {
+ if row_count >= MAX_ROWS {
+ None
+ } else if row_count + b.num_rows() > MAX_ROWS {
+ // output last rows before limit
+ let slice_len = MAX_ROWS - row_count;
+ let b = b.slice(0, slice_len);
+ row_count += slice_len;
+ Some(b)
+ } else {
+ row_count += b.num_rows();
+ Some(b.clone())
+ }
+ })
+ .collect::<Vec<_>>();
+
+ pretty_format_batches(&to_print).unwrap()
+}
+
+/// Random aggregate query builder
+///
+/// Creates queries like
+/// ```sql
+/// SELECT AGG(..) FROM table_name GROUP BY <group_by_columns>
+///```
+#[derive(Debug, Default)]
+pub struct QueryBuilder {
+ /// The name of the table to query
+ table_name: String,
+ /// Aggregate functions to be used in the query
+ /// (function_name, is_distinct)
+ aggregate_functions: Vec<(String, bool)>,
+ /// Columns to be used in group by
+ group_by_columns: Vec<String>,
+ /// Possible columns for arguments in the aggregate functions
+ ///
+ /// Assumes each
+ arguments: Vec<String>,
+}
+impl QueryBuilder {
+ pub fn new() -> Self {
+ std::default::Default::default()
+ }
+
+ /// return the table name if any
+ pub fn table_name(&self) -> &str {
+ &self.table_name
+ }
+
+ /// Set the table name for the query builder
+ pub fn with_table_name(mut self, table_name: impl Into<String>) -> Self {
+ self.table_name = table_name.into();
+ self
+ }
+
+ /// Add a new possible aggregate function to the query builder
+ pub fn with_aggregate_function(
+ mut self,
+ aggregate_function: impl Into<String>,
+ ) -> Self {
+ self.aggregate_functions
+ .push((aggregate_function.into(), false));
+ self
+ }
+
+ /// Add a new possible `DISTINCT` aggregate function to the query
+ ///
+ /// This is different than `with_aggregate_function` because only certain
+ /// aggregates support `DISTINCT`
+ pub fn with_distinct_aggregate_function(
+ mut self,
+ aggregate_function: impl Into<String>,
+ ) -> Self {
+ self.aggregate_functions
+ .push((aggregate_function.into(), true));
+ self
+ }
+
+ /// Add a column to be used in the group bys
+ pub fn with_group_by_columns<'a>(
+ mut self,
+ group_by: impl IntoIterator<Item = &'a str>,
+ ) -> Self {
+ let group_by = group_by.into_iter().map(String::from);
+ self.group_by_columns.extend(group_by);
+ self
+ }
+
+ /// Add a column to be used as an argument in the aggregate functions
+ pub fn with_aggregate_arguments<'a>(
+ mut self,
+ arguments: impl IntoIterator<Item = &'a str>,
+ ) -> Self {
+ let arguments = arguments.into_iter().map(String::from);
+ self.arguments.extend(arguments);
+ self
+ }
+
+ pub fn generate_query(&self) -> String {
+ let group_by = self.random_group_by();
+ let mut query = String::from("SELECT ");
+ query.push_str(&self.random_aggregate_functions().join(", "));
+ query.push_str(" FROM ");
+ query.push_str(&self.table_name);
+ if !group_by.is_empty() {
+ query.push_str(" GROUP BY ");
+ query.push_str(&group_by.join(", "));
+ }
+ query
+ }
+
+ /// Generate a some random aggregate function invocations (potentially
repeating).
+ ///
+ /// Each aggregate function invocation is of the form
+ ///
+ /// ```sql
+ /// function_name(<DISTINCT> argument) as alias
+ /// ```
+ ///
+ /// where
+ /// * `function_names` are randomly selected from
[`Self::aggregate_functions`]
+ /// * `<DISTINCT> argument` is randomly selected from [`Self::arguments`]
+ /// * `alias` is a unique alias `colN` for the column (to avoid duplicate
column names)
+ fn random_aggregate_functions(&self) -> Vec<String> {
+ const MAX_NUM_FUNCTIONS: usize = 5;
+ let mut rng = thread_rng();
+ let num_aggregate_functions = rng.gen_range(1..MAX_NUM_FUNCTIONS);
+
+ let mut alias_gen = 1;
+
+ let mut aggregate_functions = vec![];
+ while aggregate_functions.len() < num_aggregate_functions {
+ let idx = rng.gen_range(0..self.aggregate_functions.len());
+ let (function_name, is_distinct) = &self.aggregate_functions[idx];
+ let argument = self.random_argument();
+ let alias = format!("col{}", alias_gen);
+ let distinct = if *is_distinct { "DISTINCT " } else { "" };
+ alias_gen += 1;
+ let function = format!("{function_name}({distinct}{argument}) as
{alias}");
+ aggregate_functions.push(function);
+ }
+ aggregate_functions
+ }
+
+ /// Pick a random aggregate function argument
+ fn random_argument(&self) -> String {
+ let mut rng = thread_rng();
+ let idx = rng.gen_range(0..self.arguments.len());
+ self.arguments[idx].clone()
+ }
+
+ /// Pick a random number of fields to group by (non-repeating)
+ ///
+ /// Limited to 3 group by columns to ensure coverage for large groups. With
+ /// larger numbers of columns, each group has many fewer values.
+ fn random_group_by(&self) -> Vec<String> {
+ let mut rng = thread_rng();
+ const MAX_GROUPS: usize = 3;
+ let max_groups = self.group_by_columns.len().max(MAX_GROUPS);
+ let num_group_by = rng.gen_range(1..max_groups);
+
+ let mut already_used = HashSet::new();
+ let mut group_by = vec![];
+ while group_by.len() < num_group_by {
+ let idx = rng.gen_range(0..self.group_by_columns.len());
+ if already_used.insert(idx) {
+ group_by.push(self.group_by_columns[idx].clone());
+ }
+ }
+ group_by
+ }
+}
diff --git a/test-utils/src/string_gen.rs b/test-utils/src/string_gen.rs
index 725eb22b85..b598241db1 100644
--- a/test-utils/src/string_gen.rs
+++ b/test-utils/src/string_gen.rs
@@ -62,7 +62,7 @@ impl StringBatchGenerator {
let mut cases = vec![];
let mut rng = thread_rng();
for null_pct in [0.0, 0.01, 0.1, 0.5] {
- for _ in 0..100 {
+ for _ in 0..10 {
// max length of generated strings
let max_len = rng.gen_range(1..50);
let num_strings = rng.gen_range(1..100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]