This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c311cf5ef8 Implement `GroupColumn` support for `StringView` / 
`ByteView` (faster grouping performance) (#12809)
c311cf5ef8 is described below

commit c311cf5ef8205912a092fcae8748ae26201a1f9a
Author: kamille <[email protected]>
AuthorDate: Wed Oct 16 22:08:14 2024 +0800

    Implement `GroupColumn` support for `StringView` / `ByteView` (faster 
grouping performance) (#12809)
    
    * define `ByteGroupValueViewBuilder`.
    
    * impl append.
    
    * impl equal to.
    
    * fix compile.
    
    * fix comments.
    
    * impl take_n.
    
    * impl build.
    
    * impl rest functions in `GroupColumn`.
    
    * fix output when panic.
    
    * add e2e sql tests.
    
    * add unit tests.
    
    * switch to a really elegant style codes from alamb.
    
    * fix take_n.
    
    * improve comments.
    
    * fix compile.
    
    * fix clippy.
    
    * define more testcases in `test_byte_view_take_n`.
    
    * connect up.
    
    * fix doc.
    
    * Do not re-validate output is utf8
    
    * switch to unchecked when building array.
    
    * improve naming.
    
    * use let else to make the codes clearer.
    
    * fix typo.
    
    * improve unit test coverage for `ByteViewGroupValueBuilder`.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../src/aggregates/group_values/column.rs          |  18 +-
 .../src/aggregates/group_values/group_column.rs    | 681 ++++++++++++++++++++-
 datafusion/sqllogictest/test_files/group_by.slt    |  62 ++
 3 files changed, 755 insertions(+), 6 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs 
b/datafusion/physical-plan/src/aggregates/group_values/column.rs
index 28f35b2bde..4ad75844f7 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs
@@ -16,14 +16,16 @@
 // under the License.
 
 use crate::aggregates::group_values::group_column::{
-    ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
+    ByteGroupValueBuilder, ByteViewGroupValueBuilder, GroupColumn,
+    PrimitiveGroupValueBuilder,
 };
 use crate::aggregates::group_values::GroupValues;
 use ahash::RandomState;
 use arrow::compute::cast;
 use arrow::datatypes::{
-    Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type, 
Int64Type,
-    Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+    BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type, 
Int16Type,
+    Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type, 
UInt64Type,
+    UInt8Type,
 };
 use arrow::record_batch::RecordBatch;
 use arrow_array::{Array, ArrayRef};
@@ -119,6 +121,8 @@ impl GroupValuesColumn {
                 | DataType::LargeBinary
                 | DataType::Date32
                 | DataType::Date64
+                | DataType::Utf8View
+                | DataType::BinaryView
         )
     }
 }
@@ -184,6 +188,14 @@ impl GroupValues for GroupValuesColumn {
                         let b = 
ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
                         v.push(Box::new(b) as _)
                     }
+                    &DataType::Utf8View => {
+                        let b = 
ByteViewGroupValueBuilder::<StringViewType>::new();
+                        v.push(Box::new(b) as _)
+                    }
+                    &DataType::BinaryView => {
+                        let b = 
ByteViewGroupValueBuilder::<BinaryViewType>::new();
+                        v.push(Box::new(b) as _)
+                    }
                     dt => {
                         return not_impl_err!("{dt} not supported in 
GroupValuesColumn")
                     }
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs 
b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 5d00f300e9..4153495860 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::array::make_view;
 use arrow::array::BufferBuilder;
+use arrow::array::ByteView;
 use arrow::array::GenericBinaryArray;
 use arrow::array::GenericStringArray;
 use arrow::array::OffsetSizeTrait;
@@ -24,16 +26,23 @@ use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, 
AsArray};
 use arrow::buffer::OffsetBuffer;
 use arrow::buffer::ScalarBuffer;
 use arrow::datatypes::ByteArrayType;
+use arrow::datatypes::ByteViewType;
 use arrow::datatypes::DataType;
 use arrow::datatypes::GenericBinaryType;
