alamb commented on code in PR #6734:
URL: https://github.com/apache/arrow-datafusion/pull/6734#discussion_r1238526016
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -81,6 +81,8 @@ pub enum AggregateMode {
/// Group By expression modes
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GroupByOrderMode {
+ /// None of the expressions in the GROUP BY clause have an ordering.
+ Linear,
Review Comment:
I wonder if `None` would be a clearer name here
##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2580,12 +2580,280 @@ FRA 200 50 250
# Run order-sensitive aggregators in multiple partitions
statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER
BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]],
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
Review Comment:
This second (re)sort is needed because `RepartitionExec: partitioning=Hash`
does not preserve the ordering right?
##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2076,18 +2076,18 @@ Projection: annotated_data_infinite2.a,
annotated_data_infinite2.b, FIRST_VALUE(
----TableScan: annotated_data_infinite2 projection=[a, b, c]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b,
FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a
DESC NULLS FIRST]@2 as first_c]
---AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
+--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b],
aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=FullyOrdered
----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS
LAST, c@2 ASC NULLS LAST], has_header=true
query III
SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
FROM annotated_data_infinite2
GROUP BY a, b
----
-0 0 0
-0 1 25
-1 2 50
-1 3 75
+0 0 24
Review Comment:
Why did this output change? I also don't understand why the plan changed
from `FIRST_VALUE` to `LAST_VALUE` when the query did not change
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
Review Comment:
It might also help to add some comments here that this special operator is
required to handle multi-phase / multi-partion aggregation where the aggregate
is computed using multiple intermediate aggregates that need to be combined
together
##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -117,42 +153,57 @@ impl PartialEq<dyn Any> for FirstValue {
#[derive(Debug)]
struct FirstValueAccumulator {
first: ScalarValue,
- // At the beginning, `is_set` is `false`, this means `first` is not seen
yet.
- // Once we see (`is_set=true`) first value, we do not update `first`.
+ // At the beginning, `is_set` is false, which means `first` is not seen
yet.
Review Comment:
An alternate formulation would be to store `first: Option<Scalar>` which
would allow the compiler to ensure you checked for `first`'s initialization in
all cases
##########
datafusion/physical-expr/src/aggregate/mod.rs:
##########
@@ -85,6 +86,13 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn
Any> {
/// Single-column aggregations such as `sum` return a single value, others
(e.g. `cov`) return many.
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+ /// Order by requirements for the aggregate function
+ /// By default it is `None` (there is no requirement)
+ /// Order-sensitive aggregators should implement this
Review Comment:
```suggestion
/// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)`
should implement this
/// so that DataFusion will ensure the input is sorted correctly
```
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+ name: String,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ expr: Arc<dyn PhysicalExpr>,
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+ /// Create a new `OrderSensitiveArrayAgg` aggregate function
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ ordering_req: LexOrdering,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ expr,
+ input_data_type,
+ order_by_data_types,
+ ordering_req,
+ }
+ }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new_list(
+ &self.name,
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ ))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ self.ordering_req.clone(),
+ )?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ let mut fields = vec![Field::new_list(
+ format_state_name(&self.name, "array_agg"),
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ )];
+ let orderings = ordering_fields(&self.ordering_req,
&self.order_by_data_types);
+ fields.push(Field::new_list(
+ format_state_name(&self.name, "array_agg_orderings"),
+ Field::new(
+ "item",
+ DataType::Struct(Fields::from(orderings.clone())),
+ true,
+ ),
+ false,
+ ));
+ fields.extend(orderings);
+ Ok(fields)
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr.clone()]
+ }
+
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.ordering_req.is_empty() {
+ None
+ } else {
+ Some(&self.ordering_req)
+ }
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| {
+ self.name == x.name
+ && self.input_data_type == x.input_data_type
+ && self.order_by_data_types == x.order_by_data_types
+ && self.expr.eq(&x.expr)
+ })
+ .unwrap_or(false)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+ values: Vec<ScalarValue>,
+ ordering_values: Vec<Vec<ScalarValue>>,
+ datatypes: Vec<DataType>,
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+ /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+ /// item data type.
+ pub fn try_new(
+ datatype: &DataType,
+ ordering_dtypes: &[DataType],
+ ordering_req: LexOrdering,
+ ) -> Result<Self> {
+ let mut datatypes = vec![datatype.clone()];
+ datatypes.extend(ordering_dtypes.iter().cloned());
+ Ok(Self {
+ values: vec![],
+ ordering_values: vec![],
+ datatypes,
+ ordering_req,
+ })
+ }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if values.is_empty() {
+ return Ok(());
+ }
+ let n_row = values[0].len();
+ for index in 0..n_row {
+ let row = get_row_at_idx(values, index)?;
+ self.values.push(row[0].clone());
+ self.ordering_values.push(row[1..].to_vec());
+ }
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ if states.is_empty() {
+ return Ok(());
+ }
+ // First entry in the state is the aggregation result.
+ let array_agg_values = &states[0];
+ // 2nd entry stores values received for ordering requirement columns,
for each aggregation value inside ARRAY_AGG list.
+ // For each `ScalarValue` inside ARRAY_AGG list, we will receive a
`Vec<ScalarValue>` that stores
+ // values received from its ordering requirement expression. (This
information is necessary for during merging).
+ let agg_orderings = &states[1];
+ if agg_orderings.as_any().is::<ListArray>() {
+ // Stores ARRAY_AGG results coming from each partition
+ let mut partition_values = vec![];
+ // Stores ordering requirement expression results coming from each
partition
+ let mut partition_ordering_values = vec![];
+ for index in 0..agg_orderings.len() {
+ let ordering = ScalarValue::try_from_array(agg_orderings,
index)?;
+ // Ordering requirement expression values for each entry in
the ARRAY_AGG list
+ let other_ordering_values =
+ self.convert_array_agg_to_orderings(ordering)?;
+ // ARRAY_AGG result. (It is a `ScalarValue::List` under the
hood, it stores `Vec<ScalarValue>`)
+ let array_agg_res =
ScalarValue::try_from_array(array_agg_values, index)?;
+ if let ScalarValue::List(Some(other_values), _) =
array_agg_res {
+ partition_values.push(other_values);
+ partition_ordering_values.push(other_ordering_values);
+ } else {
+ return Err(DataFusionError::Internal(
+ "ARRAY_AGG state must be list!".into(),
+ ));
+ }
+ }
+ let sort_options = self
+ .ordering_req
+ .iter()
+ .map(|sort_expr| sort_expr.options)
+ .collect::<Vec<_>>();
+ self.values = merge_ordered_arrays(
+ &partition_values,
+ &partition_ordering_values,
+ &sort_options,
+ )?;
+ } else {
+ return Err(DataFusionError::Execution(
+ "Expects to receive a list array".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ let mut result = vec![self.evaluate()?];
+ result.push(self.evaluate_orderings()?);
+ let last_ordering = if let Some(ordering) =
self.ordering_values.last() {
+ ordering.clone()
+ } else {
+ // In case ordering is empty, construct ordering as NULL:
+ self.datatypes
+ .iter()
+ .skip(1)
+ .map(ScalarValue::try_from)
+ .collect::<Result<Vec<_>>>()?
+ };
+ result.extend(last_ordering);
+ Ok(result)
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ Ok(ScalarValue::new_list(
+ Some(self.values.clone()),
+ self.datatypes[0].clone(),
+ ))
+ }
+
+ fn size(&self) -> usize {
+ let mut total = std::mem::size_of_val(self)
+ + ScalarValue::size_of_vec(&self.values)
+ - std::mem::size_of_val(&self.values);
+
+ // Add size of the `self.ordering_values`
+ total +=
+ std::mem::size_of::<Vec<ScalarValue>>() *
self.ordering_values.capacity();
+ for row in &self.ordering_values {
+ total += ScalarValue::size_of_vec(row) -
std::mem::size_of_val(row);
+ }
+
+ // Add size of the `self.datatypes`
+ total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+ for dtype in &self.datatypes {
+ total += dtype.size() - std::mem::size_of_val(dtype);
+ }
+
+ // Add size of the `self.ordering_req`
+ total += std::mem::size_of::<PhysicalSortExpr>() *
self.ordering_req.capacity();
+ // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
Review Comment:
I think it is ok to leave this as a TODO and be somewhat inaccurate here
because they are all Arc's and therefore there will only be one actual
`PhysicalSortExpr` rather than one per group, so their contribution to the
overall memory use should be small.
##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2580,12 +2580,280 @@ FRA 200 50 250
# Run order-sensitive aggregators in multiple partitions
statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 8;
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER
BY [sales_global.ts ASC NULLS LAST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]],
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "country",
index: 0 }], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)],
ordering_mode=Linear
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts ASC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+FRA 50 200
+GRC 30 80
+TUR 75 100
+
+# Conversion in between FIRST_VALUE and LAST_VALUE to resolve
+# contradictory requirements should work in multi partitions.
+query TT
+EXPLAIN SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+logical_plan
+Sort: sales_global.country ASC NULLS LAST
+--Projection: sales_global.country, FIRST_VALUE(sales_global.amount) ORDER BY
[sales_global.ts ASC NULLS LAST] AS fv1, LAST_VALUE(sales_global.amount) ORDER
BY [sales_global.ts DESC NULLS FIRST] AS fv2
+----Aggregate: groupBy=[[sales_global.country]],
aggr=[[FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS
LAST], LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]]]
+------TableScan: sales_global projection=[country, ts, amount]
+physical_plan
+SortPreservingMergeExec: [country@0 ASC NULLS LAST]
+--SortExec: expr=[country@0 ASC NULLS LAST]
+----ProjectionExec: expr=[country@0 as country,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS
FIRST]@2 as fv2]
+------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+--------SortExec: expr=[ts@1 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "country",
index: 0 }], 8), input_partitions=8
+--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)],
ordering_mode=Linear
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
+------------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+--------------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query TRR
+SELECT country, FIRST_VALUE(amount ORDER BY ts ASC) AS fv1,
+ LAST_VALUE(amount ORDER BY ts DESC) AS fv2
+ FROM sales_global
+ GROUP BY country
+ ORDER BY country
+----
+FRA 50 50
+GRC 30 30
+TUR 75 75
+
+# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
+# multi-partitions also.
Review Comment:
```suggestion
# multi-partitions without group by also.
```
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+ name: String,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ expr: Arc<dyn PhysicalExpr>,
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+ /// Create a new `OrderSensitiveArrayAgg` aggregate function
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ ordering_req: LexOrdering,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ expr,
+ input_data_type,
+ order_by_data_types,
+ ordering_req,
+ }
+ }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new_list(
+ &self.name,
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ ))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ self.ordering_req.clone(),
+ )?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ let mut fields = vec![Field::new_list(
+ format_state_name(&self.name, "array_agg"),
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ )];
+ let orderings = ordering_fields(&self.ordering_req,
&self.order_by_data_types);
+ fields.push(Field::new_list(
+ format_state_name(&self.name, "array_agg_orderings"),
+ Field::new(
+ "item",
+ DataType::Struct(Fields::from(orderings.clone())),
+ true,
+ ),
+ false,
+ ));
+ fields.extend(orderings);
+ Ok(fields)
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr.clone()]
+ }
+
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.ordering_req.is_empty() {
+ None
+ } else {
+ Some(&self.ordering_req)
+ }
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| {
+ self.name == x.name
+ && self.input_data_type == x.input_data_type
+ && self.order_by_data_types == x.order_by_data_types
+ && self.expr.eq(&x.expr)
+ })
+ .unwrap_or(false)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+ values: Vec<ScalarValue>,
Review Comment:
I think it would help to document what values and ordering values are
storing here, perhaps referring to the very nice documentation on
`merge_ordered_arrays` would be sufficient.
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
Review Comment:
This comment appears to be incorrect as this file contains an implemenation
of ARRAY_AGG that works with ordering.
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -0,0 +1,605 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query
execution
+
+use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};
+use arrow::array::ArrayRef;
+use arrow::datatypes::{DataType, Field};
+use arrow_array::{Array, ListArray};
+use arrow_schema::{Fields, SortOptions};
+use datafusion_common::utils::{compare_rows, get_row_at_idx};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::Accumulator;
+use itertools::izip;
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::BinaryHeap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Expression for a ARRAY_AGG(ORDER BY) aggregation.
+#[derive(Debug)]
+pub struct OrderSensitiveArrayAgg {
+ name: String,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ expr: Arc<dyn PhysicalExpr>,
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAgg {
+ /// Create a new `OrderSensitiveArrayAgg` aggregate function
+ pub fn new(
+ expr: Arc<dyn PhysicalExpr>,
+ name: impl Into<String>,
+ input_data_type: DataType,
+ order_by_data_types: Vec<DataType>,
+ ordering_req: LexOrdering,
+ ) -> Self {
+ Self {
+ name: name.into(),
+ expr,
+ input_data_type,
+ order_by_data_types,
+ ordering_req,
+ }
+ }
+}
+
+impl AggregateExpr for OrderSensitiveArrayAgg {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn field(&self) -> Result<Field> {
+ Ok(Field::new_list(
+ &self.name,
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ ))
+ }
+
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+ Ok(Box::new(OrderSensitiveArrayAggAccumulator::try_new(
+ &self.input_data_type,
+ &self.order_by_data_types,
+ self.ordering_req.clone(),
+ )?))
+ }
+
+ fn state_fields(&self) -> Result<Vec<Field>> {
+ let mut fields = vec![Field::new_list(
+ format_state_name(&self.name, "array_agg"),
+ Field::new("item", self.input_data_type.clone(), true),
+ false,
+ )];
+ let orderings = ordering_fields(&self.ordering_req,
&self.order_by_data_types);
+ fields.push(Field::new_list(
+ format_state_name(&self.name, "array_agg_orderings"),
+ Field::new(
+ "item",
+ DataType::Struct(Fields::from(orderings.clone())),
+ true,
+ ),
+ false,
+ ));
+ fields.extend(orderings);
+ Ok(fields)
+ }
+
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+ vec![self.expr.clone()]
+ }
+
+ fn order_bys(&self) -> Option<&[PhysicalSortExpr]> {
+ if self.ordering_req.is_empty() {
+ None
+ } else {
+ Some(&self.ordering_req)
+ }
+ }
+
+ fn name(&self) -> &str {
+ &self.name
+ }
+}
+
+impl PartialEq<dyn Any> for OrderSensitiveArrayAgg {
+ fn eq(&self, other: &dyn Any) -> bool {
+ down_cast_any_ref(other)
+ .downcast_ref::<Self>()
+ .map(|x| {
+ self.name == x.name
+ && self.input_data_type == x.input_data_type
+ && self.order_by_data_types == x.order_by_data_types
+ && self.expr.eq(&x.expr)
+ })
+ .unwrap_or(false)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct OrderSensitiveArrayAggAccumulator {
+ values: Vec<ScalarValue>,
+ ordering_values: Vec<Vec<ScalarValue>>,
+ datatypes: Vec<DataType>,
+ ordering_req: LexOrdering,
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+ /// Create a new order-sensitive ARRAY_AGG accumulator based on the given
+ /// item data type.
+ pub fn try_new(
+ datatype: &DataType,
+ ordering_dtypes: &[DataType],
+ ordering_req: LexOrdering,
+ ) -> Result<Self> {
+ let mut datatypes = vec![datatype.clone()];
+ datatypes.extend(ordering_dtypes.iter().cloned());
+ Ok(Self {
+ values: vec![],
+ ordering_values: vec![],
+ datatypes,
+ ordering_req,
+ })
+ }
+}
+
+impl Accumulator for OrderSensitiveArrayAggAccumulator {
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if values.is_empty() {
+ return Ok(());
+ }
+ let n_row = values[0].len();
+ for index in 0..n_row {
+ let row = get_row_at_idx(values, index)?;
+ self.values.push(row[0].clone());
+ self.ordering_values.push(row[1..].to_vec());
+ }
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ if states.is_empty() {
+ return Ok(());
+ }
+ // First entry in the state is the aggregation result.
+ let array_agg_values = &states[0];
+ // 2nd entry stores values received for ordering requirement columns,
for each aggregation value inside ARRAY_AGG list.
+ // For each `ScalarValue` inside ARRAY_AGG list, we will receive a
`Vec<ScalarValue>` that stores
+ // values received from its ordering requirement expression. (This
information is necessary for during merging).
+ let agg_orderings = &states[1];
+ if agg_orderings.as_any().is::<ListArray>() {
+ // Stores ARRAY_AGG results coming from each partition
+ let mut partition_values = vec![];
+ // Stores ordering requirement expression results coming from each
partition
+ let mut partition_ordering_values = vec![];
+ for index in 0..agg_orderings.len() {
+ let ordering = ScalarValue::try_from_array(agg_orderings,
index)?;
+ // Ordering requirement expression values for each entry in
the ARRAY_AGG list
+ let other_ordering_values =
+ self.convert_array_agg_to_orderings(ordering)?;
+ // ARRAY_AGG result. (It is a `ScalarValue::List` under the
hood, it stores `Vec<ScalarValue>`)
+ let array_agg_res =
ScalarValue::try_from_array(array_agg_values, index)?;
+ if let ScalarValue::List(Some(other_values), _) =
array_agg_res {
+ partition_values.push(other_values);
+ partition_ordering_values.push(other_ordering_values);
+ } else {
+ return Err(DataFusionError::Internal(
+ "ARRAY_AGG state must be list!".into(),
+ ));
+ }
+ }
+ let sort_options = self
+ .ordering_req
+ .iter()
+ .map(|sort_expr| sort_expr.options)
+ .collect::<Vec<_>>();
+ self.values = merge_ordered_arrays(
+ &partition_values,
+ &partition_ordering_values,
+ &sort_options,
+ )?;
+ } else {
+ return Err(DataFusionError::Execution(
+ "Expects to receive a list array".to_string(),
+ ));
+ }
+ Ok(())
+ }
+
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ let mut result = vec![self.evaluate()?];
+ result.push(self.evaluate_orderings()?);
+ let last_ordering = if let Some(ordering) =
self.ordering_values.last() {
+ ordering.clone()
+ } else {
+ // In case ordering is empty, construct ordering as NULL:
+ self.datatypes
+ .iter()
+ .skip(1)
+ .map(ScalarValue::try_from)
+ .collect::<Result<Vec<_>>>()?
+ };
+ result.extend(last_ordering);
+ Ok(result)
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ Ok(ScalarValue::new_list(
+ Some(self.values.clone()),
+ self.datatypes[0].clone(),
+ ))
+ }
+
+ fn size(&self) -> usize {
+ let mut total = std::mem::size_of_val(self)
+ + ScalarValue::size_of_vec(&self.values)
+ - std::mem::size_of_val(&self.values);
+
+ // Add size of the `self.ordering_values`
+ total +=
+ std::mem::size_of::<Vec<ScalarValue>>() *
self.ordering_values.capacity();
+ for row in &self.ordering_values {
+ total += ScalarValue::size_of_vec(row) -
std::mem::size_of_val(row);
+ }
+
+ // Add size of the `self.datatypes`
+ total += std::mem::size_of::<DataType>() * self.datatypes.capacity();
+ for dtype in &self.datatypes {
+ total += dtype.size() - std::mem::size_of_val(dtype);
+ }
+
+ // Add size of the `self.ordering_req`
+ total += std::mem::size_of::<PhysicalSortExpr>() *
self.ordering_req.capacity();
+ // TODO: Calculate size of each `PhysicalSortExpr` more accurately.
+ total
+ }
+}
+
+impl OrderSensitiveArrayAggAccumulator {
+ fn convert_array_agg_to_orderings(
+ &self,
+ in_data: ScalarValue,
+ ) -> Result<Vec<Vec<ScalarValue>>> {
+ if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
+ list_vals.into_iter().map(|struct_vals| {
+ if let ScalarValue::Struct(Some(orderings), _fields) =
struct_vals {
+ Ok(orderings)
+ } else {
+ Err(DataFusionError::Execution(format!(
+ "Expects to receive ScalarValue::Struct(Some(..), _)
but got:{:?}",
+ struct_vals.get_datatype()
+ )))
+ }
+ }).collect::<Result<Vec<_>>>()
+ } else {
+ Err(DataFusionError::Execution(format!(
+ "Expects to receive ScalarValue::List(Some(..), _) but
got:{:?}",
+ in_data.get_datatype()
+ )))
+ }
+ }
+
+ fn evaluate_orderings(&self) -> Result<ScalarValue> {
+ let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
+ let struct_field = Fields::from(fields.clone());
+ let orderings = self
+ .ordering_values
+ .iter()
+ .map(|ordering| {
+ ScalarValue::Struct(Some(ordering.clone()),
struct_field.clone())
+ })
+ .collect();
+ let struct_type = DataType::Struct(Fields::from(fields));
+ Ok(ScalarValue::new_list(Some(orderings), struct_type))
+ }
+}
+
+/// This is a wrapper struct to be able to correctly merge ARRAY_AGG
+/// data from multiple partitions using `BinaryHeap`.
+/// When used inside `BinaryHeap` this struct returns smallest `CustomElement`,
+/// where smallest is determined by `ordering` values (`Vec<ScalarValue>`)
+/// according to `sort_options`
+#[derive(Debug, PartialEq, Eq)]
+struct CustomElement<'a> {
+ // Stores from which partition entry is received
+ branch_idx: usize,
+ // values to be merged
+ value: ScalarValue,
+ // according to `ordering` values, comparisons will be done.
+ ordering: Vec<ScalarValue>,
+ // `sort_options` defines, desired ordering by the user
+ sort_options: &'a [SortOptions],
+}
+
+impl<'a> CustomElement<'a> {
+ fn new(
+ branch_idx: usize,
+ value: ScalarValue,
+ ordering: Vec<ScalarValue>,
+ sort_options: &'a [SortOptions],
+ ) -> Self {
+ Self {
+ branch_idx,
+ value,
+ ordering,
+ sort_options,
+ }
+ }
+
+ fn ordering(
+ &self,
+ current: &[ScalarValue],
+ target: &[ScalarValue],
+ ) -> Result<Ordering> {
+ // Calculate ordering according to `sort_options`
+ compare_rows(current, target, self.sort_options)
+ }
+}
+
+// Overwrite ordering implementation such that
+// - `self.ordering` values are used for comparison,
+// - When used inside `BinaryHeap` it is a min-heap.
+impl<'a> Ord for CustomElement<'a> {
+ fn cmp(&self, other: &Self) -> Ordering {
+ // Compares according to custom ordering
+ self.ordering(&self.ordering, &other.ordering)
+ // Convert max heap to min heap
+ .map(|ordering| ordering.reverse())
+ // This function return error, when `self.ordering` and
`other.ordering`
+ // have different types (such as one is `ScalarValue::Int64`,
other is `ScalarValue::Float32`)
+ // Here this case won't happen, because data from each partition
will have same type
+ .unwrap()
+ }
+}
+
+impl<'a> PartialOrd for CustomElement<'a> {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+/// This functions merges `values` array (`&[Vec<ScalarValue>]`) into single
array `Vec<ScalarValue>`
+/// Merging done according to ordering values stored inside `ordering_values`
(`&[Vec<Vec<ScalarValue>>]`)
+/// Inner `Vec<ScalarValue>` in the `ordering_values` can be thought as
ordering information for the
+/// each `ScalarValue` in the `values` array.
+/// Desired ordering specified by `sort_options` argument (Should have same
size with inner `Vec<ScalarValue>`
+/// of the `ordering_values` array).
+///
+/// As an example
+/// values can be \[
+/// \[1, 2, 3, 4, 5\],
+/// \[1, 2, 3, 4\],
+/// \[1, 2, 3, 4, 5, 6\],
+/// \]
+/// In this case we will be merging three arrays (doesn't have to be same size)
+/// and produce a merged array with size 15 (sum of 5+4+6)
+/// Merging will be done according to ordering at `ordering_values` vector.
+/// As an example `ordering_values` can be [
+/// \[(1, a), (2, b), (3, b), (4, a), (5, b) \],
+/// \[(1, a), (2, b), (3, b), (4, a) \],
+/// \[(1, b), (2, c), (3, d), (4, e), (5, a), (6, b) \],
+/// ]
+/// For each ScalarValue in the `values` we have a corresponding
`Vec<ScalarValue>` (like timestamp of it)
+/// for the example above `sort_options` will have size two, that defines
ordering requirement of the merge.
+/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared
according `sort_options` (Their sizes should match)
+fn merge_ordered_arrays(
+ // We will merge values into single `Vec<ScalarValue>`.
+ values: &[Vec<ScalarValue>],
+ // `values` will be merged according to `ordering_values`.
+ // Inner `Vec<ScalarValue>` can be thought as ordering information for the
+ // each `ScalarValue` in the values`.
+ ordering_values: &[Vec<Vec<ScalarValue>>],
+ // Defines according to which ordering comparisons should be done.
+ sort_options: &[SortOptions],
+) -> Result<Vec<ScalarValue>> {
+ // Keep track the most recent data of each branch, in binary heap data
structure.
+ let mut heap: BinaryHeap<CustomElement> = BinaryHeap::new();
+
+ if !(values.len() == ordering_values.len()
+ && values
+ .iter()
+ .zip(ordering_values.iter())
+ .all(|(vals, ordering_vals)| vals.len() == ordering_vals.len()))
+ {
+ return Err(DataFusionError::Execution(
+ "Expects values arguments and/or ordering_values arguments to have
same size"
+ .to_string(),
+ ));
+ }
+ let n_branch = values.len();
+ // For each branch we keep track of indices of next will be merged entry
Review Comment:
I do wonder if there is some kernel we could add for this operation in
arrow-rs (this code is quite similar in spirit to what is in
SortPreservingMerge 🤔 )
##########
datafusion/physical-expr/src/aggregate/first_last.rs:
##########
@@ -117,42 +153,57 @@ impl PartialEq<dyn Any> for FirstValue {
#[derive(Debug)]
struct FirstValueAccumulator {
first: ScalarValue,
- // At the beginning, `is_set` is `false`, this means `first` is not seen
yet.
- // Once we see (`is_set=true`) first value, we do not update `first`.
+ // At the beginning, `is_set` is false, which means `first` is not seen
yet.
+ // Once we see the first value, we set the `is_set` flag and do not update
`first` anymore.
is_set: bool,
+ orderings: Vec<ScalarValue>,
Review Comment:
I think it would help to explain what this field is storing (namely, the
values of the `ORDER BY` columns, if present), so different intermediate values
can be correctly merged
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]