This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6786f1592f Fix the schema mismatch between logical and physical for
aggregate function, add `AggregateUDFImpl::is_null` (#11989)
6786f1592f is described below
commit 6786f1592f6b923210673c0246ea121a714aec49
Author: Jay Zhan <[email protected]>
AuthorDate: Wed Aug 21 08:47:49 2024 +0800
Fix the schema mismatch between logical and physical for aggregate
function, add `AggregateUDFImpl::is_null` (#11989)
* schema assertion and fix the mismatch from logical and physical
Signed-off-by: jayzhan211 <[email protected]>
* add more msg
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* rm test1
Signed-off-by: jayzhan211 <[email protected]>
* nullable for scalar func
Signed-off-by: jayzhan211 <[email protected]>
* nullable
Signed-off-by: jayzhan211 <[email protected]>
* rm field
Signed-off-by: jayzhan211 <[email protected]>
* rm unsafe block and use internal error
Signed-off-by: jayzhan211 <[email protected]>
* rm func_name
Signed-off-by: jayzhan211 <[email protected]>
* rm nullable option
Signed-off-by: jayzhan211 <[email protected]>
* add test
Signed-off-by: jayzhan211 <[email protected]>
* add more msg
Signed-off-by: jayzhan211 <[email protected]>
* fix test
Signed-off-by: jayzhan211 <[email protected]>
* rm row number
Signed-off-by: jayzhan211 <[email protected]>
* Update datafusion/expr/src/udaf.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/expr/src/udaf.rs
Co-authored-by: Andrew Lamb <[email protected]>
* fix failed test from #12050
Signed-off-by: jayzhan211 <[email protected]>
* cleanup
Signed-off-by: jayzhan211 <[email protected]>
* add doc
Signed-off-by: jayzhan211 <[email protected]>
---------
Signed-off-by: jayzhan211 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/physical_optimizer/enforce_sorting.rs | 34 ++++++++--------
.../core/src/physical_optimizer/sanity_checker.rs | 4 +-
datafusion/core/src/physical_planner.rs | 11 ++++--
datafusion/expr/src/expr_schema.rs | 26 +++++++++----
datafusion/expr/src/logical_plan/plan.rs | 12 +++---
datafusion/expr/src/udaf.rs | 29 +++++++++++++-
datafusion/expr/src/udf.rs | 8 ++++
.../functions-aggregate-common/src/aggregate.rs | 10 +++--
datafusion/functions-aggregate/src/count.rs | 10 ++++-
datafusion/functions/src/core/arrow_cast.rs | 8 +++-
datafusion/functions/src/core/coalesce.rs | 9 ++++-
.../optimizer/src/analyzer/count_wildcard_rule.rs | 2 +-
datafusion/optimizer/src/analyzer/type_coercion.rs | 31 +++++++++++----
.../src/aggregate.rs | 18 ++++++++-
datafusion/physical-expr/src/scalar_function.rs | 45 ++++++++++++++--------
datafusion/physical-expr/src/window/aggregate.rs | 11 +++---
datafusion/physical-expr/src/window/built_in.rs | 1 -
.../physical-expr/src/window/sliding_aggregate.rs | 5 ++-
.../src/windows/bounded_window_agg_exec.rs | 23 ++---------
datafusion/physical-plan/src/windows/mod.rs | 1 +
datafusion/physical-plan/src/windows/utils.rs | 35 +++++++++++++++++
.../physical-plan/src/windows/window_agg_exec.rs | 19 +--------
datafusion/sql/src/select.rs | 8 ++--
.../sqllogictest/test_files/count_star_rule.slt | 2 +-
datafusion/sqllogictest/test_files/insert.slt | 6 +--
.../sqllogictest/test_files/insert_to_external.slt | 4 +-
datafusion/sqllogictest/test_files/union.slt | 15 ++++++++
datafusion/sqllogictest/test_files/window.slt | 22 +++++------
28 files changed, 274 insertions(+), 135 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index bda6d598b6..14afe35466 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -845,17 +845,17 @@ mod tests {
let physical_plan = bounded_window_exec("non_nullable_col",
sort_exprs, filter);
- let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: expr=[non_nullable_col@1 ASC NULLS LAST],
preserve_partitioning=[false]",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
- let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
CurrentRow, end_bound: Following(NULL), is_causal: false }]",
+ let expected_optimized = ["WindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
CurrentRow, end_bound: Following(NULL), is_causal: false }]",
" FilterExec: NOT non_nullable_col@1",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
@@ -1722,7 +1722,7 @@ mod tests {
// corresponding SortExecs together. Also, the inputs of these
`SortExec`s
// are not necessarily the same to be able to remove them.
let expected_input = [
- "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 DESC NULLS LAST]",
" UnionExec",
" SortExec: expr=[nullable_col@0 DESC NULLS LAST],
preserve_partitioning=[false]",
@@ -1730,7 +1730,7 @@ mod tests {
" SortExec: expr=[nullable_col@0 DESC NULLS LAST],
preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC]"];
let expected_optimized = [
- "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(NULL), is_causal: false }]",
+ "WindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type:
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Range, start_bound: CurrentRow, end_bound:
Following(NULL), is_causal: false }]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC, non_nullable_col@1 ASC]",
@@ -1760,14 +1760,14 @@ mod tests {
// The `WindowAggExec` can get its required sorting from the leaf
nodes directly.
// The unnecessary SortExecs should be removed
- let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" UnionExec",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC],
preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC],
preserve_partitioning=[false]",
" ParquetExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC]"];
- let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field
{ name: \"count\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
+ let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field
{ name: \"count\", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: file_groups={1 group: [[x]]},
projection=[nullable_col, non_nullable_col], output_ordering=[nullable_col@0
ASC]",
@@ -2060,15 +2060,15 @@ mod tests {
let physical_plan =
bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);
- let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ let expected_input = ["BoundedWindowAggExec: wdw=[count: Ok(Field {
name: \"count\", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
- let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field
{ name: \"count\", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ let expected_optimized = ["BoundedWindowAggExec: wdw=[count: Ok(Field
{ name: \"count\", data_type: Int64, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }],
mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
@@ -2134,7 +2134,7 @@ mod tests {
let expected_input = vec![
"SortExec: expr=[nullable_col@0 ASC],
preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ " BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_eq!(
@@ -2386,7 +2386,7 @@ mod tests {
let physical_plan = bounded_window_exec("a", sort_exprs, spm);
let expected_input = [
- "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC,b@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
@@ -2394,7 +2394,7 @@ mod tests {
" CsvExec: file_groups={1 group: [[x]]}, projection=[a,
b, c, d, e], has_header=false",
];
let expected_optimized = [
- "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[a@0 ASC,b@1 ASC],
preserve_partitioning=[false]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs
b/datafusion/core/src/physical_optimizer/sanity_checker.rs
index 6e37c3f40f..bd80d31224 100644
--- a/datafusion/core/src/physical_optimizer/sanity_checker.rs
+++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs
@@ -437,7 +437,7 @@ mod tests {
let sort = sort_exec(sort_exprs.clone(), source);
let bw = bounded_window_exec("c9", sort_exprs, sort);
assert_plan(bw.as_ref(), vec![
- "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" SortExec: expr=[c9@0 ASC NULLS LAST],
preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
@@ -460,7 +460,7 @@ mod tests {
)];
let bw = bounded_window_exec("c9", sort_exprs, source);
assert_plan(bw.as_ref(), vec![
- "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
+ "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]",
" MemoryExec: partitions=1, partition_sizes=[0]"
]);
// Order requirement of the `BoundedWindowAggExec` is not satisfied.
We expect to receive error during sanity check.
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 41ab4ccc98..6536f9a014 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -670,6 +670,12 @@ impl DefaultPhysicalPlanner {
let input_exec = children.one()?;
let physical_input_schema = input_exec.schema();
let logical_input_schema = input.as_ref().schema();
+ let physical_input_schema_from_logical: Arc<Schema> =
+ logical_input_schema.as_ref().clone().into();
+
+ if physical_input_schema != physical_input_schema_from_logical
{
+ return internal_err!("Physical input schema should be the
same as the one converted from logical input schema.");
+ }
let groups = self.create_grouping_physical_expr(
group_expr,
@@ -1548,7 +1554,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
e: &Expr,
name: Option<String>,
logical_input_schema: &DFSchema,
- _physical_input_schema: &Schema,
+ physical_input_schema: &Schema,
execution_props: &ExecutionProps,
) -> Result<AggregateExprWithOptionalArgs> {
match e {
@@ -1599,11 +1605,10 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
let ordering_reqs: Vec<PhysicalSortExpr> =
physical_sort_exprs.clone().unwrap_or(vec![]);
- let schema: Schema = logical_input_schema.clone().into();
let agg_expr =
AggregateExprBuilder::new(func.to_owned(),
physical_args.to_vec())
.order_by(ordering_reqs.to_vec())
- .schema(Arc::new(schema))
+ .schema(Arc::new(physical_input_schema.to_owned()))
.alias(name)
.with_ignore_nulls(ignore_nulls)
.with_distinct(*distinct)
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index f6489fef14..10ec10e612 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -335,18 +335,28 @@ impl ExprSchemable for Expr {
}
}
Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
+ Expr::ScalarFunction(ScalarFunction { func, args }) => {
+ Ok(func.is_nullable(args, input_schema))
+ }
Expr::AggregateFunction(AggregateFunction { func, .. }) => {
- // TODO: UDF should be able to customize nullability
- if func.name() == "count" {
- Ok(false)
- } else {
- Ok(true)
- }
+ Ok(func.is_nullable())
}
+ Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
+ WindowFunctionDefinition::BuiltInWindowFunction(func) => {
+ if func.name() == "RANK"
+ || func.name() == "NTILE"
+ || func.name() == "CUME_DIST"
+ {
+ Ok(false)
+ } else {
+ Ok(true)
+ }
+ }
+ WindowFunctionDefinition::AggregateUDF(func) =>
Ok(func.is_nullable()),
+ WindowFunctionDefinition::WindowUDF(udwf) =>
Ok(udwf.nullable()),
+ },
Expr::ScalarVariable(_, _)
| Expr::TryCast { .. }
- | Expr::ScalarFunction(..)
- | Expr::WindowFunction { .. }
| Expr::Unnest(_)
| Expr::Placeholder(_) => Ok(true),
Expr::IsNull(_)
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index f9b3035167..f93b7c0fed 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -2015,10 +2015,9 @@ impl Projection {
/// produced by the projection operation. If the schema computation is
successful,
/// the `Result` will contain the schema; otherwise, it will contain an error.
pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) ->
Result<Arc<DFSchema>> {
- let mut schema = DFSchema::new_with_metadata(
- exprlist_to_fields(exprs, input)?,
- input.schema().metadata().clone(),
- )?;
+ let metadata = input.schema().metadata().clone();
+ let mut schema =
+ DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?,
metadata)?;
schema =
schema.with_functional_dependencies(calc_func_dependencies_for_project(
exprs, input,
)?)?;
@@ -2655,7 +2654,10 @@ impl Aggregate {
qualified_fields.extend(exprlist_to_fields(aggr_expr.as_slice(),
&input)?);
- let schema = DFSchema::new_with_metadata(qualified_fields,
HashMap::new())?;
+ let schema = DFSchema::new_with_metadata(
+ qualified_fields,
+ input.schema().metadata().clone(),
+ )?;
Self::try_new_with_schema(input, group_expr, aggr_expr,
Arc::new(schema))
}
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index d136aeaf09..cb278c7679 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -25,7 +25,7 @@ use std::vec;
use arrow::datatypes::{DataType, Field};
-use datafusion_common::{exec_err, not_impl_err, Result};
+use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
use crate::expr::AggregateFunction;
use crate::function::{
@@ -163,6 +163,10 @@ impl AggregateUDF {
self.inner.name()
}
+ pub fn is_nullable(&self) -> bool {
+ self.inner.is_nullable()
+ }
+
/// Returns the aliases for this function.
pub fn aliases(&self) -> &[String] {
self.inner.aliases()
@@ -257,6 +261,11 @@ impl AggregateUDF {
pub fn is_descending(&self) -> Option<bool> {
self.inner.is_descending()
}
+
+ /// See [`AggregateUDFImpl::default_value`] for more details.
+ pub fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
+ self.inner.default_value(data_type)
+ }
}
impl<F> From<F> for AggregateUDF
@@ -342,6 +351,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// the arguments
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
+ /// Whether the aggregate function is nullable.
+ ///
+ /// Nullable means that that the function could return `null` for any
inputs.
+ /// For example, aggregate functions like `COUNT` always return a non null
value
+ /// but others like `MIN` will return `NULL` if there is nullable input.
+ /// Note that if the function is declared as *not* nullable, make sure the
[`AggregateUDFImpl::default_value`] is `non-null`
+ fn is_nullable(&self) -> bool {
+ true
+ }
+
/// Return a new [`Accumulator`] that aggregates values for a specific
/// group during query execution.
///
@@ -552,6 +571,14 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
fn is_descending(&self) -> Option<bool> {
None
}
+
+ /// Returns default value of the function given the input is all `null`.
+ ///
+ /// Most of the aggregate function return Null if input is Null,
+ /// while `count` returns 0 if input is Null
+ fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
+ ScalarValue::try_from(data_type)
+ }
}
pub enum ReversedUDAF {
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index f5434726e2..a4584038e4 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -205,6 +205,10 @@ impl ScalarUDF {
self.inner.invoke(args)
}
+ pub fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
+ self.inner.is_nullable(args, schema)
+ }
+
/// Invoke the function without `args` but number of rows, returning the
appropriate result.
///
/// See [`ScalarUDFImpl::invoke_no_args`] for more details.
@@ -416,6 +420,10 @@ pub trait ScalarUDFImpl: Debug + Send + Sync {
self.return_type(arg_types)
}
+ fn is_nullable(&self, _args: &[Expr], _schema: &dyn ExprSchema) -> bool {
+ true
+ }
+
/// Invoke the function on `args`, returning the appropriate result
///
/// The function will be invoked passed with the slice of [`ColumnarValue`]
diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs
b/datafusion/functions-aggregate-common/src/aggregate.rs
index 016e54e688..698d1350cb 100644
--- a/datafusion/functions-aggregate-common/src/aggregate.rs
+++ b/datafusion/functions-aggregate-common/src/aggregate.rs
@@ -19,9 +19,8 @@
//! (built-in and custom) need to satisfy.
use crate::order::AggregateOrderSensitivity;
-use arrow::datatypes::Field;
-use datafusion_common::exec_err;
-use datafusion_common::{not_impl_err, Result};
+use arrow::datatypes::{DataType, Field};
+use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
@@ -171,6 +170,11 @@ pub trait AggregateExpr: Send + Sync + Debug +
PartialEq<dyn Any> {
fn get_minmax_desc(&self) -> Option<(Field, bool)> {
None
}
+
+ /// Returns default value of the function given the input is Null
+ /// Most of the aggregate function return Null if input is Null,
+ /// while `count` returns 0 if input is Null
+ fn default_value(&self, data_type: &DataType) -> Result<ScalarValue>;
}
/// Stores the physical expressions used inside the `AggregateExpr`.
diff --git a/datafusion/functions-aggregate/src/count.rs
b/datafusion/functions-aggregate/src/count.rs
index 04b1921c7b..417e28e72a 100644
--- a/datafusion/functions-aggregate/src/count.rs
+++ b/datafusion/functions-aggregate/src/count.rs
@@ -121,6 +121,10 @@ impl AggregateUDFImpl for Count {
Ok(DataType::Int64)
}
+ fn is_nullable(&self) -> bool {
+ false
+ }
+
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
if args.is_distinct {
Ok(vec![Field::new_list(
@@ -133,7 +137,7 @@ impl AggregateUDFImpl for Count {
Ok(vec![Field::new(
format_state_name(args.name, "count"),
DataType::Int64,
- true,
+ false,
)])
}
}
@@ -283,6 +287,10 @@ impl AggregateUDFImpl for Count {
fn reverse_expr(&self) -> ReversedUDAF {
ReversedUDAF::Identical
}
+
+ fn default_value(&self, _data_type: &DataType) -> Result<ScalarValue> {
+ Ok(ScalarValue::Int64(Some(0)))
+ }
}
#[derive(Debug)]
diff --git a/datafusion/functions/src/core/arrow_cast.rs
b/datafusion/functions/src/core/arrow_cast.rs
index c4db3e7704..a1b74228a5 100644
--- a/datafusion/functions/src/core/arrow_cast.rs
+++ b/datafusion/functions/src/core/arrow_cast.rs
@@ -26,7 +26,9 @@ use datafusion_common::{
};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
-use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature,
Volatility};
+use datafusion_expr::{
+ ColumnarValue, Expr, ExprSchemable, ScalarUDFImpl, Signature, Volatility,
+};
/// Implements casting to arbitrary arrow types (rather than SQL types)
///
@@ -87,6 +89,10 @@ impl ScalarUDFImpl for ArrowCastFunc {
internal_err!("arrow_cast should return type from exprs")
}
+ fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
+ args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
+ }
+
fn return_type_from_exprs(
&self,
args: &[Expr],
diff --git a/datafusion/functions/src/core/coalesce.rs
b/datafusion/functions/src/core/coalesce.rs
index 15a3ddd9d6..19db58c181 100644
--- a/datafusion/functions/src/core/coalesce.rs
+++ b/datafusion/functions/src/core/coalesce.rs
@@ -22,9 +22,9 @@ use arrow::compute::kernels::zip::zip;
use arrow::compute::{and, is_not_null, is_null};
use arrow::datatypes::DataType;
-use datafusion_common::{exec_err, Result};
+use datafusion_common::{exec_err, ExprSchema, Result};
use datafusion_expr::type_coercion::binary::type_union_resolution;
-use datafusion_expr::ColumnarValue;
+use datafusion_expr::{ColumnarValue, Expr, ExprSchemable};
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
#[derive(Debug)]
@@ -63,6 +63,11 @@ impl ScalarUDFImpl for CoalesceFunc {
Ok(arg_types[0].clone())
}
+ // If all the element in coalesce is non-null, the result is non-null
+ fn is_nullable(&self, args: &[Expr], schema: &dyn ExprSchema) -> bool {
+ args.iter().any(|e| e.nullable(schema).ok().unwrap_or(true))
+ }
+
/// coalesce evaluates to the first value which is not NULL
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
// do not accept 0 arguments.
diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
index 593dab2bc9..e114efb999 100644
--- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
+++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs
@@ -240,7 +240,7 @@ mod tests {
.build()?;
let expected = "Projection: count(Int64(1)) AS count(*)
[count(*):Int64]\
- \n WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]]
[a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST]
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64;N]\
+ \n WindowAggr: windowExpr=[[count(Int64(1)) ORDER BY [test.a DESC
NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING AS count(*) ORDER BY
[test.a DESC NULLS FIRST] RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING]]
[a:UInt32, b:UInt32, c:UInt32, count(*) ORDER BY [test.a DESC NULLS FIRST]
RANGE BETWEEN 6 PRECEDING AND 2 FOLLOWING:Int64]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_plan_eq(plan, expected)
}
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 7251a95d77..68ab2e1300 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -17,7 +17,6 @@
//! Optimizer rule for type validation and coercion
-use std::collections::HashMap;
use std::sync::Arc;
use itertools::izip;
@@ -822,9 +821,18 @@ fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) ->
Result<DFSchema> {
.iter()
.map(|f| f.is_nullable())
.collect::<Vec<_>>();
+ let mut union_field_meta = base_schema
+ .fields()
+ .iter()
+ .map(|f| f.metadata().clone())
+ .collect::<Vec<_>>();
+
+ let mut metadata = base_schema.metadata().clone();
for (i, plan) in inputs.iter().enumerate().skip(1) {
let plan_schema = plan.schema();
+ metadata.extend(plan_schema.metadata().clone());
+
if plan_schema.fields().len() != base_schema.fields().len() {
return plan_err!(
"Union schemas have different number of fields: \
@@ -834,11 +842,13 @@ fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) ->
Result<DFSchema> {
plan_schema.fields().len()
);
}
+
// coerce data type and nullablity for each field
- for (union_datatype, union_nullable, plan_field) in izip!(
+ for (union_datatype, union_nullable, union_field_map, plan_field) in
izip!(
union_datatypes.iter_mut(),
union_nullabilities.iter_mut(),
- plan_schema.fields()
+ union_field_meta.iter_mut(),
+ plan_schema.fields().iter()
) {
let coerced_type =
comparison_coercion(union_datatype,
plan_field.data_type()).ok_or_else(
@@ -852,21 +862,26 @@ fn coerce_union_schema(inputs: &[Arc<LogicalPlan>]) ->
Result<DFSchema> {
)
},
)?;
+
*union_datatype = coerced_type;
*union_nullable = *union_nullable || plan_field.is_nullable();
+ union_field_map.extend(plan_field.metadata().clone());
}
}
let union_qualified_fields = izip!(
base_schema.iter(),
union_datatypes.into_iter(),
- union_nullabilities
+ union_nullabilities,
+ union_field_meta.into_iter()
)
- .map(|((qualifier, field), datatype, nullable)| {
- let field = Arc::new(Field::new(field.name().clone(), datatype,
nullable));
- (qualifier.cloned(), field)
+ .map(|((qualifier, field), datatype, nullable, metadata)| {
+ let mut field = Field::new(field.name().clone(), datatype, nullable);
+ field.set_metadata(metadata);
+ (qualifier.cloned(), field.into())
})
.collect::<Vec<_>>();
- DFSchema::new_with_metadata(union_qualified_fields, HashMap::new())
+
+ DFSchema::new_with_metadata(union_qualified_fields, metadata)
}
/// See `<https://github.com/apache/datafusion/pull/2108>`
diff --git a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
b/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
index 8185f0fdd5..aa1d1999a3 100644
--- a/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
+++ b/datafusion/physical-expr-functions-aggregate/src/aggregate.rs
@@ -16,6 +16,7 @@
// under the License.
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::ScalarValue;
use datafusion_common::{internal_err, not_impl_err, Result};
use datafusion_expr::expr::create_function_physical_name;
use datafusion_expr::AggregateUDF;
@@ -109,6 +110,7 @@ impl AggregateExprBuilder {
)?;
let data_type = fun.return_type(&input_exprs_types)?;
+ let is_nullable = fun.is_nullable();
let name = match alias {
// TODO: Ideally, we should build the name from physical
expressions
None => create_function_physical_name(fun.name(), is_distinct,
&[], None)?,
@@ -127,6 +129,7 @@ impl AggregateExprBuilder {
is_distinct,
input_types: input_exprs_types,
is_reversed,
+ is_nullable,
}))
}
@@ -194,6 +197,7 @@ pub struct AggregateFunctionExpr {
is_distinct: bool,
is_reversed: bool,
input_types: Vec<DataType>,
+ is_nullable: bool,
}
impl AggregateFunctionExpr {
@@ -216,6 +220,10 @@ impl AggregateFunctionExpr {
pub fn is_reversed(&self) -> bool {
self.is_reversed
}
+
+ pub fn is_nullable(&self) -> bool {
+ self.is_nullable
+ }
}
impl AggregateExpr for AggregateFunctionExpr {
@@ -241,7 +249,11 @@ impl AggregateExpr for AggregateFunctionExpr {
}
fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, self.data_type.clone(), true))
+ Ok(Field::new(
+ &self.name,
+ self.data_type.clone(),
+ self.is_nullable,
+ ))
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
@@ -435,6 +447,10 @@ impl AggregateExpr for AggregateFunctionExpr {
.is_descending()
.and_then(|flag| self.field().ok().map(|f| (f, flag)))
}
+
+ fn default_value(&self, data_type: &DataType) -> Result<ScalarValue> {
+ self.fun.default_value(data_type)
+ }
}
impl PartialEq<dyn Any> for AggregateFunctionExpr {
diff --git a/datafusion/physical-expr/src/scalar_function.rs
b/datafusion/physical-expr/src/scalar_function.rs
index 83272fc9b2..130c335d1c 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -51,6 +51,7 @@ pub struct ScalarFunctionExpr {
name: String,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
+ nullable: bool,
}
impl Debug for ScalarFunctionExpr {
@@ -77,6 +78,7 @@ impl ScalarFunctionExpr {
name: name.to_owned(),
args,
return_type,
+ nullable: true,
}
}
@@ -99,6 +101,15 @@ impl ScalarFunctionExpr {
pub fn return_type(&self) -> &DataType {
&self.return_type
}
+
+ pub fn with_nullable(mut self, nullable: bool) -> Self {
+ self.nullable = nullable;
+ self
+ }
+
+ pub fn nullable(&self) -> bool {
+ self.nullable
+ }
}
impl fmt::Display for ScalarFunctionExpr {
@@ -118,7 +129,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
- Ok(true)
+ Ok(self.nullable)
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
@@ -151,12 +162,15 @@ impl PhysicalExpr for ScalarFunctionExpr {
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
- Ok(Arc::new(ScalarFunctionExpr::new(
- &self.name,
- Arc::clone(&self.fun),
- children,
- self.return_type().clone(),
- )))
+ Ok(Arc::new(
+ ScalarFunctionExpr::new(
+ &self.name,
+ Arc::clone(&self.fun),
+ children,
+ self.return_type().clone(),
+ )
+ .with_nullable(self.nullable),
+ ))
}
fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
@@ -209,8 +223,6 @@ impl PartialEq<dyn Any> for ScalarFunctionExpr {
}
/// Create a physical expression for the UDF.
-///
-/// Arguments:
pub fn create_physical_expr(
fun: &ScalarUDF,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
@@ -230,10 +242,13 @@ pub fn create_physical_expr(
let return_type =
fun.return_type_from_exprs(args, input_dfschema, &input_expr_types)?;
- Ok(Arc::new(ScalarFunctionExpr::new(
- fun.name(),
- Arc::new(fun.clone()),
- input_phy_exprs.to_vec(),
- return_type,
- )))
+ Ok(Arc::new(
+ ScalarFunctionExpr::new(
+ fun.name(),
+ Arc::new(fun.clone()),
+ input_phy_exprs.to_vec(),
+ return_type,
+ )
+ .with_nullable(fun.is_nullable(args, input_dfschema)),
+ ))
}
diff --git a/datafusion/physical-expr/src/window/aggregate.rs
b/datafusion/physical-expr/src/window/aggregate.rs
index 5892f7f3f3..52015f4252 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -176,9 +176,9 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
value_slice: &[ArrayRef],
accumulator: &mut Box<dyn Accumulator>,
) -> Result<ScalarValue> {
- let value = if cur_range.start == cur_range.end {
- // We produce None if the window is empty.
- ScalarValue::try_from(self.aggregate.field()?.data_type())?
+ if cur_range.start == cur_range.end {
+ self.aggregate
+ .default_value(self.aggregate.field()?.data_type())
} else {
// Accumulate any new rows that have entered the window:
let update_bound = cur_range.end - last_range.end;
@@ -193,8 +193,7 @@ impl AggregateWindowExpr for PlainAggregateWindowExpr {
.collect();
accumulator.update_batch(&update)?
}
- accumulator.evaluate()?
- };
- Ok(value)
+ accumulator.evaluate()
+ }
}
}
diff --git a/datafusion/physical-expr/src/window/built_in.rs
b/datafusion/physical-expr/src/window/built_in.rs
index 04d359903e..8ff277db37 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -26,7 +26,6 @@ use crate::expressions::PhysicalSortExpr;
use crate::window::window_expr::{get_orderby_values, WindowFn};
use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
-
use arrow::array::{new_empty_array, ArrayRef};
use arrow::compute::SortOptions;
use arrow::datatypes::Field;
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 50e9632b21..afa799e869 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -183,8 +183,8 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
accumulator: &mut Box<dyn Accumulator>,
) -> Result<ScalarValue> {
if cur_range.start == cur_range.end {
- // We produce None if the window is empty.
- ScalarValue::try_from(self.aggregate.field()?.data_type())
+ self.aggregate
+ .default_value(self.aggregate.field()?.data_type())
} else {
// Accumulate any new rows that have entered the window:
let update_bound = cur_range.end - last_range.end;
@@ -195,6 +195,7 @@ impl AggregateWindowExpr for SlidingAggregateWindowExpr {
.collect();
accumulator.update_batch(&update)?
}
+
// Remove rows that have now left the window:
let retract_bound = cur_range.start - last_range.start;
if retract_bound > 0 {
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 29ead35895..efb5dea1ec 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -27,6 +27,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
+use super::utils::create_schema;
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
@@ -38,11 +39,11 @@ use crate::{
ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
-
+use ahash::RandomState;
use arrow::{
array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
compute::{concat, concat_batches, sort_to_indices},
- datatypes::{Schema, SchemaBuilder, SchemaRef},
+ datatypes::SchemaRef,
record_batch::RecordBatch,
};
use datafusion_common::hash_utils::create_hashes;
@@ -59,8 +60,6 @@ use datafusion_physical_expr::window::{
PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
-
-use ahash::RandomState;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use hashbrown::raw::RawTable;
@@ -852,20 +851,6 @@ impl SortedSearch {
}
}
-fn create_schema(
- input_schema: &Schema,
- window_expr: &[Arc<dyn WindowExpr>],
-) -> Result<Schema> {
- let capacity = input_schema.fields().len() + window_expr.len();
- let mut builder = SchemaBuilder::with_capacity(capacity);
- builder.extend(input_schema.fields.iter().cloned());
- // append results to the schema
- for expr in window_expr {
- builder.push(expr.field()?);
- }
- Ok(builder.finish())
-}
-
/// Stream for the bounded window aggregation plan.
pub struct BoundedWindowAggStream {
schema: SchemaRef,
@@ -1736,7 +1721,7 @@ mod tests {
let expected_plan = vec![
"ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column {
name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1
}]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 },
options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]",
- " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index:
0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY:
[[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options:
SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name:
\"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name:
\\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name:
\\\"sn\\\", index: 0 }, options: Sor [...]
+ " BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index:
0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY:
[[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options:
SortOptions { descending: false, nulls_first: true } }]]: Ok(Field { name:
\"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name:
\\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name:
\\\"sn\\\", index: 0 }, options: Sor [...]
" StreamingTableExec: partition_sizes=1, projection=[sn, hash],
infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]",
];
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 154beb79f7..f938f4410a 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -46,6 +46,7 @@ use
datafusion_physical_expr_functions_aggregate::aggregate::AggregateExprBuilde
use itertools::Itertools;
mod bounded_window_agg_exec;
+mod utils;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
diff --git a/datafusion/physical-plan/src/windows/utils.rs
b/datafusion/physical-plan/src/windows/utils.rs
new file mode 100644
index 0000000000..3cf92daae0
--- /dev/null
+++ b/datafusion/physical-plan/src/windows/utils.rs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::{Schema, SchemaBuilder};
+use datafusion_common::Result;
+use datafusion_physical_expr::window::WindowExpr;
+use std::sync::Arc;
+
+pub(crate) fn create_schema(
+ input_schema: &Schema,
+ window_expr: &[Arc<dyn WindowExpr>],
+) -> Result<Schema> {
+ let capacity = input_schema.fields().len() + window_expr.len();
+ let mut builder = SchemaBuilder::with_capacity(capacity);
+ builder.extend(input_schema.fields().iter().cloned());
+ // append results to the schema
+ for expr in window_expr {
+ builder.push(expr.field()?);
+ }
+ Ok(builder.finish())
+}
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index 1d5c6061a0..d2f7090fca 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -22,6 +22,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
+use super::utils::create_schema;
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
@@ -33,10 +34,9 @@ use crate::{
ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties,
RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
};
-
use arrow::array::ArrayRef;
use arrow::compute::{concat, concat_batches};
-use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef};
+use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::stats::Precision;
@@ -44,7 +44,6 @@ use datafusion_common::utils::{evaluate_partition_ranges,
transpose};
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;
-
use futures::{ready, Stream, StreamExt};
/// Window execution plan
@@ -265,20 +264,6 @@ impl ExecutionPlan for WindowAggExec {
}
}
-fn create_schema(
- input_schema: &Schema,
- window_expr: &[Arc<dyn WindowExpr>],
-) -> Result<Schema> {
- let capacity = input_schema.fields().len() + window_expr.len();
- let mut builder = SchemaBuilder::with_capacity(capacity);
- builder.extend(input_schema.fields().iter().cloned());
- // append results to the schema
- for expr in window_expr {
- builder.push(expr.field()?);
- }
- Ok(builder.finish())
-}
-
/// Compute the window aggregate columns
fn compute_window_aggregates(
window_expr: &[Arc<dyn WindowExpr>],
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index f42dec4014..4e0ce33f13 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -92,6 +92,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// having and group by clause may reference aliases defined in select
projection
let projected_plan = self.project(base_plan.clone(),
select_exprs.clone())?;
+
// Place the fields of the base plan at the front so that when there
are references
// with the same name, the fields of the base plan will be searched
first.
// See https://github.com/apache/datafusion/issues/9162
@@ -288,9 +289,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan
};
- let plan = self.order_by(plan, order_by_rex)?;
-
- Ok(plan)
+ self.order_by(plan, order_by_rex)
}
/// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection
@@ -519,8 +518,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
planner_context.set_outer_from_schema(left_schema);
}
planner_context.set_outer_from_schema(old_outer_from_schema);
-
- Ok(left.build()?)
+ left.build()
}
}
}
diff --git a/datafusion/sqllogictest/test_files/count_star_rule.slt
b/datafusion/sqllogictest/test_files/count_star_rule.slt
index b552e60537..3625da68b3 100644
--- a/datafusion/sqllogictest/test_files/count_star_rule.slt
+++ b/datafusion/sqllogictest/test_files/count_star_rule.slt
@@ -85,7 +85,7 @@ logical_plan
03)----TableScan: t1 projection=[a]
physical_plan
01)ProjectionExec: expr=[a@0 as a, count() PARTITION BY [t1.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as count_a]
-02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type:
Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]
+02)--WindowAggExec: wdw=[count() PARTITION BY [t1.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count() PARTITION BY
[t1.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type:
Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }),
frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)),
end_bound: Following(UInt64(NULL)), is_causal: false }]
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------MemoryExec: partitions=1, partition_sizes=[1]
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index 9115cb5325..230ea4d98f 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -68,7 +68,7 @@ physical_plan
02)--ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
03)----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
04)------ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum(aggregate_test_100.c4) PARTITION
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as count(*) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
06)----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST],
preserve_partitioning=[true]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8
@@ -128,7 +128,7 @@ physical_plan
01)DataSinkExec: sink=MemoryTable (partitions=1)
02)--CoalescePartitionsExec
03)----ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
+04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
05)--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST],
preserve_partitioning=[true]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8
@@ -179,7 +179,7 @@ physical_plan
02)--ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
03)----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
04)------ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
-05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
06)----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST],
preserve_partitioning=[true]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 8f6bafd92e..c40f62c3ba 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -357,7 +357,7 @@ physical_plan
02)--ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
03)----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
04)------ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as sum(aggregate_test_100.c4) PARTITION
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as count(*) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_ [...]
06)----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST],
preserve_partitioning=[true]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8
@@ -418,7 +418,7 @@ physical_plan
01)DataSinkExec: sink=ParquetSink(file_groups=[])
02)--CoalescePartitionsExec
03)----ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
-04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
+04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bo [...]
05)--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST],
preserve_partitioning=[true]
06)----------CoalesceBatchesExec: target_batch_size=8192
07)------------RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index e1fd5eb726..d2c013373d 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -150,6 +150,21 @@ GROUP BY c1
2 2
3 3
+# This test goes through schema check in aggregate plan, if count's nullable
is not matched, this test failed
+query II rowsort
+SELECT c1, SUM(c2) FROM (
+ SELECT 1 as c1, 1::int as c2
+ UNION
+ SELECT 2 as c1, 2::int as c2
+ UNION
+ SELECT 3 as c1, count(1) as c2
+) as a
+GROUP BY c1
+----
+1 1
+2 2
+3 1
+
# union_all_with_count
statement ok
CREATE table t as SELECT 1 as a
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index af882c3a40..78055f8c1c 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1311,7 +1311,7 @@ logical_plan
05)--------TableScan: aggregate_test_100 projection=[c1, c2, c4]
physical_plan
01)ProjectionExec: expr=[sum(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@2 as
sum(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1,
aggregate_test_100.c2] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, count(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS
BETWEEN 1 [...]
-02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING: Ok(Field { name: "count(*) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Fo [...]
+02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING: Ok(Field { name: "count(*) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: F [...]
03)----SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST],
preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=4096
05)--------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
@@ -2558,10 +2558,10 @@ physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, sum3@2 as sum3,
min1@3 as min1, min2@4 as min2, min3@5 as min3, max1@6 as max1, max2@7 as max2,
max3@8 as max3, cnt1@9 as cnt1, cnt2@10 as cnt2, sumr1@11 as sumr1, sumr2@12 as
sumr2, sumr3@13 as sumr3, minr1@14 as minr1, minr2@15 as minr2, minr3@16 as
minr3, maxr1@17 as maxr1, maxr2@18 as maxr2, maxr3@19 as maxr3, cntr1@20 as
cntr1, cntr2@21 as cntr2, sum4@22 as sum4, cnt3@23 as cnt3]
02)--SortExec: TopK(fetch=5), expr=[inc_col@24 DESC],
preserve_partitioning=[false]
03)----ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING@13 as sum1, sum(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 5 PRECEDING AND 1
FOLLOWING@14 as sum2, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 10
FOLLOWING@15 as sum3, min(annotated_data_finite.inc_col) ORDER [...]
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.desc_col) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)),
end_bound: Following(UInt64(1)), is_causal: false }, count(*) ROWS BETWEEN 8
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "cou [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.desc_col) ROWS
BETWEEN 8 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"sum(annotated_data_finite.desc_col) ROWS BETWEEN 8 PRECEDING AND 1 FOLLOWING",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(8)),
end_bound: Following(UInt64(1)), is_causal: false }, count(*) ROWS BETWEEN 8
PRECEDING AND 1 FOLLOWING: Ok(Field { name: "cou [...]
05)--------ProjectionExec: expr=[__common_expr_1@0 as __common_expr_1,
inc_col@3 as inc_col, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING@5 as sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING, sum(annotated_data_finite.desc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 8
FOLLOWING@6 [...]
-06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(10)), end_bound: Follo [...]
-07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING
AND 4 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(4)), end_bound: Fo [...]
+06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(10)), end_bound: Follo [...]
+07)------------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col)
ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING
AND 4 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 4
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(4)), end_bound: Fo [...]
08)--------------ProjectionExec: expr=[CAST(desc_col@2 AS Int64) as
__common_expr_1, CAST(inc_col@1 AS Int64) as __common_expr_2, ts@0 as ts,
inc_col@1 as inc_col, desc_col@2 as desc_col]
09)----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col, desc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
@@ -2716,8 +2716,8 @@ physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1,
min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as
count2, avg1@8 as avg1, avg2@9 as avg2]
02)--SortExec: TopK(fetch=5), expr=[inc_col@10 ASC NULLS LAST],
preserve_partitioning=[false]
03)----ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING@9 as sum1, sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@4 as sum2, min(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING@10 as min1, min(annotated_data_ [...]
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(NULL)), end_ [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)) [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING
AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY
[annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(Int32(NULL)), end_ [...]
+05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER
BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range,
start_bound: Preceding(Int32(NULL)) [...]
06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, CAST(inc_col@1 AS Float64) as __common_expr_2, ts@0 as ts,
inc_col@1 as inc_col]
07)------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts,
inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true
@@ -2813,8 +2813,8 @@ physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, count(ann [...]
03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
+05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
@@ -2859,8 +2859,8 @@ physical_plan
01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, count(ann [...]
03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
+05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
@@ -4094,7 +4094,7 @@ logical_plan
04)------TableScan: a projection=[a]
physical_plan
01)ProjectionExec: expr=[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as count(*) PARTITION
BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]
-02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow,
is_causal: false }], mode=[Sorted]
+02)--BoundedWindowAggExec: wdw=[count(*) PARTITION BY [a.a] ORDER BY [a.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"count(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow,
is_causal: false }], mode=[Sorted]
03)----CoalesceBatchesExec: target_batch_size=4096
04)------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]