This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 107713faa6 fix: make skip_partial_aggregation_probe_ratio_threshold 
match the docs (#22752)
107713faa6 is described below

commit 107713faa61f54841221333238c71c963516e883
Author: Huaijin <[email protected]>
AuthorDate: Sat Jun 6 18:45:05 2026 +0800

    fix: make skip_partial_aggregation_probe_ratio_threshold match the docs 
(#22752)
    
    ## Which issue does this PR close?
    
    - Closes #.
    
    ## Rationale for this change
    
    The config `skip_partial_aggregation_probe_ratio_threshold` was
    documented as triggering skip when the ratio is **greater than** the
    threshold, but the code used `>=`. This meant setting the threshold to
    `1.0` (to disable the feature) still skipped rows when cardinality was
    exactly 100%.
    
    ## What changes are included in this PR?
    
    - Changed `>=` to `>` in the ratio comparison to match the docs.
    - Return `None` for `SkipAggregationProbe` when `probe_ratio_threshold
    >= 1.0`, effectively disabling the feature since the ratio can never
    exceed `1.0`.
    
    ## Are these changes tested?
    
    Yes. Added `test_skip_aggregation_disabled_at_threshold_one` which sets
    threshold to `1.0` with 100% cardinality input and asserts that no rows
    are skipped.
    
    ## Are there any user-facing changes?
    
    Yes. Setting `skip_partial_aggregation_probe_ratio_threshold = 1.0` now
    reliably disables skip aggregation, matching the documented behavior.
---
 datafusion/physical-plan/src/aggregates/mod.rs     | 84 ++++++++++++++++++++++
 .../physical-plan/src/aggregates/row_hash.rs       | 55 ++++++++++----
 2 files changed, 125 insertions(+), 14 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index c8b825d576..5a2080990e 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -3791,6 +3791,90 @@ mod tests {
         Ok(())
     }
 
+    /// When `skip_partial_aggregation_probe_ratio_threshold` is set to 1.0,
+    /// the feature must be effectively disabled: even with 100% cardinality
+    /// (every row is a unique group), no rows should be skipped.
+    #[tokio::test]
+    async fn test_skip_aggregation_disabled_at_threshold_one() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("key", DataType::Int32, true),
+            Field::new("val", DataType::Int32, true),
+        ]));
+
+        let group_by =
+            PhysicalGroupBy::new_single(vec![(col("key", &schema)?, 
"key".to_string())]);
+
+        let aggr_expr = vec![
+            AggregateExprBuilder::new(count_udaf(), vec![col("val", &schema)?])
+                .schema(Arc::clone(&schema))
+                .alias(String::from("COUNT(val)"))
+                .build()
+                .map(Arc::new)?,
+        ];
+
+        // Two batches are required: batch 1 triggers the probe threshold so 
the
+        // skip decision is evaluated; batch 2 is what would be skipped on main
+        // (where >= caused threshold=1.0 to still skip at 100% cardinality).
+        // All rows have unique keys => ratio = 1.0 (100% cardinality).
+        let input_data = vec![
+            // Batch 1: fires the probe check (ratio = 5/5 = 1.0)
+            RecordBatch::try_new(
+                Arc::clone(&schema),
+                vec![
+                    Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
+                    Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0])),
+                ],
+            )
+            .unwrap(),
+            // Batch 2: would be skipped if threshold=1.0 did not disable the 
feature
+            RecordBatch::try_new(
+                Arc::clone(&schema),
+                vec![
+                    Arc::new(Int32Array::from(vec![6, 7, 8, 9, 10])),
+                    Arc::new(Int32Array::from(vec![0, 0, 0, 0, 0])),
+                ],
+            )
+            .unwrap(),
+        ];
+
+        let input =
+            TestMemoryExec::try_new_exec(&[input_data], Arc::clone(&schema), 
None)?;
+        let aggregate_exec = Arc::new(AggregateExec::try_new(
+            AggregateMode::Partial,
+            group_by,
+            aggr_expr,
+            vec![None],
+            Arc::clone(&input) as Arc<dyn ExecutionPlan>,
+            schema,
+        )?);
+
+        let session_config = SessionConfig::default()
+            .set(
+                
"datafusion.execution.skip_partial_aggregation_probe_rows_threshold",
+                &ScalarValue::Int64(Some(1)),
+            )
+            .set(
+                
"datafusion.execution.skip_partial_aggregation_probe_ratio_threshold",
+                &ScalarValue::Float64(Some(1.0)),
+            );
+
+        let ctx = TaskContext::default().with_session_config(session_config);
+        collect(aggregate_exec.execute(0, Arc::new(ctx))?).await?;
+
+        let metrics = aggregate_exec.metrics().unwrap();
+        let skipped_rows = metrics
+            .sum_by_name("skipped_aggregation_rows")
+            .map(|m| m.as_usize())
+            .unwrap_or(0);
+
+        assert_eq!(
+            skipped_rows, 0,
+            "threshold=1.0 should disable skip aggregation, but {skipped_rows} 
rows were skipped"
+        );
+
+        Ok(())
+    }
+
     #[test]
     fn group_exprs_nullable() -> Result<()> {
         let input_schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs 
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index 1164fb37b3..c3f73976c7 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -195,7 +195,7 @@ impl SkipAggregationProbe {
         self.num_groups = num_groups;
         if self.input_rows >= self.probe_rows_threshold {
             self.should_skip = self.num_groups as f64 / self.input_rows as f64
-                >= self.probe_ratio_threshold;
+                > self.probe_ratio_threshold;
             // Set is_locked to true only if we have decided to skip, 
otherwise we can try to skip
             // during processing the next record_batch.
             self.is_locked = self.should_skip;
@@ -644,14 +644,20 @@ impl GroupedHashAggregateStream {
                 options.skip_partial_aggregation_probe_rows_threshold;
             let probe_ratio_threshold =
                 options.skip_partial_aggregation_probe_ratio_threshold;
-            let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics)
-                .with_category(MetricCategory::Rows)
-                .counter("skipped_aggregation_rows", partition);
-            Some(SkipAggregationProbe::new(
-                probe_rows_threshold,
-                probe_ratio_threshold,
-                skipped_aggregation_rows,
-            ))
+            // A threshold >= 1.0 means the ratio (num_groups / input_rows) can
+            // never exceed it, so the feature is effectively disabled.
+            if probe_ratio_threshold >= 1.0 {
+                None
+            } else {
+                let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics)
+                    .with_category(MetricCategory::Rows)
+                    .counter("skipped_aggregation_rows", partition);
+                Some(SkipAggregationProbe::new(
+                    probe_rows_threshold,
+                    probe_ratio_threshold,
+                    skipped_aggregation_rows,
+                ))
+            }
         } else {
             None
         };
@@ -1630,11 +1636,11 @@ mod tests {
             ],
         )?;
 
