dharanad commented on code in PR #11448:
URL: https://github.com/apache/datafusion/pull/11448#discussion_r1683225628


##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use arrow::array::{Array, ArrayRef, AsArray};
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::AggregateUDFImpl;
+use datafusion_expr::{Accumulator, Signature, Volatility};
+use std::collections::HashSet;
+use std::sync::Arc;
+
+make_udaf_expr_and_func!(
+    ArrayAgg,
+    array_agg,
+    expression,
+    "input values, including nulls, concatenated into an array",
+    array_agg_udaf
+);
+
+#[derive(Debug)]
+/// ARRAY_AGG aggregate expression
+pub struct ArrayAgg {
+    signature: Signature,
+    alias: Vec<String>,
+}
+
+impl Default for ArrayAgg {
+    fn default() -> Self {
+        Self {
+            signature: Signature::any(1, Volatility::Immutable),
+            alias: vec!["array_agg".to_string()],
+        }
+    }
+}
+
+impl AggregateUDFImpl for ArrayAgg {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    // TODO: change name to lowercase
+    fn name(&self) -> &str {
+        "ARRAY_AGG"

Review Comment:
   Any specific reason for having this in uppercase ?



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use arrow::array::{Array, ArrayRef, AsArray};
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::AggregateUDFImpl;
+use datafusion_expr::{Accumulator, Signature, Volatility};
+use std::collections::HashSet;
+use std::sync::Arc;
+
+make_udaf_expr_and_func!(
+    ArrayAgg,
+    array_agg,
+    expression,
+    "input values, including nulls, concatenated into an array",
+    array_agg_udaf
+);
+
+#[derive(Debug)]
+/// ARRAY_AGG aggregate expression
+pub struct ArrayAgg {
+    signature: Signature,
+    alias: Vec<String>,
+}
+
+impl Default for ArrayAgg {
+    fn default() -> Self {
+        Self {
+            signature: Signature::any(1, Volatility::Immutable),
+            alias: vec!["array_agg".to_string()],
+        }
+    }
+}
+
+impl AggregateUDFImpl for ArrayAgg {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    // TODO: change name to lowercase
+    fn name(&self) -> &str {
+        "ARRAY_AGG"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.alias
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(DataType::List(Arc::new(Field::new(
+            "item",
+            arg_types[0].clone(),
+            true,
+        ))))
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        if args.is_distinct {
+            return Ok(vec![Field::new_list(
+                format_state_name(args.name, "distinct_array_agg"),
+                Field::new("item", args.input_type.clone(), true),
+                true,
+            )]);
+        }
+
+        Ok(vec![Field::new_list(
+            format_state_name(args.name, "array_agg"),
+            Field::new("item", args.input_type.clone(), true),
+            true,
+        )])
+    }
+
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        if acc_args.is_distinct {
+            return Ok(Box::new(DistinctArrayAggAccumulator::try_new(
+                acc_args.input_type,
+            )?));
+        }
+
+        Ok(Box::new(ArrayAggAccumulator::try_new(acc_args.input_type)?))
+    }
+}
+
+#[derive(Debug)]
+pub struct ArrayAggAccumulator {
+    values: Vec<ArrayRef>,
+    datatype: DataType,
+}
+
+impl ArrayAggAccumulator {
+    /// new array_agg accumulator based on given item data type
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            values: vec![],
+            datatype: datatype.clone(),
+        })
+    }
+}
+
+impl Accumulator for ArrayAggAccumulator {
+    // Append value like Int64Array(1,2,3)
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        assert!(values.len() == 1, "array_agg can only take 1 param!");

Review Comment:
   This will invoke the `panic!`, should we return an Err here ?



##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -260,14 +259,9 @@ struct AggrFn {
 
 fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
     let aggr_expr = expr.as_any();
-    let mut distinct = false;
 
-    let inner = if aggr_expr.downcast_ref::<ArrayAgg>().is_some() {
-        protobuf::AggregateFunction::ArrayAgg
-    } else if aggr_expr.downcast_ref::<DistinctArrayAgg>().is_some() {
-        distinct = true;
-        protobuf::AggregateFunction::ArrayAgg
-    } else if aggr_expr.downcast_ref::<OrderSensitiveArrayAgg>().is_some() {
+    // TODO: remove

Review Comment:
   Nit: I think it can be removed.



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use arrow::array::{Array, ArrayRef, AsArray};
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::AggregateUDFImpl;
+use datafusion_expr::{Accumulator, Signature, Volatility};
+use std::collections::HashSet;
+use std::sync::Arc;
+
+make_udaf_expr_and_func!(
+    ArrayAgg,
+    array_agg,
+    expression,
+    "input values, including nulls, concatenated into an array",
+    array_agg_udaf
+);
+
+#[derive(Debug)]
+/// ARRAY_AGG aggregate expression
+pub struct ArrayAgg {
+    signature: Signature,
+    alias: Vec<String>,
+}
+
+impl Default for ArrayAgg {
+    fn default() -> Self {
+        Self {
+            signature: Signature::any(1, Volatility::Immutable),
+            alias: vec!["array_agg".to_string()],
+        }
+    }
+}
+
+impl AggregateUDFImpl for ArrayAgg {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    // TODO: change name to lowercase
+    fn name(&self) -> &str {
+        "ARRAY_AGG"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.alias
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(DataType::List(Arc::new(Field::new(
+            "item",
+            arg_types[0].clone(),
+            true,
+        ))))
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        if args.is_distinct {
+            return Ok(vec![Field::new_list(
+                format_state_name(args.name, "distinct_array_agg"),
+                Field::new("item", args.input_type.clone(), true),
+                true,
+            )]);
+        }
+
+        Ok(vec![Field::new_list(
+            format_state_name(args.name, "array_agg"),
+            Field::new("item", args.input_type.clone(), true),
+            true,
+        )])
+    }
+
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        if acc_args.is_distinct {
+            return Ok(Box::new(DistinctArrayAggAccumulator::try_new(
+                acc_args.input_type,
+            )?));
+        }
+
+        Ok(Box::new(ArrayAggAccumulator::try_new(acc_args.input_type)?))
+    }
+}
+
+#[derive(Debug)]
+pub struct ArrayAggAccumulator {
+    values: Vec<ArrayRef>,
+    datatype: DataType,
+}
+
+impl ArrayAggAccumulator {
+    /// new array_agg accumulator based on given item data type
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            values: vec![],
+            datatype: datatype.clone(),
+        })
+    }
+}
+
+impl Accumulator for ArrayAggAccumulator {
+    // Append value like Int64Array(1,2,3)
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        assert!(values.len() == 1, "array_agg can only take 1 param!");
+
+        let val = Arc::clone(&values[0]);
+        if val.len() > 0 {
+            self.values.push(val);
+        }
+        Ok(())
+    }
+
+    // Append value like ListArray(Int64Array(1,2,3), Int64Array(4,5,6))

