martin-g commented on code in PR #21575:
URL: https://github.com/apache/datafusion/pull/21575#discussion_r3079916419


##########
datafusion/functions-aggregate/benches/count_distinct.rs:
##########
@@ -150,5 +174,101 @@ fn count_distinct_benchmark(c: &mut Criterion) {
     });
 }
 
-criterion_group!(benches, count_distinct_benchmark);
+/// Create group indices with uniform distribution
+fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    (0..BATCH_SIZE)
+        .map(|_| rng.random_range(0..num_groups))
+        .collect()
+}
+
+/// Create group indices with skewed distribution (80% in 20% of groups)
+fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    let hot_groups = (num_groups / 5).max(1);
+    (0..BATCH_SIZE)
+        .map(|_| {
+            if rng.random_range(0..100) < 80 {
+                rng.random_range(0..hot_groups)
+            } else {
+                rng.random_range(0..num_groups)
+            }
+        })
+        .collect()
+}
+
+fn count_distinct_groups_benchmark(c: &mut Criterion) {
+    let count_fn = Count::new();
+
+    // bench different scenarios
+    let scenarios = [
+        // (name, num_groups, distinct_pct, group_fn)
+        ("sparse_uniform", 10, 80, "uniform"),
+        ("moderate_uniform", 100, 80, "uniform"),
+        ("dense_uniform", 1000, 80, "uniform"),
+        ("sparse_skewed", 10, 80, "skewed"),
+        ("dense_skewed", 1000, 80, "skewed"),
+        ("sparse_high_cardinality", 10, 99, "uniform"),
+        ("dense_low_cardinality", 1000, 20, "uniform"),
+    ];
+
+    for (name, num_groups, distinct_pct, group_type) in scenarios {
+        let n_distinct = BATCH_SIZE * distinct_pct / 100;
+        let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
+        let group_indices = if group_type == "uniform" {
+            create_uniform_groups(num_groups)
+        } else {
+            create_skewed_groups(num_groups)
+        };
+
+        let (_schema, args) = prepare_args(DataType::Int64);
+
+        if count_fn.groups_accumulator_supported(args.clone()) {
+            c.bench_function(&format!("count_distinct_groups {name}"), |b| {
+                b.iter(|| {
+                    let mut acc =
+                        
count_fn.create_groups_accumulator(args.clone()).unwrap();
+                    acc.update_batch(
+                        std::slice::from_ref(&values),
+                        &group_indices,
+                        None,
+                        num_groups,
+                    )
+                    .unwrap();
+                    acc.evaluate(EmitTo::All).unwrap()
+                })
+            });
+        } else {
+            c.bench_function(&format!("count_distinct_groups {name}"), |b| {
+                b.iter(|| {
+                    let mut accumulators: Vec<_> = (0..num_groups)
+                        .map(|_| prepare_accumulator(DataType::Int64))
+                        .collect();
+
+                    let arr = 
values.as_any().downcast_ref::<Int64Array>().unwrap();
+                    for (idx, group_idx) in group_indices.iter().enumerate() {
+                        if let Some(val) = arr.value(idx).into() {
+                            let single_val =
+                                Arc::new(Int64Array::from(vec![Some(val)])) as 
ArrayRef;
+                            accumulators[*group_idx]
+                                
.update_batch(std::slice::from_ref(&single_val))
+                                .unwrap();
+                        }

Review Comment:
   ```suggestion
                           let single_val = values.slice(idx, 1);
                           accumulators[*group_idx]
                               .update_batch(std::slice::from_ref(&single_val))
                               .unwrap();
   ```
   slightly simpler and would avoid the allocation of the single valued array 
for each row



##########
datafusion/functions-aggregate/benches/count_distinct.rs:
##########
@@ -150,5 +174,101 @@ fn count_distinct_benchmark(c: &mut Criterion) {
     });
 }
 
-criterion_group!(benches, count_distinct_benchmark);
+/// Create group indices with uniform distribution
+fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    (0..BATCH_SIZE)
+        .map(|_| rng.random_range(0..num_groups))
+        .collect()
+}
+
+/// Create group indices with skewed distribution (80% in 20% of groups)
+fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    let hot_groups = (num_groups / 5).max(1);
+    (0..BATCH_SIZE)
+        .map(|_| {
+            if rng.random_range(0..100) < 80 {
+                rng.random_range(0..hot_groups)
+            } else {
+                rng.random_range(0..num_groups)
+            }
+        })
+        .collect()
+}
+
+fn count_distinct_groups_benchmark(c: &mut Criterion) {
+    let count_fn = Count::new();
+
+    // bench different scenarios
+    let scenarios = [
+        // (name, num_groups, distinct_pct, group_fn)
+        ("sparse_uniform", 10, 80, "uniform"),
+        ("moderate_uniform", 100, 80, "uniform"),
+        ("dense_uniform", 1000, 80, "uniform"),
+        ("sparse_skewed", 10, 80, "skewed"),
+        ("dense_skewed", 1000, 80, "skewed"),
+        ("sparse_high_cardinality", 10, 99, "uniform"),
+        ("dense_low_cardinality", 1000, 20, "uniform"),
+    ];
+
+    for (name, num_groups, distinct_pct, group_type) in scenarios {
+        let n_distinct = BATCH_SIZE * distinct_pct / 100;
+        let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
+        let group_indices = if group_type == "uniform" {
+            create_uniform_groups(num_groups)
+        } else {
+            create_skewed_groups(num_groups)
+        };
+
+        let (_schema, args) = prepare_args(DataType::Int64);
+
+        if count_fn.groups_accumulator_supported(args.clone()) {
+            c.bench_function(&format!("count_distinct_groups {name}"), |b| {
+                b.iter(|| {
+                    let mut acc =
+                        
count_fn.create_groups_accumulator(args.clone()).unwrap();
+                    acc.update_batch(
+                        std::slice::from_ref(&values),
+                        &group_indices,
+                        None,
+                        num_groups,
+                    )
+                    .unwrap();
+                    acc.evaluate(EmitTo::All).unwrap()
+                })
+            });
+        } else {
+            c.bench_function(&format!("count_distinct_groups {name}"), |b| {
+                b.iter(|| {
+                    let mut accumulators: Vec<_> = (0..num_groups)
+                        .map(|_| prepare_accumulator(DataType::Int64))
+                        .collect();
+
+                    let arr = 
values.as_any().downcast_ref::<Int64Array>().unwrap();
+                    for (idx, group_idx) in group_indices.iter().enumerate() {
+                        if let Some(val) = arr.value(idx).into() {
+                            let single_val =
+                                Arc::new(Int64Array::from(vec![Some(val)])) as 
ArrayRef;
+                            accumulators[*group_idx]
+                                
.update_batch(std::slice::from_ref(&single_val))
+                                .unwrap();
+                        }

Review Comment:
   Another way would be to collect per-group indices first and then build the 
array:
   ```rust
   let mut group_rows: Vec<Vec<i64>> = vec![Vec::new(); num_groups];
   for (idx, &group_idx) in group_indices.iter().enumerate() {
       if arr.is_valid(idx) {
           group_rows[group_idx].push(arr.value(idx));
       }
   }
   for (group_idx, rows) in group_rows.iter().enumerate() {
       if !rows.is_empty() {
           let batch = Arc::new(Int64Array::from(rows.clone())) as ArrayRef;
           
accumulators[group_idx].update_batch(std::slice::from_ref(&batch)).unwrap();
       }
   }
   ```



##########
datafusion/functions-aggregate/benches/count_distinct.rs:
##########
@@ -150,5 +174,101 @@ fn count_distinct_benchmark(c: &mut Criterion) {
     });
 }
 
-criterion_group!(benches, count_distinct_benchmark);
+/// Create group indices with uniform distribution
+fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    (0..BATCH_SIZE)
+        .map(|_| rng.random_range(0..num_groups))
+        .collect()
+}
+
+/// Create group indices with skewed distribution (80% in 20% of groups)
+fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    let hot_groups = (num_groups / 5).max(1);
+    (0..BATCH_SIZE)
+        .map(|_| {
+            if rng.random_range(0..100) < 80 {
+                rng.random_range(0..hot_groups)
+            } else {
+                rng.random_range(0..num_groups)
+            }
+        })
+        .collect()
+}
+
+fn count_distinct_groups_benchmark(c: &mut Criterion) {
+    let count_fn = Count::new();
+
+    // bench different scenarios
+    let scenarios = [
+        // (name, num_groups, distinct_pct, group_fn)
+        ("sparse_uniform", 10, 80, "uniform"),
+        ("moderate_uniform", 100, 80, "uniform"),
+        ("dense_uniform", 1000, 80, "uniform"),
+        ("sparse_skewed", 10, 80, "skewed"),
+        ("dense_skewed", 1000, 80, "skewed"),
+        ("sparse_high_cardinality", 10, 99, "uniform"),
+        ("dense_low_cardinality", 1000, 20, "uniform"),
+    ];
+
+    for (name, num_groups, distinct_pct, group_type) in scenarios {
+        let n_distinct = BATCH_SIZE * distinct_pct / 100;
+        let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
+        let group_indices = if group_type == "uniform" {
+            create_uniform_groups(num_groups)
+        } else {
+            create_skewed_groups(num_groups)
+        };
+
+        let (_schema, args) = prepare_args(DataType::Int64);
+
+        if count_fn.groups_accumulator_supported(args.clone()) {
+            c.bench_function(&format!("count_distinct_groups {name}"), |b| {
+                b.iter(|| {
+                    let mut acc =
+                        
count_fn.create_groups_accumulator(args.clone()).unwrap();
+                    acc.update_batch(
+                        std::slice::from_ref(&values),
+                        &group_indices,
+                        None,
+                        num_groups,
+                    )
+                    .unwrap();
+                    acc.evaluate(EmitTo::All).unwrap()
+                })
+            });
+        } else {
+            c.bench_function(&format!("count_distinct_groups {name}"), |b| {
+                b.iter(|| {
+                    let mut accumulators: Vec<_> = (0..num_groups)
+                        .map(|_| prepare_accumulator(DataType::Int64))
+                        .collect();
+
+                    let arr = 
values.as_any().downcast_ref::<Int64Array>().unwrap();
+                    for (idx, group_idx) in group_indices.iter().enumerate() {
+                        if let Some(val) = arr.value(idx).into() {

Review Comment:
   Why do you need an Option here ?
   `arr.value(idx)` returns i64 and calling `.into()` always return `Some`.
   If you want to filter out the nulls then you need to use `arr.is_null(idx)`



##########
datafusion/functions-aggregate/benches/count_distinct.rs:
##########
@@ -150,5 +174,101 @@ fn count_distinct_benchmark(c: &mut Criterion) {
     });
 }
 
-criterion_group!(benches, count_distinct_benchmark);
+/// Create group indices with uniform distribution
+fn create_uniform_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    (0..BATCH_SIZE)
+        .map(|_| rng.random_range(0..num_groups))
+        .collect()
+}
+
+/// Create group indices with skewed distribution (80% in 20% of groups)
+fn create_skewed_groups(num_groups: usize) -> Vec<usize> {
+    let mut rng = StdRng::seed_from_u64(42);
+    let hot_groups = (num_groups / 5).max(1);
+    (0..BATCH_SIZE)
+        .map(|_| {
+            if rng.random_range(0..100) < 80 {
+                rng.random_range(0..hot_groups)
+            } else {
+                rng.random_range(0..num_groups)
+            }
+        })
+        .collect()
+}
+
+fn count_distinct_groups_benchmark(c: &mut Criterion) {
+    let count_fn = Count::new();
+
+    // bench different scenarios
+    let scenarios = [
+        // (name, num_groups, distinct_pct, group_fn)
+        ("sparse_uniform", 10, 80, "uniform"),
+        ("moderate_uniform", 100, 80, "uniform"),
+        ("dense_uniform", 1000, 80, "uniform"),
+        ("sparse_skewed", 10, 80, "skewed"),
+        ("dense_skewed", 1000, 80, "skewed"),
+        ("sparse_high_cardinality", 10, 99, "uniform"),
+        ("dense_low_cardinality", 1000, 20, "uniform"),
+    ];
+
+    for (name, num_groups, distinct_pct, group_type) in scenarios {
+        let n_distinct = BATCH_SIZE * distinct_pct / 100;
+        let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef;
+        let group_indices = if group_type == "uniform" {

Review Comment:
   nit: Introduce an enum instead of using strings: 
   ```rust
   enum GroupDist {
       Uniform, 
       Skewed 
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to