alamb commented on code in PR #7016:
URL: https://github.com/apache/arrow-datafusion/pull/7016#discussion_r1267897191


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -60,6 +60,151 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
+/// An interning store for group keys

Review Comment:
   ```suggestion
   /// Stores group key values and their mapping to group_index
   ///
   /// This is a trait to allow special casing certain kinds of keys 
   /// like single column primitive arrays
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -60,6 +60,151 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
+/// An interning store for group keys
+trait GroupValues: Send {
+    /// Calculates the `groups` for each input row of `cols`
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()>;
+
+    /// Returns the number of bytes used by this [`GroupValues`]
+    fn size(&self) -> usize;
+
+    /// Returns true if this [`GroupValues`] is empty
+    fn is_empty(&self) -> bool;
+
+    /// The number of values stored in this [`GroupValues`]
+    fn len(&self) -> usize;
+
+    /// Flushes the unique [`GroupValues`]
+    fn flush(&mut self) -> Result<Vec<ArrayRef>>;
+}
+
+/// A [`GroupValues`] making use of [`Rows`]

Review Comment:
   ```suggestion
   /// A [`GroupValues`] which stores group values using the arrow_row format 
[`Rows`]
   ```



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -60,6 +60,151 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
+/// An interning store for group keys
+trait GroupValues: Send {

Review Comment:
   Eventually I would love to see this in its own module 
`aggregates/group_values.rs` but that can be a follow on PR for sure



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -60,6 +60,151 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
+/// An interning store for group keys
+trait GroupValues: Send {
+    /// Calculates the `groups` for each input row of `cols`
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()>;
+
+    /// Returns the number of bytes used by this [`GroupValues`]
+    fn size(&self) -> usize;
+
+    /// Returns true if this [`GroupValues`] is empty
+    fn is_empty(&self) -> bool;
+
+    /// The number of values stored in this [`GroupValues`]
+    fn len(&self) -> usize;
+
+    /// Flushes the unique [`GroupValues`]
+    fn flush(&mut self) -> Result<Vec<ArrayRef>>;
+}
+
+/// A [`GroupValues`] making use of [`Rows`]
+struct GroupValuesRows {
+    /// Converter for the group values
+    row_converter: RowConverter,
+
+    /// Logically maps group values to a group_index in
+    /// [`Self::group_values`] and in each accumulator
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys (group values) in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, group_index)
+    map: RawTable<(u64, usize)>,

Review Comment:
   I am surprised to see the `map` in this structure -- I was expecting only 
the group values.
   
   Is your idea to store the group keys inside the map, as shown to be so 
effective by @yahoNanJing in 
https://github.com/apache/arrow-rs/pull/4524#issuecomment-1638970315 ?



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -75,29 +220,29 @@ use super::AggregateExec;
 ///
 /// ```text
 ///
-/// stores "group       stores group values,       internally stores aggregate
-///    indexes"          in arrow_row format         values, for all groups
+///     Assigns a consecutive group           internally stores aggregate 
values
+///     index for each unique set                     for all groups
+///         of group values
 ///
-/// ┌─────────────┐      ┌────────────┐    ┌──────────────┐       
┌──────────────┐
-/// │   ┌─────┐   │      │ ┌────────┐ │    │┌────────────┐│       
│┌────────────┐│
-/// │   │  5  │   │ ┌────┼▶│  "A"   │ │    ││accumulator ││       
││accumulator ││
-/// │   ├─────┤   │ │    │ ├────────┤ │    ││     0      ││       ││     N     
 ││
-/// │   │  9  │   │ │    │ │  "Z"   │ │    ││ ┌────────┐ ││       ││ 
┌────────┐ ││
-/// │   └─────┘   │ │    │ └────────┘ │    ││ │ state  │ ││       ││ │ state  
│ ││
-/// │     ...     │ │    │            │    ││ │┌─────┐ │ ││  ...  ││ │┌─────┐ 
│ ││
-/// │   ┌─────┐   │ │    │    ...     │    ││ │├─────┤ │ ││       ││ │├─────┤ 
│ ││
-/// │   │  1  │───┼─┘    │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
-/// │   ├─────┤   │      │            │    ││ │        │ ││       ││ │        
│ ││
-/// │   │ 13  │───┼─┐    │ ┌────────┐ │    ││ │  ...   │ ││       ││ │  ...   
│ ││
-/// │   └─────┘   │ └────┼▶│  "Q"   │ │    ││ │        │ ││       ││ │        
│ ││
-/// └─────────────┘      │ └────────┘ │    ││ │┌─────┐ │ ││       ││ │┌─────┐ 
│ ││
-///                      │            │    ││ │└─────┘ │ ││       ││ │└─────┘ 
│ ││
-///                      └────────────┘    ││ └────────┘ ││       ││ 
└────────┘ ││
-///                                        │└────────────┘│       
│└────────────┘│
-///                                        └──────────────┘       
└──────────────┘
+///         ┌────────────┐              ┌──────────────┐       ┌──────────────┐

Review Comment:
   If we choose to go with this approach, I can make some pictures for 
