jayzhan211 commented on code in PR #12996:
URL: https://github.com/apache/datafusion/pull/12996#discussion_r1819944790


##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -75,55 +176,317 @@ pub struct GroupValuesColumn {
     random_state: RandomState,
 }
 
-impl GroupValuesColumn {
+impl VectorizedGroupValuesColumn {
     /// Create a new instance of GroupValuesColumn if supported for the 
specified schema
     pub fn try_new(schema: SchemaRef) -> Result<Self> {
         let map = RawTable::with_capacity(0);
         Ok(Self {
             schema,
             map,
+            group_index_lists: Vec::new(),
+            index_lists_updates: Vec::new(),
             map_size: 0,
             group_values: vec![],
             hashes_buffer: Default::default(),
             random_state: Default::default(),
+            scalarized_indices: Default::default(),
+            vectorized_equal_to_row_indices: Default::default(),
+            vectorized_equal_to_group_indices: Default::default(),
+            vectorized_equal_to_results: Default::default(),
+            vectorized_append_row_indices: Default::default(),
         })
     }
 
-    /// Returns true if [`GroupValuesColumn`] supported for the specified 
schema
-    pub fn supported_schema(schema: &Schema) -> bool {
-        schema
-            .fields()
-            .iter()
-            .map(|f| f.data_type())
-            .all(Self::supported_type)
+    /// Collect vectorized context by checking hash values of `cols` in `map`
+    ///
+    /// 1. If bucket not found
+    ///   - Build and insert the `new inlined group index view`
+    ///     and its hash value to `map`
+    ///   - Add row index to `vectorized_append_row_indices`
+    ///   - Set group index to row in `groups`
+    ///
+    /// 2. bucket found
+    ///   - Add row index to `vectorized_equal_to_row_indices`
+    ///   - Check if the `group index view` is `inlined` or `non_inlined`:
+    ///     If it is inlined, add to `vectorized_equal_to_group_indices` 
directly.
+    ///     Otherwise get all group indices from `group_index_lists`, and add 
them.
+    ///
+    fn collect_vectorized_process_context(
+        &mut self,
+        batch_hashes: &[u64],
+        groups: &mut Vec<usize>,
+    ) {
+        self.vectorized_append_row_indices.clear();
+        self.vectorized_equal_to_row_indices.clear();
+        self.vectorized_equal_to_group_indices.clear();
+
+        let mut group_values_len = self.group_values[0].len();
+        for (row, &target_hash) in batch_hashes.iter().enumerate() {
+            let entry = self.map.get(target_hash, |(exist_hash, _)| {
+                // Somewhat surprisingly, this closure can be called even if 
the
+                // hash doesn't match, so check the hash first with an integer
+                // comparison first avoid the more expensive comparison with
+                // group value. https://github.com/apache/datafusion/pull/11718
+                target_hash == *exist_hash
+            });
+
+            let Some((_, group_index_view)) = entry else {
+                // 1. Bucket not found case
+                // Build `new inlined group index view`
+                let current_group_idx = group_values_len;
+                let group_index_view =
+                    GroupIndexView::new_inlined(current_group_idx as u64);
+
+                // Insert the `group index view` and its hash into `map`
+                // for hasher function, use precomputed hash value
+                self.map.insert_accounted(
+                    (target_hash, group_index_view),
+                    |(hash, _)| *hash,
+                    &mut self.map_size,
+                );
+
+                // Add row index to `vectorized_append_row_indices`
+                self.vectorized_append_row_indices.push(row);
+
+                // Set group index to row in `groups`
+                groups[row] = current_group_idx;
+
+                group_values_len += 1;
+                continue;
+            };
+
+            // 2. bucket found
+            // Check if the `group index view` is `inlined` or `non_inlined`
+            if group_index_view.is_non_inlined() {
+                // Non-inlined case, the value of view is offset in 
`group_index_lists`.
+                // We use it to get `group_index_list`, and add related `rows` 
and `group_indices`
+                // into `vectorized_equal_to_row_indices` and 
`vectorized_equal_to_group_indices`.
+                let list_offset = group_index_view.value() as usize;
+                let group_index_list = &self.group_index_lists[list_offset];
+                for &group_index in group_index_list {
+                    self.vectorized_equal_to_row_indices.push(row);
+                    self.vectorized_equal_to_group_indices.push(group_index);
+                }
+            } else {
+                let group_index = group_index_view.value() as usize;
+                self.vectorized_equal_to_row_indices.push(row);
+                self.vectorized_equal_to_group_indices.push(group_index);
+            }
+        }
+    }
+
+    /// Perform `vectorized_append`` for `rows` in 
`vectorized_append_row_indices`
+    fn vectorized_append(&mut self, cols: &[ArrayRef]) {
+        if self.vectorized_append_row_indices.is_empty() {
+            return;
+        }
+
+        let iter = self.group_values.iter_mut().zip(cols.iter());
+        for (group_column, col) in iter {
+            group_column.vectorized_append(col, 
&self.vectorized_append_row_indices);
+        }
     }
 
-    /// Returns true if the specified data type is supported by 
[`GroupValuesColumn`]
+    /// Perform `vectorized_equal_to`
+    ///
+    /// 1. Perform `vectorized_equal_to` for `rows` in 
`vectorized_equal_to_group_indices`
+    ///    and `group_indices` in `vectorized_equal_to_group_indices`.
+    ///
+    /// 2. Check `equal_to_results`:
     ///
-    /// In order to be supported, there must be a specialized implementation of
-    /// [`GroupColumn`] for the data type, instantiated in [`Self::intern`]
-    fn supported_type(data_type: &DataType) -> bool {
-        matches!(
-            *data_type,
-            DataType::Int8
-                | DataType::Int16
-                | DataType::Int32
-                | DataType::Int64
-                | DataType::UInt8
-                | DataType::UInt16
-                | DataType::UInt32
-                | DataType::UInt64
-                | DataType::Float32
-                | DataType::Float64
-                | DataType::Utf8
-                | DataType::LargeUtf8
-                | DataType::Binary
-                | DataType::LargeBinary
-                | DataType::Date32
-                | DataType::Date64
-                | DataType::Utf8View
-                | DataType::BinaryView
-        )
+    ///    If found equal to `rows`, set the `group_indices` to `rows` in 
`groups`.
+    ///
+    ///    If found not equal to `row`s, just add them to `scalarized_indices`,
+    ///    and perform `scalarized_intern` for them after.
+    ///    Usually, such `rows` having same hash but different value with 
`exists rows`
+    ///    are very few.
+    fn vectorized_equal_to(&mut self, cols: &[ArrayRef], groups: &mut 
Vec<usize>) {
+        assert_eq!(
+            self.vectorized_equal_to_group_indices.len(),
+            self.vectorized_equal_to_row_indices.len()
+        );
+
+        if self.vectorized_equal_to_group_indices.is_empty() {
+            return;
+        }
+
+        // 1. Perform `vectorized_equal_to` for `rows` in 
`vectorized_equal_to_group_indices`
+        //    and `group_indices` in `vectorized_equal_to_group_indices`
+        let mut equal_to_results = mem::take(&mut 
self.vectorized_equal_to_results);
+        equal_to_results.clear();
+        equal_to_results.resize(self.vectorized_equal_to_group_indices.len(), 
true);
+
+        for (col_idx, group_col) in self.group_values.iter().enumerate() {
+            group_col.vectorized_equal_to(
+                &self.vectorized_equal_to_group_indices,
+                &cols[col_idx],
+                &self.vectorized_equal_to_row_indices,
+                &mut equal_to_results,
+            );
+        }
+
+        // 2. Check `equal_to_results`, if found not equal to `row`s, just add 
them
+        //    to `scalarized_indices`, and perform `scalarized_intern` for 
them after.
+        let mut current_row_equal_to_result = false;
+        for (idx, &row) in 
self.vectorized_equal_to_row_indices.iter().enumerate() {
+            let equal_to_result = equal_to_results[idx];
+
+            // Equal to case, set the `group_indices` to `rows` in `groups`
+            if equal_to_result {
+                groups[row] = self.vectorized_equal_to_group_indices[idx];
+            }
+            current_row_equal_to_result |= equal_to_result;
+
+            // Look forward next one row to check if have checked all results
+            // of current row
+            let next_row = self
+                .vectorized_equal_to_row_indices
+                .get(idx + 1)
+                .unwrap_or(&usize::MAX);
+
+            // Have checked all results of current row, check the total result
+            if row != *next_row {
+                // Not equal to case, add `row` to `scalarized_indices`
+                if !current_row_equal_to_result {
+                    self.scalarized_indices.push(row);
+                }
+
+                // Init the total result for checking next row
+                current_row_equal_to_result = false;
+            }
+        }
+
+        self.vectorized_equal_to_results = equal_to_results;
+    }
+
+    fn scalarized_equal_to(
+        &self,
+        group_index_view: &GroupIndexView,
+        cols: &[ArrayRef],
+        row: usize,
+        groups: &mut Vec<usize>,
+    ) -> bool {
+        // Check if this row exists in `group_values`
+        fn check_row_equal(
+            array_row: &dyn GroupColumn,
+            lhs_row: usize,
+            array: &ArrayRef,
+            rhs_row: usize,
+        ) -> bool {
+            array_row.equal_to(lhs_row, array, rhs_row)
+        }
+
+        if group_index_view.is_non_inlined() {
+            let list_offset = group_index_view.value() as usize;
+            let group_index_list = &self.group_index_lists[list_offset];
+
+            for &group_idx in group_index_list {
+                let mut check_result = true;
+                for (i, group_val) in self.group_values.iter().enumerate() {
+                    if !check_row_equal(group_val.as_ref(), group_idx, 
&cols[i], row) {
+                        check_result = false;
+                        break;
+                    }
+                }
+
+                if check_result {
+                    groups[row] = group_idx;
+                    return true;
+                }
+            }
+
+            // All groups unmatched, return false result
+            false
+        } else {
+            let group_idx = group_index_view.value() as usize;
+            for (i, group_val) in self.group_values.iter().enumerate() {
+                if !check_row_equal(group_val.as_ref(), group_idx, &cols[i], 
row) {
+                    return false;
+                }
+            }
+
+            groups[row] = group_idx;
+            true
+        }
+    }
+
+    fn scalarized_intern(
+        &mut self,
+        cols: &[ArrayRef],
+        batch_hashes: &[u64],
+        groups: &mut Vec<usize>,
+    ) {
+        if self.scalarized_indices.is_empty() {
+            return;
+        }
+
+        let mut map = mem::take(&mut self.map);
+
+        for &row in &self.scalarized_indices {
+            let target_hash = batch_hashes[row];
+            let entry = map.get_mut(target_hash, |(exist_hash, 
group_index_view)| {
+                // Somewhat surprisingly, this closure can be called even if 
the
+                // hash doesn't match, so check the hash first with an integer
+                // comparison first avoid the more expensive comparison with
+                // group value. https://github.com/apache/datafusion/pull/11718
+                target_hash == *exist_hash
+            });
+
+            // Only `rows` having the same hash value with `exist rows` but 
different value
+            // will be process in `scalarized_intern`.
+            // So related `buckets` in `map` is ensured to be `Some`.
+            let Some((_, group_index_view)) = entry else {
+                unreachable!()
+            };
+
+            // Perform scalarized equal to
+            if self.scalarized_equal_to(&group_index_view, cols, row, groups) {
+                // Found the row actually exists in group values,
+                // don't need to create new group for it.
+                continue;
+            }
+
+            // Insert the `row` to `group_values` before checking `next row`
+            let group_idx = self.group_values[0].len();
+            let mut checklen = 0;
+            for (i, group_value) in self.group_values.iter_mut().enumerate() {
+                group_value.append_val(&cols[i], row);
+                let len = group_value.len();
+                if i == 0 {
+                    checklen = len;
+                } else {
+                    debug_assert_eq!(checklen, len);
+                }
+            }
+
+            // Check if the `view` is `inlined` or `non-inlined`
+            if group_index_view.is_non_inlined() {
+                // Non-inlined case, get `group_index_list` from 
`group_index_lists`,
+                // then add the new `group` with the same hash values into it.
+                let list_offset = group_index_view.value() as usize;
+                let group_index_list = &mut 
self.group_index_lists[list_offset];
+                group_index_list.push(group_idx);
+            } else {
+                // Inlined case
+                let list_offset = self.group_index_lists.len();
+
+                // Create new `group_index_list` including
+                // `exist group index` + `new group index`.
+                // Add new `group_index_list` into ``group_index_lists`.
+                let exist_group_index = group_index_view.value() as usize;
+                let new_group_index_list = vec![exist_group_index, group_idx];

Review Comment:
   Do we need `exist_group_index`? I think it is the one that has the same 
value and different value for other rows, so we don't need to check the value 
once again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to