Rachelint commented on code in PR #12809:
URL: https://github.com/apache/datafusion/pull/12809#discussion_r1798325968


##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(

Review Comment:
   Nice inspect, I am checking it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to