RowGroupValues as well



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -60,6 +60,151 @@ pub(crate) enum ExecutionState {
 
 use super::AggregateExec;
 
+/// An interning store for group keys
+trait GroupValues: Send {
+    /// Calculates the `groups` for each input row of `cols`
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()>;
+
+    /// Returns the number of bytes used by this [`GroupValues`]
+    fn size(&self) -> usize;
+
+    /// Returns true if this [`GroupValues`] is empty
+    fn is_empty(&self) -> bool;
+
+    /// The number of values stored in this [`GroupValues`]
+    fn len(&self) -> usize;
+
+    /// Flushes the unique [`GroupValues`]
+    fn flush(&mut self) -> Result<Vec<ArrayRef>>;
+}
+
+/// A [`GroupValues`] making use of [`Rows`]
+struct GroupValuesRows {
+    /// Converter for the group values
+    row_converter: RowConverter,
+
+    /// Logically maps group values to a group_index in
+    /// [`Self::group_values`] and in each accumulator
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys (group values) in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, group_index)
+    map: RawTable<(u64, usize)>,
+
+    /// The size of `map` in bytes
+    map_size: usize,
+
+    /// The actual group by values, stored in arrow [`Row`] format.
+    /// `group_values[i]` holds the group value for group_index `i`.
+    ///
+    /// The row format is used to compare group keys quickly and store
+    /// them efficiently in memory. Quick comparison is especially
+    /// important for multi-column group keys.
+    ///
+    /// [`Row`]: arrow::row::Row
+    group_values: Rows,
+
+    // buffer to be reused to store hashes
+    hashes_buffer: Vec<u64>,
+
+    /// Random state for creating hashes
+    random_state: RandomState,
+}
+
+impl GroupValuesRows {
+    fn try_new(schema: SchemaRef) -> Result<Self> {
+        let row_converter = RowConverter::new(
+            schema
+                .fields()
+                .iter()
+                .map(|f| SortField::new(f.data_type().clone()))
+                .collect(),
+        )?;
+
+        let map = RawTable::with_capacity(0);
+        let group_values = row_converter.empty_rows(0, 0);
+
+        Ok(Self {
+            row_converter,
+            map,
+            map_size: 0,
+            group_values,
+            hashes_buffer: Default::default(),
+            random_state: Default::default(),
+        })
+    }
+}
+
+impl GroupValues for GroupValuesRows {
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()> {
+        // Convert the group keys into the row format
+        // Avoid reallocation when 
https://github.com/apache/arrow-rs/issues/4479 is available
+        let group_rows = self.row_converter.convert_columns(cols)?;
+        let n_rows = group_rows.num_rows();
+
+        // tracks to which group each of the input rows belongs
+        groups.clear();
+
+        // 1.1 Calculate the group keys for the group values
+        let batch_hashes = &mut self.hashes_buffer;
+        batch_hashes.clear();
+        batch_hashes.resize(n_rows, 0);
+        create_hashes(cols, &self.random_state, batch_hashes)?;
+
+        for (row, &hash) in batch_hashes.iter().enumerate() {

Review Comment:
   This change will likely conflict with 
https://github.com/apache/arrow-datafusion/pull/6932#pullrequestreview-1536741909
 but I think it should be straightforward to update



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -385,102 +492,25 @@ impl RecordBatchStream for GroupedHashAggregateStream {
 }
 
 impl GroupedHashAggregateStream {
-    /// Calculates the group indices for each input row of
-    /// `group_values`.
-    ///
-    /// At the return of this function,
-    /// `self.scratch_space.current_group_indices` has the same number
-    /// of entries as each array in `group_values` and holds the
-    /// correct group_index for that row.
-    ///
-    /// This is one of the core hot loops in the algorithm
-    fn update_group_state(
-        &mut self,
-        group_values: &[ArrayRef],
-        allocated: &mut usize,
-    ) -> Result<()> {
-        // Convert the group keys into the row format
-        // Avoid reallocation when 
https://github.com/apache/arrow-rs/issues/4479 is available
-        let group_rows = self.row_converter.convert_columns(group_values)?;
-        let n_rows = group_rows.num_rows();
-
-        // track memory used
-        let group_values_size_pre = self.group_values.size();
-        let scratch_size_pre = self.scratch_space.size();
-
-        // tracks to which group each of the input rows belongs
-        let group_indices = &mut self.scratch_space.current_group_indices;
-        group_indices.clear();
-
-        // 1.1 Calculate the group keys for the group values
-        let batch_hashes = &mut self.scratch_space.hashes_buffer;
-        batch_hashes.clear();
-        batch_hashes.resize(n_rows, 0);
-        create_hashes(group_values, &self.random_state, batch_hashes)?;
-
-        for (row, &hash) in batch_hashes.iter().enumerate() {
-            let entry = self.map.get_mut(hash, |(_hash, group_idx)| {
-                // verify that a group that we are inserting with hash is
-                // actually the same key value as the group in
-                // existing_idx  (aka group_values @ row)
-                group_rows.row(row) == self.group_values.row(*group_idx)
-            });
-
-            let group_idx = match entry {
-                // Existing group_index for this group value
-                Some((_hash, group_idx)) => *group_idx,
-                //  1.2 Need to create new entry for the group
-                None => {
-                    // Add new entry to aggr_state and save newly created index
-                    let group_idx = self.group_values.num_rows();
-                    self.group_values.push(group_rows.row(row));
-
-                    // for hasher function, use precomputed hash value
-                    self.map.insert_accounted(
-                        (hash, group_idx),
-                        |(hash, _group_index)| *hash,
-                        allocated,
-                    );
-                    group_idx
-                }
-            };
-            group_indices.push(group_idx);
-        }
-
-        // account for memory growth in scratch space
-        *allocated += self.scratch_space.size();

Review Comment:
   Related response: 
https://github.com/apache/arrow-datafusion/pull/6932#discussion_r1267842060



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to