This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-50
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-50 by this push:
     new 4840c8aa8f fix: Partial AggregateMode will generate duplicate field 
names which will fail DFSchema construct (#17706) (#17717)
4840c8aa8f is described below

commit 4840c8aa8f685f14bc09dd4d605d235f2ddac302
Author: Qi Zhu <[email protected]>
AuthorDate: Mon Sep 22 21:29:02 2025 +0800

    fix: Partial AggregateMode will generate duplicate field names which will 
fail DFSchema construct (#17706) (#17717)
    
    * fix: Partial AggregateMode will generate duplicate field names which will 
fail DFSchema construct
    
    * Update datafusion/common/src/dfschema.rs
    
    
    
    * fmt
    
    ---------
    
    
    (cherry picked from commit 52690c64b64e56d1aba5b2daee7be701b343576d)
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/common/src/dfschema.rs      |   6 +-
 datafusion/core/tests/dataframe/mod.rs | 115 ++++++++++++++++++++++++++++++++-
 2 files changed, 117 insertions(+), 4 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index d3dda28882..f9e3b2cee4 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -913,7 +913,11 @@ impl TryFrom<SchemaRef> for DFSchema {
             field_qualifiers: vec![None; field_count],
             functional_dependencies: FunctionalDependencies::empty(),
         };
-        dfschema.check_names()?;
+        // Without checking names, because schema here may have duplicate 
field names.
+        // For example, Partial AggregateMode will generate duplicate field 
names from
+        // state_fields.
+        // See <https://github.com/apache/datafusion/issues/17715>
+        // dfschema.check_names()?;
         Ok(dfschema)
     }
 }
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index a563459f42..a8c244a347 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -32,6 +32,7 @@ use arrow::datatypes::{
 };
 use arrow::error::ArrowError;
 use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{SortOptions, TimeUnit};
 use datafusion::{assert_batches_eq, dataframe};
 use datafusion_functions_aggregate::count::{count_all, count_all_window};
 use datafusion_functions_aggregate::expr_fn::{
@@ -64,8 +65,8 @@ use datafusion::test_util::{
 use datafusion_catalog::TableProvider;
 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
 use datafusion_common::{
-    assert_contains, Constraint, Constraints, DataFusionError, ParamValues, 
ScalarValue,
-    TableReference, UnnestOptions,
+    assert_contains, Constraint, Constraints, DFSchema, DataFusionError, 
ParamValues,
+    ScalarValue, TableReference, UnnestOptions,
 };
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_datasource::file_format::format_as_file_type;
@@ -79,10 +80,16 @@ use datafusion_expr::{
     LogicalPlanBuilder, ScalarFunctionImplementation, SortExpr, WindowFrame,
     WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition,
 };
+use datafusion_physical_expr::aggregate::AggregateExprBuilder;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::Partitioning;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_plan::{displayable, ExecutionPlanProperties};
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use datafusion_physical_plan::aggregates::{
+    AggregateExec, AggregateMode, PhysicalGroupBy,
+};
+use datafusion_physical_plan::empty::EmptyExec;
+use datafusion_physical_plan::{displayable, ExecutionPlan, 
ExecutionPlanProperties};
 
 // Get string representation of the plan
 async fn physical_plan_to_string(df: &DataFrame) -> String {
@@ -6322,3 +6329,105 @@ async fn test_copy_to_preserves_order() -> Result<()> {
     );
     Ok(())
 }
+
+#[tokio::test]
+async fn test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    // Simple schema with just the fields we need
+    let file_schema = Arc::new(Schema::new(vec![
+        Field::new(
+            "timestamp",
+            DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
+            true,
+        ),
+        Field::new("ticker", DataType::Utf8, true),
+        Field::new("value", DataType::Float64, true),
+        Field::new("date", DataType::Utf8, false),
+    ]));
+
+    let df_schema = DFSchema::try_from(file_schema.clone())?;
+
+    let timestamp = col("timestamp");
+    let value = col("value");
+    let ticker = col("ticker");
+    let date = col("date");
+
+    let mock_exec = Arc::new(EmptyExec::new(file_schema.clone()));
+
+    // Build first_value aggregate
+    let first_value = Arc::new(
+        AggregateExprBuilder::new(
+            datafusion_functions_aggregate::first_last::first_value_udaf(),
+            vec![ctx.create_physical_expr(value.clone(), &df_schema)?],
+        )
+        .alias("first_value(value)")
+        .order_by(vec![PhysicalSortExpr::new(
+            ctx.create_physical_expr(timestamp.clone(), &df_schema)?,
+            SortOptions::new(false, false),
+        )])
+        .schema(file_schema.clone())
+        .build()
+        .expect("Failed to build first_value"),
+    );
+
+    // Build last_value aggregate
+    let last_value = Arc::new(
+        AggregateExprBuilder::new(
+            datafusion_functions_aggregate::first_last::last_value_udaf(),
+            vec![ctx.create_physical_expr(value.clone(), &df_schema)?],
+        )
+        .alias("last_value(value)")
+        .order_by(vec![PhysicalSortExpr::new(
+            ctx.create_physical_expr(timestamp.clone(), &df_schema)?,
+            SortOptions::new(false, false),
+        )])
+        .schema(file_schema.clone())
+        .build()
+        .expect("Failed to build last_value"),
+    );
+
+    let partial_agg = AggregateExec::try_new(
+        AggregateMode::Partial,
+        PhysicalGroupBy::new_single(vec![
+            (
+                ctx.create_physical_expr(date.clone(), &df_schema)?,
+                "date".to_string(),
+            ),
+            (
+                ctx.create_physical_expr(ticker.clone(), &df_schema)?,
+                "ticker".to_string(),
+            ),
+        ]),
+        vec![first_value, last_value],
+        vec![None, None],
+        mock_exec,
+        file_schema,
+    )
+    .expect("Failed to build partial agg");
+
+    // Assert that the schema field names match the expected names
+    let expected_field_names = vec![
+        "date",
+        "ticker",
+        "first_value(value)[first_value]",
+        "timestamp@0",
+        "is_set",
+        "last_value(value)[last_value]",
+        "timestamp@0",
+        "is_set",
+    ];
+
+    let binding = partial_agg.schema();
+    let actual_field_names: Vec<_> = binding.fields().iter().map(|f| 
f.name()).collect();
+    assert_eq!(actual_field_names, expected_field_names);
+
+    // Ensure that DFSchema::try_from does not fail
+    let partial_agg_exec_schema = DFSchema::try_from(partial_agg.schema());
+    assert!(
+        partial_agg_exec_schema.is_ok(),
+        "Expected get AggregateExec schema to succeed with duplicate state 
fields"
+    );
+
+    Ok(())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to