waynexia commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133226701
##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
}
}
}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+ K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+ /// `K` is required when casting to dict array
+ _dt: core::marker::PhantomData<K>,
+ /// laziliy initialized state that holds a boolean for each index.
+ /// the bool at each index indicates whether the value for that index has
been seen yet.
+ state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+ K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("CountDistinctDictAccumulator")
+ .field("state", &self.state)
+ .finish()
+ }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+ CountDistinctDictAccumulator<K>
+{
+ fn new() -> Self {
+ Self {
+ _dt: core::marker::PhantomData,
+ state: None,
+ }
+ }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+ K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+ fn state(&self) -> Result<Vec<ScalarValue>> {
Review Comment:
```suggestion
#[cfg(test)]
fn state(&self) -> Result<Vec<ScalarValue>> {
```
It seems that this is only utilized in tests. Would it be better to addd the
cfg annotation?
##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
}
}
}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+ K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+ /// `K` is required when casting to dict array
+ _dt: core::marker::PhantomData<K>,
+ /// laziliy initialized state that holds a boolean for each index.
+ /// the bool at each index indicates whether the value for that index has
been seen yet.
+ state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+ K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("CountDistinctDictAccumulator")
+ .field("state", &self.state)
+ .finish()
+ }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+ CountDistinctDictAccumulator<K>
+{
+ fn new() -> Self {
+ Self {
+ _dt: core::marker::PhantomData,
+ state: None,
+ }
+ }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+ K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ if let Some(state) = &self.state {
+ let bools = state
+ .iter()
+ .map(|b| ScalarValue::Boolean(Some(*b)))
+ .collect();
+ Ok(vec![ScalarValue::List(
+ Some(bools),
+ Box::new(Field::new("item", DataType::Boolean, false)),
+ )])
+ } else {
+ // empty state
+ Ok(vec![])
+ }
+ }
+
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ if values.is_empty() {
+ return Ok(());
+ }
+ let arr = as_dictionary_array::<K>(&values[0])?;
+ let nvalues = arr.values().len();
+ if let Some(state) = &self.state {
+ if state.len() != nvalues {
+ return Err(DataFusionError::Internal(
+ "Accumulator update_batch got invalid value".to_string(),
+ ));
+ }
+ } else {
+ // init state
+ self.state = Some((0..nvalues).map(|_| false).collect());
+ }
+ for idx in arr.keys_iter().flatten() {
+ self.state.as_mut().unwrap()[idx] = true;
+ }
Review Comment:
Are we guaranteed that the value set is consistent among every dictionary
array? If not I'm afraid we still need some hash in this accumulator.
##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -577,4 +746,106 @@ mod tests {
assert_eq!(result, ScalarValue::Int64(Some(2)));
Ok(())
}
+
+ #[test]
+ fn count_distinct_dict_update() -> Result<()> {
+ let values = StringArray::from_iter_values(["a", "b", "c"]);
+ // value "b" is never used
+ let keys =
+ Int8Array::from_iter(vec![Some(0), Some(0), Some(0), Some(0),
None, Some(2)]);
+ let arrays =
+ vec![
+ Arc::new(DictionaryArray::<Int8Type>::try_new(&keys,
&values).unwrap())
+ as ArrayRef,
+ ];
+ let agg = DistinctCount::new(
+ arrays[0].data_type().clone(),
+ Arc::new(NoOp::new()),
+ String::from("__col_name__"),
+ );
+ let mut accum = agg.create_accumulator()?;
+ accum.update_batch(&arrays)?;
+ // should evaluate to 2 since "b" never seen
+ assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(2)));
+ // now update with a new batch that does use "b"
+ let values = StringArray::from_iter_values(["a", "b", "c"]);
+ let keys = Int8Array::from_iter(vec![Some(1), Some(1), None]);
+ let arrays =
+ vec![
+ Arc::new(DictionaryArray::<Int8Type>::try_new(&keys,
&values).unwrap())
+ as ArrayRef,
+ ];
+ accum.update_batch(&arrays)?;
+ assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(3)));
+ Ok(())
+ }
+
+ #[test]
+ fn count_distinct_dict_merge() -> Result<()> {
+ let values = StringArray::from_iter_values(["a", "b", "c"]);
+ let keys = Int8Array::from_iter(vec![Some(0), Some(0), None]);
+ let arrays =
+ vec![
+ Arc::new(DictionaryArray::<Int8Type>::try_new(&keys,
&values).unwrap())
+ as ArrayRef,
+ ];
+ let agg = DistinctCount::new(
+ arrays[0].data_type().clone(),
+ Arc::new(NoOp::new()),
+ String::from("__col_name__"),
+ );
+ // create accum with 1 value seen
+ let mut accum = agg.create_accumulator()?;
+ accum.update_batch(&arrays)?;
+ assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(1)));
+ // create accum with state that has seen "a" and "b" but not "c"
+ let values = StringArray::from_iter_values(["a", "b", "c"]);
+ let keys = Int8Array::from_iter(vec![Some(0), Some(1), None]);
Review Comment:
what about changing the second dictionary to (reorder the value set)
- value: ["c", "b", "a"]
- keys: [2, 1, None]
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]