alamb commented on code in PR #11989:
URL: https://github.com/apache/datafusion/pull/11989#discussion_r1722029936
##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -836,17 +836,17 @@ mod tests {
let physical_plan = bounded_window_exec("non_nullable_col",
sort_exprs, filter);
- let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST],
preserve_partitioning=[false]",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
Review Comment:
count() is always non nullable, so this change makes sense to mee
##########
datafusion/expr/src/udaf.rs:
##########
@@ -552,6 +566,13 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
fn is_descending(&self) -> Option<bool> {
None
}
+
+ /// Returns default value of the function given the input is Null
Review Comment:
```suggestion
/// Returns default value of the function given the input is all `null`.
///
```
##########
datafusion/expr/src/expr_schema.rs:
##########
@@ -320,18 +320,28 @@ impl ExprSchemable for Expr {
}
}
Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
+ Expr::ScalarFunction(ScalarFunction { func, args }) => {
+ Ok(func.is_nullable(args, input_schema))
+ }
Expr::AggregateFunction(AggregateFunction { func, .. }) => {
- // TODO: UDF should be able to customize nullability
- if func.name() == "count" {
- Ok(false)
- } else {
- Ok(true)
- }
+ Ok(func.is_nullable())
}
+ Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
+ WindowFunctionDefinition::BuiltInWindowFunction(func) => {
+ if func.name() == "RANK"
Review Comment:
checking the name is probably fine given we are in the process of removing
the enum anyways
This could also check the variants of `func` here as well
##########
datafusion/core/src/physical_planner.rs:
##########
@@ -670,6 +670,12 @@ impl DefaultPhysicalPlanner {
let input_exec = children.one()?;
let physical_input_schema = input_exec.schema();
let logical_input_schema = input.as_ref().schema();
+ let physical_input_schema_from_logical: Arc<Schema> =
+ logical_input_schema.as_ref().clone().into();
+
+ if physical_input_schema != physical_input_schema_from_logical
{
Review Comment:
🎉
##########
datafusion/expr/src/expr_schema.rs:
##########
@@ -320,18 +320,28 @@ impl ExprSchemable for Expr {
}
}
Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
+ Expr::ScalarFunction(ScalarFunction { func, args }) => {
+ Ok(func.is_nullable(args, input_schema))
+ }
Expr::AggregateFunction(AggregateFunction { func, .. }) => {
- // TODO: UDF should be able to customize nullability
- if func.name() == "count" {
- Ok(false)
- } else {
- Ok(true)
- }
+ Ok(func.is_nullable())
}
+ Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
+ WindowFunctionDefinition::BuiltInWindowFunction(func) => {
+ if func.name() == "RANK"
+ || func.name() == "NTILE"
+ || func.name() == "CUME_DIST"
+ {
+ Ok(false)
+ } else {
+ Ok(true)
+ }
+ }
+ WindowFunctionDefinition::AggregateUDF(func) =>
Ok(func.is_nullable()),
+ WindowFunctionDefinition::WindowUDF(udwf) =>
Ok(udwf.nullable()),
Review Comment:
this is great
##########
datafusion/functions-aggregate/src/count.rs:
##########
@@ -121,6 +121,10 @@ impl AggregateUDFImpl for Count {
Ok(DataType::Int64)
}
+ fn is_nullable(&self) -> bool {
Review Comment:
👍
##########
datafusion/expr/src/udaf.rs:
##########
@@ -342,6 +351,11 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// the arguments
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
+ /// Whether the aggregate function is nullable
Review Comment:
```suggestion
/// Whether the aggregate function is nullable.
///
/// Nullable means that that the function could return `null` for any
inputs.
/// For example, aggregate functions like `COUNT` always return a non
null value
/// but others like `MIN` will return `NULL` if there is no non null
input.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]