This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bb98dfed08 Change first/last implementation to prevent redundant
comparisons when data is already sorted (#8678)
bb98dfed08 is described below
commit bb98dfed08d8c2b94ab668a064b206d8b84b51b0
Author: Mustafa Akur <[email protected]>
AuthorDate: Sat Dec 30 03:48:36 2023 +0300
Change first/last implementation to prevent redundant comparisons when data
is already sorted (#8678)
* Change fist last implementation to prevent redundant computations
* Remove redundant checks
* Review
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../physical-expr/src/aggregate/first_last.rs | 259 +++++++++++++--------
datafusion/physical-plan/src/aggregates/mod.rs | 77 +++++-
datafusion/sqllogictest/test_files/groupby.slt | 14 +-
3 files changed, 234 insertions(+), 116 deletions(-)
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs
b/datafusion/physical-expr/src/aggregate/first_last.rs
index c7032e601c..4afa8d0dd5 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -36,13 +36,14 @@ use datafusion_common::{
use datafusion_expr::Accumulator;
/// FIRST_VALUE aggregate expression
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct FirstValue {
name: String,
input_data_type: DataType,
order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
ordering_req: LexOrdering,
+ requirement_satisfied: bool,
}
impl FirstValue {
@@ -54,12 +55,14 @@ impl FirstValue {
ordering_req: LexOrdering,
order_by_data_types: Vec<DataType>,
) -> Self {
+ let requirement_satisfied = ordering_req.is_empty();
Self {
name: name.into(),
input_data_type,
order_by_data_types,
expr,
ordering_req,
+ requirement_satisfied,
}
}
@@ -87,6 +90,33 @@ impl FirstValue {
pub fn ordering_req(&self) -> &LexOrdering {
&self.ordering_req
}
+
+ pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool)
-> Self {
+ self.requirement_satisfied = requirement_satisfied;
+ self
+ }
+
+ pub fn convert_to_last(self) -> LastValue {
+ let name = if self.name.starts_with("FIRST") {
+ format!("LAST{}", &self.name[5..])
+ } else {
+ format!("LAST_VALUE({})", self.expr)
+ };
+ let FirstValue {
+ expr,
+ input_data_type,
+ ordering_req,
+ order_by_data_types,
+ ..
+ } = self;
+ LastValue::new(
+ expr,
+ name,
+ input_data_type,
+ reverse_order_bys(&ordering_req),
+ order_by_data_types,
+ )
+ }
}
impl AggregateExpr for FirstValue {
@@ -100,11 +130,14 @@ impl AggregateExpr for FirstValue {
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(FirstValueAccumulator::try_new(
+ FirstValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
- )?))
+ )
+ .map(|acc| {
+
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+ })
}
fn state_fields(&self) -> Result<Vec<Field>> {
@@ -130,11 +163,7 @@ impl AggregateExpr for FirstValue {
}
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
- if self.ordering_req.is_empty() {
- None
- } else {
- Some(&self.ordering_req)
- }
+ (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
fn name(&self) -> &str {
@@ -142,26 +171,18 @@ impl AggregateExpr for FirstValue {
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- let name = if self.name.starts_with("FIRST") {
- format!("LAST{}", &self.name[5..])
- } else {
- format!("LAST_VALUE({})", self.expr)
- };
- Some(Arc::new(LastValue::new(
- self.expr.clone(),
- name,
- self.input_data_type.clone(),
- reverse_order_bys(&self.ordering_req),
- self.order_by_data_types.clone(),
- )))
+ Some(Arc::new(self.clone().convert_to_last()))
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(FirstValueAccumulator::try_new(
+ FirstValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
- )?))
+ )
+ .map(|acc| {
+
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+ })
}
}
@@ -190,6 +211,8 @@ struct FirstValueAccumulator {
orderings: Vec<ScalarValue>,
// Stores the applicable ordering requirement.
ordering_req: LexOrdering,
+ // Stores whether incoming data already satisfies the ordering requirement.
+ requirement_satisfied: bool,
}
impl FirstValueAccumulator {
@@ -203,42 +226,29 @@ impl FirstValueAccumulator {
.iter()
.map(ScalarValue::try_from)
.collect::<Result<Vec<_>>>()?;
- ScalarValue::try_from(data_type).map(|value| Self {
- first: value,
+ let requirement_satisfied = ordering_req.is_empty();
+ ScalarValue::try_from(data_type).map(|first| Self {
+ first,
is_set: false,
orderings,
ordering_req,
+ requirement_satisfied,
})
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
- let [value, orderings @ ..] = row else {
- return internal_err!("Empty row in FIRST_VALUE");
- };
- // Update when there is no entry in the state, or we have an "earlier"
- // entry according to sort requirements.
- if !self.is_set
- || compare_rows(
- &self.orderings,
- orderings,
- &get_sort_options(&self.ordering_req),
- )?
- .is_gt()
- {
- self.first = value.clone();
- self.orderings = orderings.to_vec();
- self.is_set = true;
- }
- Ok(())
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) {
+ self.first = row[0].clone();
+ self.orderings = row[1..].to_vec();
+ self.is_set = true;
}
fn get_first_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in FIRST_VALUE");
};
- if self.ordering_req.is_empty() {
- // Get first entry according to receive order (0th index)
+ if self.requirement_satisfied {
+ // Get first entry according to the pre-existing ordering (0th
index):
return Ok((!value.is_empty()).then_some(0));
}
let sort_columns = ordering_values
@@ -252,6 +262,11 @@ impl FirstValueAccumulator {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
+
+ fn with_requirement_satisfied(mut self, requirement_satisfied: bool) ->
Self {
+ self.requirement_satisfied = requirement_satisfied;
+ self
+ }
}
impl Accumulator for FirstValueAccumulator {
@@ -263,9 +278,25 @@ impl Accumulator for FirstValueAccumulator {
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if let Some(first_idx) = self.get_first_idx(values)? {
- let row = get_row_at_idx(values, first_idx)?;
- self.update_with_new_row(&row)?;
+ if !self.is_set {
+ if let Some(first_idx) = self.get_first_idx(values)? {
+ let row = get_row_at_idx(values, first_idx)?;
+ self.update_with_new_row(&row);
+ }
+ } else if !self.requirement_satisfied {
+ if let Some(first_idx) = self.get_first_idx(values)? {
+ let row = get_row_at_idx(values, first_idx)?;
+ let orderings = &row[1..];
+ if compare_rows(
+ &self.orderings,
+ orderings,
+ &get_sort_options(&self.ordering_req),
+ )?
+ .is_gt()
+ {
+ self.update_with_new_row(&row);
+ }
+ }
}
Ok(())
}
@@ -294,12 +325,12 @@ impl Accumulator for FirstValueAccumulator {
let sort_options = get_sort_options(&self.ordering_req);
// Either there is no existing value, or there is an earlier
version in new data.
if !self.is_set
- || compare_rows(first_ordering, &self.orderings,
&sort_options)?.is_lt()
+ || compare_rows(&self.orderings, first_ordering,
&sort_options)?.is_gt()
{
// Update with first value in the state. Note that we should
exclude the
// is_set flag from the state. Otherwise, we will end up with
a state
// containing two is_set flags.
- self.update_with_new_row(&first_row[0..is_set_idx])?;
+ self.update_with_new_row(&first_row[0..is_set_idx]);
}
}
Ok(())
@@ -318,13 +349,14 @@ impl Accumulator for FirstValueAccumulator {
}
/// LAST_VALUE aggregate expression
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct LastValue {
name: String,
input_data_type: DataType,
order_by_data_types: Vec<DataType>,
expr: Arc<dyn PhysicalExpr>,
ordering_req: LexOrdering,
+ requirement_satisfied: bool,
}
impl LastValue {
@@ -336,12 +368,14 @@ impl LastValue {
ordering_req: LexOrdering,
order_by_data_types: Vec<DataType>,
) -> Self {
+ let requirement_satisfied = ordering_req.is_empty();
Self {
name: name.into(),
input_data_type,
order_by_data_types,
expr,
ordering_req,
+ requirement_satisfied,
}
}
@@ -369,6 +403,33 @@ impl LastValue {
pub fn ordering_req(&self) -> &LexOrdering {
&self.ordering_req
}
+
+ pub fn with_requirement_satisfied(mut self, requirement_satisfied: bool)
-> Self {
+ self.requirement_satisfied = requirement_satisfied;
+ self
+ }
+
+ pub fn convert_to_first(self) -> FirstValue {
+ let name = if self.name.starts_with("LAST") {
+ format!("FIRST{}", &self.name[4..])
+ } else {
+ format!("FIRST_VALUE({})", self.expr)
+ };
+ let LastValue {
+ expr,
+ input_data_type,
+ ordering_req,
+ order_by_data_types,
+ ..
+ } = self;
+ FirstValue::new(
+ expr,
+ name,
+ input_data_type,
+ reverse_order_bys(&ordering_req),
+ order_by_data_types,
+ )
+ }
}
impl AggregateExpr for LastValue {
@@ -382,11 +443,14 @@ impl AggregateExpr for LastValue {
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(LastValueAccumulator::try_new(
+ LastValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
- )?))
+ )
+ .map(|acc| {
+
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+ })
}
fn state_fields(&self) -> Result<Vec<Field>> {
@@ -412,11 +476,7 @@ impl AggregateExpr for LastValue {
}
fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
- if self.ordering_req.is_empty() {
- None
- } else {
- Some(&self.ordering_req)
- }
+ (!self.ordering_req.is_empty()).then_some(&self.ordering_req)
}
fn name(&self) -> &str {
@@ -424,26 +484,18 @@ impl AggregateExpr for LastValue {
}
fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
- let name = if self.name.starts_with("LAST") {
- format!("FIRST{}", &self.name[4..])
- } else {
- format!("FIRST_VALUE({})", self.expr)
- };
- Some(Arc::new(FirstValue::new(
- self.expr.clone(),
- name,
- self.input_data_type.clone(),
- reverse_order_bys(&self.ordering_req),
- self.order_by_data_types.clone(),
- )))
+ Some(Arc::new(self.clone().convert_to_first()))
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- Ok(Box::new(LastValueAccumulator::try_new(
+ LastValueAccumulator::try_new(
&self.input_data_type,
&self.order_by_data_types,
self.ordering_req.clone(),
- )?))
+ )
+ .map(|acc| {
+
Box::new(acc.with_requirement_satisfied(self.requirement_satisfied)) as _
+ })
}
}
@@ -471,6 +523,8 @@ struct LastValueAccumulator {
orderings: Vec<ScalarValue>,
// Stores the applicable ordering requirement.
ordering_req: LexOrdering,
+ // Stores whether incoming data already satisfies the ordering requirement.
+ requirement_satisfied: bool,
}
impl LastValueAccumulator {
@@ -484,42 +538,28 @@ impl LastValueAccumulator {
.iter()
.map(ScalarValue::try_from)
.collect::<Result<Vec<_>>>()?;
- Ok(Self {
- last: ScalarValue::try_from(data_type)?,
+ let requirement_satisfied = ordering_req.is_empty();
+ ScalarValue::try_from(data_type).map(|last| Self {
+ last,
is_set: false,
orderings,
ordering_req,
+ requirement_satisfied,
})
}
// Updates state with the values in the given row.
- fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
- let [value, orderings @ ..] = row else {
- return internal_err!("Empty row in LAST_VALUE");
- };
- // Update when there is no entry in the state, or we have a "later"
- // entry (either according to sort requirements or the order of
execution).
- if !self.is_set
- || self.orderings.is_empty()
- || compare_rows(
- &self.orderings,
- orderings,
- &get_sort_options(&self.ordering_req),
- )?
- .is_lt()
- {
- self.last = value.clone();
- self.orderings = orderings.to_vec();
- self.is_set = true;
- }
- Ok(())
+ fn update_with_new_row(&mut self, row: &[ScalarValue]) {
+ self.last = row[0].clone();
+ self.orderings = row[1..].to_vec();
+ self.is_set = true;
}
fn get_last_idx(&self, values: &[ArrayRef]) -> Result<Option<usize>> {
let [value, ordering_values @ ..] = values else {
return internal_err!("Empty row in LAST_VALUE");
};
- if self.ordering_req.is_empty() {
+ if self.requirement_satisfied {
// Get last entry according to the order of data:
return Ok((!value.is_empty()).then_some(value.len() - 1));
}
@@ -538,6 +578,11 @@ impl LastValueAccumulator {
let indices = lexsort_to_indices(&sort_columns, Some(1))?;
Ok((!indices.is_empty()).then_some(indices.value(0) as _))
}
+
+ fn with_requirement_satisfied(mut self, requirement_satisfied: bool) ->
Self {
+ self.requirement_satisfied = requirement_satisfied;
+ self
+ }
}
impl Accumulator for LastValueAccumulator {
@@ -549,10 +594,26 @@ impl Accumulator for LastValueAccumulator {
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if let Some(last_idx) = self.get_last_idx(values)? {
+ if !self.is_set || self.requirement_satisfied {
+ if let Some(last_idx) = self.get_last_idx(values)? {
+ let row = get_row_at_idx(values, last_idx)?;
+ self.update_with_new_row(&row);
+ }
+ } else if let Some(last_idx) = self.get_last_idx(values)? {
let row = get_row_at_idx(values, last_idx)?;
- self.update_with_new_row(&row)?;
+ let orderings = &row[1..];
+ // Update when there is a more recent entry
+ if compare_rows(
+ &self.orderings,
+ orderings,
+ &get_sort_options(&self.ordering_req),
+ )?
+ .is_lt()
+ {
+ self.update_with_new_row(&row);
+ }
}
+
Ok(())
}
@@ -583,12 +644,12 @@ impl Accumulator for LastValueAccumulator {
// Either there is no existing value, or there is a newer (latest)
// version in the new data:
if !self.is_set
- || compare_rows(last_ordering, &self.orderings,
&sort_options)?.is_gt()
+ || compare_rows(&self.orderings, last_ordering,
&sort_options)?.is_lt()
{
// Update with last value in the state. Note that we should
exclude the
// is_set flag from the state. Otherwise, we will end up with
a state
// containing two is_set flags.
- self.update_with_new_row(&last_row[0..is_set_idx])?;
+ self.update_with_new_row(&last_row[0..is_set_idx]);
}
}
Ok(())
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index f5bb4fe59b..a38044de02 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -44,9 +44,9 @@ use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
aggregate::is_order_sensitive,
equivalence::{collapse_lex_req, ProjectionMapping},
- expressions::{Column, Max, Min, UnKnownColumn},
- physical_exprs_contains, AggregateExpr, EquivalenceProperties, LexOrdering,
- LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
+ expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn},
+ physical_exprs_contains, reverse_order_bys, AggregateExpr,
EquivalenceProperties,
+ LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement,
};
use itertools::Itertools;
@@ -324,7 +324,7 @@ impl AggregateExec {
fn try_new_with_schema(
mode: AggregateMode,
group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+ mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
@@ -347,7 +347,8 @@ impl AggregateExec {
.collect::<Vec<_>>();
let req = get_aggregate_exprs_requirement(
- &aggr_expr,
+ &new_requirement,
+ &mut aggr_expr,
&group_by,
&input_eq_properties,
&mode,
@@ -896,6 +897,11 @@ fn finer_ordering(
eq_properties.get_finer_ordering(existing_req, &aggr_req)
}
+/// Concatenates the given slices.
+fn concat_slices<T: Clone>(lhs: &[T], rhs: &[T]) -> Vec<T> {
+ [lhs, rhs].concat()
+}
+
/// Get the common requirement that satisfies all the aggregate expressions.
///
/// # Parameters
@@ -914,14 +920,64 @@ fn finer_ordering(
/// A `LexRequirement` instance, which is the requirement that satisfies all
the
/// aggregate requirements. Returns an error in case of conflicting
requirements.
fn get_aggregate_exprs_requirement(
- aggr_exprs: &[Arc<dyn AggregateExpr>],
+ prefix_requirement: &[PhysicalSortRequirement],
+ aggr_exprs: &mut [Arc<dyn AggregateExpr>],
group_by: &PhysicalGroupBy,
eq_properties: &EquivalenceProperties,
agg_mode: &AggregateMode,
) -> Result<LexRequirement> {
let mut requirement = vec![];
- for aggr_expr in aggr_exprs.iter() {
- if let Some(finer_ordering) =
+ for aggr_expr in aggr_exprs.iter_mut() {
+ let aggr_req = aggr_expr.order_bys().unwrap_or(&[]);
+ let reverse_aggr_req = reverse_order_bys(aggr_req);
+ let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req);
+ let reverse_aggr_req =
+ PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req);
+ if let Some(first_value) =
aggr_expr.as_any().downcast_ref::<FirstValue>() {
+ let mut first_value = first_value.clone();
+ if eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &aggr_req,
+ )) {
+ first_value = first_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(first_value) as _;
+ } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &reverse_aggr_req,
+ )) {
+ // Converting to LAST_VALUE enables more efficient execution
+ // given the existing ordering:
+ let mut last_value = first_value.convert_to_last();
+ last_value = last_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(last_value) as _;
+ } else {
+ // Requirement is not satisfied with existing ordering.
+ first_value = first_value.with_requirement_satisfied(false);
+ *aggr_expr = Arc::new(first_value) as _;
+ }
+ } else if let Some(last_value) =
aggr_expr.as_any().downcast_ref::<LastValue>() {
+ let mut last_value = last_value.clone();
+ if eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &aggr_req,
+ )) {
+ last_value = last_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(last_value) as _;
+ } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
+ prefix_requirement,
+ &reverse_aggr_req,
+ )) {
+ // Converting to FIRST_VALUE enables more efficient execution
+ // given the existing ordering:
+ let mut first_value = last_value.convert_to_first();
+ first_value = first_value.with_requirement_satisfied(true);
+ *aggr_expr = Arc::new(first_value) as _;
+ } else {
+ // Requirement is not satisfied with existing ordering.
+ last_value = last_value.with_requirement_satisfied(false);
+ *aggr_expr = Arc::new(last_value) as _;
+ }
+ } else if let Some(finer_ordering) =
finer_ordering(&requirement, aggr_expr, group_by, eq_properties,
agg_mode)
{
requirement = finer_ordering;
@@ -2071,7 +2127,7 @@ mod tests {
options: options1,
},
];
- let aggr_exprs = order_by_exprs
+ let mut aggr_exprs = order_by_exprs
.into_iter()
.map(|order_by_expr| {
Arc::new(OrderSensitiveArrayAgg::new(
@@ -2086,7 +2142,8 @@ mod tests {
.collect::<Vec<_>>();
let group_by = PhysicalGroupBy::new_single(vec![]);
let res = get_aggregate_exprs_requirement(
- &aggr_exprs,
+ &[],
+ &mut aggr_exprs,
&group_by,
&eq_properties,
&AggregateMode::Partial,
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index bbf21e135f..b09ff79e88 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2508,7 +2508,7 @@ Projection: sales_global.country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
----SortExec: expr=[amount@1 DESC]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2539,7 +2539,7 @@ Projection: sales_global.country,
ARRAY_AGG(sales_global.amount) ORDER BY [sales
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
FIRST_VALUE(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2571,7 +2571,7 @@ Projection: sales_global.country,
FIRST_VALUE(sales_global.amount) ORDER BY [sal
----TableScan: sales_global projection=[country, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as fv1,
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount DESC NULLS
FIRST]@2 as fv2, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount
ASC NULLS LAST]@3 as amounts]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
ARRAY_AGG(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount),
ARRAY_AGG(sales_global.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2636,7 +2636,7 @@ Projection: sales_global.country,
FIRST_VALUE(sales_global.amount) ORDER BY [sal
------TableScan: sales_global projection=[country, ts, amount]
physical_plan
ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount)
ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1,
LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2
as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3
as sum1]
---AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
+--AggregateExec: mode=Single, gby=[country@0 as country],
aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount),
SUM(sales_global.amount)]
----MemoryExec: partitions=1, partition_sizes=[1]
query TRRR rowsort
@@ -2988,7 +2988,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), FIRST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
+------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
--------------SortExec: expr=[amount@1 DESC]
----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3631,10 +3631,10 @@ Projection: FIRST_VALUE(multiple_ordered_table.a) ORDER
BY [multiple_ordered_tab
----TableScan: multiple_ordered_table projection=[a, c, d]
physical_plan
ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY
[multiple_ordered_table.a ASC NULLS LAST]@1 as first_a,
LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC
NULLS FIRST]@2 as last_c]
---AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
LAST_VALUE(multiple_ordered_table.c)]
+--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8
---------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
LAST_VALUE(multiple_ordered_table.c)]
+--------AggregateExec: mode=Partial, gby=[d@2 as d],
aggr=[FIRST_VALUE(multiple_ordered_table.a),
FIRST_VALUE(multiple_ordered_table.c)]
----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c,
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]],
has_header=true