alamb commented on a change in pull request #1579:
URL: https://github.com/apache/arrow-datafusion/pull/1579#discussion_r787169764



##########
File path: datafusion/src/physical_plan/expressions/distinct_expressions.rs
##########
@@ -207,17 +208,155 @@ impl Accumulator for DistinctCountAccumulator {
     }
 }
 
+/// Expression for a ARRAY_AGG(DISTINCT) aggregation.
+#[derive(Debug)]
+pub struct DistinctArrayAgg {
+    /// Column name
+    name: String,
+    /// The DataType for the input expression
+    input_data_type: DataType,
+    /// The input expression
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl DistinctArrayAgg {
+    /// Create a new DistinctArrayAgg aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        let name = name.into();
+        Self {
+            name,
+            expr,
+            input_data_type,
+        }
+    }
+}
+
+impl AggregateExpr for DistinctArrayAgg {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            DataType::List(Box::new(Field::new(
+                "item",
+                self.input_data_type.clone(),
+                true,
+            ))),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(DistinctArrayAggAccumulator::try_new(
+            &self.input_data_type,
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "distinct_array_agg"),
+            DataType::List(Box::new(Field::new(
+                "item",
+                self.input_data_type.clone(),
+                true,
+            ))),
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct DistinctArrayAggAccumulator {
+    values: HashSet<ScalarValue>,
+    datatype: DataType,
+}
+
+impl DistinctArrayAggAccumulator {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            values: HashSet::new(),
+            datatype: datatype.clone(),
+        })
+    }
+}
+
+impl Accumulator for DistinctArrayAggAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::List(
+            Some(Box::new(self.values.clone().into_iter().collect())),
+            Box::new(self.datatype.clone()),
+        )])
+    }
+
+    fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
+        unimplemented!("update_batch is implemented instead");
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        assert_eq!(values.len(), 1, "batch input should only include 1 
column!");
+
+        let arr = &values[0];
+        for i in 0..arr.len() {
+            self.values.insert(ScalarValue::try_from_array(arr, i)?);
+        }
+        Ok(())
+    }
+
+    fn merge(&mut self, _states: &[ScalarValue]) -> Result<()> {

Review comment:
       likewise this

##########
File path: datafusion/src/physical_plan/expressions/distinct_expressions.rs
##########
@@ -207,17 +208,155 @@ impl Accumulator for DistinctCountAccumulator {
     }
 }
 