+use arrow_array::GenericByteViewArray;
+use arrow_buffer::Buffer;
 use datafusion_common::utils::proxy::VecAllocExt;
 
 use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
 use arrow_array::types::GenericStringType;
 use datafusion_physical_expr_common::binary_map::{OutputType, 
INITIAL_BUFFER_CAPACITY};
+use std::marker::PhantomData;
+use std::mem;
 use std::sync::Arc;
 use std::vec;
 
+const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
+
 /// Trait for storing a single column of group values in [`GroupValuesColumn`]
 ///
 /// Implementations of this trait store an in-progress collection of group 
values
@@ -376,6 +385,425 @@ where
     }
 }
 
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get 
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+    /// The views of string values
+    ///
+    /// If string len <= 12, the view's format will be:
+    ///   string(12B) | len(4B)
+    ///
+    /// If string len > 12, its format will be:
+    ///     offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+    views: Vec<u128>,
+
+    /// The progressing block
+    ///
+    /// New values will be inserted into it until its capacity
+    /// is not enough(detail can see `max_block_size`).
+    in_progress: Vec<u8>,
+
+    /// The completed blocks
+    completed: Vec<Buffer>,
+
+    /// The max size of `in_progress`
+    ///
+    /// `in_progress` will be flushed into `completed`, and create new 
`in_progress`
+    /// when found its remaining capacity(`max_block_size` - 
`len(in_progress)`),
+    /// is no enough to store the appended value.
+    ///
+    /// Currently it is fixed at 2MB.
+    max_block_size: usize,
+
+    /// Nulls
+    nulls: MaybeNullBufferBuilder,
+
+    /// phantom data so the type requires `<B>`
+    _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+    pub fn new() -> Self {
+        Self {
+            views: Vec::new(),
+            in_progress: Vec::new(),
+            completed: Vec::new(),
+            max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+            nulls: MaybeNullBufferBuilder::new(),
+            _phantom: PhantomData {},
+        }
+    }
+
+    /// Set the max block size
+    fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+        self.max_block_size = max_block_size;
+        self
+    }
+
+    fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+    where
+        B: ByteViewType,
+    {
+        let arr = array.as_byte_view::<B>();
+
+        // Null row case, set and return
+        if arr.is_null(row) {
+            self.nulls.append(true);
+            self.views.push(0);
+            return;
+        }
+
+        // Not null row case
+        self.nulls.append(false);
+        let value: &[u8] = arr.value(row).as_ref();
+
+        let value_len = value.len();
+        let view = if value_len <= 12 {
+            make_view(value, 0, 0)
+        } else {
+            // Ensure big enough block to hold the value firstly
+            self.ensure_in_progress_big_enough(value_len);
+
+            // Append value
+            let buffer_index = self.completed.len();
+            let offset = self.in_progress.len();
+            self.in_progress.extend_from_slice(value);
+
+            make_view(value, buffer_index as u32, offset as u32)
+        };
+
+        // Append view
+        self.views.push(view);
+    }
+
+    fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+        debug_assert!(value_len > 12);
+        let require_cap = self.in_progress.len() + value_len;
+
+        // If current block isn't big enough, flush it and create a new in 
progress block
+        if require_cap > self.max_block_size {
+            let flushed_block = mem::replace(
+                &mut self.in_progress,
+                Vec::with_capacity(self.max_block_size),
+            );
+            let buffer = Buffer::from_vec(flushed_block);
+            self.completed.push(buffer);
+        }
+    }
+
+    fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) 
-> bool {
+        let array = array.as_byte_view::<B>();
+
+        // Check if nulls equal firstly
+        let exist_null = self.nulls.is_null(lhs_row);
+        let input_null = array.is_null(rhs_row);
+        if let Some(result) = nulls_equal_to(exist_null, input_null) {
+            return result;
+        }
+
+        // Otherwise, we need to check their values
+        let exist_view = self.views[lhs_row];
+        let exist_view_len = exist_view as u32;
+
+        let input_view = array.views()[rhs_row];
+        let input_view_len = input_view as u32;
+
+        // The check logic
+        //   - Check len equality
+        //   - If inlined, check inlined value
+        //   - If non-inlined, check prefix and then check value in buffer
+        //     when needed
+        if exist_view_len != input_view_len {
+            return false;
+        }
+
+        if exist_view_len <= 12 {
+            let exist_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &exist_view,
+                    exist_view_len as usize,
+                )
+            };
+            let input_inline = unsafe {
+                GenericByteViewArray::<B>::inline_value(
+                    &input_view,
+                    input_view_len as usize,
+                )
+            };
+            exist_inline == input_inline
+        } else {
+            let exist_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&exist_view, 
4) };
+            let input_prefix =
+                unsafe { GenericByteViewArray::<B>::inline_value(&input_view, 
4) };
+
+            if exist_prefix != input_prefix {
+                return false;
+            }
+
+            let exist_full = {
+                let byte_view = ByteView::from(exist_view);
+                self.value(
+                    byte_view.buffer_index as usize,
+                    byte_view.offset as usize,
+                    byte_view.length as usize,
+                )
+            };
+            let input_full: &[u8] = unsafe { 
array.value_unchecked(rhs_row).as_ref() };
+            exist_full == input_full
+        }
+    }
+
+    fn value(&self, buffer_index: usize, offset: usize, length: usize) -> 
&[u8] {
+        debug_assert!(buffer_index <= self.completed.len());
+
+        if buffer_index < self.completed.len() {
+            let block = &self.completed[buffer_index];
+            &block[offset..offset + length]
+        } else {
+            &self.in_progress[offset..offset + length]
+        }
+    }
+
+    fn build_inner(self) -> ArrayRef {
+        let Self {
+            views,
+            in_progress,
+            mut completed,
+            nulls,
+            ..
+        } = self;
+
+        // Build nulls
+        let null_buffer = nulls.build();
+
+        // Build values
+        // Flush `in_process` firstly
+        if !in_progress.is_empty() {
+            let buffer = Buffer::from(in_progress);
+            completed.push(buffer);
+        }
+
+        let views = ScalarBuffer::from(views);
+
+        // Safety:
+        // * all views were correctly made
+        // * (if utf8): Input was valid Utf8 so buffer contents are
+        // valid utf8 as well
+        unsafe {
+            Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                views,
+                completed,
+                null_buffer,
+            ))
+        }
+    }
+
+    fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+        debug_assert!(self.len() >= n);
+
+        // The `n == len` case, we need to take all
+        if self.len() == n {
+            let new_builder = 
Self::new().with_max_block_size(self.max_block_size);
+            let cur_builder = std::mem::replace(self, new_builder);
+            return cur_builder.build_inner();
+        }
+
+        // The `n < len` case
+        // Take n for nulls
+        let null_buffer = self.nulls.take_n(n);
+
+        // Take n for values:
+        //   - Take first n `view`s from `views`
+        //
+        //   - Find the last non-inlined `view`, if all inlined,
+        //     we can build array and return happily, otherwise we
+        //     we need to continue to process related buffers
+        //
+        //   - Get the last related `buffer index`(let's name it `buffer index 
n`)
+        //     from last non-inlined `view`
+        //
+        //   - Take buffers, the key is that we need to know if we need to take
+        //     the whole last related buffer. The logic is a bit complex, you 
can
+        //     detail in `take_buffers_with_whole_last`, 
`take_buffers_with_partial_last`
+        //     and other related steps in following
+        //
+        //   - Shift the `buffer index` of remaining non-inlined `views`
+        //
+        let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+        let last_non_inlined_view = first_n_views
+            .iter()
+            .rev()
+            .find(|view| ((**view) as u32) > 12);
+
+        // All taken views inlined
+        let Some(view) = last_non_inlined_view else {
+            let views = ScalarBuffer::from(first_n_views);
+
+            // Safety:
+            // * all views were correctly made
+            // * (if utf8): Input was valid Utf8 so buffer contents are
+            // valid utf8 as well
+            unsafe {
+                return Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                    views,
+                    Vec::new(),
+                    null_buffer,
+                ));
+            }
+        };
+
+        // Unfortunately, some taken views non-inlined
+        let view = ByteView::from(*view);
+        let last_remaining_buffer_index = view.buffer_index as usize;
+
+        // Check should we take the whole `last_remaining_buffer_index` buffer
+        let take_whole_last_buffer = self.should_take_whole_buffer(
+            last_remaining_buffer_index,
+            (view.offset + view.length) as usize,
+        );
+
+        // Take related buffers
+        let buffers = if take_whole_last_buffer {
+            self.take_buffers_with_whole_last(last_remaining_buffer_index)
+        } else {
+            self.take_buffers_with_partial_last(
+                last_remaining_buffer_index,
+                (view.offset + view.length) as usize,
+            )
+        };
+
+        // Shift `buffer index`s finally
+        let shifts = if take_whole_last_buffer {
+            last_remaining_buffer_index + 1
+        } else {
+            last_remaining_buffer_index
+        };
+
+        self.views.iter_mut().for_each(|view| {
+            if (*view as u32) > 12 {
+                let mut byte_view = ByteView::from(*view);
+                byte_view.buffer_index -= shifts as u32;
+                *view = byte_view.as_u128();
+            }
+        });
+
+        // Build array and return
+        let views = ScalarBuffer::from(first_n_views);
+
+        // Safety:
+        // * all views were correctly made
+        // * (if utf8): Input was valid Utf8 so buffer contents are
+        // valid utf8 as well
+        unsafe {
+            Arc::new(GenericByteViewArray::<B>::new_unchecked(
+                views,
+                buffers,
+                null_buffer,
+            ))
+        }
+    }
+
+    fn take_buffers_with_whole_last(
+        &mut self,
+        last_remaining_buffer_index: usize,
+    ) -> Vec<Buffer> {
+        if last_remaining_buffer_index == self.completed.len() {
+            self.flush_in_progress();
+        }
+        self.completed
+            .drain(0..last_remaining_buffer_index + 1)
+            .collect()
+    }
+
+    fn take_buffers_with_partial_last(
+        &mut self,
+        last_remaining_buffer_index: usize,
+        last_take_len: usize,
+    ) -> Vec<Buffer> {
+        let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index 
+ 1);
+
+        // Take `0 ~ last_remaining_buffer_index - 1` buffers
+        if !self.completed.is_empty() || last_remaining_buffer_index == 0 {
+            
take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
+        }
+
+        // Process the `last_remaining_buffer_index` buffers
+        let last_buffer = if last_remaining_buffer_index < 
self.completed.len() {
+            // If it is in `completed`, simply clone
+            self.completed[last_remaining_buffer_index].clone()
+        } else {
+            // If it is `in_progress`, copied `0 ~ offset` part
+            let taken_last_buffer = 
self.in_progress[0..last_take_len].to_vec();
+            Buffer::from_vec(taken_last_buffer)
+        };
+        take_buffers.push(last_buffer);
+
+        take_buffers
+    }
+
+    #[inline]
+    fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize) 
-> bool {
+        if buffer_index < self.completed.len() {
+            take_len == self.completed[buffer_index].len()
+        } else {
+            take_len == self.in_progress.len()
+        }
+    }
+
+    fn flush_in_progress(&mut self) {
+        let flushed_block = mem::replace(
+            &mut self.in_progress,
+            Vec::with_capacity(self.max_block_size),
+        );
+        let buffer = Buffer::from_vec(flushed_block);
+        self.completed.push(buffer);
+    }
+}
+
+impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
+    fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> 
bool {
+        self.equal_to_inner(lhs_row, array, rhs_row)
+    }
+
+    fn append_val(&mut self, array: &ArrayRef, row: usize) {
+        self.append_val_inner(array, row)
+    }
+
+    fn len(&self) -> usize {
+        self.views.len()
+    }
+
+    fn size(&self) -> usize {
+        let buffers_size = self
+            .completed
+            .iter()
+            .map(|buf| buf.capacity() * std::mem::size_of::<u8>())
+            .sum::<usize>();
+
+        self.nulls.allocated_size()
+            + self.views.capacity() * std::mem::size_of::<u128>()
+            + self.in_progress.capacity() * std::mem::size_of::<u8>()
+            + buffers_size
+            + std::mem::size_of::<Self>()
+    }
+
+    fn build(self: Box<Self>) -> ArrayRef {
+        Self::build_inner(*self)
+    }
+
+    fn take_n(&mut self, n: usize) -> ArrayRef {
+        self.take_n_inner(n)
+    }
+}
+
 /// Determines if the nullability of the existing and new input array can be 
used
 /// to short-circuit the comparison of the two values.
 ///
@@ -394,12 +822,17 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) -> 
Option<bool> {
 mod tests {
     use std::sync::Arc;
 
-    use arrow::datatypes::Int64Type;
-    use arrow_array::{ArrayRef, Int64Array, StringArray};
+    use arrow::{
+        array::AsArray,
+        datatypes::{Int64Type, StringViewType},
+    };
+    use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray};
     use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
     use datafusion_physical_expr::binary_map::OutputType;
 
-    use 
crate::aggregates::group_values::group_column::PrimitiveGroupValueBuilder;
+    use crate::aggregates::group_values::group_column::{
+        ByteViewGroupValueBuilder, PrimitiveGroupValueBuilder,
+    };
 
     use super::{ByteGroupValueBuilder, GroupColumn};
 
@@ -579,4 +1012,246 @@ mod tests {
         assert!(!builder.equal_to(4, &input_array, 4));
         assert!(builder.equal_to(5, &input_array, 5));
     }
+
+    #[test]
+    fn test_byte_view_append_val() {
+        let mut builder =
+            
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+        let builder_array = StringViewArray::from(vec![
+            Some("this string is quite long"), // in buffer 0
+            Some("foo"),
+            None,
+            Some("bar"),
+            Some("this string is also quite long"), // buffer 0
+            Some("this string is quite long"),      // buffer 1
+            Some("bar"),
+        ]);
+        let builder_array: ArrayRef = Arc::new(builder_array);
+        for row in 0..builder_array.len() {
+            builder.append_val(&builder_array, row);
+        }
+
+        let output = Box::new(builder).build();
+        // should be 2 output buffers to hold all the data
+        assert_eq!(output.as_string_view().data_buffers().len(), 2,);
+        assert_eq!(&output, &builder_array)
+    }
+
+    #[test]
+    fn test_byte_view_equal_to() {
+        // Will cover such cases:
+        //   - exist null, input not null
+        //   - exist null, input null; values not equal
+        //   - exist null, input null; values equal
+        //   - exist not null, input null
+        //   - exist not null, input not null; value lens not equal
+        //   - exist not null, input not null; value not equal(inlined case)
+        //   - exist not null, input not null; value equal(inlined case)
+        //
+        //   - exist not null, input not null; value not equal
+        //     (non-inlined case + prefix not equal)
+        //
+        //   - exist not null, input not null; value not equal
+        //     (non-inlined case + value in `completed`)
+        //
+        //   - exist not null, input not null; value equal
+        //     (non-inlined case + value in `completed`)
+        //
+        //   - exist not null, input not null; value not equal
+        //     (non-inlined case + value in `in_progress`)
+        //
+        //   - exist not null, input not null; value equal
+        //     (non-inlined case + value in `in_progress`)
+
+        // Set the block size to 40 for ensuring some unlined values are in 
`in_progress`,
+        // and some are in `completed`, so both two branches in `value` 
function can be covered.
+        let mut builder =
+            
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+        let builder_array = Arc::new(StringViewArray::from(vec![
+            None,
+            None,
+            None,
+            Some("foo"),
+            Some("bazz"),
+            Some("foo"),
+            Some("bar"),
+            Some("I am a long string for test eq in completed"),
+            Some("I am a long string for test eq in progress"),
+        ])) as ArrayRef;
+        builder.append_val(&builder_array, 0);
+        builder.append_val(&builder_array, 1);
+        builder.append_val(&builder_array, 2);
+        builder.append_val(&builder_array, 3);
+        builder.append_val(&builder_array, 4);
+        builder.append_val(&builder_array, 5);
+        builder.append_val(&builder_array, 6);
+        builder.append_val(&builder_array, 7);
+        builder.append_val(&builder_array, 8);
+
+        // Define input array
+        let (views, buffer, _nulls) = StringViewArray::from(vec![
+            Some("foo"),
+            Some("bar"), // set to null
+            None,
+            None,
+            Some("baz"),
+            Some("oof"),
+            Some("bar"),
+            Some("i am a long string for test eq in completed"),
+            Some("I am a long string for test eq in COMPLETED"),
+            Some("I am a long string for test eq in completed"),
+            Some("I am a long string for test eq in PROGRESS"),
+            Some("I am a long string for test eq in progress"),
+        ])
+        .into_parts();
+
+        // explicitly build a boolean buffer where one of the null values also 
happens to match
+        let mut boolean_buffer_builder = BooleanBufferBuilder::new(9);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(false); // this sets Some("bar") to null 
above
+        boolean_buffer_builder.append(false);
+        boolean_buffer_builder.append(false);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        boolean_buffer_builder.append(true);
+        let nulls = NullBuffer::new(boolean_buffer_builder.finish());
+        let input_array =
+            Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as 
ArrayRef;
+
+        // Check
+        assert!(!builder.equal_to(0, &input_array, 0));
+        assert!(builder.equal_to(1, &input_array, 1));
+        assert!(builder.equal_to(2, &input_array, 2));
+        assert!(!builder.equal_to(3, &input_array, 3));
+        assert!(!builder.equal_to(4, &input_array, 4));
+        assert!(!builder.equal_to(5, &input_array, 5));
+        assert!(builder.equal_to(6, &input_array, 6));
+        assert!(!builder.equal_to(7, &input_array, 7));
+        assert!(!builder.equal_to(7, &input_array, 8));
+        assert!(builder.equal_to(7, &input_array, 9));
+        assert!(!builder.equal_to(8, &input_array, 10));
+        assert!(builder.equal_to(8, &input_array, 11));
+    }
+
+    #[test]
+    fn test_byte_view_take_n() {
+        // ####### Define cases and init #######
+
+        // `take_n` is really complex, we should consider and test following 
situations:
+        //   1. Take nulls
+        //   2. Take all `inlined`s
+        //   3. Take non-inlined + partial last buffer in `completed`
+        //   4. Take non-inlined + whole last buffer in `completed`
+        //   5. Take non-inlined + partial last `in_progress`
+        //   6. Take non-inlined + whole last buffer in `in_progress`
+        //   7. Take all views at once
+
+        let mut builder =
+            
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+        let input_array = StringViewArray::from(vec![
+            //  Test situation 1
+            None,
+            None,
+            // Test situation 2 (also test take null together)
+            None,
+            Some("foo"),
+            Some("bar"),
+            // Test situation 3 (also test take null + inlined)
+            None,
+            Some("foo"),
+            Some("this string is quite long"),
+            Some("this string is also quite long"),
+            // Test situation 4 (also test take null + inlined)
+            None,
+            Some("bar"),
+            Some("this string is quite long"),
+            // Test situation 5 (also test take null + inlined)
+            None,
+            Some("foo"),
+            Some("another string that is is quite long"),
+            Some("this string not so long"),
+            // Test situation 6 (also test take null + inlined + insert again 
after taking)
+            None,
+            Some("bar"),
+            Some("this string is quite long"),
+            // Insert 4 and just take 3 to ensure it will go the path of 
situation 6
+            None,
+            // Finally, we create a new builder,  insert the whole array and 
then
+            // take whole at once for testing situation 7
+        ]);
+
+        let input_array: ArrayRef = Arc::new(input_array);
+        let first_ones_to_append = 16; // For testing situation 1~5
+        let second_ones_to_append = 4; // For testing situation 6
+        let final_ones_to_append = input_array.len(); // For testing situation 
7
+
+        // ####### Test situation 1~5 #######
+        for row in 0..first_ones_to_append {
+            builder.append_val(&input_array, row);
+        }
+
+        assert_eq!(builder.completed.len(), 2);
+        assert_eq!(builder.in_progress.len(), 59);
+
+        // Situation 1
+        let taken_array = builder.take_n(2);
+        assert_eq!(&taken_array, &input_array.slice(0, 2));
+
+        // Situation 2
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(2, 3));
+
+        // Situation 3
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(5, 3));
+
+        let taken_array = builder.take_n(1);
+        assert_eq!(&taken_array, &input_array.slice(8, 1));
+
+        // Situation 4
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(9, 3));
+
+        // Situation 5
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(12, 3));
+
+        let taken_array = builder.take_n(1);
+        assert_eq!(&taken_array, &input_array.slice(15, 1));
+
+        // ####### Test situation 6 #######
+        assert!(builder.completed.is_empty());
+        assert!(builder.in_progress.is_empty());
+        assert!(builder.views.is_empty());
+
+        for row in first_ones_to_append..first_ones_to_append + 
second_ones_to_append {
+            builder.append_val(&input_array, row);
+        }
+
+        assert!(builder.completed.is_empty());
+        assert_eq!(builder.in_progress.len(), 25);
+
+        let taken_array = builder.take_n(3);
+        assert_eq!(&taken_array, &input_array.slice(16, 3));
+
+        // ####### Test situation 7 #######
+        // Create a new builder
+        let mut builder =
+            
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+
+        for row in 0..final_ones_to_append {
+            builder.append_val(&input_array, row);
+        }
+
+        assert_eq!(builder.completed.len(), 3);
+        assert_eq!(builder.in_progress.len(), 25);
+
+        let taken_array = builder.take_n(final_ones_to_append);
+        assert_eq!(&taken_array, &input_array);
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index a80a0891e9..8202b806a7 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -5208,3 +5208,65 @@ NULL a 2
 
 statement ok
 drop table t;
+
+# test multi group by int + utf8view
+statement ok
+create table source as values
+-- use some strings that are larger than 12 characters as that goes through a 
different path
+(1, 'a'),
+(1, 'a'),
+(2, 'thisstringislongerthan12'),
+(2, 'thisstring'),
+(3, 'abc'),
+(3, 'cba'),
+(2, 'thisstring'),
+(null, null),
+(null, 'a'),
+(null, null),
+(null, 'a'),
+(2, 'thisstringisalsolongerthan12'),
+(2, 'thisstringislongerthan12'),
+(1, 'null')
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'Utf8View') as b 
from source;
+
+query ITI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 a 2
+1 null 1
+2 thisstring 2
+2 thisstringisalsolongerthan12 1
+2 thisstringislongerthan12 2
+3 abc 1
+3 cba 1
+NULL a 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+# test with binary view
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'BinaryView') as b 
from source;
+
+query I?I
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 61 2
+1 6e756c6c 1
+2 74686973737472696e67 2
+2 74686973737472696e676973616c736f6c6f6e6765727468616e3132 1
+2 74686973737472696e6769736c6f6e6765727468616e3132 2
+3 616263 1
+3 636261 1
+NULL 61 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to