This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6786f1592f Fix the schema mismatch between logical and physical for 
aggregate function, add `AggregateUDFImpl::is_null` (#11989)
6786f1592f is described below

commit 6786f1592f6b923210673c0246ea121a714aec49
Author: Jay Zhan <[email protected]>
AuthorDate: Wed Aug 21 08:47:49 2024 +0800

    Fix the schema mismatch between logical and physical for aggregate 
function, add `AggregateUDFImpl::is_null` (#11989)
    
    * schema assertion and fix the mismatch from logical and physical
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add more msg
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm test1
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * nullable for scalar func
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * nullable
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm field
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm unsafe block and use internal error
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm func_name
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm nullable option
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add more msg
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * rm row number
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * Update datafusion/expr/src/udaf.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/expr/src/udaf.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * fix failed test from #12050
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * cleanup
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * add doc
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/physical_optimizer/enforce_sorting.rs | 34 ++++++++--------
 .../core/src/physical_optimizer/sanity_checker.rs  |  4 +-
 datafusion/core/src/physical_planner.rs            | 11 ++++--
 datafusion/expr/src/expr_schema.rs                 | 26 +++++++++----
 datafusion/expr/src/logical_plan/plan.rs           | 12 +++---
 datafusion/expr/src/udaf.rs                        | 29 +++++++++++++-
 datafusion/expr/src/udf.rs                         |  8 ++++
 .../functions-aggregate-common/src/aggregate.rs    | 10 +++--
 datafusion/functions-aggregate/src/count.rs        | 10 ++++-
 datafusion/functions/src/core/arrow_cast.rs        |  8 +++-
 datafusion/functions/src/core/coalesce.rs          |  9 ++++-
 .../optimizer/src/analyzer/count_wildcard_rule.rs  |  2 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs | 31 +++++++++++----
 .../src/aggregate.rs                               | 18 ++++++++-
 datafusion/physical-expr/src/scalar_function.rs    | 45 ++++++++++++++--------
 datafusion/physical-expr/src/window/aggregate.rs   | 11 +++---
 datafusion/physical-expr/src/window/built_in.rs    |  1 -
 .../physical-expr/src/window/sliding_aggregate.rs  |  5 ++-
 .../src/windows/bounded_window_agg_exec.rs         | 23 ++---------
 datafusion/physical-plan/src/windows/mod.rs        |  1 +
 datafusion/physical-plan/src/windows/utils.rs      | 35 +++++++++++++++++
 .../physical-plan/src/windows/window_agg_exec.rs   | 19 +--------
 datafusion/sql/src/select.rs                       |  8 ++--
 .../sqllogictest/test_files/count_star_rule.slt    |  2 +-
 datafusion/sqllogictest/test_files/insert.slt      |  6 +--
 .../sqllogictest/test_files/insert_to_external.slt |  4 +-
 datafusion/sqllogictest/test_files/union.slt       | 15 ++++++++
 datafusion/sqllogictest/test_files/window.slt      | 22 +++++------
 28 files changed, 274 insertions(+), 135 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index bda6d598b6..14afe35466 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -845,17 +845,17 @@ mod tests {
 
         let physical_plan = bounded_window_exec("non_nullable_col", 
sort_exprs, filter);
 
-        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
             "  FilterExec: NOT non_nullable_col@1",
             "    SortExec: expr=[non_nullable_col@1 ASC NULLS LAST], 
preserve_partitioning=[false]",
-            "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "        CoalesceBatchesExec: target_batch_size=128",
             "          SortExec: expr=[non_nullable_col@1 DESC], 
preserve_partitioning=[false]",
             "            MemoryExec: partitions=1, partition_sizes=[0]"];
 
-        let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
CurrentRow, end_bound: Following(NULL), is_causal: false }]",
+        let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
CurrentRow, end_bound: Following(NULL), is_causal: false }]",
             "  FilterExec: NOT non_nullable_col@1",
-            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "      CoalesceBatchesExec: target_batch_size=128",
             "        SortExec: expr=[non_nullable_col@1 DESC], 
preserve_partitioning=[false]",
             "          MemoryExec: partitions=1, partition_sizes=[0]"];
@@ -1722,7 +1722,7 @@ mod tests {
         // corresponding SortExecs together. Also, the inputs of these 
`SortExec`s
         // are not necessarily the same to be able to remove them.
         let expected_input = [
-            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "  SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
             "    UnionExec",
             "      SortExec: expr=[nullable_col@0 DESC NULLS LAST], 
preserve_partitioning=[false]",
@@ -1730,7 +1730,7 @@ mod tests {
             "      SortExec: expr=[nullable_col@0 DESC NULLS LAST], 
preserve_partitioning=[false]",
             "        ParquetExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC]"];
         let expected_optimized = [
-            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL), is_causal: false }]",
+            "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound: 
Following(NULL), is_causal: false }]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC]",
             "    UnionExec",
             "      ParquetExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC, non_nullable_col@1 ASC]",
