This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d67b0fbf52 Remove element's nullability of array_agg function (#11447)
d67b0fbf52 is described below

commit d67b0fbf52a2c428399811fabac3eec6cf15da41
Author: Jay Zhan <[email protected]>
AuthorDate: Wed Jul 17 13:34:07 2024 +0800

    Remove element's nullability of array_agg function (#11447)
    
    * rm null
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fmt
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    * fix test
    
    Signed-off-by: jayzhan211 <[email protected]>
    
    ---------
    
    Signed-off-by: jayzhan211 <[email protected]>
---
 datafusion/core/tests/sql/aggregates.rs            |  2 +-
 .../physical-expr/src/aggregate/array_agg.rs       | 23 ++++----------
 .../src/aggregate/array_agg_distinct.rs            | 23 +++-----------
 .../src/aggregate/array_agg_ordered.rs             | 37 ++++++----------------
 datafusion/physical-expr/src/aggregate/build_in.rs | 12 ++-----
 datafusion/physical-plan/src/aggregates/mod.rs     |  1 -
 6 files changed, 23 insertions(+), 75 deletions(-)

diff --git a/datafusion/core/tests/sql/aggregates.rs 
b/datafusion/core/tests/sql/aggregates.rs
index 86032dc9bc..1f4f9e77d5 100644
--- a/datafusion/core/tests/sql/aggregates.rs
+++ b/datafusion/core/tests/sql/aggregates.rs
@@ -36,7 +36,7 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
         *actual[0].schema(),
         Schema::new(vec![Field::new_list(
             "ARRAY_AGG(DISTINCT aggregate_test_100.c2)",
-            Field::new("item", DataType::UInt32, false),
+            Field::new("item", DataType::UInt32, true),
             true
         ),])
     );
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs 
b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 38a9738029..0d5ed730e2 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -24,7 +24,7 @@ use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field};
 use arrow_array::Array;
 use datafusion_common::cast::as_list_array;
-use datafusion_common::utils::array_into_list_array;
+use datafusion_common::utils::array_into_list_array_nullable;
 use datafusion_common::Result;
 use datafusion_common::ScalarValue;
 use datafusion_expr::Accumulator;
@@ -40,8 +40,6 @@ pub struct ArrayAgg {
     input_data_type: DataType,
     /// The input expression
     expr: Arc<dyn PhysicalExpr>,
-    /// If the input expression can have NULLs
-    nullable: bool,
 }
 
 impl ArrayAgg {
@@ -50,13 +48,11 @@ impl ArrayAgg {
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         data_type: DataType,
-        nullable: bool,
     ) -> Self {
         Self {
             name: name.into(),
             input_data_type: data_type,
             expr,
-            nullable,
         }
     }
 }
@@ -70,7 +66,7 @@ impl AggregateExpr for ArrayAgg {
         Ok(Field::new_list(
             &self.name,
             // This should be the same as return type of 
AggregateFunction::ArrayAgg
-            Field::new("item", self.input_data_type.clone(), self.nullable),
+            Field::new("item", self.input_data_type.clone(), true),
             true,
         ))
     }
@@ -78,14 +74,13 @@ impl AggregateExpr for ArrayAgg {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(ArrayAggAccumulator::try_new(
             &self.input_data_type,
-            self.nullable,
         )?))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
         Ok(vec![Field::new_list(
             format_state_name(&self.name, "array_agg"),
-            Field::new("item", self.input_data_type.clone(), self.nullable),
+            Field::new("item", self.input_data_type.clone(), true),
             true,
         )])
     }
@@ -116,16 +111,14 @@ impl PartialEq<dyn Any> for ArrayAgg {
 pub(crate) struct ArrayAggAccumulator {
     values: Vec<ArrayRef>,
     datatype: DataType,
-    nullable: bool,
 }
 
 impl ArrayAggAccumulator {
     /// new array_agg accumulator based on given item data type
-    pub fn try_new(datatype: &DataType, nullable: bool) -> Result<Self> {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
         Ok(Self {
             values: vec![],
             datatype: datatype.clone(),
-            nullable,
         })
     }
 }
@@ -169,15 +162,11 @@ impl Accumulator for ArrayAggAccumulator {
             self.values.iter().map(|a| a.as_ref()).collect();
 
         if element_arrays.is_empty() {
-            return Ok(ScalarValue::new_null_list(
-                self.datatype.clone(),
-                self.nullable,
-                1,
-            ));
+            return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 
1));
         }
 
         let concated_array = arrow::compute::concat(&element_arrays)?;
-        let list_array = array_into_list_array(concated_array, self.nullable);
+        let list_array = array_into_list_array_nullable(concated_array);
 
         Ok(ScalarValue::List(Arc::new(list_array)))
     }
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs 
b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index 368d11d742..eca6e4ce4f 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -42,8 +42,6 @@ pub struct DistinctArrayAgg {
     input_data_type: DataType,
     /// The input expression
     expr: Arc<dyn PhysicalExpr>,
-    /// If the input expression can have NULLs
-    nullable: bool,
 }
 
 impl DistinctArrayAgg {
@@ -52,14 +50,12 @@ impl DistinctArrayAgg {
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         input_data_type: DataType,
-        nullable: bool,
     ) -> Self {
         let name = name.into();
         Self {
             name,
             input_data_type,
             expr,
-            nullable,
         }
     }
 }
