alamb commented on code in PR #12809:
URL: https://github.com/apache/datafusion/pull/12809#discussion_r1798291818


##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -579,4 +986,208 @@ mod tests {
         assert!(!builder.equal_to(4, &input_array, 4));
         assert!(builder.equal_to(5, &input_array, 5));
     }
+
+    #[test]
+    fn test_byte_view_append_val() {
+        let mut builder =
+            
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+        let builder_array = StringViewArray::from(vec![
+            Some("this string is quite long"), // in buffer 0
+            Some("foo"),
+            None,
+            Some("bar"),
+            Some("this string is also quite long"), // buffer 0
+            Some("this string is quite long"),      // buffer 1
+            Some("bar"),
+        ]);
+        let builder_array: ArrayRef = Arc::new(builder_array);
+        for row in 0..builder_array.len() {
+            builder.append_val(&builder_array, row);
+        }
+
+        let output = Box::new(builder).build();
+        // should be 2 output buffers to hold all the data
+        assert_eq!(output.as_string_view().data_buffers().len(), 2,);
+        assert_eq!(&output, &builder_array)
+    }
+
+    #[test]
+    fn test_byte_view_equal_to() {
+        // Will cover such cases:
+        //   - exist null, input not null
+        //   - exist null, input null; values not equal
+        //   - exist null, input null; values equal
+        //   - exist not null, input null
+        //   - exist not null, input not null; values not equal
+        //   - exist not null, input not null; values equal
+
+        let mut builder = ByteViewGroupValueBuilder::<StringViewType>::new();
+        let builder_array = Arc::new(StringViewArray::from(vec![
+            None,
+            None,
+            None,
+            Some("foo"),
+            Some("bar"),
+            Some("this string is quite long"),
+            Some("baz"),
+        ])) as ArrayRef;
+        builder.append_val(&builder_array, 0);
+        builder.append_val(&builder_array, 1);
+        builder.append_val(&builder_array, 2);
+        builder.append_val(&builder_array, 3);
+        builder.append_val(&builder_array, 4);
+        builder.append_val(&builder_array, 5);
+        builder.append_val(&builder_array, 6);
+
+        // Define input array
+        let (views, buffer, _nulls) = StringViewArray::from(vec![
+            Some("foo"),
+            Some("bar"),                       // set to null
+            Some("this string is quite long"), // set to null
+            None,
+            None,
+            Some("foo"),
+            Some("baz"),
+        ])
+        .into_parts();
+
+        // explicitly build a boolean buffer where one of the null values also 
happens to match
+        let mut boolean_buffer_builder = BooleanBufferBuilder::new(6);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(false); // this sets Some("bar") to null 
above
+        boolean_buffer_builder.append(false); // this sets 
Some("thisstringisquitelong") to null above
+        boolean_buffer_builder.append(false);
+        boolean_buffer_builder.append(false);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        let nulls = NullBuffer::new(boolean_buffer_builder.finish());
+        let input_array =
+            Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as 
ArrayRef;
+
+        // Check
+        assert!(!builder.equal_to(0, &input_array, 0));
+        assert!(builder.equal_to(1, &input_array, 1));
+        assert!(builder.equal_to(2, &input_array, 2));
+        assert!(!builder.equal_to(3, &input_array, 3));
+        assert!(!builder.equal_to(4, &input_array, 4));
+        assert!(!builder.equal_to(5, &input_array, 5));
+        assert!(builder.equal_to(6, &input_array, 6));
+    }
+
+    #[test]
+    fn test_byte_view_take_n() {
+        // ####### Define cases and init #######
+
+        // `take_n` is really complex, we should consider and test following 
situations:
+        //   1. Take nulls
+        //   2. Take all `inlined`s
+        //   3. Take non-inlined + partial last buffer in `completed`
+        //   4. Take non-inlined + whole last buffer in `completed`
+        //   5. Take non-inlined + partial last `in_progress`
+        //   6. Take non-inlined + while last buffer in ``in_progress`
+        //   7. Take all views at once
+
+        let mut builder =

Review Comment:
   😍 



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
+            return cur_builder.build_inner();
+        }
+
+        // The `n < len` case
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index 
n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take buffers, the key is that we need to know if we need to take
+        //     the whole last related buffer. The logic is a bit complex, you 
can
+        //     detail in `take_buffers_with_whole_last`, 
`take_buffers_with_partial_last`
+        //     and other related steps in following
+        //
+        //   - Shift the `buffer index` of remaining non-inlined `views`
+        //
+        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+        let last_non_inlined_view = first_n_views
+            .iter()
+            .rev()
+            .find(|view| ((**view) as u32) > 12);
+
+        if let Some(view) = last_non_inlined_view {

Review Comment:
   Stylistically, you could reduce the indenting in this function by using a 
`let else`, like
   
   ```rust
           let Some(view) = last_non_inlined_view else {
               let views = ScalarBuffer::from(first_n_views);
               return Arc::new(GenericByteViewArray::<B>::new(
                   views,
                   Vec::new(),
                   null_buffer,
               ))
           }
   ```



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
+            return cur_builder.build_inner();
+        }
+
+        // The `n < len` case
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index 
n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take buffers, the key is that we need to know if we need to take