@@ -1760,14 +1760,14 @@ mod tests {
 
         // The `WindowAggExec` can get its required sorting from the leaf 
nodes directly.
         // The unnecessary SortExecs should be removed
-        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "    UnionExec",
             "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], 
preserve_partitioning=[false]",
             "        ParquetExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC]",
             "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], 
preserve_partitioning=[false]",
             "        ParquetExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC]"];
-        let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field 
{ name: \"count\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
+        let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field 
{ name: \"count\", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
             "  SortPreservingMergeExec: [nullable_col@0 ASC]",
             "    UnionExec",
             "      ParquetExec: file_groups={1 group: [[x]]}, 
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0 
ASC]",
@@ -2060,15 +2060,15 @@ mod tests {
         let physical_plan =
             bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);
 
-        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
-            "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
-            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+        let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field { 
name: \"count\", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
+            "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "      SortExec: expr=[nullable_col@0 ASC], 
preserve_partitioning=[false]",
             "        MemoryExec: partitions=1, partition_sizes=[0]"];
 
-        let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field 
{ name: \"count\", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
-            "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
-            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+        let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field 
{ name: \"count\", data_type: Int64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], 
mode=[Sorted]",
+            "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC], 
preserve_partitioning=[false]",
             "        MemoryExec: partitions=1, partition_sizes=[0]"];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
@@ -2134,7 +2134,7 @@ mod tests {
         let expected_input = vec![
             "SortExec: expr=[nullable_col@0 ASC], 
preserve_partitioning=[false]",
             "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "      MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_eq!(
@@ -2386,7 +2386,7 @@ mod tests {
         let physical_plan = bounded_window_exec("a", sort_exprs, spm);
 
         let expected_input = [
-            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "  SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
             "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
@@ -2394,7 +2394,7 @@ mod tests {
             "          CsvExec: file_groups={1 group: [[x]]}, projection=[a, 
b, c, d, e], has_header=false",
         ];
         let expected_optimized = [
-            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "  SortExec: expr=[a@0 ASC,b@1 ASC], 
preserve_partitioning=[false]",
             "    CoalescePartitionsExec",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs 
b/datafusion/core/src/physical_optimizer/sanity_checker.rs
index 6e37c3f40f..bd80d31224 100644
--- a/datafusion/core/src/physical_optimizer/sanity_checker.rs
+++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs
@@ -437,7 +437,7 @@ mod tests {
         let sort = sort_exec(sort_exprs.clone(), source);
         let bw = bounded_window_exec("c9", sort_exprs, sort);
         assert_plan(bw.as_ref(), vec![
-            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "  SortExec: expr=[c9@0 ASC NULLS LAST], 
preserve_partitioning=[false]",
             "    MemoryExec: partitions=1, partition_sizes=[0]"
         ]);
@@ -460,7 +460,7 @@ mod tests {
         )];
         let bw = bounded_window_exec("c9", sort_exprs, source);
         assert_plan(bw.as_ref(), vec![
-            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+            "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
             "  MemoryExec: partitions=1, partition_sizes=[0]"
         ]);
         // Order requirement of the `BoundedWindowAggExec` is not satisfied. 
We expect to receive error during sanity check.
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 41ab4ccc98..6536f9a014 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -670,6 +670,12 @@ impl DefaultPhysicalPlanner {
                 let input_exec = children.one()?;
                 let physical_input_schema = input_exec.schema();
                 let logical_input_schema = input.as_ref().schema();
+                let physical_input_schema_from_logical: Arc<Schema> =
+                    logical_input_schema.as_ref().clone().into();
+
+                if physical_input_schema != physical_input_schema_from_logical 
{
+                    return internal_err!("Physical input schema should be the 
same as the one converted from logical input schema.");
+                }
 
                 let groups = self.create_grouping_physical_expr(
                     group_expr,
@@ -1548,7 +1554,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
     e: &Expr,
     name: Option<String>,
     logical_input_schema: &DFSchema,
-    _physical_input_schema: &Schema,
+    physical_input_schema: &Schema,
     execution_props: &ExecutionProps,
 ) -> Result<AggregateExprWithOptionalArgs> {
     match e {
@@ -1599,11 +1605,10 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
                 let ordering_reqs: Vec<PhysicalSortExpr> =
                     physical_sort_exprs.clone().unwrap_or(vec![]);
 
-                let schema: Schema = logical_input_schema.clone().into();
                 let agg_expr =
                     AggregateExprBuilder::new(func.to_owned(), 
physical_args.to_vec())
                         .order_by(ordering_reqs.to_vec())
-                        .schema(Arc::new(schema))
+                        .schema(Arc::new(physical_input_schema.to_owned()))
                         .alias(name)
                         .with_ignore_nulls(ignore_nulls)
                         .with_distinct(*distinct)
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index f6489fef14..10ec10e612 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -335,18 +335,28 @@ impl ExprSchemable for Expr {
                 }
             }
             Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
+            Expr::ScalarFunction(ScalarFunction { func, args }) => {
+                Ok(func.is_nullable(args, input_schema))
+            }
             Expr::AggregateFunction(AggregateFunction { func, .. }) => {
-                // TODO: UDF should be able to customize nullability
-                if func.name() == "count" {
-                    Ok(false)
-                } else {
-                    Ok(true)
-                }
+                Ok(func.is_nullable())
             }
+            Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
+                WindowFunctionDefinition::BuiltInWindowFunction(func) => {
+                    if func.name() == "RANK"
+                        || func.name() == "NTILE"
+                        || func.name() == "CUME_DIST"
+                    {
+                        Ok(false)
+                    } else {
+                        Ok(true)
+                    }
+                }
+                WindowFunctionDefinition::AggregateUDF(func) => 
Ok(func.is_nullable()),
+                WindowFunctionDefinition::WindowUDF(udwf) => 
Ok(udwf.nullable()),
+            },
             Expr::ScalarVariable(_, _)
             | Expr::TryCast { .. }
-            | Expr::ScalarFunction(..)
-            | Expr::WindowFunction { .. }
             | Expr::Unnest(_)
             | Expr::Placeholder(_) => Ok(true),
             Expr::IsNull(_)
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index f9b3035167..f93b7c0fed 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -2015,10 +2015,9 @@ impl Projection {
 /// produced by the projection operation. If the schema computation is 
successful,
 /// the `Result` will contain the schema; otherwise, it will contain an error.
 pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> 
Result<Arc<DFSchema>> {
-    let mut schema = DFSchema::new_with_metadata(
-        exprlist_to_fields(exprs, input)?,
-        input.schema().metadata().clone(),
-    )?;
+    let metadata = input.schema().metadata().clone();
+    let mut schema =
+        DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, 
metadata)?;
     schema = 
schema.with_functional_dependencies(calc_func_dependencies_for_project(
         exprs, input,
     )?)?;
@@ -2655,7 +2654,10 @@ impl Aggregate {
 
         qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(), 
&input)?);
 
-        let schema = DFSchema::new_with_metadata(qualified_fields, 
HashMap::new())?;
+        let schema = DFSchema::new_with_metadata(
+            qualified_fields,
+            input.schema().metadata().clone(),
+        )?;
 
         Self::try_new_with_schema(input, group_expr, aggr_expr, 
Arc::new(schema))
     }
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index d136aeaf09..cb278c7679 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -25,7 +25,7 @@ use std::vec;
 
 use arrow::datatypes::{DataType, Field};
 
-use datafusion_common::{exec_err, not_impl_err, Result};
+use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
 
 use crate::expr::AggregateFunction;
 use crate::function::{
@@ -163,6 +163,10 @@ impl AggregateUDF {
         self.inner.name()
     }
 
+    pub fn is_nullable(&self) -> bool {
+        self.inner.is_nullable()
+    }
+
     /// Returns the aliases for this function.
     pub fn aliases(&self) -> &[String] {
         self.inner.aliases()
@@ -257,6 +261,11 @@ impl AggregateUDF {
     pub fn is_descending(&self) -> Option<bool> {
         self.inner.is_descending()
     }
+
+    /// See [`AggregateUDFImpl::default_value`] for more details.
+    pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
+        self.inner.default_value(data_type)
+    }
 }
 
 impl<F> From<F> for AggregateUDF
@@ -342,6 +351,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
     /// the arguments
     fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
 
+    /// Whether the aggregate function is nullable.
+    ///
+    /// Nullable means that that the function could return `null` for any 
inputs.
+    /// For example, aggregate functions like `COUNT` always return a non null 
value
+    /// but others like `MIN` will return `NULL` if there is nullable input.
+    /// Note that if the function is declared as *not* nullable, make sure the 
[`AggregateUDFImpl::default_value`] is `non-null`
+    fn is_nullable(&self) -> bool {
+        true
+    }
+
     /// Return a new [`Accumulator`] that aggregates values for a specific
     /// group during query execution.
     ///
@@ -552,6 +571,14 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
     fn is_descending(&self) -> Option<bool> {
         None
     }
+
+    /// Returns default value of the function given the input is all `null`.
+    ///
+    /// Most of the aggregate function return Null if input is Null,
+    /// while `count` returns 0 if input is Null
+    fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
+        ScalarValue::try_from(data_type)
+    }
 }
 
 pub enum ReversedUDAF {
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index f5434726e2..a4584038e4 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -205,6 +205,10 @@ impl ScalarUDF {
         self.inner.invoke(args)
     }
 
+    pub fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
+        self.inner.is_nullable(args, schema)
+    }
+
     /// Invoke the function without `args` but number of rows, returning the 
appropriate result.
     ///
     /// See [`ScalarUDFImpl::invoke_no_args`] for more details.
@@ -416,6 +420,10 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
         self.return_type(arg_types)
     }
 
+    fn is_nullable(&self, _args: &[Expr], _schema: &dyn ExprSchema) -> bool {
+        true
+    }
+
     /// Invoke the function on `args`, returning the appropriate result
     ///
     /// The function will be invoked passed with the slice of [`ColumnarValue`]
diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs 
b/datafusion/functions-aggregate-common/src/aggregate.rs
index 016e54e688..698d1350cb 100644
--- a/datafusion/functions-aggregate-common/src/aggregate.rs
+++ b/datafusion/functions-aggregate-common/src/aggregate.rs
@@ -19,9 +19,8 @@
 //! (built-in and custom) need to satisfy.
 
 use crate::order::AggregateOrderSensitivity;
-use arrow::datatypes::Field;
-use datafusion_common::exec_err;
-use datafusion_common::{not_impl_err, Result};
+use arrow::datatypes::{DataType, Field};
+use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
 use datafusion_expr_common::accumulator::Accumulator;
 use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -171,6 +170,11 @@ pub trait AggregateExpr: Send + Sync + Debug + 
PartialEq<dyn Any> {
     fn get_minmax_desc(&self) -> Option<(Field, bool)> {
         None
     }
+
+    /// Returns default value of the function given the input is Null
+    /// Most of the aggregate function return Null if input is Null,
+    /// while `count` returns 0 if input is Null
+    fn default_value(&self, data_type: &DataType) -> Result<ScalarValue>;
 }
 
 /// Stores the physical expressions used inside the `AggregateExpr`.
diff --git a/datafusion/functions-aggregate/src/count.rs 
b/datafusion/functions-aggregate/src/count.rs
index 04b1921c7b..417e28e72a 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -121,6 +121,10 @@ impl AggregateUDFImpl for Count {
         Ok(DataType::Int64)
     }
 
+    fn is_nullable(&self) -> bool {
+        false
+    }
+
     fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
         if args.is_distinct {
             Ok(vec![Field::new_list(
@@ -133,7 +137,7 @@ impl AggregateUDFImpl for Count {
             Ok(vec![Field::new(
                 format_state_name(args.name, "count"),
                 DataType::Int64,
-                true,
+                false,
             )])
         }
     }
@@ -283,6 +287,10 @@ impl AggregateUDFImpl for Count {
     fn reverse_expr(&self) -> ReversedUDAF {
         ReversedUDAF::Identical
     }
+
+    fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(0)))
+    }
 }
 
 #[derive(Debug)]
diff --git a/datafusion/functions/src/core/arrow_cast.rs 
b/datafusion/functions/src/core/arrow_cast.rs
index c4db3e7704..a1b74228a5 100644
--- a/datafusion/functions/src/core/arrow_cast.rs
+++ b/datafusion/functions/src/core/arrow_cast.rs
@@ -26,7 +26,9 @@ use datafusion_common::{
 };
 
 use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
-use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, 
Volatility};
+use datafusion_expr::{
+    ColumnarValue, Expr, ExprSchemable, ScalarUDFImpl, Signature, Volatility,
+};
 
 /// Implements casting to arbitrary arrow types (rather than SQL types)
 ///
@@ -87,6 +89,10 @@ impl ScalarUDFImpl for ArrowCastFunc {
         internal_err!("arrow_cast should return type from exprs")
     }
 
+    fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
+        args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
+    }
+
     fn return_type_from_exprs(
         &self,
         args: &[Expr],
diff --git a/datafusion/functions/src/core/coalesce.rs 
b/datafusion/functions/src/core/coalesce.rs
index 15a3ddd9d6..19db58c181 100644
--- a/datafusion/functions/src/core/coalesce.rs
+++ b/datafusion/functions/src/core/coalesce.rs
@@ -22,9 +22,9 @@ use arrow::compute::kernels::zip::zip;
 use arrow::compute::{and, is_not_null, is_null};
 use arrow::datatypes::DataType;
 
-use datafusion_common::{exec_err, Result};
+use datafusion_common::{exec_err, ExprSchema, Result};
 use datafusion_expr::type_coercion::binary::type_union_resolution;
-use datafusion_expr::ColumnarValue;
+use datafusion_expr::{ColumnarValue, Expr, ExprSchemable};
 use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
 
 #[derive(Debug)]
@@ -63,6 +63,11 @@ impl ScalarUDFImpl for CoalesceFunc {
         Ok(arg_types[0].clone())
     }
 
+    // If all the element in coalesce is non-null, the result is non-null
+    fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
+        args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
+    }
+
     /// coalesce evaluates to the first value which is not NULL
     fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
         // do not accept 0 arguments.
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs 
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 593dab2bc9..e114efb999 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -240,7 +240,7 @@ mod tests {
             .build()?;
 
         let expected = "Projection: count(Int64(1)) AS count(*) 
[count(*):Int64]\
-        \n  WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC 
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY 
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]] 
[a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST] 
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64;N]\
+        \n  WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC 
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY 
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]] 
[a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST] 
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
         assert_plan_eq(plan, expected)
     }
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 7251a95d77..68ab2e1300 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -17,7 +17,6 @@
 
 //! Optimizer rule for type validation and coercion
 
