jorgecarleitao commented on a change in pull request #7687:
URL: https://github.com/apache/arrow/pull/7687#discussion_r461825412



##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -327,120 +278,49 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // create vector large enough to hold the grouping key
+            // create vector to hold the grouping key
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
-                key.push(GroupByScalar::UInt32(0));
+                key.push(KeyScalar::UInt32(0));
             }
 
             // iterate over each row in the batch and create the accumulators 
for each grouping key
-            let mut accumulators: Vec<Rc<AccumulatorSet>> =
-                Vec::with_capacity(batch.num_rows());
-
             for row in 0..batch.num_rows() {
-                // create grouping key for this row
-                create_key(&group_values, row, &mut key)
-                    .map_err(ExecutionError::into_arrow_external_error)?;
-
-                if let Some(accumulator_set) = map.get(&key) {
-                    accumulators.push(accumulator_set.clone());
-                } else {
-                    let accumulator_set: AccumulatorSet = self
-                        .aggr_expr
-                        .iter()
-                        .map(|expr| expr.create_accumulator())
-                        .collect();
-
-                    let accumulator_set = Rc::new(accumulator_set);
-
-                    map.insert(key.clone(), accumulator_set.clone());
-                    accumulators.push(accumulator_set);
+                // create and assign the grouping key of this row
+                for i in 0..group_values.len() {
+                    key[i] = create_key(&group_values[i], row)
+                        .map_err(ExecutionError::into_arrow_external_error)?;
                 }
-            }
 
-            // iterate over each non-grouping column in the batch and update 
the accumulator
-            // for each row
-            for col in 0..aggr_input_values.len() {
-                let array = &aggr_input_values[col];
-
-                match array.data_type() {
-                    DataType::Int8 => update_accumulators!(
-                        array,
-                        Int8Array,
-                        ScalarValue::Int8,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Int16 => update_accumulators!(
-                        array,
-                        Int16Array,
-                        ScalarValue::Int16,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Int32 => update_accumulators!(
-                        array,
-                        Int32Array,
-                        ScalarValue::Int32,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Int64 => update_accumulators!(
-                        array,
-                        Int64Array,
-                        ScalarValue::Int64,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt8 => update_accumulators!(
-                        array,
-                        UInt8Array,
-                        ScalarValue::UInt8,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt16 => update_accumulators!(
-                        array,
-                        UInt16Array,
-                        ScalarValue::UInt16,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt32 => update_accumulators!(
-                        array,
-                        UInt32Array,
-                        ScalarValue::UInt32,
-                        col,
-                        accumulators
-                    ),
-                    DataType::UInt64 => update_accumulators!(
-                        array,
-                        UInt64Array,
-                        ScalarValue::UInt64,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Float32 => update_accumulators!(
-                        array,
-                        Float32Array,
-                        ScalarValue::Float32,
-                        col,
-                        accumulators
-                    ),
-                    DataType::Float64 => update_accumulators!(
-                        array,
-                        Float64Array,
-                        ScalarValue::Float64,
-                        col,
-                        accumulators
-                    ),
-                    other => {
-                        return Err(ExecutionError::ExecutionError(format!(
-                            "Unsupported data type {:?} for result of 
aggregate expression",
-                            other
-                        )).into_arrow_external_error());
+                // for each new key on the map, add an accumulatorSet to the 
map
+                match map.get(&key) {
+                    None => {
+                        let accumulator_set: AccumulatorSet = self
+                            .aggr_expr
+                            .iter()
+                            .map(|expr| expr.create_accumulator())
+                            .collect();
+                        map.insert(key.clone(), Rc::new(accumulator_set));
                     }
+                    _ => (),
                 };
+
+                // iterate over each non-grouping column in the batch and 
update the accumulator
+                // for each row
+                for col in 0..aggr_input_values.len() {
+                    let value = get_scalar_value(&aggr_input_values[col], row)
+                        .map_err(ExecutionError::into_arrow_external_error)?;
+
+                    match map.get(&key) {

Review comment:
       I fixed both issues for now.

##########
File path: rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
##########
@@ -327,120 +278,47 @@ impl RecordBatchReader for GroupedHashAggregateIterator {
                 })
                 .collect::<ArrowResult<Vec<_>>>()?;
 
-            // create vector large enough to hold the grouping key
+            // create vector to hold the grouping key
             let mut key = Vec::with_capacity(group_values.len());
             for _ in 0..group_values.len() {
-                key.push(GroupByScalar::UInt32(0));
+                key.push(KeyScalar::UInt32(0));
             }
 
             // iterate over each row in the batch and create the accumulators 
for each grouping key
-            let mut accumulators: Vec<Rc<AccumulatorSet>> =

Review comment:
       Mind blowing. Still much to learn.
   
   I struggle to run the benchmarks on my computer. Is there any design reason 
to not run the benchmarks as part of the pipeline? Are they too unstable 
against changes in hardware?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to