This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bf71577  refactor array_agg to not to have `update` and `merge` (#1681)
bf71577 is described below

commit bf715773e66734af87a02845242a1bc48ded8d0f
Author: Jiayu Liu <[email protected]>
AuthorDate: Thu Jan 27 09:53:33 2022 +0800

    refactor array_agg to not to have `update` and `merge` (#1681)
---
 .../src/physical_plan/expressions/array_agg.rs     | 67 ++++++++--------------
 1 file changed, 25 insertions(+), 42 deletions(-)

diff --git a/datafusion/src/physical_plan/expressions/array_agg.rs 
b/datafusion/src/physical_plan/expressions/array_agg.rs
index c237cc0..5a86edd 100644
--- a/datafusion/src/physical_plan/expressions/array_agg.rs
+++ b/datafusion/src/physical_plan/expressions/array_agg.rs
@@ -18,7 +18,7 @@
 //! Defines physical expressions that can evaluated at runtime during query 
execution
 
 use super::format_state_name;
-use crate::error::Result;
+use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
 use crate::scalar::ScalarValue;
 use arrow::array::ArrayRef;
@@ -95,7 +95,7 @@ impl AggregateExpr for ArrayAgg {
 
 #[derive(Debug)]
 pub(crate) struct ArrayAggAccumulator {
-    array: Vec<ScalarValue>,
+    values: Vec<ScalarValue>,
     datatype: DataType,
 }
 
@@ -103,32 +103,10 @@ impl ArrayAggAccumulator {
     /// new array_agg accumulator based on given item data type
     pub fn try_new(datatype: &DataType) -> Result<Self> {
         Ok(Self {
-            array: vec![],
+            values: vec![],
             datatype: datatype.clone(),
         })
     }
-
-    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
-        let value = &values[0];
-        self.array.push(value.clone());
-
-        Ok(())
-    }
-
-    fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
-        };
-
-        assert!(states.len() == 1, "states length should be 1!");
-        match &states[0] {
-            ScalarValue::List(Some(array), _) => {
-                self.array.extend((&**array).clone());
-            }
-            _ => unreachable!(),
-        }
-        Ok(())
-    }
 }
 
 impl Accumulator for ArrayAggAccumulator {
@@ -136,36 +114,41 @@ impl Accumulator for ArrayAggAccumulator {
         if values.is_empty() {
             return Ok(());
         };
-        (0..values[0].len()).try_for_each(|index| {
-            let v = values
-                .iter()
-                .map(|array| ScalarValue::try_from_array(array, index))
-                .collect::<Result<Vec<_>>>()?;
-            self.update(&v)
+        assert!(values.len() == 1, "array_agg can only take 1 param!");
+        let arr = &values[0];
+        (0..arr.len()).try_for_each(|index| {
+            let scalar = ScalarValue::try_from_array(arr, index)?;
+            self.values.push(scalar);
+            Ok(())
         })
     }
+
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
         if states.is_empty() {
             return Ok(());
         };
-        (0..states[0].len()).try_for_each(|index| {
-            let v = states
-                .iter()
-                .map(|array| ScalarValue::try_from_array(array, index))
-                .collect::<Result<Vec<_>>>()?;
-            self.merge(&v)
+        assert!(states.len() == 1, "array_agg states must be singleton!");
+        let arr = &states[0];
+        (0..arr.len()).try_for_each(|index| {
+            let scalar = ScalarValue::try_from_array(arr, index)?;
+            if let ScalarValue::List(Some(values), _) = scalar {
+                self.values.extend(*values);
+                Ok(())
+            } else {
+                Err(DataFusionError::Internal(
+                    "array_agg state must be list!".into(),
+                ))
+            }
         })
     }
+
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![ScalarValue::List(
-            Some(Box::new(self.array.clone())),
-            Box::new(self.datatype.clone()),
-        )])
+        Ok(vec![self.evaluate()?])
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {
         Ok(ScalarValue::List(
-            Some(Box::new(self.array.clone())),
+            Some(Box::new(self.values.clone())),
             Box::new(self.datatype.clone()),
         ))
     }

Reply via email to