alamb commented on code in PR #12996:
URL: https://github.com/apache/datafusion/pull/12996#discussion_r1826322118


##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -146,7 +550,7 @@ macro_rules! instantiate_primitive {
     };
 }
 
-impl GroupValues for GroupValuesColumn {
+impl GroupValues for VectorizedGroupValuesColumn {

Review Comment:
   I wonder if there is any reason to rename this



##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -196,6 +600,328 @@ impl GroupValues for GroupValuesColumn {
                         let b = 
ByteViewGroupValueBuilder::<BinaryViewType>::new();
                         v.push(Box::new(b) as _)
                     }
+                    dt => {
+                        return not_impl_err!(
+                            "{dt} not supported in VectorizedGroupValuesColumn"
+                        )
+                    }
+                }
+            }
+            self.group_values = v;
+        }
+
+        // tracks to which group each of the input rows belongs
+        groups.clear();
+        groups.resize(n_rows, usize::MAX);
+
+        let mut batch_hashes = mem::take(&mut self.hashes_buffer);

Review Comment:
   👍 



##########
datafusion/physical-plan/src/aggregates/group_values/mod.rs:
##########
@@ -143,8 +148,12 @@ pub fn new_group_values(schema: SchemaRef) -> 
Result<Box<dyn GroupValues>> {
         }
     }
 
-    if GroupValuesColumn::supported_schema(schema.as_ref()) {
-        Ok(Box::new(GroupValuesColumn::try_new(schema)?))
+    if column::supported_schema(schema.as_ref()) {

Review Comment:
   Can you explain here why GroupOrdering::None is required? Is it because the 
`VectorizedGroupValuesColumn` doesn't keep the groups in order?
   
   If that is the case, it seems like maybe `emit_n` would never be called 🤔 
   
   



##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -35,29 +37,113 @@ use datafusion_common::{not_impl_err, DataFusionError, 
Result};
 use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
 use datafusion_expr::EmitTo;
 use datafusion_physical_expr::binary_map::OutputType;
+
 use hashbrown::raw::RawTable;
-use std::mem::size_of;
 
-/// A [`GroupValues`] that stores multiple columns of group values.
+const NON_INLINED_FLAG: u64 = 0x8000000000000000;
+const VALUE_MASK: u64 = 0x7FFFFFFFFFFFFFFF;
+
+/// The view of indices pointing to the actual values in `GroupValues`
 ///
+/// If only single `group index` represented by view,
+/// value of view is just the `group index`, and we call it a `inlined view`.
 ///
-pub struct GroupValuesColumn {
+/// If multiple `group indices` represented by view,
+/// value of view is the actually the index pointing to `group indices`,
+/// and we call it `non-inlined view`.
+///
+/// The view(a u64) format is like:
+///   +---------------------+---------------------------------------------+
+///   | inlined flag(1bit)  | group index / index to group indices(63bit) |
+///   +---------------------+---------------------------------------------+
+///
+/// `inlined flag`: 1 represents `non-inlined`, and 0 represents `inlined`
+///
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+struct GroupIndexView(u64);
+
+impl GroupIndexView {
+    #[inline]
+    pub fn is_non_inlined(&self) -> bool {
+        (self.0 & NON_INLINED_FLAG) > 0
+    }
+
+    #[inline]
+    pub fn new_inlined(group_index: u64) -> Self {
+        Self(group_index)
+    }
+
+    #[inline]
+    pub fn new_non_inlined(list_offset: u64) -> Self {
+        let non_inlined_value = list_offset | NON_INLINED_FLAG;
+        Self(non_inlined_value)
+    }
+
+    #[inline]
+    pub fn value(&self) -> u64 {
+        self.0 & VALUE_MASK
+    }
+}
+
+/// A [`GroupValues`] that stores multiple columns of group values,
+/// and supports vectorized operators for them
+///
+pub struct VectorizedGroupValuesColumn {
     /// The output schema
     schema: SchemaRef,
 
     /// Logically maps group values to a group_index in
     /// [`Self::group_values`] and in each accumulator
     ///
-    /// Uses the raw API of hashbrown to avoid actually storing the
-    /// keys (group values) in the table
+    /// It is a `hashtable` based on `hashbrown`.
     ///
-    /// keys: u64 hashes of the GroupValue
-    /// values: (hash, group_index)
-    map: RawTable<(u64, usize)>,
+    /// Key and value in the `hashtable`:
+    ///   - The `key` is `hash value(u64)` of the `group value`
+    ///   - The `value` is the `group values` with the same `hash value`
+    ///
+    /// We don't really store the actual `group values` in `hashtable`,
+    /// instead we store the `group indices` pointing to values in 
`GroupValues`.
+    /// And we use [`GroupIndexView`] to represent such `group indices` in 
table.
+    ///
+    ///
+    map: RawTable<(u64, GroupIndexView)>,
 
     /// The size of `map` in bytes
     map_size: usize,
 
+    /// The lists for group indices with the same hash value
+    ///
+    /// It is possible that hash value collision exists,
+    /// and we will chain the `group indices` with same hash value
+    ///
+    /// The chained indices is like:
+    ///   `latest group index -> older group index -> even older group index 
-> ...`
+    ///
+    group_index_lists: Vec<Vec<usize>>,
+
+    /// When emitting first n, we need to decrease/erase group indices in
+    /// `map` and `group_index_lists`.
+    ///
+    /// This buffer is used to temporarily store the remaining group indices in
+    /// a specific list in `group_index_lists`.
+    emit_group_index_list_buffer: Vec<usize>,
+
+    /// Similar as `current_indices`, but `remaining_indices`
+    /// is used to store the rows will be processed in next round.
+    scalarized_indices: Vec<usize>,
+
+    /// The `vectorized_equal_tod` row indices buffer

Review Comment:
   Maybe we can rename these to "buffer" or something to make it clear they are 
temp processing space to avoid re-allocations rather than 
   
   Something like 
   ```rust
       buffer_equal_to_row_indices: Vec<usize>,
   ```
   
   Or maybe we can even put all the scratch space into their own struct to make 
it clear
   
   ```rust
   struct ScratchSpace {
       vectorized_equal_to_row_indices: Vec<usize>,
   
       /// The `vectorized_equal_tod` group indices buffer
       vectorized_equal_to_group_indices: Vec<usize>,
   
       /// The `vectorized_equal_tod` result buffer
       vectorized_equal_to_results: Vec<bool>,
   
       /// The `vectorized append` row indices buffer
       vectorized_append_row_indices: Vec<usize>,
   }
   ```
   
   Or something
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to