@@ -74,7 +70,7 @@ impl AggregateExpr for DistinctArrayAgg {
         Ok(Field::new_list(
             &self.name,
             // This should be the same as return type of 
AggregateFunction::ArrayAgg
-            Field::new("item", self.input_data_type.clone(), self.nullable),
+            Field::new("item", self.input_data_type.clone(), true),
             true,
         ))
     }
@@ -82,14 +78,13 @@ impl AggregateExpr for DistinctArrayAgg {
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
         Ok(Box::new(DistinctArrayAggAccumulator::try_new(
             &self.input_data_type,
-            self.nullable,
         )?))
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
         Ok(vec![Field::new_list(
             format_state_name(&self.name, "distinct_array_agg"),
-            Field::new("item", self.input_data_type.clone(), self.nullable),
+            Field::new("item", self.input_data_type.clone(), true),
             true,
         )])
     }
@@ -120,15 +115,13 @@ impl PartialEq<dyn Any> for DistinctArrayAgg {
 struct DistinctArrayAggAccumulator {
     values: HashSet<ScalarValue>,
     datatype: DataType,
-    nullable: bool,
 }
 
 impl DistinctArrayAggAccumulator {
-    pub fn try_new(datatype: &DataType, nullable: bool) -> Result<Self> {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
         Ok(Self {
             values: HashSet::new(),
             datatype: datatype.clone(),
-            nullable,
         })
     }
 }
@@ -166,13 +159,9 @@ impl Accumulator for DistinctArrayAggAccumulator {
     fn evaluate(&mut self) -> Result<ScalarValue> {
         let values: Vec<ScalarValue> = self.values.iter().cloned().collect();
         if values.is_empty() {
-            return Ok(ScalarValue::new_null_list(
-                self.datatype.clone(),
-                self.nullable,
-                1,
-            ));
+            return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 
1));
         }
-        let arr = ScalarValue::new_list(&values, &self.datatype, 
self.nullable);
+        let arr = ScalarValue::new_list(&values, &self.datatype, true);
         Ok(ScalarValue::List(arr))
     }
 
@@ -255,7 +244,6 @@ mod tests {
             col("a", &schema)?,
             "bla".to_string(),
             datatype,
-            true,
         ));
         let actual = aggregate(&batch, agg)?;
         compare_list_contents(expected, actual)
@@ -272,7 +260,6 @@ mod tests {
             col("a", &schema)?,
             "bla".to_string(),
             datatype,
-            true,
         ));
 
         let mut accum1 = agg.create_accumulator()?;
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs 
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index d44811192f..992c06f5bf 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -33,7 +33,7 @@ use arrow::datatypes::{DataType, Field};
 use arrow_array::cast::AsArray;
 use arrow_array::{new_empty_array, Array, ArrayRef, StructArray};
 use arrow_schema::Fields;
-use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
+use datafusion_common::utils::{array_into_list_array_nullable, get_row_at_idx};
 use datafusion_common::{exec_err, Result, ScalarValue};
 use datafusion_expr::utils::AggregateOrderSensitivity;
 use datafusion_expr::Accumulator;