Review Comment:
   Thank you for these comments. Very nice 💯 



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
+            return cur_builder.build_inner();
+        }
+
+        // The `n < len` case
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index 
n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take buffers, the key is that we need to know if we need to take
+        //     the whole last related buffer. The logic is a bit complex, you 
can
+        //     detail in `take_buffers_with_whole_last`, 
`take_buffers_with_partial_last`
+        //     and other related steps in following
+        //
+        //   - Shift the `buffer index` of remaining non-inlined `views`
+        //
+        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+        let last_non_inlined_view = first_n_views
+            .iter()
+            .rev()
+            .find(|view| ((**view) as u32) > 12);
+
+        if let Some(view) = last_non_inlined_view {
+            let view = ByteView::from(*view);
+            let last_related_buffer_index = view.buffer_index as usize;
+
+            // Check should we take the whole `last_related_buffer_index` 
buffer
+            let take_whole_last_buffer = self.should_take_whole_buffer(
+                last_related_buffer_index,
+                (view.offset + view.length) as usize,
+            );
+
+            // Take related buffers
+            let buffers = if take_whole_last_buffer {
+                self.take_buffers_with_whole_last(last_related_buffer_index)
+            } else {
+                self.take_buffers_with_partial_last(
+                    last_related_buffer_index,
+                    (view.offset + view.length) as usize,
+                )
+            };
+
+            // Shift `buffer index`s finally
+            let shifts = if take_whole_last_buffer {
+                last_related_buffer_index + 1
+            } else {
+                last_related_buffer_index
+            };
+
+            self.views.iter_mut().for_each(|view| {
+                if (*view as u32) > 12 {
+                    let mut byte_view = ByteView::from(*view);
+                    byte_view.buffer_index -= shifts as u32;
+                    *view = byte_view.as_u128();
+                }
+            });
+
+            // Build array and return
+            let views = ScalarBuffer::from(first_n_views);
+            Arc::new(GenericByteViewArray::<B>::new(views, buffers, 
null_buffer))
+        } else {
+            let views = ScalarBuffer::from(first_n_views);
+            Arc::new(GenericByteViewArray::<B>::new(
+                views,
+                Vec::new(),
+                null_buffer,
+            ))
+        }
+    }
+
+    fn take_buffers_with_whole_last(
+        &mut self,
+        last_related_buffer_index: usize,
+    ) -> Vec<Buffer> {
+        if last_related_buffer_index == self.completed.len() {
+            self.flush_in_progress();
+        }
+        self.completed
+            .drain(0..last_related_buffer_index + 1)
+            .collect()
+    }
+
+    fn take_buffers_with_partial_last(
+        &mut self,
+        last_related_buffer_index: usize,
+        take_len: usize,

Review Comment:
   maybe we could call this `last_take_len` or something to note it is the 
number of bytes  being taken from the last buffer



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
+            return cur_builder.build_inner();
+        }
+
+        // The `n < len` case
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index 
n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take buffers, the key is that we need to know if we need to take
+        //     the whole last related buffer. The logic is a bit complex, you 
can
+        //     detail in `take_buffers_with_whole_last`, 
`take_buffers_with_partial_last`
+        //     and other related steps in following
+        //
+        //   - Shift the `buffer index` of remaining non-inlined `views`
+        //
+        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+        let last_non_inlined_view = first_n_views
+            .iter()
+            .rev()
+            .find(|view| ((**view) as u32) > 12);
+
+        if let Some(view) = last_non_inlined_view {
+            let view = ByteView::from(*view);
+            let last_related_buffer_index = view.buffer_index as usize;

Review Comment:
   I think a name like `last_remaining_buffer_index` might be clearer about 
what this quantity represents



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
+            return cur_builder.build_inner();
+        }
+
+        // The `n < len` case
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index 
n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take buffers, the key is that we need to know if we need to take
+        //     the whole last related buffer. The logic is a bit complex, you 
can
+        //     detail in `take_buffers_with_whole_last`, 
`take_buffers_with_partial_last`
+        //     and other related steps in following
+        //
+        //   - Shift the `buffer index` of remaining non-inlined `views`
+        //
+        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+        let last_non_inlined_view = first_n_views
+            .iter()
+            .rev()
+            .find(|view| ((**view) as u32) > 12);
+
+        if let Some(view) = last_non_inlined_view {
+            let view = ByteView::from(*view);
+            let last_related_buffer_index = view.buffer_index as usize;
+
+            // Check should we take the whole `last_related_buffer_index` 
buffer
+            let take_whole_last_buffer = self.should_take_whole_buffer(
+                last_related_buffer_index,
+                (view.offset + view.length) as usize,
+            );
+
+            // Take related buffers
+            let buffers = if take_whole_last_buffer {
+                self.take_buffers_with_whole_last(last_related_buffer_index)
+            } else {
+                self.take_buffers_with_partial_last(
+                    last_related_buffer_index,
+                    (view.offset + view.length) as usize,
+                )
+            };
+
+            // Shift `buffer index`s finally
+            let shifts = if take_whole_last_buffer {
+                last_related_buffer_index + 1
+            } else {
+                last_related_buffer_index
+            };
+
+            self.views.iter_mut().for_each(|view| {
+                if (*view as u32) > 12 {
+                    let mut byte_view = ByteView::from(*view);
+                    byte_view.buffer_index -= shifts as u32;
+                    *view = byte_view.as_u128();
+                }
+            });
+
+            // Build array and return
+            let views = ScalarBuffer::from(first_n_views);
+            Arc::new(GenericByteViewArray::<B>::new(views, buffers, 
null_buffer))

Review Comment:
   as above, I think we should use `new_unchecked` here as all the data is 
valid by construction (maybe we could keep the check in debug builds)



##########
datafusion/physical-plan/src/aggregates/group_values/group_column.rs:
##########
@@ -376,6 +385,399 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        Arc::new(GenericByteViewArray::<B>::new(
+            views,
+            completed,
+            null_buffer,
+        ))
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);

Review Comment:
   💯 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to