Review Comment:
   Nit: If this is code document, then we can move this inside the function.



##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -702,6 +702,7 @@ async fn roundtrip_expr_api() -> Result<()> {
         string_agg(col("a").cast_to(&DataType::Utf8, &schema)?, lit("|")),
         bool_and(lit(true)),
         bool_or(lit(true)),
+        array_agg(lit(1)),

Review Comment:
   What is builder mode ? You mean a builder for `array_agg`



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use arrow::array::{Array, ArrayRef, AsArray};
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::AggregateUDFImpl;
+use datafusion_expr::{Accumulator, Signature, Volatility};
+use std::collections::HashSet;
+use std::sync::Arc;
+
+make_udaf_expr_and_func!(
+    ArrayAgg,
+    array_agg,
+    expression,
+    "input values, including nulls, concatenated into an array",
+    array_agg_udaf
+);
+
+#[derive(Debug)]
+/// ARRAY_AGG aggregate expression
+pub struct ArrayAgg {
+    signature: Signature,
+    alias: Vec<String>,
+}
+
+impl Default for ArrayAgg {
+    fn default() -> Self {
+        Self {
+            signature: Signature::any(1, Volatility::Immutable),
+            alias: vec!["array_agg".to_string()],
+        }
+    }
+}
+
+impl AggregateUDFImpl for ArrayAgg {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    // TODO: change name to lowercase
+    fn name(&self) -> &str {
+        "ARRAY_AGG"
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.alias
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+        Ok(DataType::List(Arc::new(Field::new(
+            "item",
+            arg_types[0].clone(),
+            true,
+        ))))
+    }
+
+    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
+        if args.is_distinct {
+            return Ok(vec![Field::new_list(
+                format_state_name(args.name, "distinct_array_agg"),
+                Field::new("item", args.input_type.clone(), true),
+                true,
+            )]);
+        }
+
+        Ok(vec![Field::new_list(
+            format_state_name(args.name, "array_agg"),
+            Field::new("item", args.input_type.clone(), true),
+            true,
+        )])
+    }
+
+    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
+        if acc_args.is_distinct {
+            return Ok(Box::new(DistinctArrayAggAccumulator::try_new(
+                acc_args.input_type,
+            )?));
+        }
+
+        Ok(Box::new(ArrayAggAccumulator::try_new(acc_args.input_type)?))
+    }
+}
+
+#[derive(Debug)]
+pub struct ArrayAggAccumulator {
+    values: Vec<ArrayRef>,
+    datatype: DataType,
+}
+
+impl ArrayAggAccumulator {
+    /// new array_agg accumulator based on given item data type
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            values: vec![],
+            datatype: datatype.clone(),
+        })
+    }
+}
+
+impl Accumulator for ArrayAggAccumulator {
+    // Append value like Int64Array(1,2,3)
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        assert!(values.len() == 1, "array_agg can only take 1 param!");
+
+        let val = Arc::clone(&values[0]);
+        if val.len() > 0 {
+            self.values.push(val);
+        }
+        Ok(())
+    }
+
+    // Append value like ListArray(Int64Array(1,2,3), Int64Array(4,5,6))
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        assert!(states.len() == 1, "array_agg states must be singleton!");

