mingmwang commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241222042
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
}
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
+
+ fn update_one_accumulator_with_native_value<T1>(
+ &mut self,
+ groups_with_rows: &[usize],
+ agg_input_array1: &T1,
+ acc_idx1: usize,
+ filter_bool_array: &[Option<&BooleanArray>],
+ ) -> Result<()>
+ where
+ T1: ArrowArrayReader,
+ {
+ let accumulator1 = &self.row_accumulators[acc_idx1];
+ let filter_array1 = &filter_bool_array[acc_idx1];
+ for group_idx in groups_with_rows {
+ let group_state = &mut self.aggr_state.group_states[*group_idx];
+ let mut state_accessor =
+ RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+ state_accessor.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
+ for idx in &group_state.indices {
+ let value = col_to_value(agg_input_array1, filter_array1, *idx
as usize);
+ accumulator1.update_value::<T1::Item>(value, &mut
state_accessor);
+ }
+ // clear the group indices in this group
+ group_state.indices.clear();
+ }
+
+ Ok(())
+ }
+
+ fn update_two_accumulator2_with_native_value<T1, T2>(
Review Comment:
> After thinking about this a bit -- I wonder if we can somehow get rid of
the need to typecast during the aggregate at all -- it seems to me the code
would be much simpler if the aggregator didn't have to cast its input, but
instead the inputs were cast as needed.
I'm afraid the `typecast` can not be avoid. This is because the arrow
`Array` Trait itself does not provide any `read` functions to read its value
at a given `index`. The trait must be downcast to the
Struct(`PrimitiveArray<T:ArrowPrimitiveType>` or `BooleanArray`) first.
I think this is related to `Trait Objects` restrictions. If we want to add
`generate` methods(like read value at given index) method or add `GAT` to a
trait, that trait can not be used as a `Trait Objects` any more.
This is also why in this PR I had added a new trait. This new trait and its
implementors bridge the type system between
the arrow `ArrowPrimitiveType` and the row accumulator internal state types.
```rust
pub trait ArrowArrayReader: Array {
type Item: RowAccumulatorNativeType;
/// Returns the element at index `i`
/// # Panics
/// Panics if the value is outside the bounds of the array
fn value_at(&self, index: usize) -> Self::Item;
/// Returns the element at index `i`
/// # Safety
/// Caller is responsible for ensuring that the index is within the
bounds of the array
unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item;
}
impl<'a> ArrowArrayReader for &'a BooleanArray {
type Item = bool;
#[inline]
fn value_at(&self, index: usize) -> Self::Item {
BooleanArray::value(self, index)
}
#[inline]
unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
BooleanArray::value_unchecked(self, index)
}
}
impl<'a, T: ArrowPrimitiveType> ArrowArrayReader for &'a PrimitiveArray<T>
where
<T as ArrowPrimitiveType>::Native: RowAccumulatorNativeType,
{
type Item = T::Native;
#[inline]
fn value_at(&self, index: usize) -> Self::Item {
PrimitiveArray::value(self, index)
}
#[inline]
unsafe fn value_at_unchecked(&self, index: usize) -> Self::Item {
PrimitiveArray::value_unchecked(self, index)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]