This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 04af0c61c8 perf: Implement boolean group values (#17726)
04af0c61c8 is described below
commit 04af0c61c8e761b724b49da4c87f18cf27e2b4f8
Author: Eshed Schacham <[email protected]>
AuthorDate: Mon Oct 6 13:21:23 2025 +0200
perf: Implement boolean group values (#17726)
* chore: fix typos in group_values.
* perf: add support for boolean columns in single and multi group by values.
* remove extra finish
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../src/aggregates/group_values/mod.rs | 17 +-
.../multi_group_by/{primitive.rs => boolean.rs} | 219 ++++++++++-----------
.../group_values/multi_group_by/bytes.rs | 2 +-
.../aggregates/group_values/multi_group_by/mod.rs | 15 +-
.../group_values/multi_group_by/primitive.rs | 2 +-
.../group_values/single_group_by/boolean.rs | 154 +++++++++++++++
.../group_values/single_group_by/bytes.rs | 6 +-
.../aggregates/group_values/single_group_by/mod.rs | 1 +
8 files changed, 285 insertions(+), 131 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index f2f489b722..316fbe11ae 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -40,8 +40,8 @@ pub(crate) use single_group_by::primitive::HashValue;
use crate::aggregates::{
group_values::single_group_by::{
- bytes::GroupValuesByes, bytes_view::GroupValuesBytesView,
- primitive::GroupValuesPrimitive,
+ boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
+ bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
},
order::GroupOrdering,
};
@@ -119,7 +119,7 @@ pub trait GroupValues: Send {
/// - If group by single column, and type of this column has
/// the specific [`GroupValues`] implementation, such implementation
/// will be chosen.
-///
+///
/// - If group by multiple columns, and all column types have the specific
/// `GroupColumn` implementations, `GroupValuesColumn` will be chosen.
///
@@ -174,23 +174,26 @@ pub fn new_group_values(
downcast_helper!(Decimal128Type, d);
}
DataType::Utf8 => {
- return
Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
+ return
Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Utf8)));
}
DataType::LargeUtf8 => {
- return
Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Utf8)));
+ return
Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Utf8)));
}
DataType::Utf8View => {
return
Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
}
DataType::Binary => {
- return
Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Binary)));
+ return
Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Binary)));
}
DataType::LargeBinary => {
- return
Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Binary)));
+ return
Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Binary)));
}
DataType::BinaryView => {
return
Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
}
+ DataType::Boolean => {
+ return Ok(Box::new(GroupValuesBoolean::new()));
+ }
_ => {}
}
}
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs
similarity index 64%
copy from
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
copy to
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs
index afec25fd3d..91a9e21aeb 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs
@@ -15,76 +15,61 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use crate::aggregates::group_values::multi_group_by::{nulls_equal_to,
GroupColumn};
use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
-use arrow::array::ArrowNativeTypeOp;
-use arrow::array::{cast::AsArray, Array, ArrayRef, ArrowPrimitiveType,
PrimitiveArray};
-use arrow::buffer::ScalarBuffer;
-use arrow::datatypes::DataType;
+use arrow::array::{Array as _, ArrayRef, AsArray, BooleanArray,
BooleanBufferBuilder};
use datafusion_common::Result;
-use datafusion_execution::memory_pool::proxy::VecAllocExt;
use itertools::izip;
-use std::iter;
-use std::sync::Arc;
-/// An implementation of [`GroupColumn`] for primitive values
+/// An implementation of [`GroupColumn`] for booleans
///
/// Optimized to skip null buffer construction if the input is known to be non
nullable
///
/// # Template parameters
///
-/// `T`: the native Rust type that stores the data
/// `NULLABLE`: if the data can contain any nulls
#[derive(Debug)]
-pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType, const NULLABLE:
bool> {
- data_type: DataType,
- group_values: Vec<T::Native>,
+pub struct BooleanGroupValueBuilder<const NULLABLE: bool> {
+ buffer: BooleanBufferBuilder,
nulls: MaybeNullBufferBuilder,
}
-impl<T, const NULLABLE: bool> PrimitiveGroupValueBuilder<T, NULLABLE>
-where
- T: ArrowPrimitiveType,
-{
- /// Create a new `PrimitiveGroupValueBuilder`
- pub fn new(data_type: DataType) -> Self {
+impl<const NULLABLE: bool> BooleanGroupValueBuilder<NULLABLE> {
+ /// Create a new `BooleanGroupValueBuilder`
+ pub fn new() -> Self {
Self {
- data_type,
- group_values: vec![],
+ buffer: BooleanBufferBuilder::new(0),
nulls: MaybeNullBufferBuilder::new(),
}
}
}
-impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
- for PrimitiveGroupValueBuilder<T, NULLABLE>
-{
+impl<const NULLABLE: bool> GroupColumn for BooleanGroupValueBuilder<NULLABLE> {
fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) ->
bool {
- // Perf: skip null check (by short circuit) if input is not nullable
if NULLABLE {
let exist_null = self.nulls.is_null(lhs_row);
let input_null = array.is_null(rhs_row);
if let Some(result) = nulls_equal_to(exist_null, input_null) {
return result;
}
- // Otherwise, we need to check their values
}
- self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
+ self.buffer.get_bit(lhs_row) == array.as_boolean().value(rhs_row)
}
fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> {
- // Perf: skip null check if input can't have nulls
if NULLABLE {
if array.is_null(row) {
self.nulls.append(true);
- self.group_values.push(T::default_value());
+ self.buffer.append(bool::default());
} else {
self.nulls.append(false);
- self.group_values.push(array.as_primitive::<T>().value(row));
+ self.buffer.append(array.as_boolean().value(row));
}
} else {
- self.group_values.push(array.as_primitive::<T>().value(row));
+ self.buffer.append(array.as_boolean().value(row));
}
Ok(())
@@ -97,7 +82,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
rhs_rows: &[usize],
equal_to_results: &mut [bool],
) {
- let array = array.as_primitive::<T>();
+ let array = array.as_boolean();
let iter = izip!(
lhs_rows.iter(),
@@ -111,7 +96,6 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
continue;
}
- // Perf: skip null check (by short circuit) if input is not
nullable
if NULLABLE {
let exist_null = self.nulls.is_null(lhs_row);
let input_null = array.is_null(rhs_row);
@@ -119,15 +103,14 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool>
GroupColumn
*equal_to_result = result;
continue;
}
- // Otherwise, we need to check their values
}
- *equal_to_result =
self.group_values[lhs_row].is_eq(array.value(rhs_row));
+ *equal_to_result = self.buffer.get_bit(lhs_row) ==
array.value(rhs_row);
}
}
fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) ->
Result<()> {
- let arr = array.as_primitive::<T>();
+ let arr = array.as_boolean();
let null_count = array.null_count();
let num_rows = array.len();
@@ -144,10 +127,10 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool>
GroupColumn
for &row in rows {
if array.is_null(row) {
self.nulls.append(true);
- self.group_values.push(T::default_value());
+ self.buffer.append(bool::default());
} else {
self.nulls.append(false);
- self.group_values.push(arr.value(row));
+ self.buffer.append(arr.value(row));
}
}
}
@@ -155,19 +138,18 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool>
GroupColumn
(true, Some(true)) => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
- self.group_values.push(arr.value(row));
+ self.buffer.append(arr.value(row));
}
}
(true, Some(false)) => {
self.nulls.append_n(rows.len(), true);
- self.group_values
- .extend(iter::repeat_n(T::default_value(), rows.len()));
+ self.buffer.append_n(rows.len(), bool::default());
}
(false, _) => {
for &row in rows {
- self.group_values.push(arr.value(row));
+ self.buffer.append(arr.value(row));
}
}
}
@@ -176,55 +158,49 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool>
GroupColumn
}
fn len(&self) -> usize {
- self.group_values.len()
+ self.buffer.len()
}
fn size(&self) -> usize {
- self.group_values.allocated_size() + self.nulls.allocated_size()
+ self.buffer.capacity() / 8 + self.nulls.allocated_size()
}
fn build(self: Box<Self>) -> ArrayRef {
- let Self {
- data_type,
- group_values,
- nulls,
- } = *self;
+ let Self { mut buffer, nulls } = *self;
let nulls = nulls.build();
if !NULLABLE {
assert!(nulls.is_none(), "unexpected nulls in non nullable input");
}
- let arr = PrimitiveArray::<T>::new(ScalarBuffer::from(group_values),
nulls);
- // Set timezone information for timestamp
- Arc::new(arr.with_data_type(data_type))
+ let arr = BooleanArray::new(buffer.finish(), nulls);
+
+ Arc::new(arr)
}
fn take_n(&mut self, n: usize) -> ArrayRef {
- let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
-
let first_n_nulls = if NULLABLE { self.nulls.take_n(n) } else { None };
- Arc::new(
- PrimitiveArray::<T>::new(ScalarBuffer::from(first_n),
first_n_nulls)
- .with_data_type(self.data_type.clone()),
- )
+ let mut new_builder = BooleanBufferBuilder::new(self.buffer.len());
+ new_builder.append_packed_range(n..self.buffer.len(),
self.buffer.as_slice());
+ std::mem::swap(&mut new_builder, &mut self.buffer);
+
+ // take only first n values from the original builder
+ new_builder.truncate(n);
+
+ Arc::new(BooleanArray::new(new_builder.finish(), first_n_nulls))
}
}
#[cfg(test)]
mod tests {
- use std::sync::Arc;
+ use arrow::array::NullBufferBuilder;
- use
crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder;
- use arrow::array::{ArrayRef, Int64Array, NullBufferBuilder};
- use arrow::datatypes::{DataType, Int64Type};
-
- use super::GroupColumn;
+ use super::*;
#[test]
- fn test_nullable_primitive_equal_to() {
- let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type,
true>,
+ fn test_nullable_boolean_equal_to() {
+ let append = |builder: &mut BooleanGroupValueBuilder<true>,
builder_array: &ArrayRef,
append_rows: &[usize]| {
for &index in append_rows {
@@ -232,7 +208,7 @@ mod tests {
}
};
- let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, true>,
+ let equal_to = |builder: &BooleanGroupValueBuilder<true>,
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
@@ -243,12 +219,12 @@ mod tests {
}
};
- test_nullable_primitive_equal_to_internal(append, equal_to);
+ test_nullable_boolean_equal_to_internal(append, equal_to);
}
#[test]
fn test_nullable_primitive_vectorized_equal_to() {
- let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type,
true>,
+ let append = |builder: &mut BooleanGroupValueBuilder<true>,
builder_array: &ArrayRef,
append_rows: &[usize]| {
builder
@@ -256,7 +232,7 @@ mod tests {
.unwrap();
};
- let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, true>,
+ let equal_to = |builder: &BooleanGroupValueBuilder<true>,
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
@@ -269,14 +245,14 @@ mod tests {
);
};
- test_nullable_primitive_equal_to_internal(append, equal_to);
+ test_nullable_boolean_equal_to_internal(append, equal_to);
}
- fn test_nullable_primitive_equal_to_internal<A, E>(mut append: A, mut
equal_to: E)
+ fn test_nullable_boolean_equal_to_internal<A, E>(mut append: A, mut
equal_to: E)
where
- A: FnMut(&mut PrimitiveGroupValueBuilder<Int64Type, true>, &ArrayRef,
&[usize]),
+ A: FnMut(&mut BooleanGroupValueBuilder<true>, &ArrayRef, &[usize]),
E: FnMut(
- &PrimitiveGroupValueBuilder<Int64Type, true>,
+ &BooleanGroupValueBuilder<true>,
&[usize],
&ArrayRef,
&[usize],
@@ -292,32 +268,37 @@ mod tests {
// - exist not null, input not null; values equal
// Define PrimitiveGroupValueBuilder
- let mut builder =
- PrimitiveGroupValueBuilder::<Int64Type,
true>::new(DataType::Int64);
- let builder_array = Arc::new(Int64Array::from(vec![
+ let mut builder = BooleanGroupValueBuilder::<true>::new();
+ let builder_array = Arc::new(BooleanArray::from(vec![
None,
None,
None,
- Some(1),
- Some(2),
- Some(3),
+ Some(true),
+ Some(false),
+ Some(true),
])) as ArrayRef;
append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5]);
// Define input array
- let (_nulls, values, _) =
- Int64Array::from(vec![Some(1), Some(2), None, None, Some(1),
Some(3)])
- .into_parts();
+ let (values, _nulls) = BooleanArray::from(vec![
+ Some(true),
+ Some(false),
+ None,
+ None,
+ Some(true),
+ Some(true),
+ ])
+ .into_parts();
// explicitly build a null buffer where one of the null values also
happens to match
let mut nulls = NullBufferBuilder::new(6);
nulls.append_non_null();
- nulls.append_null(); // this sets Some(2) to null above
+ nulls.append_null(); // this sets Some(false) to null above
nulls.append_null();
nulls.append_null();
nulls.append_non_null();
nulls.append_non_null();
- let input_array = Arc::new(Int64Array::new(values, nulls.finish())) as
ArrayRef;
+ let input_array = Arc::new(BooleanArray::new(values, nulls.finish()))
as ArrayRef;
// Check
let mut equal_to_results = vec![true; builder.len()];
@@ -339,7 +320,7 @@ mod tests {
#[test]
fn test_not_nullable_primitive_equal_to() {
- let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type,
false>,
+ let append = |builder: &mut BooleanGroupValueBuilder<false>,
builder_array: &ArrayRef,
append_rows: &[usize]| {
for &index in append_rows {
@@ -347,7 +328,7 @@ mod tests {
}
};
- let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, false>,
+ let equal_to = |builder: &BooleanGroupValueBuilder<false>,
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
@@ -358,12 +339,12 @@ mod tests {
}
};
- test_not_nullable_primitive_equal_to_internal(append, equal_to);
+ test_not_nullable_boolean_equal_to_internal(append, equal_to);
}
#[test]
fn test_not_nullable_primitive_vectorized_equal_to() {
- let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type,
false>,
+ let append = |builder: &mut BooleanGroupValueBuilder<false>,
builder_array: &ArrayRef,
append_rows: &[usize]| {
builder
@@ -371,7 +352,7 @@ mod tests {
.unwrap();
};
- let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, false>,
+ let equal_to = |builder: &BooleanGroupValueBuilder<false>,
lhs_rows: &[usize],
input_array: &ArrayRef,
rhs_rows: &[usize],
@@ -384,14 +365,14 @@ mod tests {
);
};
- test_not_nullable_primitive_equal_to_internal(append, equal_to);
+ test_not_nullable_boolean_equal_to_internal(append, equal_to);
}
- fn test_not_nullable_primitive_equal_to_internal<A, E>(mut append: A, mut
equal_to: E)
+ fn test_not_nullable_boolean_equal_to_internal<A, E>(mut append: A, mut
equal_to: E)
where
- A: FnMut(&mut PrimitiveGroupValueBuilder<Int64Type, false>, &ArrayRef,
&[usize]),
+ A: FnMut(&mut BooleanGroupValueBuilder<false>, &ArrayRef, &[usize]),
E: FnMut(
- &PrimitiveGroupValueBuilder<Int64Type, false>,
+ &BooleanGroupValueBuilder<false>,
&[usize],
&ArrayRef,
&[usize],
@@ -403,45 +384,49 @@ mod tests {
// - values not equal
// Define PrimitiveGroupValueBuilder
- let mut builder =
- PrimitiveGroupValueBuilder::<Int64Type,
false>::new(DataType::Int64);
- let builder_array =
- Arc::new(Int64Array::from(vec![Some(0), Some(1)])) as ArrayRef;
- append(&mut builder, &builder_array, &[0, 1]);
+ let mut builder = BooleanGroupValueBuilder::<false>::new();
+ let builder_array = Arc::new(BooleanArray::from(vec![
+ Some(false),
+ Some(true),
+ Some(false),
+ Some(true),
+ ])) as ArrayRef;
+ append(&mut builder, &builder_array, &[0, 1, 2, 3]);
// Define input array
- let input_array = Arc::new(Int64Array::from(vec![Some(0), Some(2)]))
as ArrayRef;
+ let input_array = Arc::new(BooleanArray::from(vec![
+ Some(false),
+ Some(false),
+ Some(true),
+ Some(true),
+ ])) as ArrayRef;
// Check
let mut equal_to_results = vec![true; builder.len()];
equal_to(
&builder,
- &[0, 1],
+ &[0, 1, 2, 3],
&input_array,
- &[0, 1],
+ &[0, 1, 2, 3],
&mut equal_to_results,
);
assert!(equal_to_results[0]);
assert!(!equal_to_results[1]);
+ assert!(!equal_to_results[2]);
+ assert!(equal_to_results[3]);
}
#[test]
- fn test_nullable_primitive_vectorized_operation_special_case() {
+ fn test_nullable_boolean_vectorized_operation_special_case() {
// Test the special `all nulls` or `not nulls` input array case
// for vectorized append and equal to
- let mut builder =
- PrimitiveGroupValueBuilder::<Int64Type,
true>::new(DataType::Int64);
+ let mut builder = BooleanGroupValueBuilder::<true>::new();
// All nulls input array
- let all_nulls_input_array = Arc::new(Int64Array::from(vec![
- Option::<i64>::None,
- None,
- None,
- None,
- None,
- ])) as _;
+ let all_nulls_input_array =
+ Arc::new(BooleanArray::from(vec![None, None, None, None, None]))
as _;
builder
.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4])
.unwrap();
@@ -461,12 +446,12 @@ mod tests {
assert!(equal_to_results[4]);
// All not nulls input array
- let all_not_nulls_input_array = Arc::new(Int64Array::from(vec![
- Some(1),
- Some(2),
- Some(3),
- Some(4),
- Some(5),
+ let all_not_nulls_input_array = Arc::new(BooleanArray::from(vec![
+ Some(false),
+ Some(true),
+ Some(false),
+ Some(true),
+ Some(true),
])) as _;
builder
.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4])
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
index be1f68ea45..87528dc001 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
@@ -633,7 +633,7 @@ mod tests {
// - exist not null, input not null; values not equal
// - exist not null, input not null; values equal
- // Define PrimitiveGroupValueBuilder
+ // Define ByteGroupValueBuilder
let mut builder = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
let builder_array = Arc::new(StringArray::from(vec![
None,
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
index 5d96ac6dcc..ccdcbb13bf 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
@@ -17,6 +17,7 @@
//! `GroupValues` implementations for multi group by cases
+mod boolean;
mod bytes;
pub mod bytes_view;
mod primitive;
@@ -24,8 +25,8 @@ mod primitive;
use std::mem::{self, size_of};
use crate::aggregates::group_values::multi_group_by::{
- bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder,
- primitive::PrimitiveGroupValueBuilder,
+ boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder,
+ bytes_view::ByteViewGroupValueBuilder,
primitive::PrimitiveGroupValueBuilder,
};
use crate::aggregates::group_values::GroupValues;
use ahash::RandomState;
@@ -1047,6 +1048,15 @@ impl<const STREAMING: bool> GroupValues for
GroupValuesColumn<STREAMING> {
let b =
ByteViewGroupValueBuilder::<BinaryViewType>::new();
v.push(Box::new(b) as _)
}
+ &DataType::Boolean => {
+ if nullable {
+ let b = BooleanGroupValueBuilder::<true>::new();
+ v.push(Box::new(b) as _)
+ } else {
+ let b = BooleanGroupValueBuilder::<false>::new();
+ v.push(Box::new(b) as _)
+ }
+ }
dt => {
return not_impl_err!("{dt} not supported in
GroupValuesColumn")
}
@@ -1236,6 +1246,7 @@ fn supported_type(data_type: &DataType) -> bool {
| DataType::Timestamp(_, _)
| DataType::Utf8View
| DataType::BinaryView
+ | DataType::Boolean
)
}
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
index afec25fd3d..f560121cd7 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
@@ -305,7 +305,7 @@ mod tests {
append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5]);
// Define input array
- let (_nulls, values, _) =
+ let (_, values, _nulls) =
Int64Array::from(vec![Some(1), Some(2), None, None, Some(1),
Some(3)])
.into_parts();
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs
new file mode 100644
index 0000000000..44b763a91f
--- /dev/null
+++
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregates::group_values::GroupValues;
+
+use arrow::array::{
+ ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder,
NullBufferBuilder,
+ RecordBatch,
+};
+use datafusion_common::Result;
+use datafusion_expr::EmitTo;
+use std::{mem::size_of, sync::Arc};
+
+#[derive(Debug)]
+pub struct GroupValuesBoolean {
+ false_group: Option<usize>,
+ true_group: Option<usize>,
+ null_group: Option<usize>,
+}
+
+impl GroupValuesBoolean {
+ pub fn new() -> Self {
+ Self {
+ false_group: None,
+ true_group: None,
+ null_group: None,
+ }
+ }
+}
+
+impl GroupValues for GroupValuesBoolean {
+ fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) ->
Result<()> {
+ let array = cols[0].as_boolean();
+ groups.clear();
+
+ for value in array.iter() {
+ let index = match value {
+ Some(false) => {
+ if let Some(index) = self.false_group {
+ index
+ } else {
+ let index = self.len();
+ self.false_group = Some(index);
+ index
+ }
+ }
+ Some(true) => {
+ if let Some(index) = self.true_group {
+ index
+ } else {
+ let index = self.len();
+ self.true_group = Some(index);
+ index
+ }
+ }
+ None => {
+ if let Some(index) = self.null_group {
+ index
+ } else {
+ let index = self.len();
+ self.null_group = Some(index);
+ index
+ }
+ }
+ };
+
+ groups.push(index);
+ }
+
+ Ok(())
+ }
+
+ fn size(&self) -> usize {
+ size_of::<Self>()
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ fn len(&self) -> usize {
+ self.false_group.is_some() as usize
+ + self.true_group.is_some() as usize
+ + self.null_group.is_some() as usize
+ }
+
+ fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ let len = self.len();
+ let mut builder = BooleanBufferBuilder::new(len);
+ let emit_count = match emit_to {
+ EmitTo::All => len,
+ EmitTo::First(n) => n,
+ };
+ builder.append_n(emit_count, false);
+ if let Some(idx) = self.true_group.as_mut() {
+ if *idx < emit_count {
+ builder.set_bit(*idx, true);
+ self.true_group = None;
+ } else {
+ *idx -= emit_count;
+ }
+ }
+
+ if let Some(idx) = self.false_group.as_mut() {
+ if *idx < emit_count {
+ // already false, no need to set
+ self.false_group = None;
+ } else {
+ *idx -= emit_count;
+ }
+ }
+
+ let values = builder.finish();
+
+ let nulls = if let Some(idx) = self.null_group.as_mut() {
+ if *idx < emit_count {
+ let mut buffer = NullBufferBuilder::new(len);
+ buffer.append_n_non_nulls(*idx);
+ buffer.append_null();
+ buffer.append_n_non_nulls(emit_count - *idx - 1);
+
+ self.null_group = None;
+ Some(buffer.finish().unwrap())
+ } else {
+ *idx -= emit_count;
+ None
+ }
+ } else {
+ None
+ };
+
+ Ok(vec![Arc::new(BooleanArray::new(values, nulls)) as _])
+ }
+
+ fn clear_shrink(&mut self, _batch: &RecordBatch) {
+ self.false_group = None;
+ self.true_group = None;
+ self.null_group = None;
+ }
+}
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
index 21078ceb8a..b901aee313 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
@@ -28,14 +28,14 @@ use
datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
///
/// This specialization is significantly faster than using the more general
/// purpose `Row`s format
-pub struct GroupValuesByes<O: OffsetSizeTrait> {
+pub struct GroupValuesBytes<O: OffsetSizeTrait> {
/// Map string/binary values to group index
map: ArrowBytesMap<O, usize>,
/// The total number of groups so far (used to assign group_index)
num_groups: usize,
}
-impl<O: OffsetSizeTrait> GroupValuesByes<O> {
+impl<O: OffsetSizeTrait> GroupValuesBytes<O> {
pub fn new(output_type: OutputType) -> Self {
Self {
map: ArrowBytesMap::new(output_type),
@@ -44,7 +44,7 @@ impl<O: OffsetSizeTrait> GroupValuesByes<O> {
}
}
-impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
+impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) ->
Result<()> {
assert_eq!(cols.len(), 1);
diff --git
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
index 417618ba66..89c6b624e8 100644
---
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
+++
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
@@ -17,6 +17,7 @@
//! `GroupValues` implementations for single group by cases
+pub(crate) mod boolean;
pub(crate) mod bytes;
pub(crate) mod bytes_view;
pub(crate) mod primitive;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]