Dandandan commented on code in PR #5408:
URL: https://github.com/apache/arrow-datafusion/pull/5408#discussion_r1119247302
##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -116,66 +116,7 @@ struct DistinctCountAccumulator {
state_data_types: Vec<DataType>,
count_data_type: DataType,
}
-impl DistinctCountAccumulator {
- fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
- // If a row has a NULL, it is not included in the final count.
- if !values.iter().any(|v| v.is_null()) {
- self.values.insert(DistinctScalarValues(values.to_vec()));
- }
-
- Ok(())
- }
-
- fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
- if states.is_empty() {
- return Ok(());
- }
-
- let col_values = states
- .iter()
- .map(|state| match state {
- ScalarValue::List(Some(values), _) => Ok(values),
- _ => Err(DataFusionError::Internal(format!(
- "Unexpected accumulator state {state:?}"
- ))),
- })
- .collect::<Result<Vec<_>>>()?;
-
- (0..col_values[0].len()).try_for_each(|row_index| {
- let row_values = col_values
- .iter()
- .map(|col| col[row_index].clone())
- .collect::<Vec<_>>();
- self.update(&row_values)
- })
- }
-}
-
impl Accumulator for DistinctCountAccumulator {
- fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
- if values.is_empty() {
- return Ok(());
- }
- (0..values[0].len()).try_for_each(|index| {
- let v = values
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- self.update(&v)
- })
- }
- fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- if states.is_empty() {
- return Ok(());
- }
- (0..states[0].len()).try_for_each(|index| {
- let v = states
- .iter()
- .map(|array| ScalarValue::try_from_array(array, index))
- .collect::<Result<Vec<_>>>()?;
- self.merge(&v)
- })
- }
fn state(&self) -> Result<Vec<ScalarValue>> {
let mut cols_out = self
.state_data_types
Review Comment:
(and code to be updated accordingly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]