martin-g commented on code in PR #18712:
URL: https://github.com/apache/datafusion/pull/18712#discussion_r2539711382
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1239,3 +1270,123 @@ impl GroupedHashAggregateStream {
Ok(states_batch)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::test::TestMemoryExec;
+ use arrow::array::{Int32Array, Int64Array};
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
+ use datafusion_execution::TaskContext;
+ use datafusion_functions_aggregate::count::count_udaf;
+ use datafusion_physical_expr::aggregate::AggregateExprBuilder;
+ use datafusion_physical_expr::expressions::col;
+ use std::sync::Arc;
+
+ #[tokio::test]
+ async fn test_double_emission_race_condition_bug() -> Result<()> {
+ // Fix for https://github.com/apache/datafusion/issues/18701
+ // This test specifically proves that we have fixed double emission
race condition
+ // where emit_early_if_necessary() and switch_to_skip_aggregation()
+ // both emit in the same loop iteration, causing data loss
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("group_col", DataType::Int32, false),
+ Field::new("value_col", DataType::Int64, false),
+ ]));
+
+ // Create data that will trigger BOTH conditions in the same iteration:
+ // 1. More groups than batch_size (triggers early emission when memory
pressure hits)
+ // 2. High cardinality ratio (triggers skip aggregation)
+ let batch_size = 1024; // We'll set this in session config
+ let num_groups = batch_size + 100; // Slightly more than batch_size
(1124 groups)
+
+ // Create exactly 1 row per group = 100% cardinality ratio
+ let group_ids: Vec<i32> = (0..num_groups as i32).collect();
+ let values: Vec<i64> = vec![1; num_groups];
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Int32Array::from(group_ids)),
+ Arc::new(Int64Array::from(values)),
+ ],
+ )?;
+
+ let input_partitions = vec![vec![batch]];
+
+ // Create constrained memory to trigger early emission but not
completely fail
+ let runtime = RuntimeEnvBuilder::default()
+ .with_memory_limit(1024, 1.0) // 100KB - enough to start but will
trigger pressure
Review Comment:
1024 bytes != 100KB
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]