This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c311cf5ef8 Implement `GroupColumn` support for `StringView` /
`ByteView` (faster grouping performance) (#12809)
c311cf5ef8 is described below
commit c311cf5ef8205912a092fcae8748ae26201a1f9a
Author: kamille <[email protected]>
AuthorDate: Wed Oct 16 22:08:14 2024 +0800
Implement `GroupColumn` support for `StringView` / `ByteView` (faster
grouping performance) (#12809)
* define `ByteGroupValueViewBuilder`.
* impl append.
* impl equal to.
* fix compile.
* fix comments.
* impl take_n.
* impl build.
* impl rest functions in `GroupColumn`.
* fix output when panic.
* add e2e sql tests.
* add unit tests.
* switch to a really elegant style codes from alamb.
* fix take_n.
* improve comments.
* fix compile.
* fix clippy.
* define more testcases in `test_byte_view_take_n`.
* connect up.
* fix doc.
* Do not re-validate output is utf8
* switch to unchecked when building array.
* improve naming.
* use let else to make the codes clearer.
* fix typo.
* improve unit test coverage for `ByteViewGroupValueBuilder`.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../src/aggregates/group_values/column.rs | 18 +-
.../src/aggregates/group_values/group_column.rs | 681 ++++++++++++++++++++-
datafusion/sqllogictest/test_files/group_by.slt | 62 ++
3 files changed, 755 insertions(+), 6 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/group_values/column.rs
b/datafusion/physical-plan/src/aggregates/group_values/column.rs
index 28f35b2bde..4ad75844f7 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/column.rs
@@ -16,14 +16,16 @@
// under the License.
use crate::aggregates::group_values::group_column::{
- ByteGroupValueBuilder, GroupColumn, PrimitiveGroupValueBuilder,
+ ByteGroupValueBuilder, ByteViewGroupValueBuilder, GroupColumn,
+ PrimitiveGroupValueBuilder,
};
use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
use arrow::compute::cast;
use arrow::datatypes::{
- Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
Int64Type,
- Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+ BinaryViewType, Date32Type, Date64Type, Float32Type, Float64Type,
Int16Type,
+ Int32Type, Int64Type, Int8Type, StringViewType, UInt16Type, UInt32Type,
UInt64Type,
+ UInt8Type,
};
use arrow::record_batch::RecordBatch;
use arrow_array::{Array, ArrayRef};
@@ -119,6 +121,8 @@ impl GroupValuesColumn {
| DataType::LargeBinary
| DataType::Date32
| DataType::Date64
+ | DataType::Utf8View
+ | DataType::BinaryView
)
}
}
@@ -184,6 +188,14 @@ impl GroupValues for GroupValuesColumn {
let b =
ByteGroupValueBuilder::<i64>::new(OutputType::Binary);
v.push(Box::new(b) as _)
}
+ &DataType::Utf8View => {
+ let b =
ByteViewGroupValueBuilder::<StringViewType>::new();
+ v.push(Box::new(b) as _)
+ }
+ &DataType::BinaryView => {
+ let b =
ByteViewGroupValueBuilder::<BinaryViewType>::new();
+ v.push(Box::new(b) as _)
+ }
dt => {
return not_impl_err!("{dt} not supported in
GroupValuesColumn")
}
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
index 5d00f300e9..4153495860 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/group_column.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::make_view;
use arrow::array::BufferBuilder;
+use arrow::array::ByteView;
use arrow::array::GenericBinaryArray;
use arrow::array::GenericStringArray;
use arrow::array::OffsetSizeTrait;
@@ -24,16 +26,23 @@ use arrow::array::{Array, ArrayRef, ArrowPrimitiveType,
AsArray};
use arrow::buffer::OffsetBuffer;
use arrow::buffer::ScalarBuffer;
use arrow::datatypes::ByteArrayType;
+use arrow::datatypes::ByteViewType;
use arrow::datatypes::DataType;
use arrow::datatypes::GenericBinaryType;
+use arrow_array::GenericByteViewArray;
+use arrow_buffer::Buffer;
use datafusion_common::utils::proxy::VecAllocExt;
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow_array::types::GenericStringType;
use datafusion_physical_expr_common::binary_map::{OutputType,
INITIAL_BUFFER_CAPACITY};
+use std::marker::PhantomData;
+use std::mem;
use std::sync::Arc;
use std::vec;
+const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;
+
/// Trait for storing a single column of group values in [`GroupValuesColumn`]
///
/// Implementations of this trait store an in-progress collection of group
values
@@ -376,6 +385,425 @@ where
}
}
+/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
+///
+/// Stores a collection of binary view or utf8 view group values in a buffer
+/// whose structure is similar to `GenericByteViewArray`, and we can get
benefits:
+///
+/// 1. Efficient comparison of incoming rows to existing rows
+/// 2. Efficient construction of the final output array
+/// 3. Efficient to perform `take_n` comparing to use `GenericByteViewBuilder`
+pub struct ByteViewGroupValueBuilder<B: ByteViewType> {
+ /// The views of string values
+ ///
+ /// If string len <= 12, the view's format will be:
+ /// string(12B) | len(4B)
+ ///
+ /// If string len > 12, its format will be:
+ /// offset(4B) | buffer_index(4B) | prefix(4B) | len(4B)
+ views: Vec<u128>,
+
+ /// The progressing block
+ ///
+ /// New values will be inserted into it until its capacity
+ /// is not enough(detail can see `max_block_size`).
+ in_progress: Vec<u8>,
+
+ /// The completed blocks
+ completed: Vec<Buffer>,
+
+ /// The max size of `in_progress`
+ ///
+ /// `in_progress` will be flushed into `completed`, and create new
`in_progress`
+ /// when found its remaining capacity(`max_block_size` -
`len(in_progress)`),
+ /// is no enough to store the appended value.
+ ///
+ /// Currently it is fixed at 2MB.
+ max_block_size: usize,
+
+ /// Nulls
+ nulls: MaybeNullBufferBuilder,
+
+ /// phantom data so the type requires `<B>`
+ _phantom: PhantomData<B>,
+}
+
+impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
+ pub fn new() -> Self {
+ Self {
+ views: Vec::new(),
+ in_progress: Vec::new(),
+ completed: Vec::new(),
+ max_block_size: BYTE_VIEW_MAX_BLOCK_SIZE,
+ nulls: MaybeNullBufferBuilder::new(),
+ _phantom: PhantomData {},
+ }
+ }
+
+ /// Set the max block size
+ fn with_max_block_size(mut self, max_block_size: usize) -> Self {
+ self.max_block_size = max_block_size;
+ self
+ }
+
+ fn append_val_inner(&mut self, array: &ArrayRef, row: usize)
+ where
+ B: ByteViewType,
+ {
+ let arr = array.as_byte_view::<B>();
+
+ // Null row case, set and return
+ if arr.is_null(row) {
+ self.nulls.append(true);
+ self.views.push(0);
+ return;
+ }
+
+ // Not null row case
+ self.nulls.append(false);
+ let value: &[u8] = arr.value(row).as_ref();
+
+ let value_len = value.len();
+ let view = if value_len <= 12 {
+ make_view(value, 0, 0)
+ } else {
+ // Ensure big enough block to hold the value firstly
+ self.ensure_in_progress_big_enough(value_len);
+
+ // Append value
+ let buffer_index = self.completed.len();
+ let offset = self.in_progress.len();
+ self.in_progress.extend_from_slice(value);
+
+ make_view(value, buffer_index as u32, offset as u32)
+ };
+
+ // Append view
+ self.views.push(view);
+ }
+
+ fn ensure_in_progress_big_enough(&mut self, value_len: usize) {
+ debug_assert!(value_len > 12);
+ let require_cap = self.in_progress.len() + value_len;
+
+ // If current block isn't big enough, flush it and create a new in
progress block
+ if require_cap > self.max_block_size {
+ let flushed_block = mem::replace(
+ &mut self.in_progress,
+ Vec::with_capacity(self.max_block_size),
+ );
+ let buffer = Buffer::from_vec(flushed_block);
+ self.completed.push(buffer);
+ }
+ }
+
+ fn equal_to_inner(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize)
-> bool {
+ let array = array.as_byte_view::<B>();
+
+ // Check if nulls equal firstly
+ let exist_null = self.nulls.is_null(lhs_row);
+ let input_null = array.is_null(rhs_row);
+ if let Some(result) = nulls_equal_to(exist_null, input_null) {
+ return result;
+ }
+
+ // Otherwise, we need to check their values
+ let exist_view = self.views[lhs_row];
+ let exist_view_len = exist_view as u32;
+
+ let input_view = array.views()[rhs_row];
+ let input_view_len = input_view as u32;
+
+ // The check logic
+ // - Check len equality
+ // - If inlined, check inlined value
+ // - If non-inlined, check prefix and then check value in buffer
+ // when needed
+ if exist_view_len != input_view_len {
+ return false;
+ }
+
+ if exist_view_len <= 12 {
+ let exist_inline = unsafe {
+ GenericByteViewArray::<B>::inline_value(
+ &exist_view,
+ exist_view_len as usize,
+ )
+ };
+ let input_inline = unsafe {
+ GenericByteViewArray::<B>::inline_value(
+ &input_view,
+ input_view_len as usize,
+ )
+ };
+ exist_inline == input_inline
+ } else {
+ let exist_prefix =
+ unsafe { GenericByteViewArray::<B>::inline_value(&exist_view,
4) };
+ let input_prefix =
+ unsafe { GenericByteViewArray::<B>::inline_value(&input_view,
4) };
+
+ if exist_prefix != input_prefix {
+ return false;
+ }
+
+ let exist_full = {
+ let byte_view = ByteView::from(exist_view);
+ self.value(
+ byte_view.buffer_index as usize,
+ byte_view.offset as usize,
+ byte_view.length as usize,
+ )
+ };
+ let input_full: &[u8] = unsafe {
array.value_unchecked(rhs_row).as_ref() };
+ exist_full == input_full
+ }
+ }
+
+ fn value(&self, buffer_index: usize, offset: usize, length: usize) ->
&[u8] {
+ debug_assert!(buffer_index <= self.completed.len());
+
+ if buffer_index < self.completed.len() {
+ let block = &self.completed[buffer_index];
+ &block[offset..offset + length]
+ } else {
+ &self.in_progress[offset..offset + length]
+ }
+ }
+
+ fn build_inner(self) -> ArrayRef {
+ let Self {
+ views,
+ in_progress,
+ mut completed,
+ nulls,
+ ..
+ } = self;
+
+ // Build nulls
+ let null_buffer = nulls.build();
+
+ // Build values
+ // Flush `in_process` firstly
+ if !in_progress.is_empty() {
+ let buffer = Buffer::from(in_progress);
+ completed.push(buffer);
+ }
+
+ let views = ScalarBuffer::from(views);
+
+ // Safety:
+ // * all views were correctly made
+ // * (if utf8): Input was valid Utf8 so buffer contents are
+ // valid utf8 as well
+ unsafe {
+ Arc::new(GenericByteViewArray::<B>::new_unchecked(
+ views,
+ completed,
+ null_buffer,
+ ))
+ }
+ }
+
+ fn take_n_inner(&mut self, n: usize) -> ArrayRef {
+ debug_assert!(self.len() >= n);
+
+ // The `n == len` case, we need to take all
+ if self.len() == n {
+ let new_builder =
Self::new().with_max_block_size(self.max_block_size);
+ let cur_builder = std::mem::replace(self, new_builder);
+ return cur_builder.build_inner();
+ }
+
+ // The `n < len` case
+ // Take n for nulls
+ let null_buffer = self.nulls.take_n(n);
+
+ // Take n for values:
+ // - Take first n `view`s from `views`
+ //
+ // - Find the last non-inlined `view`, if all inlined,
+ // we can build array and return happily, otherwise we
+ // we need to continue to process related buffers
+ //
+ // - Get the last related `buffer index`(let's name it `buffer index
n`)
+ // from last non-inlined `view`
+ //
+ // - Take buffers, the key is that we need to know if we need to take
+ // the whole last related buffer. The logic is a bit complex, you
can
+ // detail in `take_buffers_with_whole_last`,
`take_buffers_with_partial_last`
+ // and other related steps in following
+ //
+ // - Shift the `buffer index` of remaining non-inlined `views`
+ //
+ let first_n_views = self.views.drain(0..n).collect::<Vec<_>>();
+
+ let last_non_inlined_view = first_n_views
+ .iter()
+ .rev()
+ .find(|view| ((**view) as u32) > 12);
+
+ // All taken views inlined
+ let Some(view) = last_non_inlined_view else {
+ let views = ScalarBuffer::from(first_n_views);
+
+ // Safety:
+ // * all views were correctly made
+ // * (if utf8): Input was valid Utf8 so buffer contents are
+ // valid utf8 as well
+ unsafe {
+ return Arc::new(GenericByteViewArray::<B>::new_unchecked(
+ views,
+ Vec::new(),
+ null_buffer,
+ ));
+ }
+ };
+
+ // Unfortunately, some taken views non-inlined
+ let view = ByteView::from(*view);
+ let last_remaining_buffer_index = view.buffer_index as usize;
+
+ // Check should we take the whole `last_remaining_buffer_index` buffer
+ let take_whole_last_buffer = self.should_take_whole_buffer(
+ last_remaining_buffer_index,
+ (view.offset + view.length) as usize,
+ );
+
+ // Take related buffers
+ let buffers = if take_whole_last_buffer {
+ self.take_buffers_with_whole_last(last_remaining_buffer_index)
+ } else {
+ self.take_buffers_with_partial_last(
+ last_remaining_buffer_index,
+ (view.offset + view.length) as usize,
+ )
+ };
+
+ // Shift `buffer index`s finally
+ let shifts = if take_whole_last_buffer {
+ last_remaining_buffer_index + 1
+ } else {
+ last_remaining_buffer_index
+ };
+
+ self.views.iter_mut().for_each(|view| {
+ if (*view as u32) > 12 {
+ let mut byte_view = ByteView::from(*view);
+ byte_view.buffer_index -= shifts as u32;
+ *view = byte_view.as_u128();
+ }
+ });
+
+ // Build array and return
+ let views = ScalarBuffer::from(first_n_views);
+
+ // Safety:
+ // * all views were correctly made
+ // * (if utf8): Input was valid Utf8 so buffer contents are
+ // valid utf8 as well
+ unsafe {
+ Arc::new(GenericByteViewArray::<B>::new_unchecked(
+ views,
+ buffers,
+ null_buffer,
+ ))
+ }
+ }
+
+ fn take_buffers_with_whole_last(
+ &mut self,
+ last_remaining_buffer_index: usize,
+ ) -> Vec<Buffer> {
+ if last_remaining_buffer_index == self.completed.len() {
+ self.flush_in_progress();
+ }
+ self.completed
+ .drain(0..last_remaining_buffer_index + 1)
+ .collect()
+ }
+
+ fn take_buffers_with_partial_last(
+ &mut self,
+ last_remaining_buffer_index: usize,
+ last_take_len: usize,
+ ) -> Vec<Buffer> {
+ let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index
+ 1);
+
+ // Take `0 ~ last_remaining_buffer_index - 1` buffers
+ if !self.completed.is_empty() || last_remaining_buffer_index == 0 {
+
take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
+ }
+
+ // Process the `last_remaining_buffer_index` buffers
+ let last_buffer = if last_remaining_buffer_index <
self.completed.len() {
+ // If it is in `completed`, simply clone
+ self.completed[last_remaining_buffer_index].clone()
+ } else {
+ // If it is `in_progress`, copied `0 ~ offset` part
+ let taken_last_buffer =
self.in_progress[0..last_take_len].to_vec();
+ Buffer::from_vec(taken_last_buffer)
+ };
+ take_buffers.push(last_buffer);
+
+ take_buffers
+ }
+
+ #[inline]
+ fn should_take_whole_buffer(&self, buffer_index: usize, take_len: usize)
-> bool {
+ if buffer_index < self.completed.len() {
+ take_len == self.completed[buffer_index].len()
+ } else {
+ take_len == self.in_progress.len()
+ }
+ }
+
+ fn flush_in_progress(&mut self) {
+ let flushed_block = mem::replace(
+ &mut self.in_progress,
+ Vec::with_capacity(self.max_block_size),
+ );
+ let buffer = Buffer::from_vec(flushed_block);
+ self.completed.push(buffer);
+ }
+}
+
+impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
+ fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) ->
bool {
+ self.equal_to_inner(lhs_row, array, rhs_row)
+ }
+
+ fn append_val(&mut self, array: &ArrayRef, row: usize) {
+ self.append_val_inner(array, row)
+ }
+
+ fn len(&self) -> usize {
+ self.views.len()
+ }
+
+ fn size(&self) -> usize {
+ let buffers_size = self
+ .completed
+ .iter()
+ .map(|buf| buf.capacity() * std::mem::size_of::<u8>())
+ .sum::<usize>();
+
+ self.nulls.allocated_size()
+ + self.views.capacity() * std::mem::size_of::<u128>()
+ + self.in_progress.capacity() * std::mem::size_of::<u8>()
+ + buffers_size
+ + std::mem::size_of::<Self>()
+ }
+
+ fn build(self: Box<Self>) -> ArrayRef {
+ Self::build_inner(*self)
+ }
+
+ fn take_n(&mut self, n: usize) -> ArrayRef {
+ self.take_n_inner(n)
+ }
+}
+
/// Determines if the nullability of the existing and new input array can be
used
/// to short-circuit the comparison of the two values.
///
@@ -394,12 +822,17 @@ fn nulls_equal_to(lhs_null: bool, rhs_null: bool) ->
Option<bool> {
mod tests {
use std::sync::Arc;
- use arrow::datatypes::Int64Type;
- use arrow_array::{ArrayRef, Int64Array, StringArray};
+ use arrow::{
+ array::AsArray,
+ datatypes::{Int64Type, StringViewType},
+ };
+ use arrow_array::{ArrayRef, Int64Array, StringArray, StringViewArray};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
use datafusion_physical_expr::binary_map::OutputType;
- use
crate::aggregates::group_values::group_column::PrimitiveGroupValueBuilder;
+ use crate::aggregates::group_values::group_column::{
+ ByteViewGroupValueBuilder, PrimitiveGroupValueBuilder,
+ };
use super::{ByteGroupValueBuilder, GroupColumn};
@@ -579,4 +1012,246 @@ mod tests {
assert!(!builder.equal_to(4, &input_array, 4));
assert!(builder.equal_to(5, &input_array, 5));
}
+
+ #[test]
+ fn test_byte_view_append_val() {
+ let mut builder =
+
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+ let builder_array = StringViewArray::from(vec![
+ Some("this string is quite long"), // in buffer 0
+ Some("foo"),
+ None,
+ Some("bar"),
+ Some("this string is also quite long"), // buffer 0
+ Some("this string is quite long"), // buffer 1
+ Some("bar"),
+ ]);
+ let builder_array: ArrayRef = Arc::new(builder_array);
+ for row in 0..builder_array.len() {
+ builder.append_val(&builder_array, row);
+ }
+
+ let output = Box::new(builder).build();
+ // should be 2 output buffers to hold all the data
+ assert_eq!(output.as_string_view().data_buffers().len(), 2,);
+ assert_eq!(&output, &builder_array)
+ }
+
+ #[test]
+ fn test_byte_view_equal_to() {
+ // Will cover such cases:
+ // - exist null, input not null
+ // - exist null, input null; values not equal
+ // - exist null, input null; values equal
+ // - exist not null, input null
+ // - exist not null, input not null; value lens not equal
+ // - exist not null, input not null; value not equal(inlined case)
+ // - exist not null, input not null; value equal(inlined case)
+ //
+ // - exist not null, input not null; value not equal
+ // (non-inlined case + prefix not equal)
+ //
+ // - exist not null, input not null; value not equal
+ // (non-inlined case + value in `completed`)
+ //
+ // - exist not null, input not null; value equal
+ // (non-inlined case + value in `completed`)
+ //
+ // - exist not null, input not null; value not equal
+ // (non-inlined case + value in `in_progress`)
+ //
+ // - exist not null, input not null; value equal
+ // (non-inlined case + value in `in_progress`)
+
+ // Set the block size to 40 for ensuring some unlined values are in
`in_progress`,
+ // and some are in `completed`, so both two branches in `value`
function can be covered.
+ let mut builder =
+
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+ let builder_array = Arc::new(StringViewArray::from(vec![
+ None,
+ None,
+ None,
+ Some("foo"),
+ Some("bazz"),
+ Some("foo"),
+ Some("bar"),
+ Some("I am a long string for test eq in completed"),
+ Some("I am a long string for test eq in progress"),
+ ])) as ArrayRef;
+ builder.append_val(&builder_array, 0);
+ builder.append_val(&builder_array, 1);
+ builder.append_val(&builder_array, 2);
+ builder.append_val(&builder_array, 3);
+ builder.append_val(&builder_array, 4);
+ builder.append_val(&builder_array, 5);
+ builder.append_val(&builder_array, 6);
+ builder.append_val(&builder_array, 7);
+ builder.append_val(&builder_array, 8);
+
+ // Define input array
+ let (views, buffer, _nulls) = StringViewArray::from(vec![
+ Some("foo"),
+ Some("bar"), // set to null
+ None,
+ None,
+ Some("baz"),
+ Some("oof"),
+ Some("bar"),
+ Some("i am a long string for test eq in completed"),
+ Some("I am a long string for test eq in COMPLETED"),
+ Some("I am a long string for test eq in completed"),
+ Some("I am a long string for test eq in PROGRESS"),
+ Some("I am a long string for test eq in progress"),
+ ])
+ .into_parts();
+
+ // explicitly build a boolean buffer where one of the null values also
happens to match
+ let mut boolean_buffer_builder = BooleanBufferBuilder::new(9);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(false); // this sets Some("bar") to null
above
+ boolean_buffer_builder.append(false);
+ boolean_buffer_builder.append(false);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ boolean_buffer_builder.append(true);
+ let nulls = NullBuffer::new(boolean_buffer_builder.finish());
+ let input_array =
+ Arc::new(StringViewArray::new(views, buffer, Some(nulls))) as
ArrayRef;
+
+ // Check
+ assert!(!builder.equal_to(0, &input_array, 0));
+ assert!(builder.equal_to(1, &input_array, 1));
+ assert!(builder.equal_to(2, &input_array, 2));
+ assert!(!builder.equal_to(3, &input_array, 3));
+ assert!(!builder.equal_to(4, &input_array, 4));
+ assert!(!builder.equal_to(5, &input_array, 5));
+ assert!(builder.equal_to(6, &input_array, 6));
+ assert!(!builder.equal_to(7, &input_array, 7));
+ assert!(!builder.equal_to(7, &input_array, 8));
+ assert!(builder.equal_to(7, &input_array, 9));
+ assert!(!builder.equal_to(8, &input_array, 10));
+ assert!(builder.equal_to(8, &input_array, 11));
+ }
+
+ #[test]
+ fn test_byte_view_take_n() {
+ // ####### Define cases and init #######
+
+ // `take_n` is really complex, we should consider and test following
situations:
+ // 1. Take nulls
+ // 2. Take all `inlined`s
+ // 3. Take non-inlined + partial last buffer in `completed`
+ // 4. Take non-inlined + whole last buffer in `completed`
+ // 5. Take non-inlined + partial last `in_progress`
+ // 6. Take non-inlined + whole last buffer in `in_progress`
+ // 7. Take all views at once
+
+ let mut builder =
+
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+ let input_array = StringViewArray::from(vec![
+ // Test situation 1
+ None,
+ None,
+ // Test situation 2 (also test take null together)
+ None,
+ Some("foo"),
+ Some("bar"),
+ // Test situation 3 (also test take null + inlined)
+ None,
+ Some("foo"),
+ Some("this string is quite long"),
+ Some("this string is also quite long"),
+ // Test situation 4 (also test take null + inlined)
+ None,
+ Some("bar"),
+ Some("this string is quite long"),
+ // Test situation 5 (also test take null + inlined)
+ None,
+ Some("foo"),
+ Some("another string that is is quite long"),
+ Some("this string not so long"),
+ // Test situation 6 (also test take null + inlined + insert again
after taking)
+ None,
+ Some("bar"),
+ Some("this string is quite long"),
+ // Insert 4 and just take 3 to ensure it will go the path of
situation 6
+ None,
+ // Finally, we create a new builder, insert the whole array and
then
+ // take whole at once for testing situation 7
+ ]);
+
+ let input_array: ArrayRef = Arc::new(input_array);
+ let first_ones_to_append = 16; // For testing situation 1~5
+ let second_ones_to_append = 4; // For testing situation 6
+ let final_ones_to_append = input_array.len(); // For testing situation
7
+
+ // ####### Test situation 1~5 #######
+ for row in 0..first_ones_to_append {
+ builder.append_val(&input_array, row);
+ }
+
+ assert_eq!(builder.completed.len(), 2);
+ assert_eq!(builder.in_progress.len(), 59);
+
+ // Situation 1
+ let taken_array = builder.take_n(2);
+ assert_eq!(&taken_array, &input_array.slice(0, 2));
+
+ // Situation 2
+ let taken_array = builder.take_n(3);
+ assert_eq!(&taken_array, &input_array.slice(2, 3));
+
+ // Situation 3
+ let taken_array = builder.take_n(3);
+ assert_eq!(&taken_array, &input_array.slice(5, 3));
+
+ let taken_array = builder.take_n(1);
+ assert_eq!(&taken_array, &input_array.slice(8, 1));
+
+ // Situation 4
+ let taken_array = builder.take_n(3);
+ assert_eq!(&taken_array, &input_array.slice(9, 3));
+
+ // Situation 5
+ let taken_array = builder.take_n(3);
+ assert_eq!(&taken_array, &input_array.slice(12, 3));
+
+ let taken_array = builder.take_n(1);
+ assert_eq!(&taken_array, &input_array.slice(15, 1));
+
+ // ####### Test situation 6 #######
+ assert!(builder.completed.is_empty());
+ assert!(builder.in_progress.is_empty());
+ assert!(builder.views.is_empty());
+
+ for row in first_ones_to_append..first_ones_to_append +
second_ones_to_append {
+ builder.append_val(&input_array, row);
+ }
+
+ assert!(builder.completed.is_empty());
+ assert_eq!(builder.in_progress.len(), 25);
+
+ let taken_array = builder.take_n(3);
+ assert_eq!(&taken_array, &input_array.slice(16, 3));
+
+ // ####### Test situation 7 #######
+ // Create a new builder
+ let mut builder =
+
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(60);
+
+ for row in 0..final_ones_to_append {
+ builder.append_val(&input_array, row);
+ }
+
+ assert_eq!(builder.completed.len(), 3);
+ assert_eq!(builder.in_progress.len(), 25);
+
+ let taken_array = builder.take_n(final_ones_to_append);
+ assert_eq!(&taken_array, &input_array);
+ }
}
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index a80a0891e9..8202b806a7 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -5208,3 +5208,65 @@ NULL a 2
statement ok
drop table t;
+
+# test multi group by int + utf8view
+statement ok
+create table source as values
+-- use some strings that are larger than 12 characters as that goes through a
different path
+(1, 'a'),
+(1, 'a'),
+(2, 'thisstringislongerthan12'),
+(2, 'thisstring'),
+(3, 'abc'),
+(3, 'cba'),
+(2, 'thisstring'),
+(null, null),
+(null, 'a'),
+(null, null),
+(null, 'a'),
+(2, 'thisstringisalsolongerthan12'),
+(2, 'thisstringislongerthan12'),
+(1, 'null')
+;
+
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'Utf8View') as b
from source;
+
+query ITI
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 a 2
+1 null 1
+2 thisstring 2
+2 thisstringisalsolongerthan12 1
+2 thisstringislongerthan12 2
+3 abc 1
+3 cba 1
+NULL a 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+# test with binary view
+statement ok
+create view t as select column1 as a, arrow_cast(column2, 'BinaryView') as b
from source;
+
+query I?I
+select a, b, count(*) from t group by a, b order by a, b;
+----
+1 61 2
+1 6e756c6c 1
+2 74686973737472696e67 2
+2 74686973737472696e676973616c736f6c6f6e6765727468616e3132 1
+2 74686973737472696e6769736c6f6e6765727468616e3132 2
+3 616263 1
+3 636261 1
+NULL 61 2
+NULL NULL 2
+
+statement ok
+drop view t
+
+statement ok
+drop table source;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]