-use std::collections::HashMap;
 use std::sync::Arc;
 
 use itertools::izip;
@@ -822,9 +821,18 @@ fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> 
Result<DFSchema> {
         .iter()
         .map(|f| f.is_nullable())
         .collect::<Vec<_>>();
+    let mut union_field_meta = base_schema
+        .fields()
+        .iter()
+        .map(|f| f.metadata().clone())
+        .collect::<Vec<_>>();
+
+    let mut metadata = base_schema.metadata().clone();
 
     for (i, plan) in inputs.iter().enumerate().skip(1) {
         let plan_schema = plan.schema();
+        metadata.extend(plan_schema.metadata().clone());
+
         if plan_schema.fields().len() != base_schema.fields().len() {
             return plan_err!(
                 "Union schemas have different number of fields: \
@@ -834,11 +842,13 @@ fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> 
Result<DFSchema> {
                 plan_schema.fields().len()
             );
         }
+
         // coerce data type and nullablity for each field
-        for (union_datatype, union_nullable, plan_field) in izip!(
+        for (union_datatype, union_nullable, union_field_map, plan_field) in 
izip!(
             union_datatypes.iter_mut(),
             union_nullabilities.iter_mut(),
-            plan_schema.fields()
+            union_field_meta.iter_mut(),
+            plan_schema.fields().iter()
         ) {
             let coerced_type =
                 comparison_coercion(union_datatype, 
plan_field.data_type()).ok_or_else(
@@ -852,21 +862,26 @@ fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) -> 
Result<DFSchema> {
                         )
                     },
                 )?;
+
             *union_datatype = coerced_type;
             *union_nullable = *union_nullable || plan_field.is_nullable();
+            union_field_map.extend(plan_field.metadata().clone());
         }
     }
     let union_qualified_fields = izip!(
         base_schema.iter(),
         union_datatypes.into_iter(),
-        union_nullabilities
+        union_nullabilities,
+        union_field_meta.into_iter()
     )
-    .map(|((qualifier, field), datatype, nullable)| {
-        let field = Arc::new(Field::new(field.name().clone(), datatype, 
nullable));
-        (qualifier.cloned(), field)
+    .map(|((qualifier, field), datatype, nullable, metadata)| {
+        let mut field = Field::new(field.name().clone(), datatype, nullable);
+        field.set_metadata(metadata);
+        (qualifier.cloned(), field.into())
     })
     .collect::<Vec<_>>();
-    DFSchema::new_with_metadata(union_qualified_fields, HashMap::new())
+
+    DFSchema::new_with_metadata(union_qualified_fields, metadata)
 }
 
 /// See `<https://github.com/apache/datafusion/pull/2108>`
diff --git a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs 
b/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
index 8185f0fdd5..aa1d1999a3 100644
--- a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
+++ b/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::ScalarValue;
 use datafusion_common::{internal_err, not_impl_err, Result};
 use datafusion_expr::expr::create_function_physical_name;
 use datafusion_expr::AggregateUDF;
@@ -109,6 +110,7 @@ impl AggregateExprBuilder {
         )?;
 
         let data_type = fun.return_type(&input_exprs_types)?;
+        let is_nullable = fun.is_nullable();
         let name = match alias {
             // TODO: Ideally, we should build the name from physical 
expressions
             None => create_function_physical_name(fun.name(), is_distinct, 
&[], None)?,
@@ -127,6 +129,7 @@ impl AggregateExprBuilder {
             is_distinct,
             input_types: input_exprs_types,
             is_reversed,
+            is_nullable,
         }))
     }
 
