jayzhan211 commented on code in PR #12996: URL: https://github.com/apache/datafusion/pull/12996#discussion_r1824335722
########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -196,6 +570,324 @@ impl GroupValues for GroupValuesColumn { let b = ByteViewGroupValueBuilder::<BinaryViewType>::new(); v.push(Box::new(b) as _) } + dt => { + return not_impl_err!( + "{dt} not supported in VectorizedGroupValuesColumn" + ) + } + } + } + self.group_values = v; + } + + // tracks to which group each of the input rows belongs + groups.clear(); + groups.resize(n_rows, usize::MAX); + + let mut batch_hashes = mem::take(&mut self.hashes_buffer); + batch_hashes.clear(); + batch_hashes.resize(n_rows, 0); + create_hashes(cols, &self.random_state, &mut batch_hashes)?; + + // General steps for one round `vectorized equal_to & append`: + // 1. Collect vectorized context by checking hash values of `cols` in `map`, + // mainly fill `vectorized_append_row_indices`, `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices` + // + // 2. Perform `vectorized_append` for `vectorized_append_row_indices`. + // `vectorized_append` must be performed before `vectorized_equal_to`, + // because some `group indices` in `vectorized_equal_to_group_indices` + // may be actually placeholders, and still point to no actual values in + // `group_values` before performing append. + // + // 3. Perform `vectorized_equal_to` for `vectorized_equal_to_row_indices` + // and `vectorized_equal_to_group_indices`. If found some rows in input `cols` + // not equal to `exist rows` in `group_values`, place them in `scalarized_indices` + // and perform `scalarized_intern` for them similar as what in [`GroupValuesColumn`] + // after. + // + // 4. Perform `scalarized_intern` for rows mentioned above, when we process like this + // can see the comments of `scalarized_intern`. + // + + // 1. Collect vectorized context by checking hash values of `cols` in `map` + self.collect_vectorized_process_context(&batch_hashes, groups); + + // 2. Perform `vectorized_append` + self.vectorized_append(cols); + + // 3. Perform `vectorized_equal_to` + self.vectorized_equal_to(cols, groups); + + // 4. Perform `scalarized_intern` + self.scalarized_intern(cols, &batch_hashes, groups); + + self.hashes_buffer = batch_hashes; + + Ok(()) + } + + fn size(&self) -> usize { + let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum(); + group_values_size + self.map_size + self.hashes_buffer.allocated_size() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn len(&self) -> usize { + if self.group_values.is_empty() { + return 0; + } + + self.group_values[0].len() + } + + fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> { + let mut output = match emit_to { + EmitTo::All => { + let group_values = mem::take(&mut self.group_values); + debug_assert!(self.group_values.is_empty()); + + group_values + .into_iter() + .map(|v| v.build()) + .collect::<Vec<_>>() + } + EmitTo::First(n) => { + let output = self + .group_values + .iter_mut() + .map(|v| v.take_n(n)) + .collect::<Vec<_>>(); + let new_group_index_lists = + Vec::with_capacity(self.group_index_lists.len()); + let old_group_index_lists = + mem::replace(&mut self.group_index_lists, new_group_index_lists); + + // SAFETY: self.map outlives iterator and is not modified concurrently + unsafe { + for bucket in self.map.iter() { + // Check if it is `inlined` or `non-inlined` + if bucket.as_ref().1.is_non_inlined() { + // Non-inlined case + // We take `group_index_list` from `old_group_index_lists` + let list_offset = bucket.as_ref().1.value() as usize; + let old_group_index_list = + &old_group_index_lists[list_offset]; + + let mut new_group_index_list = Vec::new(); + for &group_index in old_group_index_list { + if let Some(remaining) = group_index.checked_sub(n) { + new_group_index_list.push(remaining); + } + } + + // The possible results: + // - `new_group_index_list` is empty, we should erase this bucket + // - only one value in `new_group_index_list`, switch the `view` to `inlined` + // - still multiple values in `new_group_index_list`, build and set the new `unlined view` + if new_group_index_list.is_empty() { + self.map.erase(bucket); + } else if new_group_index_list.len() == 1 { + let group_index = new_group_index_list.first().unwrap(); + bucket.as_mut().1 = + GroupIndexView::new_inlined(*group_index as u64); + } else { + let new_list_offset = self.group_index_lists.len(); + self.group_index_lists.push(new_group_index_list); Review Comment: I have another idea like https://github.com/Rachelint/arrow-datafusion/pull/2/files, but given the test doesn't seem to cover the code here (I didn't see the list offset print out), so we need to strengthen test to make sure the change makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org