Rachelint commented on code in PR #12809:
URL: https://github.com/apache/datafusion/pull/12809#discussion_r1801809708


##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,425 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {

Review Comment:
   Seems we can't use it currently... Because it seems only can accept two 
`GenericByteViewArray`s as input.
   ```rust
       pub unsafe fn compare_unchecked(
           left: &GenericByteViewArray<T>,
           left_idx: usize,
           right: &GenericByteViewArray<T>,
           right_idx: usize,
       ) -> std::cmp::Ordering {
   ```
   
   But in `equal_to_inner` function, one of the input is 
`ByteViewGroupValueBuilder`, and only another is `GenericByteViewArray`. 
   
   Actually I implement `equal_to_inner` by copying and modifying codes from 
`compare_unchecked`.
   
   🤔Maybe we can make `compre_unchecked` able to accept not only 
``GenericByteViewArray`(maybe can by defining a new trait for the inputs)? 
   And reuse it rather than copying codes in future?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to