alamb commented on code in PR #11989:
URL: https://github.com/apache/datafusion/pull/11989#discussion_r1722029936


##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -836,17 +836,17 @@ mod tests {
 
         let physical_plan = bounded_window_exec("non_nullable_col", 
sort_exprs, filter);
 
-        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
             "  FilterExec: NOT non_nullable_col@1",
             "    SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], 
preserve_partitioning=[false]",
-            "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",

Review Comment:
   count() is always non nullable, so this change makes sense to mee



##########
datafusion/expr/src/udaf.rs:
##########
@@ -552,6 +566,13 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
     fn is_descending(&self) -> Option<bool> {
         None
     }
+
+    /// Returns default value of the function given the input is Null

Review Comment:
   ```suggestion
       /// Returns default value of the function given the input is all `null`.
       ///
   ```



##########
datafusion/expr/src/expr_schema.rs:
##########
@@ -320,18 +320,28 @@ impl ExprSchemable for Expr {
                 }
             }
             Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
+            Expr::ScalarFunction(ScalarFunction { func, args }) => {
+                Ok(func.is_nullable(args, input_schema))
+            }
             Expr::AggregateFunction(AggregateFunction { func, .. }) => {
-                // TODO: UDF should be able to customize nullability
-                if func.name() == "count" {
-                    Ok(false)
-                } else {
-                    Ok(true)
-                }
+                Ok(func.is_nullable())
             }
+            Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
+                WindowFunctionDefinition::BuiltInWindowFunction(func) => {
+                    if func.name() == "RANK"

Review Comment:
   checking the name is probably fine given we are in the process of removing 
the enum anyways
   
   This could also check the variants of `func` here as well



##########
datafusion/core/src/physical_planner.rs:
##########
@@ -670,6 +670,12 @@ impl DefaultPhysicalPlanner {
                 let input_exec = children.one()?;
                 let physical_input_schema = input_exec.schema();
                 let logical_input_schema = input.as_ref().schema();
+                let physical_input_schema_from_logical: Arc<Schema> =
+                    logical_input_schema.as_ref().clone().into();
+
+                if physical_input_schema != physical_input_schema_from_logical 
{

Review Comment:
   🎉 



##########
datafusion/expr/src/expr_schema.rs:
##########
@@ -320,18 +320,28 @@ impl ExprSchemable for Expr {
                 }
             }
             Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
+            Expr::ScalarFunction(ScalarFunction { func, args }) => {
+                Ok(func.is_nullable(args, input_schema))
+            }
             Expr::AggregateFunction(AggregateFunction { func, .. }) => {
-                // TODO: UDF should be able to customize nullability
-                if func.name() == "count" {
-                    Ok(false)
-                } else {
-                    Ok(true)
-                }
+                Ok(func.is_nullable())
             }
+            Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
+                WindowFunctionDefinition::BuiltInWindowFunction(func) => {
+                    if func.name() == "RANK"
+                        || func.name() == "NTILE"
+                        || func.name() == "CUME_DIST"
+                    {
+                        Ok(false)
+                    } else {
+                        Ok(true)
+                    }
+                }
+                WindowFunctionDefinition::AggregateUDF(func) => 
Ok(func.is_nullable()),
+                WindowFunctionDefinition::WindowUDF(udwf) => 
Ok(udwf.nullable()),

Review Comment:
   this is great



##########
datafusion/functions-aggregate/src/count.rs:
##########
@@ -121,6 +121,10 @@ impl AggregateUDFImpl for Count {
         Ok(DataType::Int64)
     }
 
+    fn is_nullable(&self) -> bool {

Review Comment:
   👍 



##########
datafusion/expr/src/udaf.rs:
##########
@@ -342,6 +351,11 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
     /// the arguments
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
 
+    /// Whether the aggregate function is nullable

Review Comment:
   ```suggestion
       /// Whether the aggregate function is nullable.
       ///
       /// Nullable means that that the function could return `null` for any 
inputs.
       /// For example, aggregate functions like `COUNT` always return a non 
null value
       /// but others like `MIN` will return `NULL` if there is no non null 
input.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to