Weijun-H commented on code in PR #8462:
URL: https://github.com/apache/arrow-datafusion/pull/8462#discussion_r1419354580
##########
datafusion/physical-expr/src/aggregate/approx_distinct.rs:
##########
@@ -304,5 +317,89 @@ where
Ok(())
}
- default_accumulator_impl!();
+ default_hllaccumulator_impl!();
+}
+
+impl Accumulator for BitmaptAccumulator {
+ //state() can be used by physical nodes to aggregate states together and
send them over the network/threads, to combine values.
+ fn state(&self) -> Result<Vec<ScalarValue>> {
+ let mut bytes = vec![];
+ self.bitmap.serialize_into(&mut bytes).unwrap();
+ Ok(vec![ScalarValue::Binary(Some(bytes))])
+ }
+ fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+ let value = &values[0];
+ if value.is_empty() {
+ return Ok(());
+ }
+ match value.data_type() {
+ DataType::Int8 => {
+ let array =
value.as_any().downcast_ref::<Int8Array>().unwrap();
+ for value in array.iter() {
+ match value {
+ Some(v) => self.bitmap.insert(v as u32),
+ None => false,
+ };
+ }
+ }
+ DataType::Int16 => {
+ let array =
value.as_any().downcast_ref::<Int16Array>().unwrap();
+ for value in array.iter() {
+ match value {
+ Some(v) => self.bitmap.insert(v as u32),
+ None => false,
+ };
+ }
+ }
+
+ DataType::UInt8 => {
+ let array =
value.as_any().downcast_ref::<UInt8Array>().unwrap();
+ for value in array.iter() {
+ match value {
+ Some(v) => self.bitmap.insert(v as u32),
+ None => false,
+ };
+ }
+ }
+ DataType::UInt16 => {
+ let array =
value.as_any().downcast_ref::<UInt16Array>().unwrap();
+ for value in array.iter() {
+ match value {
+ Some(v) => self.bitmap.insert(v as u32),
+ None => false,
+ };
+ }
+ }
+ e => {
+ return Err(DataFusionError::Internal(format!(
+ "Unsupported data type {:?} for bitmap distinct count",
+ e
+ )));
+ }
+ }
+ Ok(())
+ }
+
+ fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+ let binary_array =
states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+
+ for b in binary_array.iter() {
+ let v = b.ok_or_else(|| {
+ DataFusionError::Internal(
+ "Impossibly got empty binary array from states".into(),
+ )
+ })?;
+ let bitmap =
RoaringBitmap::deserialize_from(&v.to_vec()[..]).unwrap();
+ self.bitmap.bitor_assign(bitmap);
+ }
+ Ok(())
+ }
+
+ fn evaluate(&self) -> Result<ScalarValue> {
+ Ok(ScalarValue::from(self.bitmap.len()))
+ }
+
+ fn size(&self) -> usize {
+ self.bitmap.serialized_size()
Review Comment:
Not sure is a proper way to measure roaring bitmap
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]