This is an automated email from the ASF dual-hosted git repository. jiayuliu pushed a commit to branch impl-array-agg in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit a5a55d94d4955433b9b1ba47866f291c38dfddbf Author: Jiayu Liu <[email protected]> AuthorDate: Thu Oct 14 15:50:43 2021 +0800 add array agg --- datafusion/src/logical_plan/expr.rs | 4 ++++ datafusion/src/physical_plan/aggregates.rs | 24 ++++++++++++-------- .../src/physical_plan/expressions/array_agg.rs | 26 ++++++++++++++++++++++ datafusion/src/physical_plan/expressions/mod.rs | 2 ++ 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index d50d533..8927a97 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -405,6 +405,10 @@ impl Expr { .iter() .map(|e| e.get_type(schema)) .collect::<Result<Vec<_>>>()?; + let arg_fields = args + .iter() + .map(|e| e.to_field(schema)) + .collect::<Result<Vec<_>>>()?; aggregates::return_type(fun, &data_types) } Expr::AggregateUDF { fun, args, .. } => { diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index eb3f6ca..a7d97cb 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -59,8 +59,10 @@ pub enum AggregateFunction { Max, /// avg Avg, - /// Approximate aggregate function + /// approximate distinct count ApproxDistinct, + /// `array_agg` + ArrayAgg, } impl fmt::Display for AggregateFunction { @@ -79,6 +81,7 @@ impl FromStr for AggregateFunction { "count" => AggregateFunction::Count, "avg" => AggregateFunction::Avg, "sum" => AggregateFunction::Sum, + "array_agg" => AggregateFunction::ArrayAgg, "approx_distinct" => AggregateFunction::ApproxDistinct, _ => { return Err(DataFusionError::Plan(format!( @@ -105,6 +108,7 @@ pub fn return_type(fun: &AggregateFunction, arg_types: &[DataType]) -> Result<Da AggregateFunction::Max | AggregateFunction::Min => Ok(arg_types[0].clone()), AggregateFunction::Sum => sum_return_type(&arg_types[0]), AggregateFunction::Avg => avg_return_type(&arg_types[0]), + AggregateFunction::ArrayAgg => unimplemented!(), } } @@ -138,14 +142,12 @@ pub fn create_aggregate_expr( (AggregateFunction::Count, false) => { Arc::new(expressions::Count::new(arg, name, return_type)) } - (AggregateFunction::Count, true) => { - Arc::new(distinct_expressions::DistinctCount::new( - arg_types, - args.to_vec(), - name, - return_type, - )) - } + (AggregateFunction::Count, true) => Arc::new(expressions::DistinctCount::new( + arg_types, + args.to_vec(), + name, + return_type, + )), (AggregateFunction::Sum, false) => { Arc::new(expressions::Sum::new(arg, name, return_type)) } @@ -154,6 +156,9 @@ pub fn create_aggregate_expr( "SUM(DISTINCT) aggregations are not available".to_string(), )); } + (AggregateFunction::ArrayAgg, _) => Arc::new( + expressions::ArrayAgg::new(arg, name, arg_types) + ), (AggregateFunction::ApproxDistinct, _) => Arc::new( expressions::ApproxDistinct::new(arg, name, arg_types[0].clone()), ), @@ -205,6 +210,7 @@ pub fn signature(fun: &AggregateFunction) -> Signature { AggregateFunction::Count | AggregateFunction::ApproxDistinct => { Signature::any(1, Volatility::Immutable) } + AggregateFunction::ArrayAgg => Signature::variadic_equal(Volatility::Immutable), AggregateFunction::Min | AggregateFunction::Max => { let valid = STRINGS .iter() diff --git a/datafusion/src/physical_plan/expressions/array_agg.rs b/datafusion/src/physical_plan/expressions/array_agg.rs new file mode 100644 index 0000000..4fe005b --- /dev/null +++ b/datafusion/src/physical_plan/expressions/array_agg.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines physical expressions that can evaluated at runtime during query execution + +/// APPROX_DISTINCT aggregate expression +#[derive(Debug)] +pub struct ArrayAgg { + name: String, + input_data_type: DataType, + expr: Arc<dyn PhysicalExpr>, +} diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 4ca0036..e765b67 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -26,6 +26,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::record_batch::RecordBatch; mod approx_distinct; +mod array_agg; mod average; #[macro_use] mod binary; @@ -57,6 +58,7 @@ pub mod helpers { } pub use approx_distinct::ApproxDistinct; +pub use array_agg::ArrayAgg; pub use average::{avg_return_type, Avg, AvgAccumulator}; pub use binary::{binary, binary_operator_data_type, BinaryExpr}; pub use case::{case, CaseExpr};
