This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch impl-array-agg
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit a5a55d94d4955433b9b1ba47866f291c38dfddbf
Author: Jiayu Liu <[email protected]>
AuthorDate: Thu Oct 14 15:50:43 2021 +0800

    add array agg
---
 datafusion/src/logical_plan/expr.rs                |  4 ++++
 datafusion/src/physical_plan/aggregates.rs         | 24 ++++++++++++--------
 .../src/physical_plan/expressions/array_agg.rs     | 26 ++++++++++++++++++++++
 datafusion/src/physical_plan/expressions/mod.rs    |  2 ++
 4 files changed, 47 insertions(+), 9 deletions(-)

diff --git a/datafusion/src/logical_plan/expr.rs 
b/datafusion/src/logical_plan/expr.rs
index d50d533..8927a97 100644
--- a/datafusion/src/logical_plan/expr.rs
+++ b/datafusion/src/logical_plan/expr.rs
@@ -405,6 +405,10 @@ impl Expr {
                     .iter()
                     .map(|e| e.get_type(schema))
                     .collect::<Result<Vec<_>>>()?;
+                let arg_fields = args
+                    .iter()
+                    .map(|e| e.to_field(schema))
+                    .collect::<Result<Vec<_>>>()?;
                 aggregates::return_type(fun, &data_types)
             }
             Expr::AggregateUDF { fun, args, .. } => {
diff --git a/datafusion/src/physical_plan/aggregates.rs 
b/datafusion/src/physical_plan/aggregates.rs
index eb3f6ca..a7d97cb 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -59,8 +59,10 @@ pub enum AggregateFunction {
     Max,
     /// avg
     Avg,
-    /// Approximate aggregate function
+    /// approximate distinct count
     ApproxDistinct,
+    /// `array_agg`
+    ArrayAgg,
 }
 
 impl fmt::Display for AggregateFunction {
@@ -79,6 +81,7 @@ impl FromStr for AggregateFunction {
             "count" => AggregateFunction::Count,
             "avg" => AggregateFunction::Avg,
             "sum" => AggregateFunction::Sum,
+            "array_agg" => AggregateFunction::ArrayAgg,
             "approx_distinct" => AggregateFunction::ApproxDistinct,
             _ => {
                 return Err(DataFusionError::Plan(format!(
@@ -105,6 +108,7 @@ pub fn return_type(fun: &AggregateFunction, arg_types: 
&[DataType]) -> Result<Da
         AggregateFunction::Max | AggregateFunction::Min => 
Ok(arg_types[0].clone()),
         AggregateFunction::Sum => sum_return_type(&arg_types[0]),
         AggregateFunction::Avg => avg_return_type(&arg_types[0]),
+        AggregateFunction::ArrayAgg => unimplemented!(),
     }
 }
 
@@ -138,14 +142,12 @@ pub fn create_aggregate_expr(
         (AggregateFunction::Count, false) => {
             Arc::new(expressions::Count::new(arg, name, return_type))
         }
-        (AggregateFunction::Count, true) => {
-            Arc::new(distinct_expressions::DistinctCount::new(
-                arg_types,
-                args.to_vec(),
-                name,
-                return_type,
-            ))
-        }
+        (AggregateFunction::Count, true) => 
Arc::new(expressions::DistinctCount::new(
+            arg_types,
+            args.to_vec(),
+            name,
+            return_type,
+        )),
         (AggregateFunction::Sum, false) => {
             Arc::new(expressions::Sum::new(arg, name, return_type))
         }
@@ -154,6 +156,9 @@ pub fn create_aggregate_expr(
                 "SUM(DISTINCT) aggregations are not available".to_string(),
             ));
         }
+        (AggregateFunction::ArrayAgg, _) => Arc::new(
+            expressions::ArrayAgg::new(arg, name, arg_types)
+        ),
         (AggregateFunction::ApproxDistinct, _) => Arc::new(
             expressions::ApproxDistinct::new(arg, name, arg_types[0].clone()),
         ),
@@ -205,6 +210,7 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
         AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
             Signature::any(1, Volatility::Immutable)
         }
+        AggregateFunction::ArrayAgg => 
Signature::variadic_equal(Volatility::Immutable),
         AggregateFunction::Min | AggregateFunction::Max => {
             let valid = STRINGS
                 .iter()
diff --git a/datafusion/src/physical_plan/expressions/array_agg.rs 
b/datafusion/src/physical_plan/expressions/array_agg.rs
new file mode 100644
index 0000000..4fe005b
--- /dev/null
+++ b/datafusion/src/physical_plan/expressions/array_agg.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+/// APPROX_DISTINCT aggregate expression
+#[derive(Debug)]
+pub struct ArrayAgg {
+  name: String,
+  input_data_type: DataType,
+  expr: Arc<dyn PhysicalExpr>,
+}
diff --git a/datafusion/src/physical_plan/expressions/mod.rs 
b/datafusion/src/physical_plan/expressions/mod.rs
index 4ca0036..e765b67 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -26,6 +26,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions};
 use arrow::record_batch::RecordBatch;
 
 mod approx_distinct;
+mod array_agg;
 mod average;
 #[macro_use]
 mod binary;
@@ -57,6 +58,7 @@ pub mod helpers {
 }
 
 pub use approx_distinct::ApproxDistinct;
+pub use array_agg::ArrayAgg;
 pub use average::{avg_return_type, Avg, AvgAccumulator};
 pub use binary::{binary, binary_operator_data_type, BinaryExpr};
 pub use case::{case, CaseExpr};

Reply via email to