alamb commented on code in PR #6657:
URL: https://github.com/apache/arrow-datafusion/pull/6657#discussion_r1241126875
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +785,67 @@ impl GroupedHashAggregateStream {
}
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
+
+ fn update_one_accumulator_with_native_value<T1>(
Review Comment:
This code is basically a copy of what is in
`datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs`,
right?
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -496,37 +497,57 @@ impl GroupedHashAggregateStream {
})
.collect::<Result<Vec<_>>>()?;
- for group_idx in groups_with_rows {
- let group_state = &mut self.aggr_state.group_states[*group_idx];
- let mut state_accessor =
- RowAccessor::new_from_layout(self.row_aggr_layout.clone());
- state_accessor.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
- for idx in &group_state.indices {
+ let mut single_value_acc_idx = vec![];
Review Comment:
FYI @ozankabak and @mustafasrepo -- here is an example where having two
grouping operators (hash and bounded) requires 2x the code - not sure if we can
find a way to reduce the duplication
##########
datafusion/physical-expr/src/aggregate/row_agg_macros.rs:
##########
@@ -0,0 +1,525 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[macro_export]
+macro_rules! matches_all_supported_data_types {
+ ($expression:expr) => {
+ matches!(
+ $expression,
+ DataType::Boolean
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64
+ | DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::Float32
+ | DataType::Float64
+ | DataType::Decimal128(_, _)
+ )
+ };
+}
+pub use matches_all_supported_data_types;
+
+#[macro_export]
+macro_rules! dispatch_all_supported_data_types {
+ ($macro:ident $(, $x:ident)*) => {
+ $macro! {
+ [$($x),*],
+ { Boolean, BooleanArray },
+ { Int8, Int8Array },
+ { Int16, Int16Array },
+ { Int32, Int32Array },
+ { Int64, Int64Array },
+ { UInt8, UInt8Array },
+ { UInt16, UInt16Array },
+ { UInt32, UInt32Array },
+ { UInt64, UInt64Array },
+ { Float32, Float32Array },
+ { Float64, Float64Array },
+ { Decimal128, Decimal128Array }
+ }
+ };
+}
+
+pub use dispatch_all_supported_data_types;
+
+// TODO generate the matching type pairs
+#[macro_export]
Review Comment:
I really worry about this table -- it seems like it will be very hard to
maintain (aka trying to add some new type or accumulator type will be very hard)
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -761,4 +784,67 @@ impl GroupedHashAggregateStream {
}
Ok(Some(RecordBatch::try_new(self.schema.clone(), output)?))
}
+
+ fn update_one_accumulator_with_native_value<T1>(
+ &mut self,
+ groups_with_rows: &[usize],
+ agg_input_array1: &T1,
+ acc_idx1: usize,
+ filter_bool_array: &[Option<&BooleanArray>],
+ ) -> Result<()>
+ where
+ T1: ArrowArrayReader,
+ {
+ let accumulator1 = &self.row_accumulators[acc_idx1];
+ let filter_array1 = &filter_bool_array[acc_idx1];
+ for group_idx in groups_with_rows {
+ let group_state = &mut self.aggr_state.group_states[*group_idx];
+ let mut state_accessor =
+ RowAccessor::new_from_layout(self.row_aggr_layout.clone());
+ state_accessor.point_to(0,
group_state.aggregation_buffer.as_mut_slice());
+ for idx in &group_state.indices {
+ let value = col_to_value(agg_input_array1, filter_array1, *idx
as usize);
+ accumulator1.update_value::<T1::Item>(value, &mut
state_accessor);
+ }
+ // clear the group indices in this group
+ group_state.indices.clear();
+ }
+
+ Ok(())
+ }
+
+ fn update_two_accumulator2_with_native_value<T1, T2>(
Review Comment:
After thinking about this a bit -- I wonder if we can somehow get rid of the
need to typecast during the aggregate at all -- it seems to me the code would
be much simpler if the aggregator didn't have to cast its input, but instead
the inputs were cast as needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]