alamb commented on code in PR #12996:
URL: https://github.com/apache/datafusion/pull/12996#discussion_r1829848355


##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -75,55 +148,653 @@ pub struct GroupValuesColumn {
     random_state: RandomState,
 }
 
-impl GroupValuesColumn {
+/// Buffers to store intermediate results in `vectorized_append`
+/// and `vectorized_equal_to`, for reducing memory allocation
+#[derive(Default)]
+struct VectorizedOperationBuffers {
+    /// The `vectorized append` row indices buffer
+    append_row_indices: Vec<usize>,
+
+    /// The `vectorized_equal_to` row indices buffer
+    equal_to_row_indices: Vec<usize>,
+
+    /// The `vectorized_equal_to` group indices buffer
+    equal_to_group_indices: Vec<usize>,
+
+    /// The `vectorized_equal_to` result buffer
+    equal_to_results: Vec<bool>,
+
+    /// The buffer for storing row indices found not equal to
+    /// exist groups in `group_values` in `vectorized_equal_to`.
+    /// We will perform `scalarized_intern` for such rows.
+    remaining_row_indices: Vec<usize>,
+}
+
+impl VectorizedOperationBuffers {
+    fn clear(&mut self) {
+        self.append_row_indices.clear();
+        self.equal_to_row_indices.clear();
+        self.equal_to_group_indices.clear();
+        self.equal_to_results.clear();
+        self.remaining_row_indices.clear();
+    }
+}
+
+impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
+    // ========================================================================
+    // Initialization functions
+    // ========================================================================
+
     /// Create a new instance of GroupValuesColumn if supported for the 
specified schema
     pub fn try_new(schema: SchemaRef) -> Result<Self> {
         let map = RawTable::with_capacity(0);

Review Comment:
   This `with_capacity` can probably be improved (as a follow on PR) to avoid 
some smaller allocations



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -56,14 +59,40 @@ pub trait GroupColumn: Send + Sync {
     ///
     /// Note that this comparison returns true if both elements are NULL
     fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> 
bool;
+
     /// Appends the row at `row` in `array` to this builder
     fn append_val(&mut self, array: &ArrayRef, row: usize);
+
+    /// The vectorized version equal to
+    ///
+    /// When found nth row stored in this builder at `lhs_row`
+    /// is equal to the row in `array` at `rhs_row`,
+    /// it will record the `true` result at the corresponding
+    /// position in `equal_to_results`.
+    ///
+    /// And if found nth result in `equal_to_results` is already

Review Comment:
   this is quite clever to pass in the existing "is equal to results"
   
   



##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -75,55 +148,653 @@ pub struct GroupValuesColumn {
     random_state: RandomState,
 }
 
-impl GroupValuesColumn {
+/// Buffers to store intermediate results in `vectorized_append`
+/// and `vectorized_equal_to`, for reducing memory allocation
+#[derive(Default)]
+struct VectorizedOperationBuffers {
+    /// The `vectorized append` row indices buffer
+    append_row_indices: Vec<usize>,
+
+    /// The `vectorized_equal_to` row indices buffer
+    equal_to_row_indices: Vec<usize>,
+
+    /// The `vectorized_equal_to` group indices buffer
+    equal_to_group_indices: Vec<usize>,
+
+    /// The `vectorized_equal_to` result buffer
+    equal_to_results: Vec<bool>,
+
+    /// The buffer for storing row indices found not equal to
+    /// exist groups in `group_values` in `vectorized_equal_to`.
+    /// We will perform `scalarized_intern` for such rows.
+    remaining_row_indices: Vec<usize>,
+}
+
+impl VectorizedOperationBuffers {
+    fn clear(&mut self) {
+        self.append_row_indices.clear();
+        self.equal_to_row_indices.clear();
+        self.equal_to_group_indices.clear();
+        self.equal_to_results.clear();
+        self.remaining_row_indices.clear();
+    }
+}
+
+impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
+    // ========================================================================
+    // Initialization functions
+    // ========================================================================
+
     /// Create a new instance of GroupValuesColumn if supported for the 
specified schema
     pub fn try_new(schema: SchemaRef) -> Result<Self> {
         let map = RawTable::with_capacity(0);
         Ok(Self {
             schema,
             map,
+            group_index_lists: Vec::new(),
+            emit_group_index_list_buffer: Vec::new(),
+            vectorized_operation_buffers: 
VectorizedOperationBuffers::default(),
             map_size: 0,
             group_values: vec![],
             hashes_buffer: Default::default(),
             random_state: Default::default(),
         })
     }
 
-    /// Returns true if [`GroupValuesColumn`] supported for the specified 
schema
-    pub fn supported_schema(schema: &Schema) -> bool {
-        schema
-            .fields()
+    // ========================================================================
+    // Scalarized intern
+    // ========================================================================
+
+    /// Scalarized intern
+    ///
+    /// This is used only for `streaming aggregation`, because `streaming 
aggregation`
+    /// depends on the order between `input rows` and their corresponding 
`group indices`.
+    ///
+    /// For example, assuming `input rows` in `cols` with 4 new rows
+    /// (not equal to `exist rows` in `group_values`, and need to create
+    /// new groups for them):
+    ///
+    /// ```text
+    ///   row1 (hash collision with the exist rows)
+    ///   row2
+    ///   row3 (hash collision with the exist rows)
+    ///   row4
+    /// ```
+    ///
+    /// # In `scalarized_intern`, their `group indices` will be
+    ///
+    /// ```text
+    ///   row1 --> 0
+    ///   row2 --> 1
+    ///   row3 --> 2
+    ///   row4 --> 3
+    /// ```
+    ///
+    /// `Group indices` order agrees with their input order, and the 
`streaming aggregation`
+    /// depends on this.
+    ///
+    /// # However In `vectorized_intern`, their `group indices` will be
+    ///
+    /// ```text
+    ///   row1 --> 2
+    ///   row2 --> 0
+    ///   row3 --> 3
+    ///   row4 --> 1
+    /// ```
+    ///
+    /// `Group indices` order are against with their input order, and this 
will lead to error
+    /// in `streaming aggregation`.
+    ///
+    fn scalarized_intern(

Review Comment:
   this is basically the same as `GroupValuesColumn::intern` was previously, 
which makes sense to me



##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -75,55 +148,653 @@ pub struct GroupValuesColumn {
     random_state: RandomState,
 }
 
-impl GroupValuesColumn {
+/// Buffers to store intermediate results in `vectorized_append`

Review Comment:
   👍 



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -56,14 +59,40 @@ pub trait GroupColumn: Send + Sync {
     ///
     /// Note that this comparison returns true if both elements are NULL
     fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> 
bool;
+
     /// Appends the row at `row` in `array` to this builder
     fn append_val(&mut self, array: &ArrayRef, row: usize);

Review Comment:
   Maybe as a follow on we can consider removing `append_val` and `equal_to` 
and simpl change all codepaths to use the vectorized version



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -287,6 +469,63 @@ where
         };
     }
 
+    fn vectorized_equal_to(

Review Comment:
   What i have been dreaming about with @XiangpengHao  is maybe something like 
adding `take` / `filter` to arrow array builders
   
   I took this opportunity to write up the idea (finally) for your amusement: 
   - https://github.com/apache/arrow-rs/issues/6692
   



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -128,6 +157,89 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> 
GroupColumn
         }
     }
 
+    fn vectorized_equal_to(
+        &self,
+        lhs_rows: &[usize],
+        array: &ArrayRef,
+        rhs_rows: &[usize],
+        equal_to_results: &mut [bool],
+    ) {
+        let array = array.as_primitive::<T>();
+
+        let iter = izip!(
+            lhs_rows.iter(),
+            rhs_rows.iter(),
+            equal_to_results.iter_mut(),
+        );
+
+        for (&lhs_row, &rhs_row, equal_to_result) in iter {
+            // Has found not equal to in previous column, don't need to check
+            if !*equal_to_result {
+                continue;
+            }
+
+            // Perf: skip null check (by short circuit) if input is not 
nullable
+            if NULLABLE {
+                let exist_null = self.nulls.is_null(lhs_row);
+                let input_null = array.is_null(rhs_row);
+                if let Some(result) = nulls_equal_to(exist_null, input_null) {
+                    *equal_to_result = result;
+                    continue;
+                }
+                // Otherwise, we need to check their values
+            }
+
+            *equal_to_result = self.group_values[lhs_row] == 
array.value(rhs_row);
+        }
+    }
+
+    fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) {
+        let arr = array.as_primitive::<T>();
+
+        let null_count = array.null_count();
+        let num_rows = array.len();
+        let all_null_or_non_null = if null_count == 0 {
+            Some(true)
+        } else if null_count == num_rows {
+            Some(false)
+        } else {
+            None
+        };
+
+        match (NULLABLE, all_null_or_non_null) {
+            (true, None) => {
+                for &row in rows {
+                    if array.is_null(row) {
+                        self.nulls.append(true);
+                        self.group_values.push(T::default_value());
+                    } else {
+                        self.nulls.append(false);
+                        self.group_values.push(arr.value(row));
+                    }
+                }
+            }
+
+            (true, Some(true)) => {
+                self.nulls.append_n(rows.len(), false);
+                for &row in rows {
+                    self.group_values.push(arr.value(row));
+                }
+            }
+
+            (true, Some(false)) => {
+                self.nulls.append_n(rows.len(), true);
+                self.group_values
+                    .extend(iter::repeat(T::default_value()).take(rows.len()));
+            }
+
+            (false, _) => {
+                for &row in rows {
+                    self.group_values.push(arr.value(row));

Review Comment:
   
![so-beautiful](https://github.com/user-attachments/assets/fd1e8844-8aa0-4965-846a-4323d67ab1e7)
   
   that uf possible the inner loop just looks like this (memcopy!)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to