@@ -194,6 +197,7 @@ pub struct AggregateFunctionExpr {
     is_distinct: bool,
     is_reversed: bool,
     input_types: Vec<DataType>,
+    is_nullable: bool,
 }
 
 impl AggregateFunctionExpr {
@@ -216,6 +220,10 @@ impl AggregateFunctionExpr {
     pub fn is_reversed(&self) -> bool {
         self.is_reversed
     }
+
+    pub fn is_nullable(&self) -> bool {
+        self.is_nullable
+    }
 }
 
 impl AggregateExpr for AggregateFunctionExpr {
@@ -241,7 +249,11 @@ impl AggregateExpr for AggregateFunctionExpr {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(&self.name, self.data_type.clone(), true))
+        Ok(Field::new(
+            &self.name,
+            self.data_type.clone(),
+            self.is_nullable,
+        ))
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
@@ -435,6 +447,10 @@ impl AggregateExpr for AggregateFunctionExpr {
             .is_descending()
             .and_then(|flag| self.field().ok().map(|f| (f, flag)))
     }
+
+    fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
+        self.fun.default_value(data_type)
+    }
 }
 
 impl PartialEq<dyn Any> for AggregateFunctionExpr {
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index 83272fc9b2..130c335d1c 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -51,6 +51,7 @@ pub struct ScalarFunctionExpr {
     name: String,
     args: Vec<Arc<dyn PhysicalExpr>>,
     return_type: DataType,
+    nullable: bool,
 }
 
 impl Debug for ScalarFunctionExpr {
@@ -77,6 +78,7 @@ impl ScalarFunctionExpr {
             name: name.to_owned(),
             args,
             return_type,
+            nullable: true,
         }
     }
 
@@ -99,6 +101,15 @@ impl ScalarFunctionExpr {
     pub fn return_type(&self) -> &DataType {
         &self.return_type
     }
+
+    pub fn with_nullable(mut self, nullable: bool) -> Self {
+        self.nullable = nullable;
+        self
+    }
+
+    pub fn nullable(&self) -> bool {
+        self.nullable
+    }
 }
 
 impl fmt::Display for ScalarFunctionExpr {
@@ -118,7 +129,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
     }
 
     fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
-        Ok(true)
+        Ok(self.nullable)
     }
 
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
@@ -151,12 +162,15 @@ impl PhysicalExpr for ScalarFunctionExpr {
         self: Arc<Self>,
         children: Vec<Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn PhysicalExpr>> {
-        Ok(Arc::new(ScalarFunctionExpr::new(
-            &self.name,
-            Arc::clone(&self.fun),
-            children,
-            self.return_type().clone(),
-        )))
+        Ok(Arc::new(
+            ScalarFunctionExpr::new(
+                &self.name,
+                Arc::clone(&self.fun),
+                children,
+                self.return_type().clone(),
+            )
+            .with_nullable(self.nullable),
+        ))
     }
 
     fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
@@ -209,8 +223,6 @@ impl PartialEq<dyn Any> for ScalarFunctionExpr {
 }
 
 /// Create a physical expression for the UDF.
-///
-/// Arguments:
 pub fn create_physical_expr(
     fun: &ScalarUDF,
     input_phy_exprs: &[Arc<dyn PhysicalExpr>],
@@ -230,10 +242,13 @@ pub fn create_physical_expr(
     let return_type =
         fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?;
 
-    Ok(Arc::new(ScalarFunctionExpr::new(
-        fun.name(),
-        Arc::new(fun.clone()),
-        input_phy_exprs.to_vec(),
-        return_type,
-    )))
+    Ok(Arc::new(
+        ScalarFunctionExpr::new(
+            fun.name(),
+            Arc::new(fun.clone()),
+            input_phy_exprs.to_vec(),
+            return_type,
+        )
+        .with_nullable(fun.is_nullable(args, input_dfschema)),
+    ))
 }
diff --git a/datafusion/physical-expr/src/window/aggregate.rs 
b/datafusion/physical-expr/src/window/aggregate.rs
index 5892f7f3f3..52015f4252 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -176,9 +176,9 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
         value_slice: &[ArrayRef],
         accumulator: &mut Box<dyn Accumulator>,
     ) -> Result<ScalarValue> {
-        let value = if cur_range.start == cur_range.end {
-            // We produce None if the window is empty.
-            ScalarValue::try_from(self.aggregate.field()?.data_type())?
+        if cur_range.start == cur_range.end {
+            self.aggregate
+                .default_value(self.aggregate.field()?.data_type())
         } else {
             // Accumulate any new rows that have entered the window:
             let update_bound = cur_range.end - last_range.end;
@@ -193,8 +193,7 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
                     .collect();
                 accumulator.update_batch(&update)?
             }
-            accumulator.evaluate()?
-        };
-        Ok(value)
+            accumulator.evaluate()
+        }
     }
 }
