Dandandan commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r683978762



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -363,55 +348,74 @@ fn group_aggregate_batch(
 
     let mut group_by_values = group_by_values.into_boxed_slice();
 
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
     // Make sure we can create the accumulators or otherwise return an error
     
create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // track which entries in `accumulators` have rows in this batch to 
aggregate
+    let mut groups_with_rows = vec![];
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
+
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
                 // We can safely unwrap here as we checked we can create an 
accumulator before
                 let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());
-                // Note it would be nice to make this a real error (rather 
than panic)
-                // but it is better than silently ignoring the issue and 
getting wrong results
-                create_group_by_values(&group_values, row, &mut 
group_by_values)
-                    .expect("can not create group by value");
-                (
-                    key.clone(),
-                    (group_by_values.clone(), accumulator_set, vec![row as 
u32]),
-                )
-            });
+
+                // Copy group values from arrays into ScalarValues
+                create_group_by_values(&group_values, row, &mut 
group_by_values)?;

Review comment:
       I think for the previous implementation for the `key` it made sense, as 
the key only had to be cloned when it had to be inserted. But here we are doing 
the `clone` afterwards each time anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to