alamb commented on code in PR #9190:
URL: https://github.com/apache/arrow-datafusion/pull/9190#discussion_r1485193590


##########
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs:
##########
@@ -50,18 +61,18 @@ async fn aggregate_test() {
     let n = 300;
     let distincts = vec![10, 20];
     for distinct in distincts {
-        let mut handles = Vec::new();
+        let mut join_set = JoinSet::new();

Review Comment:
   Using JoinSet for consistency -- which automatically cancels outstanding 
tasks on panic



##########
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs:
##########
@@ -22,21 +22,32 @@ use arrow::compute::{concat_batches, SortOptions};
 use arrow::datatypes::DataType;
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches;
-use datafusion::physical_plan::aggregates::{
-    AggregateExec, AggregateMode, PhysicalGroupBy,
-};
+use arrow_array::cast::AsArray;
+use arrow_array::types::Int64Type;
+use arrow_array::Array;
+use hashbrown::HashMap;
 use rand::rngs::StdRng;
 use rand::{Rng, SeedableRng};
+use tokio::task::JoinSet;
 
+use datafusion::common::Result;
+use datafusion::datasource::MemTable;
+use datafusion::physical_plan::aggregates::{
+    AggregateExec, AggregateMode, PhysicalGroupBy,
+};
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::{collect, displayable, ExecutionPlan};
-use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion::prelude::{DataFrame, SessionConfig, SessionContext};
+use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
 use datafusion_physical_expr::expressions::{col, Sum};
 use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
-use test_utils::add_empty_batches;
+use datafusion_physical_plan::InputOrderMode;
+use test_utils::{add_empty_batches, StringBatchGenerator};
 
-#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
-async fn aggregate_test() {
+/// Tests that streaming aggregate and batch (non streaming) aggregate produce
+/// same results
+#[tokio::test(flavor = "multi_thread")]

Review Comment:
   There is no reason to limit this to 8 threads that I know of. Using 
multi-thread uses more cores if available



##########
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs:
##########
@@ -234,3 +245,158 @@ pub(crate) fn make_staggered_batches<const STREAM: bool>(
     }
     add_empty_batches(batches, &mut rng)
 }
+
+/// Test group by with string/large string columns
+#[tokio::test(flavor = "multi_thread")]
+async fn group_by_strings() {
+    let mut join_set = JoinSet::new();
+    for large in [true, false] {
+        for sorted in [true, false] {
+            for generator in StringBatchGenerator::interesting_cases() {
+                join_set.spawn(group_by_string_test(generator, sorted, large));
+            }
+        }
+    }
+    while let Some(join_handle) = join_set.join_next().await {
+        // propagate errors
+        join_handle.unwrap();
+    }
+}
+
+/// Run GROUP BY <x> using SQL and ensure the results are correct
+///
+/// If sorted is true, the input batches will be sorted by the group by column
+/// to test the streaming group by case
+///
+/// if large is true, the input batches will be LargeStringArray
+async fn group_by_string_test(

Review Comment:
   here is the new test 



##########
datafusion/core/tests/fuzz_cases/distinct_count_string_fuzz.rs:
##########
@@ -19,43 +19,22 @@
 
 use std::sync::Arc;
 
-use arrow::array::ArrayRef;
 use arrow::record_batch::RecordBatch;
-use arrow_array::{Array, GenericStringArray, OffsetSizeTrait, UInt32Array};
+use arrow_array::{Array, OffsetSizeTrait};
 
 use arrow_array::cast::AsArray;
 use datafusion::datasource::MemTable;
-use rand::rngs::StdRng;
-use rand::{thread_rng, Rng, SeedableRng};
 use std::collections::HashSet;
 use tokio::task::JoinSet;
 
 use datafusion::prelude::{SessionConfig, SessionContext};
-use test_utils::stagger_batch;
+use test_utils::StringBatchGenerator;
 
 #[tokio::test(flavor = "multi_thread")]
 async fn distinct_count_string_test() {
-    // max length of generated strings
     let mut join_set = JoinSet::new();
-    let mut rng = thread_rng();
-    for null_pct in [0.0, 0.01, 0.1, 0.5] {
-        for _ in 0..100 {
-            let max_len = rng.gen_range(1..50);
-            let num_strings = rng.gen_range(1..100);
-            let num_distinct_strings = if num_strings > 1 {
-                rng.gen_range(1..num_strings)
-            } else {
-                num_strings
-            };
-            let generator = BatchGenerator {
-                max_len,
-                num_strings,
-                num_distinct_strings,
-                null_pct,
-                rng: StdRng::from_seed(rng.gen()),
-            };
-            join_set.spawn(async move { 
run_distinct_count_test(generator).await });
-        }
+    for generator in StringBatchGenerator::interesting_cases() {

Review Comment:
   Refactored StringBatchGenerator into a common location to share



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to