@@ -50,8 +50,6 @@ pub struct OrderSensitiveArrayAgg {
     input_data_type: DataType,
     /// The input expression
     expr: Arc<dyn PhysicalExpr>,
-    /// If the input expression can have `NULL`s
-    nullable: bool,
     /// Ordering data types
     order_by_data_types: Vec<DataType>,
     /// Ordering requirement
@@ -66,7 +64,6 @@ impl OrderSensitiveArrayAgg {
         expr: Arc<dyn PhysicalExpr>,
         name: impl Into<String>,
         input_data_type: DataType,
-        nullable: bool,
         order_by_data_types: Vec<DataType>,
         ordering_req: LexOrdering,
     ) -> Self {
@@ -74,7 +71,6 @@ impl OrderSensitiveArrayAgg {
             name: name.into(),
             input_data_type,
             expr,
-            nullable,
             order_by_data_types,
             ordering_req,
             reverse: false,
@@ -90,8 +86,8 @@ impl AggregateExpr for OrderSensitiveArrayAgg {
     fn field(&self) -> Result<Field> {
         Ok(Field::new_list(
             &self.name,
-            // This should be the same as return type of 
AggregateFunction::ArrayAgg
-            Field::new("item", self.input_data_type.clone(), self.nullable),
+            // This should be the same as return type of 
AggregateFunction::OrderSensitiveArrayAgg
+            Field::new("item", self.input_data_type.clone(), true),
             true,
         ))
     }
@@ -102,7 +98,6 @@ impl AggregateExpr for OrderSensitiveArrayAgg {
             &self.order_by_data_types,
             self.ordering_req.clone(),
             self.reverse,
-            self.nullable,
         )
         .map(|acc| Box::new(acc) as _)
     }
@@ -110,17 +105,13 @@ impl AggregateExpr for OrderSensitiveArrayAgg {
     fn state_fields(&self) -> Result<Vec<Field>> {
         let mut fields = vec![Field::new_list(
             format_state_name(&self.name, "array_agg"),
-            Field::new("item", self.input_data_type.clone(), self.nullable),
+            Field::new("item", self.input_data_type.clone(), true),
             true, // This should be the same as field()
         )];
         let orderings = ordering_fields(&self.ordering_req, 
&self.order_by_data_types);
         fields.push(Field::new_list(
             format_state_name(&self.name, "array_agg_orderings"),
-            Field::new(
-                "item",
-                DataType::Struct(Fields::from(orderings)),
-                self.nullable,
-            ),
+            Field::new("item", DataType::Struct(Fields::from(orderings)), 
true),
             false,
         ));
         Ok(fields)
@@ -147,7 +138,6 @@ impl AggregateExpr for OrderSensitiveArrayAgg {
             name: self.name.to_string(),
             input_data_type: self.input_data_type.clone(),
             expr: Arc::clone(&self.expr),
-            nullable: self.nullable,
             order_by_data_types: self.order_by_data_types.clone(),
             // Reverse requirement:
             ordering_req: reverse_order_bys(&self.ordering_req),
@@ -186,8 +176,6 @@ pub(crate) struct OrderSensitiveArrayAggAccumulator {
     ordering_req: LexOrdering,
     /// Whether the aggregation is running in reverse.
     reverse: bool,
-    /// Whether the input expr is nullable
-    nullable: bool,
 }
 
 impl OrderSensitiveArrayAggAccumulator {
@@ -198,7 +186,6 @@ impl OrderSensitiveArrayAggAccumulator {
         ordering_dtypes: &[DataType],
         ordering_req: LexOrdering,
         reverse: bool,
-        nullable: bool,
     ) -> Result<Self> {
         let mut datatypes = vec![datatype.clone()];
         datatypes.extend(ordering_dtypes.iter().cloned());
@@ -208,7 +195,6 @@ impl OrderSensitiveArrayAggAccumulator {
             datatypes,
             ordering_req,
             reverse,
-            nullable,
         })
     }
 }
@@ -312,7 +298,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
         if self.values.is_empty() {
             return Ok(ScalarValue::new_null_list(
                 self.datatypes[0].clone(),
-                self.nullable,
+                true,
                 1,
             ));
         }
@@ -322,14 +308,10 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
             ScalarValue::new_list_from_iter(
                 values.into_iter().rev(),
                 &self.datatypes[0],
-                self.nullable,
+                true,
             )
         } else {
-            ScalarValue::new_list_from_iter(
-                values.into_iter(),
-                &self.datatypes[0],
-                self.nullable,
-            )
+            ScalarValue::new_list_from_iter(values.into_iter(), 
&self.datatypes[0], true)
         };
         Ok(ScalarValue::List(array))
     }
@@ -385,9 +367,8 @@ impl OrderSensitiveArrayAggAccumulator {
             column_wise_ordering_values,
             None,
         )?;
-        Ok(ScalarValue::List(Arc::new(array_into_list_array(
+        Ok(ScalarValue::List(Arc::new(array_into_list_array_nullable(
             Arc::new(ordering_array),
-            self.nullable,
         ))))
     }
 }
diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs 
b/datafusion/physical-expr/src/aggregate/build_in.rs
index 68c9b4859f..ef21b3d0f7 100644
--- a/datafusion/physical-expr/src/aggregate/build_in.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -62,16 +62,14 @@ pub fn create_aggregate_expr(
     Ok(match (fun, distinct) {
         (AggregateFunction::ArrayAgg, false) => {
             let expr = Arc::clone(&input_phy_exprs[0]);
-            let nullable = expr.nullable(input_schema)?;
 
             if ordering_req.is_empty() {
-                Arc::new(expressions::ArrayAgg::new(expr, name, data_type, 
nullable))
+                Arc::new(expressions::ArrayAgg::new(expr, name, data_type))
             } else {
                 Arc::new(expressions::OrderSensitiveArrayAgg::new(
                     expr,
                     name,
                     data_type,
-                    nullable,
                     ordering_types,
                     ordering_req.to_vec(),
                 ))
@@ -84,13 +82,7 @@ pub fn create_aggregate_expr(
                 );
             }
             let expr = Arc::clone(&input_phy_exprs[0]);
-            let is_expr_nullable = expr.nullable(input_schema)?;
-            Arc::new(expressions::DistinctArrayAgg::new(
-                expr,
-                name,
-                data_type,
-                is_expr_nullable,
-            ))
+            Arc::new(expressions::DistinctArrayAgg::new(expr, name, data_type))
         }
         (AggregateFunction::Min, _) => Arc::new(expressions::Min::new(
             Arc::clone(&input_phy_exprs[0]),
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 8bf808af3b..5f780f1ff8 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -2231,7 +2231,6 @@ mod tests {
                     Arc::clone(col_a),
                     "array_agg",
                     DataType::Int32,
-                    false,
                     vec![],
                     order_by_expr.unwrap_or_default(),
                 )) as _


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to