Rachelint commented on code in PR #12996:
URL: https://github.com/apache/datafusion/pull/12996#discussion_r1823813010


##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -196,6 +570,324 @@ impl GroupValues for GroupValuesColumn {
                         let b = 
ByteViewGroupValueBuilder::<BinaryViewType>::new();
                         v.push(Box::new(b) as _)
                     }
+                    dt => {
+                        return not_impl_err!(
+                            "{dt} not supported in VectorizedGroupValuesColumn"
+                        )
+                    }
+                }
+            }
+            self.group_values = v;
+        }
+
+        // tracks to which group each of the input rows belongs
+        groups.clear();
+        groups.resize(n_rows, usize::MAX);
+
+        let mut batch_hashes = mem::take(&mut self.hashes_buffer);
+        batch_hashes.clear();
+        batch_hashes.resize(n_rows, 0);
+        create_hashes(cols, &self.random_state, &mut batch_hashes)?;
+
+        // General steps for one round `vectorized equal_to & append`:
+        //   1. Collect vectorized context by checking hash values of `cols` 
in `map`,
+        //      mainly fill `vectorized_append_row_indices`, 
`vectorized_equal_to_row_indices`
+        //      and `vectorized_equal_to_group_indices`
+        //
+        //   2. Perform `vectorized_append` for 
`vectorized_append_row_indices`.
+        //     `vectorized_append` must be performed before 
`vectorized_equal_to`,
+        //      because some `group indices` in 
`vectorized_equal_to_group_indices`
+        //      may be actually placeholders, and still point to no actual 
values in
+        //      `group_values` before performing append.
+        //
+        //   3. Perform `vectorized_equal_to` for 
`vectorized_equal_to_row_indices`
+        //      and `vectorized_equal_to_group_indices`. If found some rows in 
input `cols`
+        //      not equal to `exist rows` in `group_values`, place them in 
`scalarized_indices`
+        //      and perform `scalarized_intern` for them similar as what in 
[`GroupValuesColumn`]
+        //      after.
+        //
+        //   4. Perform `scalarized_intern` for rows mentioned above, when we 
process like this
+        //      can see the comments of `scalarized_intern`.
+        //
+
+        // 1. Collect vectorized context by checking hash values of `cols` in 
`map`
+        self.collect_vectorized_process_context(&batch_hashes, groups);
+
+        // 2. Perform `vectorized_append`
+        self.vectorized_append(cols);
+
+        // 3. Perform `vectorized_equal_to`
+        self.vectorized_equal_to(cols, groups);
+
+        // 4. Perform `scalarized_intern`
+        self.scalarized_intern(cols, &batch_hashes, groups);
+
+        self.hashes_buffer = batch_hashes;
+
+        Ok(())
+    }
+
+    fn size(&self) -> usize {
+        let group_values_size: usize = self.group_values.iter().map(|v| 
v.size()).sum();
+        group_values_size + self.map_size + self.hashes_buffer.allocated_size()
+    }
+
+    fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    fn len(&self) -> usize {
+        if self.group_values.is_empty() {
+            return 0;
+        }
+
+        self.group_values[0].len()
+    }
+
+    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        let mut output = match emit_to {
+            EmitTo::All => {
+                let group_values = mem::take(&mut self.group_values);
+                debug_assert!(self.group_values.is_empty());
+
+                group_values
+                    .into_iter()
+                    .map(|v| v.build())
+                    .collect::<Vec<_>>()
+            }
+            EmitTo::First(n) => {
+                let output = self
+                    .group_values
+                    .iter_mut()
+                    .map(|v| v.take_n(n))
+                    .collect::<Vec<_>>();
+                let new_group_index_lists =
+                    Vec::with_capacity(self.group_index_lists.len());
+                let old_group_index_lists =
+                    mem::replace(&mut self.group_index_lists, 
new_group_index_lists);
+
+                // SAFETY: self.map outlives iterator and is not modified 
concurrently
+                unsafe {
+                    for bucket in self.map.iter() {
+                        // Check if it is `inlined` or `non-inlined`
+                        if bucket.as_ref().1.is_non_inlined() {
+                            // Non-inlined case
+                            // We take `group_index_list` from 
`old_group_index_lists`
+                            let list_offset = bucket.as_ref().1.value() as 
usize;
+                            let old_group_index_list =
+                                &old_group_index_lists[list_offset];
+
+                            let mut new_group_index_list = Vec::new();
+                            for &group_index in old_group_index_list {
+                                if let Some(remaining) = 
group_index.checked_sub(n) {
+                                    new_group_index_list.push(remaining);
+                                }
+                            }
+
+                            // The possible results:
+                            //   - `new_group_index_list` is empty, we should 
erase this bucket
+                            //   - only one value in `new_group_index_list`, 
switch the `view` to `inlined`
+                            //   - still multiple values in 
`new_group_index_list`, build and set the new `unlined view`
+                            if new_group_index_list.is_empty() {
+                                self.map.erase(bucket);
+                            } else if new_group_index_list.len() == 1 {
+                                let group_index = 
new_group_index_list.first().unwrap();
+                                bucket.as_mut().1 =
+                                    GroupIndexView::new_inlined(*group_index 
as u64);
+                            } else {
+                                let new_list_offset = 
self.group_index_lists.len();
+                                
self.group_index_lists.push(new_group_index_list);

Review Comment:
   🤔 current when taking n happen, we need to create an `new 
group_index_lists`,  and push the `group_index_list` still not empty from `old 
group_index_lists` to it.
   
   The alternative is that, we don't create `new group_index_lists`, and just 
resue the `old group_index_lists`.
   But I am afraid it will become something like following finally:
   ```text
   [],
   [1, 2],
   [],
   ...
   [],
   [],
   [3, 4],
   ```
   
   
   I think we can actually reuse the `group_index_list` taken from `old 
group_index_lists`, rather than dropping it:
   - We can define something like `taken_group_index_list_buffer`
   - And we will place the `kept group indices` into them
   - Finally we copy the `kept group indices` from 
taken_group_index_list_buffer` to the reused `group_index_list`,
     and push it to the current `grou_index_lists`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to