This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4ac1428025 Convert `nth_value` to UDAF (#11287)
4ac1428025 is described below
commit 4ac14280250fd7e1fad8f749702a7e7d025c472d
Author: jcsherin <[email protected]>
AuthorDate: Mon Jul 8 23:07:37 2024 +0530
Convert `nth_value` to UDAF (#11287)
* Copies `NthValueAccumulator` to `functions-aggregate`
* Partial implementation of `AggregateUDFImpl`
Pending methods are:
- `accumulator`
- `state_fields`
- `reverse_expr`
* Implements `accumulator` method
* Retains existing comments verbatim
* Removes unnecessary path prefix
* Implements `reverse_expr` method
* Adds `nullable` field to `NthValue`
* Revert to existing name
* Implements `state_fields` method
* Removes `nth_value` from `physical-expr`
* Adds default
* Exports `nth_value`
* Fixes build error in physical plan roundtrip test
* Minor: formatting
* Parses `N` from input expression
* Fixes build error by using `nth_value_udaf`
* Fixes `reverse_expr` by passing correct `N`
* Update plan with lowercase UDF name
* Updates error message for incorrect no. of arguments
This error message is manually formatted to remain consistent with
existing error statements. It is not formatted by running:
```
cargo test -p datafusion-sqllogictest --test sqllogictests errors --
--complete
```
* Fixes nullable "item" in `state_fields`
* Minor: fix formatting after resolving conflicts
* Updates multiple existing plans with lowercase name
* Implements `retract_batch` for window aggregations
* Fixes: regex mismatch for error message in CI
* Revert "Updates multiple existing plans with lowercase name"
This reverts commit 1913efda49e585816286b54b371d4166ac894d1f.
* Revert "Implements `retract_batch` for window aggregations"
This reverts commit 4bb204f6ec8028c4e3313db5af3fabfcdaf7fea8.
* Fixes: use builtin window function instead of udaf
* Revert "Updates error message for incorrect no. of arguments"
This reverts commit fa61ce62dcae6eae6f8e9c9900ebf8cff5023bc0.
* Refactor: renames field and method
* Removes hack for nullability
* Minor: refactors `reverse_expr`
* Minor: removes unncessary path prefix
* Minor: cleanup arguments for creating aggregate expr
* Refactor: extracts `merge_ordered_arrays` to `physical-expr-common`
* Minor: adds todo for configuring nullability
* Retrigger CI
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-cli/Cargo.lock | 1 +
datafusion/expr/src/aggregate_function.rs | 7 -
datafusion/expr/src/type_coercion/aggregates.rs | 1 -
datafusion/functions-aggregate/src/lib.rs | 3 +
.../src}/nth_value.rs | 206 ++++++++++-----------
datafusion/functions-array/Cargo.toml | 1 +
datafusion/functions-array/src/planner.rs | 5 +-
.../src/aggregate/merge_arrays.rs | 195 +++++++++++++++++++
.../physical-expr-common/src/aggregate/mod.rs | 1 +
.../src/aggregate/array_agg_ordered.rs | 181 +-----------------
datafusion/physical-expr/src/aggregate/build_in.rs | 24 +--
datafusion/physical-expr/src/aggregate/mod.rs | 1 -
datafusion/physical-expr/src/expressions/mod.rs | 1 -
datafusion/proto/proto/datafusion.proto | 2 +-
datafusion/proto/src/generated/pbjson.rs | 3 -
datafusion/proto/src/generated/prost.rs | 7 +-
datafusion/proto/src/logical_plan/from_proto.rs | 1 -
datafusion/proto/src/logical_plan/to_proto.rs | 4 -
datafusion/proto/src/physical_plan/to_proto.rs | 6 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 21 ++-
datafusion/sql/src/expr/function.rs | 6 +-
.../test_files/agg_func_substitute.slt | 30 +--
22 files changed, 350 insertions(+), 357 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 4fce2ec500..500e731a5b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1319,6 +1319,7 @@ dependencies = [
"datafusion-execution",
"datafusion-expr",
"datafusion-functions",
+ "datafusion-functions-aggregate",
"itertools",
"log",
"paste",
diff --git a/datafusion/expr/src/aggregate_function.rs
b/datafusion/expr/src/aggregate_function.rs
index 760952d948..23e98714df 100644
--- a/datafusion/expr/src/aggregate_function.rs
+++ b/datafusion/expr/src/aggregate_function.rs
@@ -39,8 +39,6 @@ pub enum AggregateFunction {
Max,
/// Aggregation into an array
ArrayAgg,
- /// N'th value in a group according to some ordering
- NthValue,
}
impl AggregateFunction {
@@ -50,7 +48,6 @@ impl AggregateFunction {
Min => "MIN",
Max => "MAX",
ArrayAgg => "ARRAY_AGG",
- NthValue => "NTH_VALUE",
}
}
}
@@ -69,7 +66,6 @@ impl FromStr for AggregateFunction {
"max" => AggregateFunction::Max,
"min" => AggregateFunction::Min,
"array_agg" => AggregateFunction::ArrayAgg,
- "nth_value" => AggregateFunction::NthValue,
_ => {
return plan_err!("There is no built-in function named {name}");
}
@@ -114,7 +110,6 @@ impl AggregateFunction {
coerced_data_types[0].clone(),
input_expr_nullable[0],
)))),
- AggregateFunction::NthValue => Ok(coerced_data_types[0].clone()),
}
}
@@ -124,7 +119,6 @@ impl AggregateFunction {
match self {
AggregateFunction::Max | AggregateFunction::Min => Ok(true),
AggregateFunction::ArrayAgg => Ok(false),
- AggregateFunction::NthValue => Ok(true),
}
}
}
@@ -147,7 +141,6 @@ impl AggregateFunction {
.collect::<Vec<_>>();
Signature::uniform(1, valid, Volatility::Immutable)
}
- AggregateFunction::NthValue => Signature::any(2,
Volatility::Immutable),
}
}
}
diff --git a/datafusion/expr/src/type_coercion/aggregates.rs
b/datafusion/expr/src/type_coercion/aggregates.rs
index 0f7464b96b..fbec6e2f80 100644
--- a/datafusion/expr/src/type_coercion/aggregates.rs
+++ b/datafusion/expr/src/type_coercion/aggregates.rs
@@ -101,7 +101,6 @@ pub fn coerce_types(
// unpack the dictionary to get the value
get_min_max_result_type(input_types)
}
- AggregateFunction::NthValue => Ok(input_types.to_vec()),
}
}
diff --git a/datafusion/functions-aggregate/src/lib.rs
b/datafusion/functions-aggregate/src/lib.rs
index fc485a284a..6ae2dfb369 100644
--- a/datafusion/functions-aggregate/src/lib.rs
+++ b/datafusion/functions-aggregate/src/lib.rs
@@ -74,6 +74,7 @@ pub mod average;
pub mod bit_and_or_xor;
pub mod bool_and_or;
pub mod grouping;
+pub mod nth_value;
pub mod string_agg;
use crate::approx_percentile_cont::approx_percentile_cont_udaf;
@@ -105,6 +106,7 @@ pub mod expr_fn {
pub use super::first_last::last_value;
pub use super::grouping::grouping;
pub use super::median::median;
+ pub use super::nth_value::nth_value;
pub use super::regr::regr_avgx;
pub use super::regr::regr_avgy;
pub use super::regr::regr_count;
@@ -157,6 +159,7 @@ pub fn all_default_aggregate_functions() ->
Vec<Arc<AggregateUDF>> {
bool_and_or::bool_or_udaf(),
average::avg_udaf(),
grouping::grouping_udaf(),
+ nth_value::nth_value_udaf(),
]
}
diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs
b/datafusion/functions-aggregate/src/nth_value.rs
similarity index 77%
rename from datafusion/physical-expr/src/aggregate/nth_value.rs
rename to datafusion/functions-aggregate/src/nth_value.rs
index b75ecd1066..6719c673c5 100644
--- a/datafusion/physical-expr/src/aggregate/nth_value.rs
+++ b/datafusion/functions-aggregate/src/nth_value.rs
@@ -22,149 +22,149 @@ use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;
-use crate::aggregate::array_agg_ordered::merge_ordered_arrays;
-use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
-use crate::expressions::{format_state_name, Literal};
-use crate::{
- reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr,
PhysicalSortExpr,
-};
-
-use arrow_array::cast::AsArray;
-use arrow_array::{new_empty_array, ArrayRef, StructArray};
+use arrow::array::{new_empty_array, ArrayRef, AsArray, StructArray};
use arrow_schema::{DataType, Field, Fields};
+
use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx};
-use datafusion_common::{exec_err, internal_err, Result, ScalarValue};
-use datafusion_expr::utils::AggregateOrderSensitivity;
-use datafusion_expr::Accumulator;
+use datafusion_common::{exec_err, internal_err, not_impl_err, Result,
ScalarValue};
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::{
+ Accumulator, AggregateUDF, AggregateUDFImpl, Expr, ReversedUDAF, Signature,
+ Volatility,
+};
+use
datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays;
+use datafusion_physical_expr_common::aggregate::utils::ordering_fields;
+use datafusion_physical_expr_common::sort_expr::{
+ limited_convert_logical_sort_exprs_to_physical, LexOrdering,
PhysicalSortExpr,
+};
+
+make_udaf_expr_and_func!(
+ NthValueAgg,
+ nth_value,
+ "Returns the nth value in a group of values.",
+ nth_value_udaf
+);
/// Expression for a `NTH_VALUE(... ORDER BY ..., ...)` aggregation. In a multi
/// partition setting, partial aggregations are computed for every partition,
/// and then their results are merged.
#[derive(Debug)]
pub struct NthValueAgg {
- /// Column name
- name: String,
- /// The `DataType` for the input expression
- input_data_type: DataType,
- /// The input expression
- expr: Arc<dyn PhysicalExpr>,
- /// The `N` value.
- n: i64,
- /// If the input expression can have `NULL`s
- nullable: bool,
- /// Ordering data types
- order_by_data_types: Vec<DataType>,
- /// Ordering requirement
- ordering_req: LexOrdering,
+ signature: Signature,
+ /// Determines whether `N` is relative to the beginning or the end
+ /// of the aggregation. When set to `true`, then `N` is from the end.
+ reversed: bool,
}
impl NthValueAgg {
/// Create a new `NthValueAgg` aggregate function
- pub fn new(
- expr: Arc<dyn PhysicalExpr>,
- n: i64,
- name: impl Into<String>,
- input_data_type: DataType,
- nullable: bool,
- order_by_data_types: Vec<DataType>,
- ordering_req: LexOrdering,
- ) -> Self {
+ pub fn new() -> Self {
Self {
- name: name.into(),
- input_data_type,
- expr,
- n,
- nullable,
- order_by_data_types,
- ordering_req,
+ signature: Signature::any(2, Volatility::Immutable),
+ reversed: false,
}
}
+
+ pub fn with_reversed(mut self, reversed: bool) -> Self {
+ self.reversed = reversed;
+ self
+ }
}
-impl AggregateExpr for NthValueAgg {
+impl Default for NthValueAgg {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl AggregateUDFImpl for NthValueAgg {
fn as_any(&self) -> &dyn Any {
self
}
- fn field(&self) -> Result<Field> {
- Ok(Field::new(&self.name, self.input_data_type.clone(), true))
+ fn name(&self) -> &str {
+ "nth_value"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
}
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(NthValueAccumulator::try_new(
- self.n,
- &self.input_data_type,
- &self.order_by_data_types,
- self.ordering_req.clone(),
- )?))
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ Ok(arg_types[0].clone())
}
- fn state_fields(&self) -> Result<Vec<Field>> {
+ fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
+ let n = match acc_args.input_exprs[1] {
+ Expr::Literal(ScalarValue::Int64(Some(value))) => {
+ if self.reversed {
+ Ok(-value)
+ } else {
+ Ok(value)
+ }
+ }
+ _ => not_impl_err!(
+ "{} not supported for n: {}",
+ self.name(),
+ &acc_args.input_exprs[1]
+ ),
+ }?;
+
+ let ordering_req = limited_convert_logical_sort_exprs_to_physical(
+ acc_args.sort_exprs,
+ acc_args.schema,
+ )?;
+
+ let ordering_dtypes = ordering_req
+ .iter()
+ .map(|e| e.expr.data_type(acc_args.schema))
+ .collect::<Result<Vec<_>>>()?;
+
+ NthValueAccumulator::try_new(
+ n,
+ acc_args.input_type,
+ &ordering_dtypes,
+ ordering_req,
+ )
+ .map(|acc| Box::new(acc) as _)
+ }
+
+ fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
let mut fields = vec![Field::new_list(
- format_state_name(&self.name, "nth_value"),
- Field::new("item", self.input_data_type.clone(), true),
- self.nullable, // This should be the same as field()
+ format_state_name(self.name(), "nth_value"),
+ // TODO: The nullability of the list element should be
configurable.
+ // The hard-coded `true` should be changed once the field for
+ // nullability is added to `StateFieldArgs` struct.
+ // See: https://github.com/apache/datafusion/pull/11063
+ Field::new("item", args.input_type.clone(), true),
+ false,
)];
- if !self.ordering_req.is_empty() {
- let orderings =
- ordering_fields(&self.ordering_req, &self.order_by_data_types);
+ let orderings = args.ordering_fields.to_vec();
+ if !orderings.is_empty() {
fields.push(Field::new_list(
- format_state_name(&self.name, "nth_value_orderings"),
+ format_state_name(self.name(), "nth_value_orderings"),
Field::new("item", DataType::Struct(Fields::from(orderings)),
true),
- self.nullable,
+ false,
));
}
Ok(fields)
}
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
- let n = Arc::new(Literal::new(ScalarValue::Int64(Some(self.n)))) as _;
- vec![Arc::clone(&self.expr), n]
+ fn aliases(&self) -> &[String] {
+ &[]
}
- fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
- (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
- }
-
- fn order_sensitivity(&self) -> AggregateOrderSensitivity {
- AggregateOrderSensitivity::HardRequirement
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- Some(Arc::new(Self {
- name: self.name.to_string(),
- input_data_type: self.input_data_type.clone(),
- expr: Arc::clone(&self.expr),
- // index should be from the opposite side
- n: -self.n,
- nullable: self.nullable,
- order_by_data_types: self.order_by_data_types.clone(),
- // reverse requirement
- ordering_req: reverse_order_bys(&self.ordering_req),
- }) as _)
- }
-}
-
-impl PartialEq<dyn Any> for NthValueAgg {
- fn eq(&self, other: &dyn Any) -> bool {
- down_cast_any_ref(other)
- .downcast_ref::<Self>()
- .map(|x| {
- self.name == x.name
- && self.input_data_type == x.input_data_type
- && self.order_by_data_types == x.order_by_data_types
- && self.expr.eq(&x.expr)
- })
- .unwrap_or(false)
+ fn reverse_expr(&self) -> ReversedUDAF {
+ ReversedUDAF::Reversed(Arc::from(AggregateUDF::from(
+ Self::new().with_reversed(!self.reversed),
+ )))
}
}
#[derive(Debug)]
-pub(crate) struct NthValueAccumulator {
+pub struct NthValueAccumulator {
+ /// The `N` value.
n: i64,
/// Stores entries in the `NTH_VALUE` result.
values: VecDeque<ScalarValue>,
diff --git a/datafusion/functions-array/Cargo.toml
b/datafusion/functions-array/Cargo.toml
index eb1ef9e03f..73c5b9114a 100644
--- a/datafusion/functions-array/Cargo.toml
+++ b/datafusion/functions-array/Cargo.toml
@@ -49,6 +49,7 @@ datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
+datafusion-functions-aggregate = { workspace = true }
itertools = { version = "0.12", features = ["use_std"] }
log = { workspace = true }
paste = "1.0.14"
diff --git a/datafusion/functions-array/src/planner.rs
b/datafusion/functions-array/src/planner.rs
index cfb3e5ed07..01853fb569 100644
--- a/datafusion/functions-array/src/planner.rs
+++ b/datafusion/functions-array/src/planner.rs
@@ -23,6 +23,7 @@ use datafusion_expr::{
sqlparser, AggregateFunction, Expr, ExprSchemable, GetFieldAccess,
};
use datafusion_functions::expr_fn::get_field;
+use datafusion_functions_aggregate::nth_value::nth_value_udaf;
use crate::{
array_has::array_has_all,
@@ -119,8 +120,8 @@ impl UserDefinedSQLPlanner for FieldAccessPlanner {
// Special case for array_agg(expr)[index] to
NTH_VALUE(expr, index)
Expr::AggregateFunction(agg_func) if
is_array_agg(&agg_func) => {
Ok(PlannerResult::Planned(Expr::AggregateFunction(
- datafusion_expr::expr::AggregateFunction::new(
- AggregateFunction::NthValue,
+ datafusion_expr::expr::AggregateFunction::new_udf(
+ nth_value_udaf(),
agg_func
.args
.into_iter()
diff --git a/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs
b/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs
new file mode 100644
index 0000000000..544bdc1828
--- /dev/null
+++ b/datafusion/physical-expr-common/src/aggregate/merge_arrays.rs
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::compute::SortOptions;
+use datafusion_common::utils::compare_rows;
+use datafusion_common::{exec_err, ScalarValue};
+use std::cmp::Ordering;
+use std::collections::{BinaryHeap, VecDeque};
+
+/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data
from
+/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
+/// struct returns smallest `CustomElement`, where smallest is determined by
+/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+ /// Stores the partition this entry came from
+ branch_idx: usize,
+ /// Values to merge
+ value: ScalarValue,
+ // Comparison "key"
+ ordering: Vec<ScalarValue>,
+ /// Options defining the ordering semantics
+ sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+ fn new(
+ branch_idx: usize,
+ value: ScalarValue,
+ ordering: Vec<ScalarValue>,
+ sort_options: &'a [SortOptions],
+ ) -> Self {
+ Self {
+ branch_idx,
+ value,
+ ordering,
+ sort_options,
+ }
+ }
+
+ fn ordering(
+ &self,
+ current: &[ScalarValue],
+ target: &[ScalarValue],
+ ) -> datafusion_common::Result<Ordering> {
+ // Calculate ordering according to `sort_options`
+ compare_rows(current, target, self.sort_options)
+ }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // Compares according to custom ordering
+ self.ordering(&self.ordering, &other.ordering)
+ // Convert max heap to min heap
+ .map(|ordering| ordering.reverse())
+ // This function return error, when `self.ordering` and
`other.ordering`
+ // have different types (such as one is `ScalarValue::Int64`,
other is `ScalarValue::Float32`)
+ // Here this case won't happen, because data from each partition
will have same type
+ .unwrap()
+ }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single
array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values`
(`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as
ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same
size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+/// \[1, 2, 3, 4, 5\],
+/// \[1, 2, 3, 4\],
+/// \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+/// \[(1, a), (2, b), (3, b), (4, a) \],
+/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding
`Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines
ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared
according `sort_options` (Their sizes should match)
+pub fn merge_ordered_arrays(
+ // We will merge values into single `Vec<ScalarValue>`.
+ values: &mut [VecDeque<ScalarValue>],
+ // `values` will be merged according to `ordering_values`.
+ // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+ // each `ScalarValue` in the values`.
+ ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
+ // Defines according to which ordering comparisons should be done.
+ sort_options: &[SortOptions],
+) -> datafusion_common::Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
+ // Keep track the most recent data of each branch, in binary heap data
structure.
+ let mut heap = BinaryHeap::<CustomElement>::new();
+
+ if values.len() != ordering_values.len()
+ || values
+ .iter()
+ .zip(ordering_values.iter())
+ .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
+ {
+ return exec_err!(
+ "Expects values arguments and/or ordering_values arguments to have
same size"
+ );
+ }
+ let n_branch = values.len();
+ let mut merged_values = vec![];
+ let mut merged_orderings = vec![];
+ // Continue iterating the loop until consuming data of all branches.
+ loop {
+ let minimum = if let Some(minimum) = heap.pop() {
+ minimum
+ } else {
+ // Heap is empty, fill it with the next entries from each branch.
+ for branch_idx in 0..n_branch {
+ if let Some(orderings) =
ordering_values[branch_idx].pop_front() {
+ // Their size should be same, we can safely .unwrap here.
+ let value = values[branch_idx].pop_front().unwrap();
+ // Push the next element to the heap:
+ heap.push(CustomElement::new(
+ branch_idx,
+ value,
+ orderings,
+ sort_options,
+ ));
+ }
+ // If None, we consumed this branch, skip it.
+ }
+
+ // Now we have filled the heap, get the largest entry (this will be
+ // the next element in merge).
+ if let Some(minimum) = heap.pop() {
+ minimum
+ } else {
+ // Heap is empty, this means that all indices are same with
+ // `end_indices`. We have consumed all of the branches, merge
+ // is completed, exit from the loop:
+ break;
+ }
+ };
+ let CustomElement {
+ branch_idx,
+ value,
+ ordering,
+ ..
+ } = minimum;
+ // Add minimum value in the heap to the result
+ merged_values.push(value);
+ merged_orderings.push(ordering);
+
+ // If there is an available entry, push next entry in the most
+ // recently consumed branch to the heap.
+ if let Some(orderings) = ordering_values[branch_idx].pop_front() {
+ // Their size should be same, we can safely .unwrap here.
+ let value = values[branch_idx].pop_front().unwrap();
+ // Push the next element to the heap:
+ heap.push(CustomElement::new(
+ branch_idx,
+ value,
+ orderings,
+ sort_options,
+ ));
+ }
+ }
+
+ Ok((merged_values, merged_orderings))
+}
diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index cd309b7f7d..35666f199a 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -17,6 +17,7 @@
pub mod count_distinct;
pub mod groups_accumulator;
+pub mod merge_arrays;
pub mod stats;
pub mod tdigest;
pub mod utils;
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 3b122fe9f8..a64d97637c 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -19,8 +19,7 @@
//! that can evaluated at runtime during query execution
use std::any::Any;
-use std::cmp::Ordering;
-use std::collections::{BinaryHeap, VecDeque};
+use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
@@ -33,11 +32,12 @@ use crate::{
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::{new_empty_array, Array, ArrayRef, StructArray};
-use arrow_schema::{Fields, SortOptions};
-use datafusion_common::utils::{array_into_list_array, compare_rows,
get_row_at_idx};
+use arrow_schema::Fields;
+use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::utils::AggregateOrderSensitivity;
use datafusion_expr::Accumulator;
+use
datafusion_physical_expr_common::aggregate::merge_arrays::merge_ordered_arrays;
/// Expression for a `ARRAY_AGG(... ORDER BY ..., ...)` aggregation. In a multi
/// partition setting, partial aggregations are computed for every partition,
@@ -384,179 +384,6 @@ impl OrderSensitiveArrayAggAccumulator {
}
}
-/// This is a wrapper struct to be able to correctly merge `ARRAY_AGG` data
from
-/// multiple partitions using `BinaryHeap`. When used inside `BinaryHeap`, this
-/// struct returns smallest `CustomElement`, where smallest is determined by
-/// `ordering` values (`Vec<ScalarValue>`) according to `sort_options`.
-#[derive(Debug, PartialEq, Eq)]
-struct CustomElement<'a> {
- /// Stores the partition this entry came from
- branch_idx: usize,
- /// Values to merge
- value: ScalarValue,
- // Comparison "key"
- ordering: Vec<ScalarValue>,
- /// Options defining the ordering semantics
- sort_options: &'a [SortOptions],
-}
-
-impl<'a> CustomElement<'a> {
- fn new(
- branch_idx: usize,
- value: ScalarValue,
- ordering: Vec<ScalarValue>,
- sort_options: &'a [SortOptions],
- ) -> Self {
- Self {
- branch_idx,
- value,
- ordering,
- sort_options,
- }
- }
-
- fn ordering(
- &self,
- current: &[ScalarValue],
- target: &[ScalarValue],
- ) -> Result<Ordering> {
- // Calculate ordering according to `sort_options`
- compare_rows(current, target, self.sort_options)
- }
-}
-
-// Overwrite ordering implementation such that
-// - `self.ordering` values are used for comparison,
-// - When used inside `BinaryHeap` it is a min-heap.
-impl<'a> Ord for CustomElement<'a> {
- fn cmp(&self, other: &Self) -> Ordering {
- // Compares according to custom ordering
- self.ordering(&self.ordering, &other.ordering)
- // Convert max heap to min heap
- .map(|ordering| ordering.reverse())
- // This function return error, when `self.ordering` and
`other.ordering`
- // have different types (such as one is `ScalarValue::Int64`,
other is `ScalarValue::Float32`)
- // Here this case won't happen, because data from each partition
will have same type
- .unwrap()
- }
-}
-
-impl<'a> PartialOrd for CustomElement<'a> {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- Some(self.cmp(other))
- }
-}
-
-/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single
array `Vec<ScalarValue>`
-/// Merging done according to ordering values stored inside `ordering_values`
(`&[Vec<Vec<ScalarValue>>]`)
-/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as
ordering information for the
-/// each `ScalarValue` in the `values` array.
-/// Desired ordering specified by `sort_options` argument (Should have same
size with inner `Vec<ScalarValue>`
-/// of the `ordering_values` array).
-///
-/// As an example
-/// values can be \[
-/// \[1, 2, 3, 4, 5\],
-/// \[1, 2, 3, 4\],
-/// \[1, 2, 3, 4, 5, 6\],
-/// \]
-/// In this case we will be merging three arrays (doesn't have to be same size)
-/// and produce a merged array with size 15 (sum of 5+4+6)
-/// Merging will be done according to ordering at `ordering_values` vector.
-/// As an example `ordering_values` can be [
-/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
-/// \[(1, a), (2, b), (3, b), (4, a) \],
-/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
-/// ]
-/// For each ScalarValue in the `values` we have a corresponding
`Vec<ScalarValue>` (like timestamp of it)
-/// for the example above `sort_options` will have size two, that defines
ordering requirement of the merge.
-/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared
according `sort_options` (Their sizes should match)
-pub(crate) fn merge_ordered_arrays(
- // We will merge values into single `Vec<ScalarValue>`.
- values: &mut [VecDeque<ScalarValue>],
- // `values` will be merged according to `ordering_values`.
- // Inner `Vec<ScalarValue>` can be thought as ordering information for the
- // each `ScalarValue` in the values`.
- ordering_values: &mut [VecDeque<Vec<ScalarValue>>],
- // Defines according to which ordering comparisons should be done.
- sort_options: &[SortOptions],
-) -> Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
- // Keep track the most recent data of each branch, in binary heap data
structure.
- let mut heap = BinaryHeap::<CustomElement>::new();
-
- if values.len() != ordering_values.len()
- || values
- .iter()
- .zip(ordering_values.iter())
- .any(|(vals, ordering_vals)| vals.len() != ordering_vals.len())
- {
- return exec_err!(
- "Expects values arguments and/or ordering_values arguments to have
same size"
- );
- }
- let n_branch = values.len();
- let mut merged_values = vec![];
- let mut merged_orderings = vec![];
- // Continue iterating the loop until consuming data of all branches.
- loop {
- let minimum = if let Some(minimum) = heap.pop() {
- minimum
- } else {
- // Heap is empty, fill it with the next entries from each branch.
- for branch_idx in 0..n_branch {
- if let Some(orderings) =
ordering_values[branch_idx].pop_front() {
- // Their size should be same, we can safely .unwrap here.
- let value = values[branch_idx].pop_front().unwrap();
- // Push the next element to the heap:
- heap.push(CustomElement::new(
- branch_idx,
- value,
- orderings,
- sort_options,
- ));
- }
- // If None, we consumed this branch, skip it.
- }
-
- // Now we have filled the heap, get the largest entry (this will be
- // the next element in merge).
- if let Some(minimum) = heap.pop() {
- minimum
- } else {
- // Heap is empty, this means that all indices are same with
- // `end_indices`. We have consumed all of the branches, merge
- // is completed, exit from the loop:
- break;
- }
- };
- let CustomElement {
- branch_idx,
- value,
- ordering,
- ..
- } = minimum;
- // Add minimum value in the heap to the result
- merged_values.push(value);
- merged_orderings.push(ordering);
-
- // If there is an available entry, push next entry in the most
- // recently consumed branch to the heap.
- if let Some(orderings) = ordering_values[branch_idx].pop_front() {
- // Their size should be same, we can safely .unwrap here.
- let value = values[branch_idx].pop_front().unwrap();
- // Push the next element to the heap:
- heap.push(CustomElement::new(
- branch_idx,
- value,
- orderings,
- sort_options,
- ));
- }
- }
-
- Ok((merged_values, merged_orderings))
-}
-
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 1eadf7247f..d4cd3d51d1 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -30,10 +30,10 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
-use datafusion_common::{exec_err, not_impl_err, Result};
+use datafusion_common::{not_impl_err, Result};
use datafusion_expr::AggregateFunction;
-use crate::expressions::{self, Literal};
+use crate::expressions::{self};
use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
/// Create a physical aggregation expression.
@@ -102,26 +102,6 @@ pub fn create_aggregate_expr(
name,
data_type,
)),
- (AggregateFunction::NthValue, _) => {
- let expr = &input_phy_exprs[0];
- let Some(n) = input_phy_exprs[1]
- .as_any()
- .downcast_ref::<Literal>()
- .map(|literal| literal.value())
- else {
- return exec_err!("Second argument of NTH_VALUE needs to be a
literal");
- };
- let nullable = expr.nullable(input_schema)?;
- Arc::new(expressions::NthValueAgg::new(
- Arc::clone(expr),
- n.clone().try_into()?,
- name,
- input_phy_types[0].clone(),
- nullable,
- ordering_types,
- ordering_req.to_vec(),
- ))
- }
})
}
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index f0de7446f6..b9d803900f 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -20,7 +20,6 @@ pub use
datafusion_physical_expr_common::aggregate::AggregateExpr;
pub(crate) mod array_agg;
pub(crate) mod array_agg_distinct;
pub(crate) mod array_agg_ordered;
-pub(crate) mod nth_value;
#[macro_use]
pub(crate) mod min_max;
pub(crate) mod groups_accumulator;
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index 1f2c955ad0..7d8f12091f 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -39,7 +39,6 @@ pub use
crate::aggregate::array_agg_distinct::DistinctArrayAgg;
pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg;
pub use crate::aggregate::build_in::create_aggregate_expr;
pub use crate::aggregate::min_max::{Max, MaxAccumulator, Min, MinAccumulator};
-pub use crate::aggregate::nth_value::NthValueAgg;
pub use crate::aggregate::stats::StatsType;
pub use crate::window::cume_dist::{cume_dist, CumeDist};
pub use crate::window::lead_lag::{lag, lead, WindowShift};
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index ce6c0c53c3..345765b08b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -500,7 +500,7 @@ enum AggregateFunction {
// REGR_SYY = 33;
// REGR_SXY = 34;
// STRING_AGG = 35;
- NTH_VALUE_AGG = 36;
+ // NTH_VALUE_AGG = 36;
}
message AggregateExprNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 347654e52b..905f0d9849 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -535,7 +535,6 @@ impl serde::Serialize for AggregateFunction {
Self::Min => "MIN",
Self::Max => "MAX",
Self::ArrayAgg => "ARRAY_AGG",
- Self::NthValueAgg => "NTH_VALUE_AGG",
};
serializer.serialize_str(variant)
}
@@ -550,7 +549,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"MIN",
"MAX",
"ARRAY_AGG",
- "NTH_VALUE_AGG",
];
struct GeneratedVisitor;
@@ -594,7 +592,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction {
"MIN" => Ok(AggregateFunction::Min),
"MAX" => Ok(AggregateFunction::Max),
"ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg),
- "NTH_VALUE_AGG" => Ok(AggregateFunction::NthValueAgg),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index c74f172482..b16d26ee6e 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1924,7 +1924,7 @@ pub enum AggregateFunction {
/// AVG = 3;
/// COUNT = 4;
/// APPROX_DISTINCT = 5;
- ArrayAgg = 6,
+ ///
/// VARIANCE = 7;
/// VARIANCE_POP = 8;
/// COVARIANCE = 9;
@@ -1952,7 +1952,8 @@ pub enum AggregateFunction {
/// REGR_SYY = 33;
/// REGR_SXY = 34;
/// STRING_AGG = 35;
- NthValueAgg = 36,
+ /// NTH_VALUE_AGG = 36;
+ ArrayAgg = 6,
}
impl AggregateFunction {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -1964,7 +1965,6 @@ impl AggregateFunction {
AggregateFunction::Min => "MIN",
AggregateFunction::Max => "MAX",
AggregateFunction::ArrayAgg => "ARRAY_AGG",
- AggregateFunction::NthValueAgg => "NTH_VALUE_AGG",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
@@ -1973,7 +1973,6 @@ impl AggregateFunction {
"MIN" => Some(Self::Min),
"MAX" => Some(Self::Max),
"ARRAY_AGG" => Some(Self::ArrayAgg),
- "NTH_VALUE_AGG" => Some(Self::NthValueAgg),
_ => None,
}
}
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index f4fb692804..a58af8afdd 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -145,7 +145,6 @@ impl From<protobuf::AggregateFunction> for
AggregateFunction {
protobuf::AggregateFunction::Min => Self::Min,
protobuf::AggregateFunction::Max => Self::Max,
protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg,
- protobuf::AggregateFunction::NthValueAgg => Self::NthValue,
}
}
}
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 7570040a1d..d8f8ea002b 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -117,7 +117,6 @@ impl From<&AggregateFunction> for
protobuf::AggregateFunction {
AggregateFunction::Min => Self::Min,
AggregateFunction::Max => Self::Max,
AggregateFunction::ArrayAgg => Self::ArrayAgg,
- AggregateFunction::NthValue => Self::NthValueAgg,
}
}
}
@@ -377,9 +376,6 @@ pub fn serialize_expr(
AggregateFunction::ArrayAgg =>
protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
- AggregateFunction::NthValue => {
- protobuf::AggregateFunction::NthValueAgg
- }
};
let aggregate_expr = protobuf::AggregateExprNode {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 23cdc666e7..5e982ad2af 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -25,8 +25,8 @@ use datafusion::physical_expr::{PhysicalSortExpr,
ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
ArrayAgg, BinaryExpr, CaseExpr, CastExpr, Column, CumeDist,
DistinctArrayAgg,
InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr,
NotExpr,
- NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType,
RowNumber,
- TryCastExpr, WindowShift,
+ NthValue, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber,
TryCastExpr,
+ WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
@@ -255,8 +255,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
protobuf::AggregateFunction::Min
} else if aggr_expr.downcast_ref::<Max>().is_some() {
protobuf::AggregateFunction::Max
- } else if aggr_expr.downcast_ref::<NthValueAgg>().is_some() {
- protobuf::AggregateFunction::NthValueAgg
} else {
return not_impl_err!("Aggregate function not supported: {expr:?}");
};
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 106247b2d4..d8d85ace1a 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -38,7 +38,7 @@ use datafusion::datasource::physical_plan::{
};
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
-use datafusion::physical_expr::expressions::{Max, NthValueAgg};
+use datafusion::physical_expr::expressions::Max;
use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
use datafusion::physical_expr::{PhysicalSortRequirement, ScalarFunctionExpr};
use datafusion::physical_plan::aggregates::{
@@ -81,6 +81,7 @@ use datafusion_expr::{
ScalarUDFImpl, Signature, SimpleAggregateUDF, WindowFrame,
WindowFrameBound,
};
use datafusion_functions_aggregate::average::avg_udaf;
+use datafusion_functions_aggregate::nth_value::nth_value_udaf;
use datafusion_functions_aggregate::string_agg::StringAgg;
use datafusion_proto::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
@@ -362,15 +363,17 @@ fn rountrip_aggregate() -> Result<()> {
false,
)?],
// NTH_VALUE
- vec![Arc::new(NthValueAgg::new(
- col("b", &schema)?,
- 1,
- "NTH_VALUE(b, 1)".to_string(),
- DataType::Int64,
+ vec![udaf::create_aggregate_expr(
+ &nth_value_udaf(),
+ &[col("b", &schema)?, lit(1u64)],
+ &[],
+ &[],
+ &[],
+ &schema,
+ "NTH_VALUE(b, 1)",
false,
- Vec::new(),
- Vec::new(),
- ))],
+ false,
+ )?],
// STRING_AGG
vec![udaf::create_aggregate_expr(
&AggregateUDF::new_from_impl(StringAgg::new()),
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index ea460cb3ef..d9ddf57eb1 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -415,9 +415,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<WindowFunctionDefinition> {
// check udaf first
let udaf = self.context_provider.get_aggregate_meta(name);
- // Skip first value and last value, since we expect window builtin
first/last value not udaf version
+ // Use the builtin window function instead of the user-defined
aggregate function
if udaf.as_ref().is_some_and(|udaf| {
- udaf.name() != "first_value" && udaf.name() != "last_value"
+ udaf.name() != "first_value"
+ && udaf.name() != "last_value"
+ && udaf.name() != "nth_value"
}) {
Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap()))
} else {
diff --git a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
index 342d45e7fb..9a0a1d5874 100644
--- a/datafusion/sqllogictest/test_files/agg_func_substitute.slt
+++ b/datafusion/sqllogictest/test_files/agg_func_substitute.slt
@@ -39,16 +39,16 @@ EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1] as result
GROUP BY a;
----
logical_plan
-01)Projection: multiple_ordered_table.a,
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST] AS result
-02)--Aggregate: groupBy=[[multiple_ordered_table.a]],
aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]]
+01)Projection: multiple_ordered_table.a,
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST] AS result
+02)--Aggregate: groupBy=[[multiple_ordered_table.a]],
aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]]
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
-01)ProjectionExec: expr=[a@0 as a,
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+01)ProjectionExec: expr=[a@0 as a,
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
@@ -59,16 +59,16 @@ EXPLAIN SELECT a, NTH_VALUE(c, 1 ORDER BY c) as result
GROUP BY a;
----
logical_plan
-01)Projection: multiple_ordered_table.a,
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST] AS result
-02)--Aggregate: groupBy=[[multiple_ordered_table.a]],
aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]]
+01)Projection: multiple_ordered_table.a,
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST] AS result
+02)--Aggregate: groupBy=[[multiple_ordered_table.a]],
aggr=[[nth_value(multiple_ordered_table.c, Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]]
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
-01)ProjectionExec: expr=[a@0 as a,
NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+01)ProjectionExec: expr=[a@0 as a,
nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c
ASC NULLS LAST]@1 as result]
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
@@ -78,16 +78,16 @@ EXPLAIN SELECT a, ARRAY_AGG(c ORDER BY c)[1 + 100] as result
GROUP BY a;
----
logical_plan
-01)Projection: multiple_ordered_table.a,
NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST] AS result
-02)--Aggregate: groupBy=[[multiple_ordered_table.a]],
aggr=[[NTH_VALUE(multiple_ordered_table.c, Int64(101)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST] AS
NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]]
+01)Projection: multiple_ordered_table.a,
nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST] AS result
+02)--Aggregate: groupBy=[[multiple_ordered_table.a]],
aggr=[[nth_value(multiple_ordered_table.c, Int64(101)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST] AS
nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]]]
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
-01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)
+ Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
-02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)
+ Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
+02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
+06)----------AggregateExec: mode=Partial, gby=[a@0 as a],
aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY
[multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]