rluvaton commented on code in PR #19520:
URL: https://github.com/apache/datafusion/pull/19520#discussion_r2649658142


##########
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs:
##########
@@ -491,74 +528,168 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> 
{
         batch_hashes: &[u64],
         groups: &mut [usize],
     ) {
+        // ➜  ~ sysctl hw.l1dcachesize
+        // L1 cache size: 65536
+        // L2 cache size: 4194304
+
+        let should_prefetch = if self.agg_prefetch_locality == 1 {
+            self.agg_prefetch_elements > 0 && self.map_size >= l1_cache_size()
+        } else if self.agg_prefetch_locality == 2 {
+            self.agg_prefetch_elements > 0 && self.map_size >= l2_cache_size()
+        }  else if self.agg_prefetch_locality == 3 {
+            self.agg_prefetch_elements > 0 && self.map_size >= l3_cache_size()
+        } else {
+            self.agg_prefetch_elements > 0
+        };
+
+        if !should_prefetch {
+            
self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes, 
groups);
+            return;
+        }
+
+        match self.agg_prefetch_elements {
+            0 => 
self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes, 
groups),
+            1 => 
self.collect_vectorized_process_context_with_prefetch::<1>(batch_hashes, 
groups),
+            2 => 
self.collect_vectorized_process_context_with_prefetch::<2>(batch_hashes, 
groups),
+            3 => 
self.collect_vectorized_process_context_with_prefetch::<3>(batch_hashes, 
groups),
+            4 => 
self.collect_vectorized_process_context_with_prefetch::<4>(batch_hashes, 
groups),
+            5 => 
self.collect_vectorized_process_context_with_prefetch::<5>(batch_hashes, 
groups),
+            6 => 
self.collect_vectorized_process_context_with_prefetch::<6>(batch_hashes, 
groups),
+            7 => 
self.collect_vectorized_process_context_with_prefetch::<7>(batch_hashes, 
groups),
+            8 => 
self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes, 
groups),
+            _ => 
self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes, 
groups),
+        }
+    }
+
+    fn collect_vectorized_process_context_with_prefetch<const PREFETCH: usize>(
+        &mut self,
+        batch_hashes: &[u64],
+        groups: &mut [usize],
+    ) {
+        if self.agg_prefetch_read {
+            
self.collect_vectorized_process_context_with_prefetch_and_read::<true, 
PREFETCH>(batch_hashes, groups)
+        } else {
+            
self.collect_vectorized_process_context_with_prefetch_and_read::<false, 
PREFETCH>(batch_hashes, groups)
+        }
+    }
+
+    fn collect_vectorized_process_context_with_prefetch_and_read<const READ: 
bool, const PREFETCH: usize>(
+        &mut self,
+        batch_hashes: &[u64],
+        groups: &mut [usize],
+    ) {
+        match self.agg_prefetch_locality {
+            0 => 
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
 PREFETCH, 0>(batch_hashes, groups),
+            1 => 
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
 PREFETCH, 1>(batch_hashes, groups),
+            2 => 
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
 PREFETCH, 2>(batch_hashes, groups),
+            3 => 
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
 PREFETCH, 3>(batch_hashes, groups),
+            _ => 
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
 PREFETCH, 3>(batch_hashes, groups),
+        }
+    }
+        fn 
collect_vectorized_process_context_with_prefetch_and_read_and_locality<const 
READ: bool, const PREFETCH: usize, const LOCALITY: i32>(
+            &mut self,
+            batch_hashes: &[u64],
+            groups: &mut [usize],
+        ) {
         self.vectorized_operation_buffers.append_row_indices.clear();
         self.vectorized_operation_buffers
-            .equal_to_row_indices
-            .clear();
+          .equal_to_row_indices
+          .clear();
         self.vectorized_operation_buffers
-            .equal_to_group_indices
-            .clear();
+          .equal_to_group_indices
+          .clear();
+
+        if PREFETCH > 0 {
+            self.map.reserve(batch_hashes.len(), |(hash, _)| *hash);
+            let mut group_values_len = self.group_values[0].len();
+            if batch_hashes.is_empty() {
+                return;
+            }
 
-        let mut group_values_len = self.group_values[0].len();
-        for (row, &target_hash) in batch_hashes.iter().enumerate() {
-            let entry = self
-                .map
-                .find(target_hash, |(exist_hash, _)| target_hash == 
*exist_hash);
+            let mut batch_hashes_iter = 
batch_hashes[0..batch_hashes.len().saturating_sub(PREFETCH)].iter().enumerate();
 
-            let Some((_, group_index_view)) = entry else {
-                // 1. Bucket not found case
-                // Build `new inlined group index view`
-                let current_group_idx = group_values_len;
-                let group_index_view =
-                    GroupIndexView::new_inlined(current_group_idx as u64);
-
-                // Insert the `group index view` and its hash into `map`
-                // for hasher function, use precomputed hash value
-                self.map.insert_accounted(
-                    (target_hash, group_index_view),
-                    |(hash, _)| *hash,
-                    &mut self.map_size,
-                );
-
-                // Add row index to `vectorized_append_row_indices`
-                self.vectorized_operation_buffers
-                    .append_row_indices
-                    .push(row);
-
-                // Set group index to row in `groups`
-                groups[row] = current_group_idx;
-
-                group_values_len += 1;
-                continue;
-            };
+            for (row, &target_hash) in batch_hashes_iter {
+                // prefetch next item
+                for i in 1..=PREFETCH {
+                    if READ {
+                        self.map.prefetch_read::<LOCALITY>(batch_hashes[row + 
i]);
+                    } else {
+                        self.map.prefetch_write::<LOCALITY>(batch_hashes[row + 
i]);
+                    }
+                }

Review Comment:
   I prefetch the same item multiple times but I should prefetch before the 
loop and then only prefetch the row + PREFETCH



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to