Rachelint commented on code in PR #12996:
URL: https://github.com/apache/datafusion/pull/12996#discussion_r1822523760


##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -75,55 +176,317 @@ pub struct GroupValuesColumn {
     random_state: RandomState,
 }
 
-impl GroupValuesColumn {
+impl VectorizedGroupValuesColumn {
     /// Create a new instance of GroupValuesColumn if supported for the 
specified schema
     pub fn try_new(schema: SchemaRef) -> Result<Self> {
         let map = RawTable::with_capacity(0);
         Ok(Self {
             schema,
             map,
+            group_index_lists: Vec::new(),
+            index_lists_updates: Vec::new(),
             map_size: 0,
             group_values: vec![],
             hashes_buffer: Default::default(),
             random_state: Default::default(),
+            scalarized_indices: Default::default(),
+            vectorized_equal_to_row_indices: Default::default(),
+            vectorized_equal_to_group_indices: Default::default(),
+            vectorized_equal_to_results: Default::default(),
+            vectorized_append_row_indices: Default::default(),
         })
     }
 
-    /// Returns true if [`GroupValuesColumn`] supported for the specified 
schema
-    pub fn supported_schema(schema: &Schema) -> bool {
-        schema
-            .fields()
-            .iter()
-            .map(|f| f.data_type())
-            .all(Self::supported_type)
+    /// Collect vectorized context by checking hash values of `cols` in `map`
+    ///
+    /// 1. If bucket not found
+    ///   - Build and insert the `new inlined group index view`
+    ///     and its hash value to `map`
+    ///   - Add row index to `vectorized_append_row_indices`
+    ///   - Set group index to row in `groups`
+    ///
+    /// 2. bucket found
+    ///   - Add row index to `vectorized_equal_to_row_indices`
+    ///   - Check if the `group index view` is `inlined` or `non_inlined`:
+    ///     If it is inlined, add to `vectorized_equal_to_group_indices` 
directly.
+    ///     Otherwise get all group indices from `group_index_lists`, and add 
them.
+    ///
+    fn collect_vectorized_process_context(
+        &mut self,
+        batch_hashes: &[u64],
+        groups: &mut Vec<usize>,
+    ) {
+        self.vectorized_append_row_indices.clear();
+        self.vectorized_equal_to_row_indices.clear();
+        self.vectorized_equal_to_group_indices.clear();
+
+        let mut group_values_len = self.group_values[0].len();
+        for (row, &target_hash) in batch_hashes.iter().enumerate() {
+            let entry = self.map.get(target_hash, |(exist_hash, _)| {
+                // Somewhat surprisingly, this closure can be called even if 
the
+                // hash doesn't match, so check the hash first with an integer
+                // comparison first avoid the more expensive comparison with
+                // group value. https://github.com/apache/datafusion/pull/11718
+                target_hash == *exist_hash
+            });
+
+            let Some((_, group_index_view)) = entry else {
+                // 1. Bucket not found case
+                // Build `new inlined group index view`
+                let current_group_idx = group_values_len;
+                let group_index_view =
+                    GroupIndexView::new_inlined(current_group_idx as u64);
+
+                // Insert the `group index view` and its hash into `map`
+                // for hasher function, use precomputed hash value
+                self.map.insert_accounted(
+                    (target_hash, group_index_view),
+                    |(hash, _)| *hash,
+                    &mut self.map_size,
+                );
+
+                // Add row index to `vectorized_append_row_indices`
+                self.vectorized_append_row_indices.push(row);
+
+                // Set group index to row in `groups`
+                groups[row] = current_group_idx;
+
+                group_values_len += 1;
+                continue;
+            };
+
+            // 2. bucket found
+            // Check if the `group index view` is `inlined` or `non_inlined`
+            if group_index_view.is_non_inlined() {
+                // Non-inlined case, the value of view is offset in 
`group_index_lists`.
+                // We use it to get `group_index_list`, and add related `rows` 
and `group_indices`
+                // into `vectorized_equal_to_row_indices` and 
`vectorized_equal_to_group_indices`.
+                let list_offset = group_index_view.value() as usize;
+                let group_index_list = &self.group_index_lists[list_offset];
+                for &group_index in group_index_list {

Review Comment:
   I guess maybe we need to perform `vectorized_append` before 
`vectorized_equal_to`.
   
   The process is like what in `duckdb` 
https://github.com/duckdb/duckdb/blob/4ba2e66277a7576f58318c1aac112faa67c47b11/src/execution/aggregate_hashtable.cpp#L379-L394
   
   When a `new group` directly found in `hashtable` (`get` return None), the 
`future group index still not point to a group` will be inserted into 
`hashtable`.
   
   And if next some `rows with the same hash value` in the same `input arrays` 
exist, such `future group index` will be picked into 
`vectorized_equal_to_group_indices`.
   
   So will need to perform `vectorized_append` firstly to fill such group, and 
after that the `future group index` can really point to a actual `group` in 
`group values`.
   
   And the `vectorized_equal_to` can be performed successfully rather than 
panic when encounter a `group index` pointing to no group.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to