+/// Expression for a ARRAY_AGG(DISTINCT) aggregation.
+#[derive(Debug)]
+pub struct DistinctArrayAgg {
+    /// Column name
+    name: String,
+    /// The DataType for the input expression
+    input_data_type: DataType,
+    /// The input expression
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl DistinctArrayAgg {
+    /// Create a new DistinctArrayAgg aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        let name = name.into();
+        Self {
+            name,
+            expr,
+            input_data_type,
+        }
+    }
+}
+
+impl AggregateExpr for DistinctArrayAgg {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            DataType::List(Box::new(Field::new(
+                "item",
+                self.input_data_type.clone(),
+                true,
+            ))),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(DistinctArrayAggAccumulator::try_new(
+            &self.input_data_type,
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "distinct_array_agg"),
+            DataType::List(Box::new(Field::new(
+                "item",
+                self.input_data_type.clone(),
+                true,
+            ))),
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct DistinctArrayAggAccumulator {
+    values: HashSet<ScalarValue>,
+    datatype: DataType,
+}
+
+impl DistinctArrayAggAccumulator {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            values: HashSet::new(),
+            datatype: datatype.clone(),
+        })
+    }
+}
+
+impl Accumulator for DistinctArrayAggAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::List(
+            Some(Box::new(self.values.clone().into_iter().collect())),
+            Box::new(self.datatype.clone()),
+        )])
+    }
+
+    fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {

Review comment:
       I think this will need to be removed now that 
https://github.com/apache/arrow-datafusion/pull/1582 is merged

##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -454,42 +452,40 @@ mod tests {
                     &input_phy_exprs[0..1],
                     &input_schema,
                     "c1",
-                );
+                )?;
                 match fun {
                     AggregateFunction::Count => {
-                        let result_agg_phy_exprs_distinct = result_distinct?;
-                        assert!(result_agg_phy_exprs_distinct
-                            .as_any()
-                            .is::<DistinctCount>());
-                        assert_eq!("c1", result_agg_phy_exprs_distinct.name());
+                        
assert!(result_distinct.as_any().is::<DistinctCount>());
+                        assert_eq!("c1", result_distinct.name());
                         assert_eq!(
                             Field::new("c1", DataType::UInt64, true),
-                            result_agg_phy_exprs_distinct.field().unwrap()
+                            result_distinct.field().unwrap()

Review comment:
       well that is certainly much nicer to read 👍 

##########
File path: datafusion/tests/sql/aggregates.rs
##########
@@ -410,3 +411,58 @@ async fn csv_query_array_agg_one() -> Result<()> {
     assert_batches_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn csv_query_array_agg_distinct() -> Result<()> {
+    let mut ctx = ExecutionContext::new();
+    register_aggregate_csv(&mut ctx).await?;
+    let sql = "SELECT array_agg(distinct c2) FROM aggregate_test_100";
+    let actual = execute_to_batches(&mut ctx, sql).await;
+
+    // The results for this query should be something like the following:
+    //    +------------------------------------------+
+    //    | ARRAYAGG(DISTINCT aggregate_test_100.c2) |
+    //    +------------------------------------------+
+    //    | [4, 2, 3, 5, 1]                          |
+    //    +------------------------------------------+
+    // Since ARRAY_AGG(DISTINCT) ordering is nondeterministic, check the 
schema and contents.
+    assert_eq!(
+        *actual[0].schema(),
+        Schema::new(vec![Field::new(
+            "ARRAYAGG(DISTINCT aggregate_test_100.c2)",
+            DataType::List(Box::new(Field::new("item", DataType::UInt32, 
true))),
+            false
+        ),])
+    );
+
+    // We should have 1 row containing a list
+    let column = actual[0].column(0);
+    assert_eq!(column.len(), 1);
+
+    let list = ScalarValue::try_from_array(column, 0)?;
+    let mut contents = vec![];
+    match list {
+        // Since ScalarValue does not implement ORD, we must extract the 
underlying values

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/expressions/distinct_expressions.rs
##########
@@ -705,4 +844,151 @@ mod tests {
 
         Ok(())
     }
+
+    // Ordering is unpredictable when using ARRAY_AGG(DISTINCT). Thus we 
cannot test by simply
+    // checking for equality of output, and it is difficult to sort since ORD 
is not implemented
+    // for ScalarValue. Thus we check for equality via the following:
+    //   1. `expected` and `actual` have the same number of elements.
+    //   2. `expected` contains no duplicates.
+    //   3. `expected` and `actual` contain the same unique elements.
+    fn check_distinct_array_agg(
+        input: ArrayRef,
+        expected: ScalarValue,
+        datatype: DataType,
+    ) -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", datatype.clone(), 
false)]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![input])?;
+
+        let agg = Arc::new(DistinctArrayAgg::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            datatype,
+        ));
+        let actual = aggregate(&batch, agg)?;
+
+        match (expected, actual) {
+            (ScalarValue::List(Some(e), _), ScalarValue::List(Some(a), _)) => {
+                // Check that the inputs are the same length.
+                assert_eq!(e.len(), a.len());
+
+                let h1: HashSet<ScalarValue> = 
HashSet::from_iter(e.clone().into_iter());
+                let h2: HashSet<ScalarValue> = 
HashSet::from_iter(a.into_iter());
+
+                // Check that e's elements are unique.
+                assert_eq!(h1.len(), e.len());
+
+                // Check that a contains the same unique elements as e.
+                assert_eq!(h1, h2);

Review comment:
       ScalarValue does implement `PartialOrd`-- the reason it doesn't 
implement Ord is that it doesn't really make sense to compare different variant 
types -- e.g. `ScalarValue::List` and `ScalarValue::Boolean`
   
   However, in your test cases you are always comparing the same enum, so you 
can do something like the following (which will panic if there is a problem 
comparing the `ScalarValue`s in the test):
   
   
   ```suggestion
                   // workaround lack of Ord of ScalarValue
                   let cmp = |a: &ScalarValue, b: &ScalarValue| {
                       a.partial_cmp(b).expect("Can compare ScalarValues")
                   };
                   e.as_mut().sort_by(cmp);
                   a.as_mut().sort_by(cmp);
   
                   // Check that the inputs are the same
                   assert_eq!(e, a);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to