diff --git a/datafusion/physical-expr/src/window/built_in.rs 
b/datafusion/physical-expr/src/window/built_in.rs
index 04d359903e..8ff277db37 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -26,7 +26,6 @@ use crate::expressions::PhysicalSortExpr;
 use crate::window::window_expr::{get_orderby_values, WindowFn};
 use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
 use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
-
 use arrow::array::{new_empty_array, ArrayRef};
 use arrow::compute::SortOptions;
 use arrow::datatypes::Field;
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs 
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 50e9632b21..afa799e869 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -183,8 +183,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
         accumulator: &mut Box<dyn Accumulator>,
     ) -> Result<ScalarValue> {
         if cur_range.start == cur_range.end {
-            // We produce None if the window is empty.
-            ScalarValue::try_from(self.aggregate.field()?.data_type())
+            self.aggregate
+                .default_value(self.aggregate.field()?.data_type())
         } else {
             // Accumulate any new rows that have entered the window:
             let update_bound = cur_range.end - last_range.end;
@@ -195,6 +195,7 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
                     .collect();
                 accumulator.update_batch(&update)?
             }
+
             // Remove rows that have now left the window:
             let retract_bound = cur_range.start - last_range.start;
             if retract_bound > 0 {
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 29ead35895..efb5dea1ec 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -27,6 +27,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use super::utils::create_schema;
 use crate::expressions::PhysicalSortExpr;
 use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use crate::windows::{
@@ -38,11 +39,11 @@ use crate::{
     ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
-
+use ahash::RandomState;
 use arrow::{
     array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
     compute::{concat, concat_batches, sort_to_indices},
-    datatypes::{Schema, SchemaBuilder, SchemaRef},
+    datatypes::SchemaRef,
     record_batch::RecordBatch,
 };
 use datafusion_common::hash_utils::create_hashes;
@@ -59,8 +60,6 @@ use datafusion_physical_expr::window::{
     PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState,
 };
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
-
-use ahash::RandomState;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use hashbrown::raw::RawTable;
@@ -852,20 +851,6 @@ impl SortedSearch {
     }
 }
 
-fn create_schema(
-    input_schema: &Schema,
-    window_expr: &[Arc<dyn WindowExpr>],
-) -> Result<Schema> {
-    let capacity = input_schema.fields().len() + window_expr.len();
-    let mut builder = SchemaBuilder::with_capacity(capacity);
-    builder.extend(input_schema.fields.iter().cloned());
-    // append results to the schema
-    for expr in window_expr {
-        builder.push(expr.field()?);
-    }
-    Ok(builder.finish())
-}
-
 /// Stream for the bounded window aggregation plan.
 pub struct BoundedWindowAggStream {
     schema: SchemaRef,
@@ -1736,7 +1721,7 @@ mod tests {
 
         let expected_plan = vec![
             "ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { 
name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 
}]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, 
options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]",
-            "  BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 
0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: 
[[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: 
SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name: 
\"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: 
\\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: 
\\\"sn\\\", index: 0 }, options: Sor [...]
+            "  BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 
0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: 
[[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: 
SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name: 
\"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: 
\\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: 
\\\"sn\\\", index: 0 }, options: Sor [...]
             "    StreamingTableExec: partition_sizes=1, projection=[sn, hash], 
infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]",
         ];
 
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 154beb79f7..f938f4410a 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -46,6 +46,7 @@ use 
datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilde
 use itertools::Itertools;
 
 mod bounded_window_agg_exec;
+mod utils;
 mod window_agg_exec;
 
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
diff --git a/datafusion/physical-plan/src/windows/utils.rs 
b/datafusion/physical-plan/src/windows/utils.rs
new file mode 100644
index 0000000000..3cf92daae0
--- /dev/null
+++ b/datafusion/physical-plan/src/windows/utils.rs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::{Schema, SchemaBuilder};
+use datafusion_common::Result;
+use datafusion_physical_expr::window::WindowExpr;
+use std::sync::Arc;
+
+pub(crate) fn create_schema(
+    input_schema: &Schema,
+    window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Schema> {
+    let capacity = input_schema.fields().len() + window_expr.len();
+    let mut builder = SchemaBuilder::with_capacity(capacity);
+    builder.extend(input_schema.fields().iter().cloned());
+    // append results to the schema
+    for expr in window_expr {
+        builder.push(expr.field()?);
+    }
+    Ok(builder.finish())
+}
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 1d5c6061a0..d2f7090fca 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -22,6 +22,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use super::utils::create_schema;
 use crate::expressions::PhysicalSortExpr;
 use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use crate::windows::{
@@ -33,10 +34,9 @@ use crate::{
     ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
     RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
 };
-
 use arrow::array::ArrayRef;
 use arrow::compute::{concat, concat_batches};
-use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef};
+use arrow::datatypes::SchemaRef;
 use arrow::error::ArrowError;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::stats::Precision;
@@ -44,7 +44,6 @@ use datafusion_common::utils::{evaluate_partition_ranges, 
transpose};
 use datafusion_common::{internal_err, Result};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalSortRequirement;
-
 use futures::{ready, Stream, StreamExt};
 
 /// Window execution plan
@@ -265,20 +264,6 @@ impl ExecutionPlan for WindowAggExec {
     }
 }
 
-fn create_schema(
-    input_schema: &Schema,
-    window_expr: &[Arc<dyn WindowExpr>],
-) -> Result<Schema> {
-    let capacity = input_schema.fields().len() + window_expr.len();
-    let mut builder = SchemaBuilder::with_capacity(capacity);
-    builder.extend(input_schema.fields().iter().cloned());
-    // append results to the schema
-    for expr in window_expr {
-        builder.push(expr.field()?);
-    }
-    Ok(builder.finish())
-}
-
 /// Compute the window aggregate columns
 fn compute_window_aggregates(
     window_expr: &[Arc<dyn WindowExpr>],
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index f42dec4014..4e0ce33f13 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -92,6 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         // having and group by clause may reference aliases defined in select 
projection
         let projected_plan = self.project(base_plan.clone(), 
select_exprs.clone())?;
+
         // Place the fields of the base plan at the front so that when there 
are references
         // with the same name, the fields of the base plan will be searched 
first.
         // See https://github.com/apache/datafusion/issues/9162
@@ -288,9 +289,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             plan
         };
 
-        let plan = self.order_by(plan, order_by_rex)?;
-
-        Ok(plan)
+        self.order_by(plan, order_by_rex)
     }
 
     /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection
@@ -519,8 +518,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     planner_context.set_outer_from_schema(left_schema);
                 }
                 planner_context.set_outer_from_schema(old_outer_from_schema);
-
-                Ok(left.build()?)
+                left.build()
             }
         }
     }
diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt 
b/datafusion/sqllogictest/test_files/count_star_rule.slt
index b552e60537..3625da68b3 100644
--- a/datafusion/sqllogictest/test_files/count_star_rule.slt
+++ b/datafusion/sqllogictest/test_files/count_star_rule.slt
@@ -85,7 +85,7 @@ logical_plan
 03)----TableScan: t1 projection=[a]
 physical_plan
 01)ProjectionExec: expr=[a@0 as a, count() PARTITION BY [t1.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
-02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY 
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: 
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }]
+02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY 
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: 
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), 
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }]
 03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
 04)------MemoryExec: partitions=1, partition_sizes=[1]
 
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index 9115cb5325..230ea4d98f 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -68,7 +68,7 @@ physical_plan
 02)--ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
 03)----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
 04)------ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as count(*)  [...]
-05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
 06)----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST], 
preserve_partitioning=[true]
 07)------------CoalesceBatchesExec: target_batch_size=8192
 08)--------------RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8
@@ -128,7 +128,7 @@ physical_plan
 01)DataSinkExec: sink=MemoryTable (partitions=1)
 02)--CoalescePartitionsExec
 03)----ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
+04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
 05)--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST], 
preserve_partitioning=[true]
 06)----------CoalesceBatchesExec: target_batch_size=8192
 07)------------RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8
