This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new bf71577 refactor array_agg to not to have `update` and `merge` (#1681)
bf71577 is described below
commit bf715773e66734af87a02845242a1bc48ded8d0f
Author: Jiayu Liu <[email protected]>
AuthorDate: Thu Jan 27 09:53:33 2022 +0800
refactor array_agg to not to have `update` and `merge` (#1681)
---
.../src/physical_plan/expressions/array_agg.rs | 67 ++++++++--------------
1 file changed, 25 insertions(+), 42 deletions(-)
diff --git a/datafusion/src/physical_plan/expressions/array_agg.rs
b/datafusion/src/physical_plan/expressions/array_agg.rs
index c237cc0..5a86edd 100644
--- a/datafusion/src/physical_plan/expressions/array_agg.rs
+++ b/datafusion/src/physical_plan/expressions/array_agg.rs
@@ -18,7 +18,7 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
use super::format_state_name;
-use crate::error::Result;
+use crate::error::{DataFusionError, Result};
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
use crate::scalar::ScalarValue;
use arrow::array::ArrayRef;
@@ -95,7 +95,7 @@ impl AggregateExpr for ArrayAgg {
#[derive(Debug)]
pub(crate) struct ArrayAggAccumulator {
- array: Vec<ScalarValue>,
+ values: Vec<ScalarValue>,
datatype: DataType,
}
@@ -103,32 +103,10 @@ impl ArrayAggAccumulator {
/// new array_agg accumulator based on given item data type
pub fn try_new(datatype: &DataType) -> Result<Self> {
Ok(Self {
- array: vec![],
+ values: vec![],
datatype: datatype.clone(),
})
}
-
- fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
- let value = &values[0];
- self.array.push(value.clone());
-
- Ok(())
- }
-
- fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
- if states.is_empty() {
- return Ok(());
- };
-
- assert!(states.len() == 1, "states length should be 1!");
- match &states[0] {
- ScalarValue::List(Some(array), _) => {
- self.array.extend((&**array).clone());
- }
- _ => unreachable!(),
- }
- Ok(())
- }
}
impl Accumulator for ArrayAggAccumulator {
@@ -136,36 +114,41 @@ impl Accumulator for ArrayAggAccumulator {
if values.is_empty() {
return Ok(());
};
- (0..values[0].len()).try_for_each(|index| {
- let v = values
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- self.update(&v)
+ assert!(values.len() == 1, "array_agg can only take 1 param!");
+ let arr = &values[0];
+ (0..arr.len()).try_for_each(|index| {
+ let scalar = ScalarValue::try_from_array(arr, index)?;
+ self.values.push(scalar);
+ Ok(())
})
}
+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
};
- (0..states[0].len()).try_for_each(|index| {
- let v = states
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- self.merge(&v)
+ assert!(states.len() == 1, "array_agg states must be singleton!");
+ let arr = &states[0];
+ (0..arr.len()).try_for_each(|index| {
+ let scalar = ScalarValue::try_from_array(arr, index)?;
+ if let ScalarValue::List(Some(values), _) = scalar {
+ self.values.extend(*values);
+ Ok(())
+ } else {
+ Err(DataFusionError::Internal(
+ "array_agg state must be list!".into(),
+ ))
+ }
})
}
+
fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![ScalarValue::List(
- Some(Box::new(self.array.clone())),
- Box::new(self.datatype.clone()),
- )])
+ Ok(vec![self.evaluate()?])
}
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::List(
- Some(Box::new(self.array.clone())),
+ Some(Box::new(self.values.clone())),
Box::new(self.datatype.clone()),
))
}