Review Comment:
   Same as above. Can we remove all `assert!` and return `Err` 



##########
datafusion/expr/src/expr_fn.rs:
##########
@@ -171,6 +171,7 @@ pub fn max(expr: Expr) -> Expr {
     ))
 }
 
+// TODO: remove

Review Comment:
   Nit: I think we can remove this



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use arrow::array::{Array, ArrayRef, AsArray};
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::AggregateUDFImpl;
+use datafusion_expr::{Accumulator, Signature, Volatility};
+use std::collections::HashSet;
+use std::sync::Arc;
+
+make_udaf_expr_and_func!(
+    ArrayAgg,
+    array_agg,
+    expression,
+    "input values, including nulls, concatenated into an array",
+    array_agg_udaf
+);
+
+#[derive(Debug)]
+/// ARRAY_AGG aggregate expression
+pub struct ArrayAgg {
+    signature: Signature,
+    alias: Vec<String>,
+}
+
+impl Default for ArrayAgg {
+    fn default() -> Self {
+        Self {
+            signature: Signature::any(1, Volatility::Immutable),
+            alias: vec!["array_agg".to_string()],

Review Comment:
   Why are we using the smallcase variant as alias ? Shouldn't this be the 
function name ?



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query 
execution
+
+use arrow::array::{Array, ArrayRef, AsArray};
+use arrow::datatypes::DataType;
+use arrow_schema::Field;
+
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
+use datafusion_expr::utils::format_state_name;
+use datafusion_expr::AggregateUDFImpl;
+use datafusion_expr::{Accumulator, Signature, Volatility};
+use std::collections::HashSet;
+use std::sync::Arc;
+
+make_udaf_expr_and_func!(
+    ArrayAgg,
+    array_agg,
+    expression,
+    "input values, including nulls, concatenated into an array",

Review Comment:
   I think this is not clear & can we improve for clarity just the other 
functions ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to