This is an automated email from the ASF dual-hosted git repository.

goldmedal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c22abb4ac3 Improve `AggregateFuzz` testing: generate random queries 
(#12847)
c22abb4ac3 is described below

commit c22abb4ac3f1af8bbdf176ef0198988fc7b0982c
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Oct 22 05:43:21 2024 -0400

    Improve `AggregateFuzz` testing: generate random queries (#12847)
    
    * Add random queries into aggregate fuzz tester
    
    * Address review comments
    
    * Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
    
    Co-authored-by: Jax Liu <[email protected]>
    
    * Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
    
    Co-authored-by: Jax Liu <[email protected]>
    
    ---------
    
    Co-authored-by: Jax Liu <[email protected]>
---
 datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 385 +++++++--------------
 .../aggregation_fuzzer/data_generator.rs           |  37 +-
 .../tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs  | 237 ++++++++++++-
 test-utils/src/string_gen.rs                       |   2 +-
 4 files changed, 370 insertions(+), 291 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index ff51282933..1035fa31da 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -45,299 +45,150 @@ use rand::{Rng, SeedableRng};
 use tokio::task::JoinSet;
 
 use crate::fuzz_cases::aggregation_fuzzer::{
-    AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig,
+    AggregationFuzzerBuilder, ColumnDescr, DatasetGeneratorConfig, 
QueryBuilder,
 };
 
 // ========================================================================
 //  The new aggregation fuzz tests based on [`AggregationFuzzer`]
 // ========================================================================
-
-// TODO: write more test case to cover more `group by`s and `aggregation 
function`s
-// TODO: maybe we can use macro to simply the case creating
-
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `no 
group by`
+//
+// Notes on tests:
+//
+// Since the supported types differ for each aggregation function, the tests
+// below are structured so they enumerate each different aggregate function.
+//
+// The test framework handles varying combinations of arguments (data types),
+// sortedness, and grouping parameters
+//
+// TODO: Test floating point values (where output needs to be compared with 
some
+// acceptable range due to floating point rounding)
+//
+// TODO: test other aggregate functions
+// - AVG (unstable given the wide range of inputs)
+//
+// TODO: specific test for ordering (ensure all group by columns are ordered)
+//  Currently the data is sorted by random columns, so there are almost no
+//  repeated runs. To improve coverage we should also sort by lower 
cardinality columns
 #[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_no_group() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![ColumnDescr::new("a", DataType::Int32)];
-
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set: Vec::new(),
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(16)
-        .add_sql("SELECT sum(a) FROM fuzz_table")
-        .add_sql("SELECT sum(distinct a) FROM fuzz_table")
-        .add_sql("SELECT max(a) FROM fuzz_table")
-        .add_sql("SELECT min(a) FROM fuzz_table")
-        .add_sql("SELECT count(a) FROM fuzz_table")
-        .add_sql("SELECT count(distinct a) FROM fuzz_table")
-        .add_sql("SELECT avg(a) FROM fuzz_table")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await
+async fn test_min() {
+    let data_gen_config = baseline_config();
+
+    // Queries like SELECT min(a) FROM fuzz_table GROUP BY b
+    let query_builder = QueryBuilder::new()
+        .with_table_name("fuzz_table")
+        .with_aggregate_function("min")
+        // min works on all column types
+        .with_aggregate_arguments(data_gen_config.all_columns())
+        .with_group_by_columns(data_gen_config.all_columns());
+
+    AggregationFuzzerBuilder::from(data_gen_config)
+        .add_query_builder(query_builder)
+        .build()
+        .run()
+        .await;
 }
 
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + 
`group by single int64`
 #[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_group_by_single_int64() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![
-        ColumnDescr::new("a", DataType::Int32),
-        ColumnDescr::new("b", DataType::Int64),
-        ColumnDescr::new("c", DataType::Int64),
-    ];
-    let sort_keys_set = vec![
-        vec!["b".to_string()],
-        vec!["c".to_string(), "b".to_string()],
-    ];
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set,
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(16)
-        .add_sql("SELECT b, sum(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, sum(distinct a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, avg(a) FROM fuzz_table GROUP BY b")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
+async fn test_max() {
+    let data_gen_config = baseline_config();
+
+    // Queries like SELECT max(a) FROM fuzz_table GROUP BY b
+    let query_builder = QueryBuilder::new()
+        .with_table_name("fuzz_table")
+        .with_aggregate_function("max")
+        // max works on all column types
+        .with_aggregate_arguments(data_gen_config.all_columns())
+        .with_group_by_columns(data_gen_config.all_columns());
+
+    AggregationFuzzerBuilder::from(data_gen_config)
+        .add_query_builder(query_builder)
+        .build()
+        .run()
+        .await;
 }
 
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + 
`group by single string`
 #[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_group_by_single_string() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![
-        ColumnDescr::new("a", DataType::Int32),
-        ColumnDescr::new("b", DataType::Utf8),
-        ColumnDescr::new("c", DataType::Int64),
-    ];
-    let sort_keys_set = vec![
-        vec!["b".to_string()],
-        vec!["c".to_string(), "b".to_string()],
-    ];
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set,
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(16)
-        .add_sql("SELECT b, sum(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, sum(distinct a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, avg(a) FROM fuzz_table GROUP BY b")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
+async fn test_sum() {
+    let data_gen_config = baseline_config();
+
+    // Queries like SELECT sum(a), sum(distinct) FROM fuzz_table GROUP BY b
+    let query_builder = QueryBuilder::new()
+        .with_table_name("fuzz_table")
+        .with_aggregate_function("sum")
+        .with_distinct_aggregate_function("sum")
+        // sum only works on numeric columns
+        .with_aggregate_arguments(data_gen_config.numeric_columns())
+        .with_group_by_columns(data_gen_config.all_columns());
+
+    AggregationFuzzerBuilder::from(data_gen_config)
+        .add_query_builder(query_builder)
+        .build()
+        .run()
+        .await;
 }
 
-/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + 
`group by string + int64`
 #[tokio::test(flavor = "multi_thread")]
-async fn test_basic_prim_aggr_group_by_mixed_string_int64() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![
-        ColumnDescr::new("a", DataType::Int32),
-        ColumnDescr::new("b", DataType::Utf8),
-        ColumnDescr::new("c", DataType::Int64),
-        ColumnDescr::new("d", DataType::Int32),
-    ];
-    let sort_keys_set = vec![
-        vec!["b".to_string(), "c".to_string()],
-        vec!["d".to_string(), "b".to_string(), "c".to_string()],
-    ];
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set,
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(16)
-        .add_sql("SELECT b, c, sum(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, sum(distinct a) FROM fuzz_table GROUP BY b,c")
-        .add_sql("SELECT b, c, max(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, min(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, count(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, count(distinct a) FROM fuzz_table GROUP BY b, 
c")
-        .add_sql("SELECT b, c, avg(a) FROM fuzz_table GROUP BY b, c")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
+async fn test_count() {
+    let data_gen_config = baseline_config();
+
+    // Queries like SELECT count(a), count(distinct) FROM fuzz_table GROUP BY b
+    let query_builder = QueryBuilder::new()
+        .with_table_name("fuzz_table")
+        .with_aggregate_function("count")
+        .with_distinct_aggregate_function("count")
+        // count work for all arguments
+        .with_aggregate_arguments(data_gen_config.all_columns())
+        .with_group_by_columns(data_gen_config.all_columns());
+
+    AggregationFuzzerBuilder::from(data_gen_config)
+        .add_query_builder(query_builder)
+        .build()
+        .run()
+        .await;
 }
 
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `no 
group by`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_no_group() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![ColumnDescr::new("a", DataType::Utf8)];
-
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set: Vec::new(),
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(8)
-        .add_sql("SELECT max(a) FROM fuzz_table")
-        .add_sql("SELECT min(a) FROM fuzz_table")
-        .add_sql("SELECT count(a) FROM fuzz_table")
-        .add_sql("SELECT count(distinct a) FROM fuzz_table")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
-}
-
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group 
by single int64`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_group_by_single_int64() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![
-        ColumnDescr::new("a", DataType::Utf8),
-        ColumnDescr::new("b", DataType::Int64),
-        ColumnDescr::new("c", DataType::Int64),
-    ];
-    let sort_keys_set = vec![
-        vec!["b".to_string()],
-        vec!["c".to_string(), "b".to_string()],
-    ];
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set,
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(8)
-        .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
-}
-
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group 
by single string`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_group_by_single_string() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
+/// Return a standard set of columns for testing data generation
+///
+/// Includes numeric and string types
+///
+/// Does not include:
+/// 1. Floating point numbers
+/// 1. structured types
+fn baseline_config() -> DatasetGeneratorConfig {
     let columns = vec![
-        ColumnDescr::new("a", DataType::Utf8),
-        ColumnDescr::new("b", DataType::Utf8),
-        ColumnDescr::new("c", DataType::Int64),
-    ];
-    let sort_keys_set = vec![
-        vec!["b".to_string()],
-        vec!["c".to_string(), "b".to_string()],
+        ColumnDescr::new("i8", DataType::Int8),
+        ColumnDescr::new("i16", DataType::Int16),
+        ColumnDescr::new("i32", DataType::Int32),
+        ColumnDescr::new("i64", DataType::Int64),
+        ColumnDescr::new("u8", DataType::UInt8),
+        ColumnDescr::new("u16", DataType::UInt16),
+        ColumnDescr::new("u32", DataType::UInt32),
+        ColumnDescr::new("u64", DataType::UInt64),
+        // TODO: date/time columns
+        // todo decimal columns
+        // begin string columns
+        ColumnDescr::new("utf8", DataType::Utf8),
+        ColumnDescr::new("largeutf8", DataType::LargeUtf8),
+        // TODO add support for utf8view in data generator
+        // ColumnDescr::new("utf8view", DataType::Utf8View),
+        // todo binary
     ];
-    let data_gen_config = DatasetGeneratorConfig {
-        columns,
-        rows_num_range: (512, 1024),
-        sort_keys_set,
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(16)
-        .add_sql("SELECT b, max(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, min(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(a) FROM fuzz_table GROUP BY b")
-        .add_sql("SELECT b, count(distinct a) FROM fuzz_table GROUP BY b")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
-}
 
-/// Fuzz test for `basic string aggr(count/count distinct/min/max)` + `group 
by string + int64`
-#[tokio::test(flavor = "multi_thread")]
-async fn test_basic_string_aggr_group_by_mixed_string_int64() {
-    let builder = AggregationFuzzerBuilder::default();
-
-    // Define data generator config
-    let columns = vec![
-        ColumnDescr::new("a", DataType::Utf8),
-        ColumnDescr::new("b", DataType::Utf8),
-        ColumnDescr::new("c", DataType::Int64),
-        ColumnDescr::new("d", DataType::Int32),
-    ];
-    let sort_keys_set = vec![
-        vec!["b".to_string(), "c".to_string()],
-        vec!["d".to_string(), "b".to_string(), "c".to_string()],
-    ];
-    let data_gen_config = DatasetGeneratorConfig {
+    DatasetGeneratorConfig {
         columns,
         rows_num_range: (512, 1024),
-        sort_keys_set,
-    };
-
-    // Build fuzzer
-    let fuzzer = builder
-        .data_gen_config(data_gen_config)
-        .data_gen_rounds(16)
-        .add_sql("SELECT b, c, max(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, min(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, count(a) FROM fuzz_table GROUP BY b, c")
-        .add_sql("SELECT b, c, count(distinct a) FROM fuzz_table GROUP BY b, 
c")
-        .table_name("fuzz_table")
-        .build();
-
-    fuzzer.run().await;
+        sort_keys_set: vec![
+            // low cardinality to try and get many repeated runs
+            vec![String::from("u8")],
+            vec![String::from("utf8"), String::from("u8")],
+        ],
+    }
 }
 
 // ========================================================================
 //  The old aggregation fuzz tests
 // ========================================================================
+
 /// Tracks if this stream is generating input or output
 /// Tests that streaming aggregate and batch (non streaming) aggregate produce
 /// same results
@@ -353,7 +204,7 @@ async fn streaming_aggregate_test() {
         vec!["d", "c", "a"],
         vec!["d", "c", "b", "a"],
     ];
-    let n = 300;
+    let n = 10;
     let distincts = vec![10, 20];
     for distinct in distincts {
         let mut join_set = JoinSet::new();
diff --git 
a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs 
b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
index 9d45779295..44f96d5a1a 100644
--- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/data_generator.rs
@@ -48,16 +48,41 @@ use test_utils::{
 ///
 #[derive(Debug, Clone)]
 pub struct DatasetGeneratorConfig {
-    // Descriptions of columns in datasets, it's `required`
+    /// Descriptions of columns in datasets, it's `required`
     pub columns: Vec<ColumnDescr>,
 
-    // Rows num range of the generated datasets, it's `required`
+    /// Rows num range of the generated datasets, it's `required`
     pub rows_num_range: (usize, usize),
 
-    // Sort keys used to generate the sorted data set, it's optional
+    /// Additional optional sort keys
+    ///
+    /// The generated datasets always include a non-sorted copy. For each
+    /// element in `sort_keys_set`, an additional datasets is created that
+    /// is sorted by these values as well.
     pub sort_keys_set: Vec<Vec<String>>,
 }
 
+impl DatasetGeneratorConfig {
+    /// return a list of all column names
+    pub fn all_columns(&self) -> Vec<&str> {
+        self.columns.iter().map(|d| d.name.as_str()).collect()
+    }
+
+    /// return a list of column names that are "numeric"
+    pub fn numeric_columns(&self) -> Vec<&str> {
+        self.columns
+            .iter()
+            .filter_map(|d| {
+                if d.column_type.is_numeric() {
+                    Some(d.name.as_str())
+                } else {
+                    None
+                }
+            })
+            .collect()
+    }
+}
+
 /// Dataset generator
 ///
 /// It will generate one random [`Dataset`]s when `generate` function is 
called.
@@ -96,7 +121,7 @@ impl DatasetGenerator {
     pub fn generate(&self) -> Result<Vec<Dataset>> {
         let mut datasets = Vec::with_capacity(self.sort_keys_set.len() + 1);
 
-        // Generate the base batch
+        // Generate the base batch (unsorted)
         let base_batch = self.batch_generator.generate()?;
         let batches = stagger_batch(base_batch.clone());
         let dataset = Dataset::new(batches, Vec::new());
@@ -362,7 +387,9 @@ impl RecordBatchGenerator {
             DataType::LargeUtf8 => {
                 generate_string_array!(self, num_rows, batch_gen_rng, 
array_gen_rng, i64)
             }
-            _ => unreachable!(),
+            _ => {
+                panic!("Unsupported data generator type: {data_type}")
+            }
         }
     }
 }
diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs 
b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
index 6daebc8942..898d1081ff 100644
--- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::HashSet;
 use std::sync::Arc;
 
 use arrow::util::pretty::pretty_format_batches;
@@ -61,7 +62,16 @@ impl AggregationFuzzerBuilder {
         }
     }
 
-    pub fn add_sql(mut self, sql: &str) -> Self {
+    /// Adds random SQL queries to the fuzzer along with the table name
+    pub fn add_query_builder(mut self, query_builder: QueryBuilder) -> Self {
+        const NUM_QUERIES: usize = 10;
+        for _ in 0..NUM_QUERIES {
+            self = self.add_sql(&query_builder.generate_query());
+        }
+        self.table_name(query_builder.table_name())
+    }
+
+    fn add_sql(mut self, sql: &str) -> Self {
         self.candidate_sqls.push(Arc::from(sql));
         self
     }
@@ -76,11 +86,6 @@ impl AggregationFuzzerBuilder {
         self
     }
 
-    pub fn data_gen_rounds(mut self, data_gen_rounds: usize) -> Self {
-        self.data_gen_rounds = data_gen_rounds;
-        self
-    }
-
     pub fn build(self) -> AggregationFuzzer {
         assert!(!self.candidate_sqls.is_empty());
         let candidate_sqls = self.candidate_sqls;
@@ -99,12 +104,18 @@ impl AggregationFuzzerBuilder {
     }
 }
 
-impl Default for AggregationFuzzerBuilder {
+impl std::default::Default for AggregationFuzzerBuilder {
     fn default() -> Self {
         Self::new()
     }
 }
 
+impl From<DatasetGeneratorConfig> for AggregationFuzzerBuilder {
+    fn from(value: DatasetGeneratorConfig) -> Self {
+        Self::default().data_gen_config(value)
+    }
+}
+
 /// AggregationFuzzer randomly generating multiple [`AggregationFuzzTestTask`],
 /// and running them to check the correctness of the optimizations
 /// (e.g. sorted, partial skipping, spilling...)
@@ -169,6 +180,10 @@ impl AggregationFuzzer {
                 })
                 .collect::<Vec<_>>();
 
+            for q in &query_groups {
+                println!(" Testing with query {}", q.sql);
+            }
+
             let tasks = self.generate_fuzz_tasks(query_groups).await;
             for task in tasks {
                 join_set.spawn(async move { task.run().await });
@@ -270,20 +285,27 @@ impl AggregationFuzzTestTask {
         check_equality_of_batches(task_result, expected_result).map_err(|e| {
             // If we found inconsistent result, we print the test details for 
reproducing at first
             let message = format!(
-                "{}\n\
-                     ### Inconsistent row:\n\
-                     - row_idx:{}\n\
-                     - task_row:{}\n\
-                     - expected_row:{}\n\
-                     ### Task total result:\n{}\n\
-                     ### Expected total result:\n{}\n\
-                     ",
-                self.context_error_report(),
+                "##### AggregationFuzzer error report #####\n\
+                 ### Sql:\n{}\n\
+                 ### Schema:\n{}\n\
+                 ### Session context params:\n{:?}\n\
+                 ### Inconsistent row:\n\
+                 - row_idx:{}\n\
+                 - task_row:{}\n\
+                 - expected_row:{}\n\
+                 ### Task total result:\n{}\n\
+                 ### Expected total result:\n{}\n\
+                 ### Input:\n{}\n\
+                 ",
+                self.sql,
+                self.dataset_ref.batches[0].schema_ref(),
+                self.ctx_with_params.params,
                 e.row_idx,
                 e.lhs_row,
                 e.rhs_row,
-                pretty_format_batches(task_result).unwrap(),
-                pretty_format_batches(expected_result).unwrap(),
+                format_batches_with_limit(task_result),
+                format_batches_with_limit(expected_result),
+                format_batches_with_limit(&self.dataset_ref.batches),
             );
             DataFusionError::Internal(message)
         })
@@ -305,3 +327,182 @@ impl AggregationFuzzTestTask {
         )
     }
 }
+
+/// Pretty prints the `RecordBatch`es, limited to the first 100 rows
+fn format_batches_with_limit(batches: &[RecordBatch]) -> impl 
std::fmt::Display {
+    const MAX_ROWS: usize = 100;
+    let mut row_count = 0;
+    let to_print = batches
+        .iter()
+        .filter_map(|b| {
+            if row_count >= MAX_ROWS {
+                None
+            } else if row_count + b.num_rows() > MAX_ROWS {
+                // output last rows before limit
+                let slice_len = MAX_ROWS - row_count;
+                let b = b.slice(0, slice_len);
+                row_count += slice_len;
+                Some(b)
+            } else {
+                row_count += b.num_rows();
+                Some(b.clone())
+            }
+        })
+        .collect::<Vec<_>>();
+
+    pretty_format_batches(&to_print).unwrap()
+}
+
+/// Random aggregate query builder
+///
+/// Creates queries like
+/// ```sql
+/// SELECT AGG(..) FROM table_name GROUP BY <group_by_columns>
+///```
+#[derive(Debug, Default)]
+pub struct QueryBuilder {
+    /// The name of the table to query
+    table_name: String,
+    /// Aggregate functions to be used in the query
+    /// (function_name, is_distinct)
+    aggregate_functions: Vec<(String, bool)>,
+    /// Columns to be used in group by
+    group_by_columns: Vec<String>,
+    /// Possible columns for arguments in the aggregate functions
+    ///
+    /// Assumes each
+    arguments: Vec<String>,
+}
+impl QueryBuilder {
+    pub fn new() -> Self {
+        std::default::Default::default()
+    }
+
+    /// return the table name if any
+    pub fn table_name(&self) -> &str {
+        &self.table_name
+    }
+
+    /// Set the table name for the query builder
+    pub fn with_table_name(mut self, table_name: impl Into<String>) -> Self {
+        self.table_name = table_name.into();
+        self
+    }
+
+    /// Add a new possible aggregate function to the query builder
+    pub fn with_aggregate_function(
+        mut self,
+        aggregate_function: impl Into<String>,
+    ) -> Self {
+        self.aggregate_functions
+            .push((aggregate_function.into(), false));
+        self
+    }
+
+    /// Add a new possible `DISTINCT` aggregate function to the query
+    ///
+    /// This is different than `with_aggregate_function` because only certain
+    /// aggregates support `DISTINCT`
+    pub fn with_distinct_aggregate_function(
+        mut self,
+        aggregate_function: impl Into<String>,
+    ) -> Self {
+        self.aggregate_functions
+            .push((aggregate_function.into(), true));
+        self
+    }
+
+    /// Add a column to be used in the group bys
+    pub fn with_group_by_columns<'a>(
+        mut self,
+        group_by: impl IntoIterator<Item = &'a str>,
+    ) -> Self {
+        let group_by = group_by.into_iter().map(String::from);
+        self.group_by_columns.extend(group_by);
+        self
+    }
+
+    /// Add a column to be used as an argument in the aggregate functions
+    pub fn with_aggregate_arguments<'a>(
+        mut self,
+        arguments: impl IntoIterator<Item = &'a str>,
+    ) -> Self {
+        let arguments = arguments.into_iter().map(String::from);
+        self.arguments.extend(arguments);
+        self
+    }
+
+    pub fn generate_query(&self) -> String {
+        let group_by = self.random_group_by();
+        let mut query = String::from("SELECT ");
+        query.push_str(&self.random_aggregate_functions().join(", "));
+        query.push_str(" FROM ");
+        query.push_str(&self.table_name);
+        if !group_by.is_empty() {
+            query.push_str(" GROUP BY ");
+            query.push_str(&group_by.join(", "));
+        }
+        query
+    }
+
+    /// Generate a some random aggregate function invocations (potentially 
repeating).
+    ///
+    /// Each aggregate function invocation is of the form
+    ///
+    /// ```sql
+    /// function_name(<DISTINCT> argument) as alias
+    /// ```
+    ///
+    /// where
+    /// * `function_names` are randomly selected from 
[`Self::aggregate_functions`]
+    /// * `<DISTINCT> argument` is randomly selected from [`Self::arguments`]
+    /// * `alias` is a unique alias `colN` for the column (to avoid duplicate 
column names)
+    fn random_aggregate_functions(&self) -> Vec<String> {
+        const MAX_NUM_FUNCTIONS: usize = 5;
+        let mut rng = thread_rng();
+        let num_aggregate_functions = rng.gen_range(1..MAX_NUM_FUNCTIONS);
+
+        let mut alias_gen = 1;
+
+        let mut aggregate_functions = vec![];
+        while aggregate_functions.len() < num_aggregate_functions {
+            let idx = rng.gen_range(0..self.aggregate_functions.len());
+            let (function_name, is_distinct) = &self.aggregate_functions[idx];
+            let argument = self.random_argument();
+            let alias = format!("col{}", alias_gen);
+            let distinct = if *is_distinct { "DISTINCT " } else { "" };
+            alias_gen += 1;
+            let function = format!("{function_name}({distinct}{argument}) as 
{alias}");
+            aggregate_functions.push(function);
+        }
+        aggregate_functions
+    }
+
+    /// Pick a random aggregate function argument
+    fn random_argument(&self) -> String {
+        let mut rng = thread_rng();
+        let idx = rng.gen_range(0..self.arguments.len());
+        self.arguments[idx].clone()
+    }
+
+    /// Pick a random number of fields to group by (non-repeating)
+    ///
+    /// Limited to 3 group by columns to ensure coverage for large groups. With
+    /// larger numbers of columns, each group has many fewer values.
+    fn random_group_by(&self) -> Vec<String> {
+        let mut rng = thread_rng();
+        const MAX_GROUPS: usize = 3;
+        let max_groups = self.group_by_columns.len().max(MAX_GROUPS);
+        let num_group_by = rng.gen_range(1..max_groups);
+
+        let mut already_used = HashSet::new();
+        let mut group_by = vec![];
+        while group_by.len() < num_group_by {
+            let idx = rng.gen_range(0..self.group_by_columns.len());
+            if already_used.insert(idx) {
+                group_by.push(self.group_by_columns[idx].clone());
+            }
+        }
+        group_by
+    }
+}
diff --git a/test-utils/src/string_gen.rs b/test-utils/src/string_gen.rs
index 725eb22b85..b598241db1 100644
--- a/test-utils/src/string_gen.rs
+++ b/test-utils/src/string_gen.rs
@@ -62,7 +62,7 @@ impl StringBatchGenerator {
         let mut cases = vec![];
         let mut rng = thread_rng();
         for null_pct in [0.0, 0.01, 0.1, 0.5] {
-            for _ in 0..100 {
+            for _ in 0..10 {
                 // max length of generated strings
                 let max_len = rng.gen_range(1..50);
                 let num_strings = rng.gen_range(1..100);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to