rluvaton commented on code in PR #19520:
URL: https://github.com/apache/datafusion/pull/19520#discussion_r2649657688
##########
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs:
##########
@@ -491,74 +528,168 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING>
{
batch_hashes: &[u64],
groups: &mut [usize],
) {
+ // ➜ ~ sysctl hw.l1dcachesize
+ // L1 cache size: 65536
+ // L2 cache size: 4194304
+
+ let should_prefetch = if self.agg_prefetch_locality == 1 {
+ self.agg_prefetch_elements > 0 && self.map_size >= l1_cache_size()
+ } else if self.agg_prefetch_locality == 2 {
+ self.agg_prefetch_elements > 0 && self.map_size >= l2_cache_size()
+ } else if self.agg_prefetch_locality == 3 {
+ self.agg_prefetch_elements > 0 && self.map_size >= l3_cache_size()
+ } else {
+ self.agg_prefetch_elements > 0
+ };
+
+ if !should_prefetch {
+
self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes,
groups);
+ return;
+ }
+
+ match self.agg_prefetch_elements {
+ 0 =>
self.collect_vectorized_process_context_with_prefetch::<0>(batch_hashes,
groups),
+ 1 =>
self.collect_vectorized_process_context_with_prefetch::<1>(batch_hashes,
groups),
+ 2 =>
self.collect_vectorized_process_context_with_prefetch::<2>(batch_hashes,
groups),
+ 3 =>
self.collect_vectorized_process_context_with_prefetch::<3>(batch_hashes,
groups),
+ 4 =>
self.collect_vectorized_process_context_with_prefetch::<4>(batch_hashes,
groups),
+ 5 =>
self.collect_vectorized_process_context_with_prefetch::<5>(batch_hashes,
groups),
+ 6 =>
self.collect_vectorized_process_context_with_prefetch::<6>(batch_hashes,
groups),
+ 7 =>
self.collect_vectorized_process_context_with_prefetch::<7>(batch_hashes,
groups),
+ 8 =>
self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes,
groups),
+ _ =>
self.collect_vectorized_process_context_with_prefetch::<8>(batch_hashes,
groups),
+ }
+ }
+
+ fn collect_vectorized_process_context_with_prefetch<const PREFETCH: usize>(
+ &mut self,
+ batch_hashes: &[u64],
+ groups: &mut [usize],
+ ) {
+ if self.agg_prefetch_read {
+
self.collect_vectorized_process_context_with_prefetch_and_read::<true,
PREFETCH>(batch_hashes, groups)
+ } else {
+
self.collect_vectorized_process_context_with_prefetch_and_read::<false,
PREFETCH>(batch_hashes, groups)
+ }
+ }
+
+ fn collect_vectorized_process_context_with_prefetch_and_read<const READ:
bool, const PREFETCH: usize>(
+ &mut self,
+ batch_hashes: &[u64],
+ groups: &mut [usize],
+ ) {
+ match self.agg_prefetch_locality {
+ 0 =>
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
PREFETCH, 0>(batch_hashes, groups),
+ 1 =>
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
PREFETCH, 1>(batch_hashes, groups),
+ 2 =>
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
PREFETCH, 2>(batch_hashes, groups),
+ 3 =>
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
PREFETCH, 3>(batch_hashes, groups),
+ _ =>
self.collect_vectorized_process_context_with_prefetch_and_read_and_locality::<READ,
PREFETCH, 3>(batch_hashes, groups),
+ }
+ }
+ fn
collect_vectorized_process_context_with_prefetch_and_read_and_locality<const
READ: bool, const PREFETCH: usize, const LOCALITY: i32>(
+ &mut self,
+ batch_hashes: &[u64],
+ groups: &mut [usize],
+ ) {
self.vectorized_operation_buffers.append_row_indices.clear();
self.vectorized_operation_buffers
- .equal_to_row_indices
- .clear();
+ .equal_to_row_indices
+ .clear();
self.vectorized_operation_buffers
- .equal_to_group_indices
- .clear();
+ .equal_to_group_indices
+ .clear();
+
+ if PREFETCH > 0 {
+ self.map.reserve(batch_hashes.len(), |(hash, _)| *hash);
+ let mut group_values_len = self.group_values[0].len();
+ if batch_hashes.is_empty() {
+ return;
+ }
- let mut group_values_len = self.group_values[0].len();
- for (row, &target_hash) in batch_hashes.iter().enumerate() {
- let entry = self
- .map
- .find(target_hash, |(exist_hash, _)| target_hash ==
*exist_hash);
+ let mut batch_hashes_iter =
batch_hashes[0..batch_hashes.len().saturating_sub(PREFETCH)].iter().enumerate();
- let Some((_, group_index_view)) = entry else {
- // 1. Bucket not found case
- // Build `new inlined group index view`
- let current_group_idx = group_values_len;
- let group_index_view =
- GroupIndexView::new_inlined(current_group_idx as u64);
-
- // Insert the `group index view` and its hash into `map`
- // for hasher function, use precomputed hash value
- self.map.insert_accounted(
- (target_hash, group_index_view),
- |(hash, _)| *hash,
- &mut self.map_size,
- );
-
- // Add row index to `vectorized_append_row_indices`
- self.vectorized_operation_buffers
- .append_row_indices
- .push(row);
-
- // Set group index to row in `groups`
- groups[row] = current_group_idx;
-
- group_values_len += 1;
- continue;
- };
+ for (row, &target_hash) in batch_hashes_iter {
+ // prefetch next item
+ for i in 1..=PREFETCH {
+ if READ {
+ self.map.prefetch_read::<LOCALITY>(batch_hashes[row +
i]);
+ } else {
+ self.map.prefetch_write::<LOCALITY>(batch_hashes[row +
i]);
+ }
+ }
+ self.insert_entry(groups, &mut group_values_len, row,
target_hash);
+ }
+ for index in
batch_hashes.len().saturating_sub(PREFETCH)..batch_hashes.len() {
+ self.insert_entry(groups, &mut group_values_len, index,
batch_hashes[index]);
+ }
+ } else {
+ let mut group_values_len = self.group_values[0].len();
- // 2. bucket found
- // Check if the `group index view` is `inlined` or `non_inlined`
- if group_index_view.is_non_inlined() {
- // Non-inlined case, the value of view is offset in
`group_index_lists`.
- // We use it to get `group_index_list`, and add related `rows`
and `group_indices`
- // into `vectorized_equal_to_row_indices` and
`vectorized_equal_to_group_indices`.
- let list_offset = group_index_view.value() as usize;
- let group_index_list = &self.group_index_lists[list_offset];
-
- self.vectorized_operation_buffers
- .equal_to_group_indices
- .extend_from_slice(group_index_list);
- self.vectorized_operation_buffers
- .equal_to_row_indices
- .extend(std::iter::repeat_n(row, group_index_list.len()));
- } else {
- let group_index = group_index_view.value() as usize;
- self.vectorized_operation_buffers
- .equal_to_row_indices
- .push(row);
- self.vectorized_operation_buffers
- .equal_to_group_indices
- .push(group_index);
+ for (row, &target_hash) in batch_hashes.iter().enumerate() {
+ self.insert_entry(groups, &mut group_values_len, row,
target_hash);
}
}
}
+ #[inline(always)]
Review Comment:
Inline always to make sure my change for extracting to a function is not
affecting the prev case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]