Rachelint commented on code in PR #12996: URL: https://github.com/apache/datafusion/pull/12996#discussion_r1823813010
########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -196,6 +570,324 @@ impl GroupValues for GroupValuesColumn { let b = ByteViewGroupValueBuilder::<BinaryViewType>::new(); v.push(Box::new(b) as _) } + dt => { + return not_impl_err!( + "{dt} not supported in VectorizedGroupValuesColumn" + ) + } + } + } + self.group_values = v; + } + + // tracks to which group each of the input rows belongs + groups.clear(); + groups.resize(n_rows, usize::MAX); + + let mut batch_hashes = mem::take(&mut self.hashes_buffer); + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, &mut batch_hashes)?; + + // General steps for one round `vectorized equal_to & append`: + // 1. Collect vectorized context by checking hash values of `cols` in `map`, + // mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices` + // + // 2. Perform `vectorized_append` for `vectorized_append_row_indices`. + // `vectorized_append` must be performed before `vectorized_equal_to`, + // because some `group indices` in `vectorized_equal_to_group_indices` + // may be actually placeholders, and still point to no actual values in + // `group_values` before performing append. + // + // 3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices`. If found some rows in input `cols` + // not equal to `exist rows` in `group_values`, place them in `scalarized_indices` + // and perform `scalarized_intern` for them similar as what in [`GroupValuesColumn`] + // after. + // + // 4. Perform `scalarized_intern` for rows mentioned above, when we process like this + // can see the comments of `scalarized_intern`. + // + + // 1. Collect vectorized context by checking hash values of `cols` in `map` + self.collect_vectorized_process_context(&batch_hashes, groups); + + // 2. Perform `vectorized_append` + self.vectorized_append(cols); + + // 3. Perform `vectorized_equal_to` + self.vectorized_equal_to(cols, groups); + + // 4. Perform `scalarized_intern` + self.scalarized_intern(cols, &batch_hashes, groups); + + self.hashes_buffer = batch_hashes; + + Ok(()) + } + + fn size(&self) -> usize { + let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum(); + group_values_size + self.map_size + self.hashes_buffer.allocated_size() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn len(&self) -> usize { + if self.group_values.is_empty() { + return 0; + } + + self.group_values[0].len() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> { + let mut output = match emit_to { + EmitTo::All => { + let group_values = mem::take(&mut self.group_values); + debug_assert!(self.group_values.is_empty()); + + group_values + .into_iter() + .map(|v| v.build()) + .collect::<Vec<_>>() + } + EmitTo::First(n) => { + let output = self + .group_values + .iter_mut() + .map(|v| v.take_n(n)) + .collect::<Vec<_>>(); + let new_group_index_lists = + Vec::with_capacity(self.group_index_lists.len()); + let old_group_index_lists = + mem::replace(&mut self.group_index_lists, new_group_index_lists); + + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + // Check if it is `inlined` or `non-inlined` + if bucket.as_ref().1.is_non_inlined() { + // Non-inlined case + // We take `group_index_list` from `old_group_index_lists` + let list_offset = bucket.as_ref().1.value() as usize; + let old_group_index_list = + &old_group_index_lists[list_offset]; + + let mut new_group_index_list = Vec::new(); + for &group_index in old_group_index_list { + if let Some(remaining) = group_index.checked_sub(n) { + new_group_index_list.push(remaining); + } + } + + // The possible results: + // - `new_group_index_list` is empty, we should erase this bucket + // - only one value in `new_group_index_list`, switch the `view` to `inlined` + // - still multiple values in `new_group_index_list`, build and set the new `unlined view` + if new_group_index_list.is_empty() { + self.map.erase(bucket); + } else if new_group_index_list.len() == 1 { + let group_index = new_group_index_list.first().unwrap(); + bucket.as_mut().1 = + GroupIndexView::new_inlined(*group_index as u64); + } else { + let new_list_offset = self.group_index_lists.len(); + self.group_index_lists.push(new_group_index_list); Review Comment: 🤔 current when taking n happen, we need to create an `new group_index_lists`, and push the `group_index_list` still not empty from `old group_index_lists` to it. The alternative is that, we don't create `new group_index_lists`, and just resue the `old group_index_lists`. But I am afraid it will become something like following finally: ```text [], [1, 2], [], ... [], [], [3, 4], ``` I think we can actually reuse the `group_index_list` taken from `old group_index_lists`, rather than dropping it: - We can define something like `taken_group_index_list_buffer` - And we will place the `kept group indices` into them - Finally we copy the `kept group indices` from taken_group_index_list_buffer` to the reused `group_index_list`, and push it to the current `grou_index_lists`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org