-        // Batch 2: 350 rows with 350 unique NEW groups (starting from group 
10)
-        // After batch 2, total: 450 rows, 360 groups
-        // Ratio: 360/450 = 0.8 (80%) >= 0.8 -> SHOULD decide to skip
-        let batch2_rows = 350;
-        let batch2_groups = 350;
+        // Batch 2: 360 rows with 360 unique NEW groups (starting from group 
10)
+        // After batch 2, total: 460 rows, 370 groups
+        // Ratio: 370/460 ≈ 0.804 (80.4%) > 0.8 -> SHOULD decide to skip
+        let batch2_rows = 360;
+        let batch2_groups = 360;
         let group_ids_batch2: Vec<i32> = (batch1_groups..(batch1_groups + 
batch2_groups))
             .map(|x| x as i32)
             .collect();
@@ -1817,4 +1823,25 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_skip_aggregation_probe_equality_does_not_skip() {
+        // When num_groups / input_rows == probe_ratio_threshold, the `>` 
boundary
+        // means we must NOT skip — equality is not sufficient to trigger skip.
+        let threshold_ratio = 0.5_f64;
+        let threshold_rows = 10_usize;
+        let mut probe = SkipAggregationProbe::new(
+            threshold_rows,
+            threshold_ratio,
+            metrics::Count::new(),
+        );
+
+        // 10 rows, 5 groups → ratio = 5/10 = 0.5 exactly equals threshold
+        probe.update_state(10, 5);
+
+        assert!(
+            !probe.should_skip(),
+            "ratio == threshold should not trigger skip (boundary is 
exclusive)"
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to