@@ -179,7 +179,7 @@ physical_plan
 02)--ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
 03)----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
 04)------ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
-05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
 06)----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST], 
preserve_partitioning=[true]
 07)------------CoalesceBatchesExec: target_batch_size=8192
 08)--------------RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 8f6bafd92e..c40f62c3ba 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -357,7 +357,7 @@ physical_plan
 02)--ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
 03)----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
 04)------ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as count(*)  [...]
-05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
 06)----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST], 
preserve_partitioning=[true]
 07)------------CoalesceBatchesExec: target_batch_size=8192
 08)--------------RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8
@@ -418,7 +418,7 @@ physical_plan
 01)DataSinkExec: sink=ParquetSink(file_groups=[])
 02)--CoalescePartitionsExec
 03)----ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
+04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
 05)--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST], 
preserve_partitioning=[true]
 06)----------CoalesceBatchesExec: target_batch_size=8192
 07)------------RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index e1fd5eb726..d2c013373d 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -150,6 +150,21 @@ GROUP BY c1
 2 2
 3 3
 
+# This test goes through schema check in aggregate plan, if count's nullable 
is not matched, this test failed
+query II rowsort
+SELECT c1, SUM(c2) FROM (
+    SELECT 1 as c1, 1::int as c2
+    UNION
+    SELECT 2 as c1, 2::int as c2
+    UNION
+    SELECT 3 as c1, count(1) as c2
+) as a
+GROUP BY c1
+----
+1 1
+2 2
+3 1
+
 # union_all_with_count
 statement ok
 CREATE table t as SELECT 1 as a
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index af882c3a40..78055f8c1c 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1311,7 +1311,7 @@ logical_plan
 05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4]
 physical_plan
 01)ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as 
sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1, 
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS 
BETWEEN 1  [...]
-02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "count(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Fo [...]
+02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING: Ok(Field { name: "count(*) PARTITION BY [aggregate_test_100.c1] 
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: F [...]
 03)----SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST], 
preserve_partitioning=[true]
 04)------CoalesceBatchesExec: target_batch_size=4096
 05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
@@ -2558,10 +2558,10 @@ physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3, 
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2, 
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as 
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as 
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as 
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
 02)--SortExec: TopK(fetch=5), expr=[inc_col@24 DESC], 
preserve_partitioning=[false]
 03)----ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING@13 as sum1, sum(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1 
FOLLOWING@14 as sum2, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10 
FOLLOWING@15 as sum3, min(annotated_data_finite.inc_col) ORDER  [...]
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)), is_causal: false }, count(*) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "cou [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.desc_col) ROWS 
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)), 
end_bound: Following(UInt64(1)), is_causal: false }, count(*) ROWS BETWEEN 8 
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "cou [...]
 05)--------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1, 
inc_col@3 as inc_col, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING@5 as sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8 
FOLLOWING@6  [...]
-06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follo [...]
-07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fo [...]
+06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(10)), end_bound: Follo [...]
+07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) 
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING 
AND 4 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(4)), end_bound: Fo [...]
 08)--------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as 
__common_expr_1, CAST(inc_col@1 AS Int64) as __common_expr_2, ts@0 as ts, 
inc_col@1 as inc_col, desc_col@2 as desc_col]
 09)----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
@@ -2716,8 +2716,8 @@ physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, 
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as 
count2, avg1@8 as avg1, avg2@9 as avg2]
 02)--SortExec: TopK(fetch=5), expr=[inc_col@10 ASC NULLS LAST], 
preserve_partitioning=[false]
 03)----ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@9 as sum1, sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@4 as sum2, min(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING@10 as min1, min(annotated_data_ [...]
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), end_ [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)) [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING 
AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY 
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(Int32(NULL)), end_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER 
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, 
start_bound: Preceding(Int32(NULL)) [...]
 06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, CAST(inc_col@1 AS Float64) as __common_expr_2, ts@0 as ts, 
inc_col@1 as inc_col]
 07)------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, 
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
 
@@ -2813,8 +2813,8 @@ physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
 02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, count(ann [...]
 03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
+05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
 06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
@@ -2859,8 +2859,8 @@ physical_plan
 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
 02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, count(ann [...]
 03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
+05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
 06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
 07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
@@ -4094,7 +4094,7 @@ logical_plan
 04)------TableScan: a projection=[a]
 physical_plan
 01)ProjectionExec: expr=[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as count(*) PARTITION 
BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]
-02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow, 
is_causal: false }], mode=[Sorted]
+02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow, 
is_causal: false }], mode=[Sorted]
 03)----CoalesceBatchesExec: target_batch_size=4096
 04)------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
 05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to