comphead commented on code in PR #5408: URL: https://github.com/apache/arrow-datafusion/pull/5408#discussion_r1131310429
########## datafusion/physical-expr/src/aggregate/count_distinct.rs: ########## @@ -159,118 +112,80 @@ impl DistinctCountAccumulator { .values .iter() .next() - .map(|vals| { - (ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0)) - * self.values.capacity() - }) + .map(|vals| ScalarValue::size(vals) - std::mem::size_of_val(&vals)) .unwrap_or(0) } - - // calculates the size as accurate as possible, call to this method is expensive - fn full_size(&self) -> usize { - std::mem::size_of_val(self) - + (std::mem::size_of::<DistinctScalarValues>() * self.values.capacity()) - + self - .values - .iter() - .map(|vals| { - ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0) - }) - .sum::<usize>() - + (std::mem::size_of::<DataType>() * self.state_data_types.capacity()) - + self - .state_data_types - .iter() - .map(|dt| dt.size() - std::mem::size_of_val(dt)) - .sum::<usize>() - + self.count_data_type.size() - - std::mem::size_of_val(&self.count_data_type) - } } impl Accumulator for DistinctCountAccumulator { + fn state(&self) -> Result<Vec<ScalarValue>> { + let mut cols_out = + ScalarValue::new_list(Some(Vec::new()), self.state_data_type.clone()); + self.values + .iter() + .enumerate() + .for_each(|(_, distinct_values)| { + if let ScalarValue::List(Some(ref mut v), _) = cols_out { + v.push(distinct_values.clone()); + } + }); + Ok(vec![cols_out]) + } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { return Ok(()); } - (0..values[0].len()).try_for_each(|index| { - let v = values - .iter() - .map(|array| ScalarValue::try_from_array(array, index)) - .collect::<Result<Vec<_>>>()?; - self.update(&v) + let arr = &values[0]; + (0..arr.len()).try_for_each(|index| { + if !arr.is_null(index) { + let scalar = ScalarValue::try_from_array(arr, index)?; + self.values.insert(scalar); + } + Ok(()) }) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { if states.is_empty() { return Ok(()); } - (0..states[0].len()).try_for_each(|index| { - let v = states - .iter() - .map(|array| ScalarValue::try_from_array(array, index)) - .collect::<Result<Vec<_>>>()?; - self.merge(&v) + let arr = &states[0]; + (0..arr.len()).try_for_each(|index| { + let scalar = ScalarValue::try_from_array(arr, index)?; + + if let ScalarValue::List(Some(scalar), _) = scalar { + scalar.iter().for_each(|scalar| { + if !ScalarValue::is_null(scalar) { + self.values.insert(scalar.clone()); + } + }); + } else { + return Err(DataFusionError::Internal( + "Unexpected accumulator state".into(), + )); + } + Ok(()) }) } - fn state(&self) -> Result<Vec<ScalarValue>> { - let mut cols_out = self - .state_data_types - .iter() - .map(|state_data_type| { - ScalarValue::new_list(Some(Vec::new()), state_data_type.clone()) - }) - .collect::<Vec<_>>(); - - let mut cols_vec = cols_out - .iter_mut() - .map(|c| match c { - ScalarValue::List(Some(ref mut v), _) => Ok(v), - t => Err(DataFusionError::Internal(format!( - "cols_out should only consist of ScalarValue::List. {t:?} is found" - ))), - }) - .collect::<Result<Vec<_>>>()?; - - self.values.iter().for_each(|distinct_values| { - distinct_values.0.iter().enumerate().for_each( - |(col_index, distinct_value)| { - cols_vec[col_index].push(distinct_value.clone()); - }, - ) - }); - - Ok(cols_out.into_iter().collect()) - } fn evaluate(&self) -> Result<ScalarValue> { - match &self.count_data_type { - DataType::Int64 => Ok(ScalarValue::Int64(Some(self.values.len() as i64))), - t => Err(DataFusionError::Internal(format!( - "Invalid data type {t:?} for count distinct aggregation" - ))), - } + Ok(ScalarValue::Int64(Some(self.values.len() as i64))) } fn size(&self) -> usize { - if self.count_data_type.is_primitive() { Review Comment: Filed #5534 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org