Rachelint commented on code in PR #12996: URL: https://github.com/apache/datafusion/pull/12996#discussion_r1823772169
########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -75,55 +154,350 @@ pub struct GroupValuesColumn { random_state: RandomState, } -impl GroupValuesColumn { +impl VectorizedGroupValuesColumn { /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result<Self> { let map = RawTable::with_capacity(0); Ok(Self { schema, map, + group_index_lists: Vec::new(), map_size: 0, group_values: vec![], hashes_buffer: Default::default(), random_state: Default::default(), + scalarized_indices: Default::default(), + vectorized_equal_to_row_indices: Default::default(), + vectorized_equal_to_group_indices: Default::default(), + vectorized_equal_to_results: Default::default(), + vectorized_append_row_indices: Default::default(), }) } - /// Returns true if [`GroupValuesColumn`] supported for the specified schema - pub fn supported_schema(schema: &Schema) -> bool { - schema - .fields() - .iter() - .map(|f| f.data_type()) - .all(Self::supported_type) + /// Collect vectorized context by checking hash values of `cols` in `map` + /// + /// 1. If bucket not found + /// - Build and insert the `new inlined group index view` + /// and its hash value to `map` + /// - Add row index to `vectorized_append_row_indices` + /// - Set group index to row in `groups` + /// + /// 2. bucket found + /// - Add row index to `vectorized_equal_to_row_indices` + /// - Check if the `group index view` is `inlined` or `non_inlined`: + /// If it is inlined, add to `vectorized_equal_to_group_indices` directly. + /// Otherwise get all group indices from `group_index_lists`, and add them. + /// + fn collect_vectorized_process_context( + &mut self, + batch_hashes: &[u64], + groups: &mut [usize], + ) { + self.vectorized_append_row_indices.clear(); + self.vectorized_equal_to_row_indices.clear(); + self.vectorized_equal_to_group_indices.clear(); + + let mut group_values_len = self.group_values[0].len(); + for (row, &target_hash) in batch_hashes.iter().enumerate() { + let entry = self + .map + .get(target_hash, |(exist_hash, _)| target_hash == *exist_hash); + + let Some((_, group_index_view)) = entry else { + // 1. Bucket not found case + // Build `new inlined group index view` + let current_group_idx = group_values_len; + let group_index_view = + GroupIndexView::new_inlined(current_group_idx as u64); + + // Insert the `group index view` and its hash into `map` + // for hasher function, use precomputed hash value + self.map.insert_accounted( + (target_hash, group_index_view), + |(hash, _)| *hash, + &mut self.map_size, + ); + + // Add row index to `vectorized_append_row_indices` + self.vectorized_append_row_indices.push(row); + + // Set group index to row in `groups` + groups[row] = current_group_idx; + + group_values_len += 1; + continue; + }; + + // 2. bucket found + // Check if the `group index view` is `inlined` or `non_inlined` + if group_index_view.is_non_inlined() { + // Non-inlined case, the value of view is offset in `group_index_lists`. + // We use it to get `group_index_list`, and add related `rows` and `group_indices` + // into `vectorized_equal_to_row_indices` and `vectorized_equal_to_group_indices`. + let list_offset = group_index_view.value() as usize; + let group_index_list = &self.group_index_lists[list_offset]; + for &group_index in group_index_list { + self.vectorized_equal_to_row_indices.push(row); + self.vectorized_equal_to_group_indices.push(group_index); + } + } else { + let group_index = group_index_view.value() as usize; + self.vectorized_equal_to_row_indices.push(row); + self.vectorized_equal_to_group_indices.push(group_index); + } + } } - /// Returns true if the specified data type is supported by [`GroupValuesColumn`] - /// - /// In order to be supported, there must be a specialized implementation of - /// [`GroupColumn`] for the data type, instantiated in [`Self::intern`] - fn supported_type(data_type: &DataType) -> bool { - matches!( - *data_type, - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float32 - | DataType::Float64 - | DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Binary - | DataType::LargeBinary - | DataType::Date32 - | DataType::Date64 - | DataType::Utf8View - | DataType::BinaryView - ) + /// Perform `vectorized_append`` for `rows` in `vectorized_append_row_indices` + fn vectorized_append(&mut self, cols: &[ArrayRef]) { + if self.vectorized_append_row_indices.is_empty() { + return; + } + + let iter = self.group_values.iter_mut().zip(cols.iter()); + for (group_column, col) in iter { + group_column.vectorized_append(col, &self.vectorized_append_row_indices); + } + } + + /// Perform `vectorized_equal_to` + /// + /// 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` + /// and `group_indices` in `vectorized_equal_to_group_indices`. + /// + /// 2. Check `equal_to_results`: + /// + /// If found equal to `rows`, set the `group_indices` to `rows` in `groups`. + /// + /// If found not equal to `row`s, just add them to `scalarized_indices`, + /// and perform `scalarized_intern` for them after. + /// Usually, such `rows` having same hash but different value with `exists rows` + /// are very few. + fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut [usize]) { + assert_eq!( + self.vectorized_equal_to_group_indices.len(), + self.vectorized_equal_to_row_indices.len() + ); + + self.scalarized_indices.clear(); + + if self.vectorized_equal_to_group_indices.is_empty() { + return; + } + + // 1. Perform `vectorized_equal_to` for `rows` in `vectorized_equal_to_group_indices` + // and `group_indices` in `vectorized_equal_to_group_indices` + let mut equal_to_results = mem::take(&mut self.vectorized_equal_to_results); + equal_to_results.clear(); + equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), true); + + for (col_idx, group_col) in self.group_values.iter().enumerate() { + group_col.vectorized_equal_to( + &self.vectorized_equal_to_group_indices, + &cols[col_idx], + &self.vectorized_equal_to_row_indices, + &mut equal_to_results, + ); + } + + // 2. Check `equal_to_results`, if found not equal to `row`s, just add them + // to `scalarized_indices`, and perform `scalarized_intern` for them after. + let mut current_row_equal_to_result = false; + for (idx, &row) in self.vectorized_equal_to_row_indices.iter().enumerate() { Review Comment: Good point! I plan to continue optimizing the `vectorized_equal_to` together with https://github.com/apache/datafusion/pull/12996#discussion_r1818601807 and https://github.com/apache/datafusion/pull/12996#discussion_r1823151542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org