This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 04af0c61c8 perf: Implement boolean group values  (#17726)
04af0c61c8 is described below

commit 04af0c61c8e761b724b49da4c87f18cf27e2b4f8
Author: Eshed Schacham <[email protected]>
AuthorDate: Mon Oct 6 13:21:23 2025 +0200

    perf: Implement boolean group values  (#17726)
    
    * chore: fix typos in group_values.
    
    * perf: add support for boolean columns in single and multi group by values.
    
    * remove extra finish
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../src/aggregates/group_values/mod.rs             |  17 +-
 .../multi_group_by/{primitive.rs => boolean.rs}    | 219 ++++++++++-----------
 .../group_values/multi_group_by/bytes.rs           |   2 +-
 .../aggregates/group_values/multi_group_by/mod.rs  |  15 +-
 .../group_values/multi_group_by/primitive.rs       |   2 +-
 .../group_values/single_group_by/boolean.rs        | 154 +++++++++++++++
 .../group_values/single_group_by/bytes.rs          |   6 +-
 .../aggregates/group_values/single_group_by/mod.rs |   1 +
 8 files changed, 285 insertions(+), 131 deletions(-)

diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs 
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index f2f489b722..316fbe11ae 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -40,8 +40,8 @@ pub(crate) use single_group_by::primitive::HashValue;
 
 use crate::aggregates::{
     group_values::single_group_by::{
-        bytes::GroupValuesByes, bytes_view::GroupValuesBytesView,
-        primitive::GroupValuesPrimitive,
+        boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
+        bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
     },
     order::GroupOrdering,
 };
@@ -119,7 +119,7 @@ pub trait GroupValues: Send {
 ///   - If group by single column, and type of this column has
 ///     the specific [`GroupValues`] implementation, such implementation
 ///     will be chosen.
-///   
+///
 ///   - If group by multiple columns, and all column types have the specific
 ///     `GroupColumn` implementations, `GroupValuesColumn` will be chosen.
 ///
@@ -174,23 +174,26 @@ pub fn new_group_values(
                 downcast_helper!(Decimal128Type, d);
             }
             DataType::Utf8 => {
-                return 
Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
+                return 
Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Utf8)));
             }
             DataType::LargeUtf8 => {
-                return 
Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Utf8)));
+                return 
Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Utf8)));
             }
             DataType::Utf8View => {
                 return 
Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
             }
             DataType::Binary => {
-                return 
Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Binary)));
+                return 
Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Binary)));
             }
             DataType::LargeBinary => {
-                return 
Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Binary)));
+                return 
Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Binary)));
             }
             DataType::BinaryView => {
                 return 
Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
             }
+            DataType::Boolean => {
+                return Ok(Box::new(GroupValuesBoolean::new()));
+            }
             _ => {}
         }
     }
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs
similarity index 64%
copy from 
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
copy to 
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs
index afec25fd3d..91a9e21aeb 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/boolean.rs
@@ -15,76 +15,61 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use crate::aggregates::group_values::multi_group_by::{nulls_equal_to, 
GroupColumn};
 use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
-use arrow::array::ArrowNativeTypeOp;
-use arrow::array::{cast::AsArray, Array, ArrayRef, ArrowPrimitiveType, 
PrimitiveArray};
-use arrow::buffer::ScalarBuffer;
-use arrow::datatypes::DataType;
+use arrow::array::{Array as _, ArrayRef, AsArray, BooleanArray, 
BooleanBufferBuilder};
 use datafusion_common::Result;
-use datafusion_execution::memory_pool::proxy::VecAllocExt;
 use itertools::izip;
-use std::iter;
-use std::sync::Arc;
 
-/// An implementation of [`GroupColumn`] for primitive values
+/// An implementation of [`GroupColumn`] for booleans
 ///
 /// Optimized to skip null buffer construction if the input is known to be non 
nullable
 ///
 /// # Template parameters
 ///
-/// `T`: the native Rust type that stores the data
 /// `NULLABLE`: if the data can contain any nulls
 #[derive(Debug)]
-pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType, const NULLABLE: 
bool> {
-    data_type: DataType,
-    group_values: Vec<T::Native>,
+pub struct BooleanGroupValueBuilder<const NULLABLE: bool> {
+    buffer: BooleanBufferBuilder,
     nulls: MaybeNullBufferBuilder,
 }
 
-impl<T, const NULLABLE: bool> PrimitiveGroupValueBuilder<T, NULLABLE>
-where
-    T: ArrowPrimitiveType,
-{
-    /// Create a new `PrimitiveGroupValueBuilder`
-    pub fn new(data_type: DataType) -> Self {
+impl<const NULLABLE: bool> BooleanGroupValueBuilder<NULLABLE> {
+    /// Create a new `BooleanGroupValueBuilder`
+    pub fn new() -> Self {
         Self {
-            data_type,
-            group_values: vec![],
+            buffer: BooleanBufferBuilder::new(0),
             nulls: MaybeNullBufferBuilder::new(),
         }
     }
 }
 
-impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
-    for PrimitiveGroupValueBuilder<T, NULLABLE>
-{
+impl<const NULLABLE: bool> GroupColumn for BooleanGroupValueBuilder<NULLABLE> {
     fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> 
bool {
-        // Perf: skip null check (by short circuit) if input is not nullable
         if NULLABLE {
             let exist_null = self.nulls.is_null(lhs_row);
             let input_null = array.is_null(rhs_row);
             if let Some(result) = nulls_equal_to(exist_null, input_null) {
                 return result;
             }
-            // Otherwise, we need to check their values
         }
 
-        self.group_values[lhs_row] == array.as_primitive::<T>().value(rhs_row)
+        self.buffer.get_bit(lhs_row) == array.as_boolean().value(rhs_row)
     }
 
     fn append_val(&mut self, array: &ArrayRef, row: usize) -> Result<()> {
-        // Perf: skip null check if input can't have nulls
         if NULLABLE {
             if array.is_null(row) {
                 self.nulls.append(true);
-                self.group_values.push(T::default_value());
+                self.buffer.append(bool::default());
             } else {
                 self.nulls.append(false);
-                self.group_values.push(array.as_primitive::<T>().value(row));
+                self.buffer.append(array.as_boolean().value(row));
             }
         } else {
-            self.group_values.push(array.as_primitive::<T>().value(row));
+            self.buffer.append(array.as_boolean().value(row));
         }
 
         Ok(())
@@ -97,7 +82,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
         rhs_rows: &[usize],
         equal_to_results: &mut [bool],
     ) {
-        let array = array.as_primitive::<T>();
+        let array = array.as_boolean();
 
         let iter = izip!(
             lhs_rows.iter(),
@@ -111,7 +96,6 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
                 continue;
             }
 
-            // Perf: skip null check (by short circuit) if input is not 
nullable
             if NULLABLE {
                 let exist_null = self.nulls.is_null(lhs_row);
                 let input_null = array.is_null(rhs_row);
@@ -119,15 +103,14 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> 
GroupColumn
                     *equal_to_result = result;
                     continue;
                 }
-                // Otherwise, we need to check their values
             }
 
-            *equal_to_result = 
self.group_values[lhs_row].is_eq(array.value(rhs_row));
+            *equal_to_result = self.buffer.get_bit(lhs_row) == 
array.value(rhs_row);
         }
     }
 
     fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> 
Result<()> {
-        let arr = array.as_primitive::<T>();
+        let arr = array.as_boolean();
 
         let null_count = array.null_count();
         let num_rows = array.len();
@@ -144,10 +127,10 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> 
GroupColumn
                 for &row in rows {
                     if array.is_null(row) {
                         self.nulls.append(true);
-                        self.group_values.push(T::default_value());
+                        self.buffer.append(bool::default());
                     } else {
                         self.nulls.append(false);
-                        self.group_values.push(arr.value(row));
+                        self.buffer.append(arr.value(row));
                     }
                 }
             }
@@ -155,19 +138,18 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> 
GroupColumn
             (true, Some(true)) => {
                 self.nulls.append_n(rows.len(), false);
                 for &row in rows {
-                    self.group_values.push(arr.value(row));
+                    self.buffer.append(arr.value(row));
                 }
             }
 
             (true, Some(false)) => {
                 self.nulls.append_n(rows.len(), true);
-                self.group_values
-                    .extend(iter::repeat_n(T::default_value(), rows.len()));
+                self.buffer.append_n(rows.len(), bool::default());
             }
 
             (false, _) => {
                 for &row in rows {
-                    self.group_values.push(arr.value(row));
+                    self.buffer.append(arr.value(row));
                 }
             }
         }
@@ -176,55 +158,49 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> 
GroupColumn
     }
 
     fn len(&self) -> usize {
-        self.group_values.len()
+        self.buffer.len()
     }
 
     fn size(&self) -> usize {
-        self.group_values.allocated_size() + self.nulls.allocated_size()
+        self.buffer.capacity() / 8 + self.nulls.allocated_size()
     }
 
     fn build(self: Box<Self>) -> ArrayRef {
-        let Self {
-            data_type,
-            group_values,
-            nulls,
-        } = *self;
+        let Self { mut buffer, nulls } = *self;
 
         let nulls = nulls.build();
         if !NULLABLE {
             assert!(nulls.is_none(), "unexpected nulls in non nullable input");
         }
 
-        let arr = PrimitiveArray::<T>::new(ScalarBuffer::from(group_values), 
nulls);
-        // Set timezone information for timestamp
-        Arc::new(arr.with_data_type(data_type))
+        let arr = BooleanArray::new(buffer.finish(), nulls);
+
+        Arc::new(arr)
     }
 
     fn take_n(&mut self, n: usize) -> ArrayRef {
-        let first_n = self.group_values.drain(0..n).collect::<Vec<_>>();
-
         let first_n_nulls = if NULLABLE { self.nulls.take_n(n) } else { None };
 
-        Arc::new(
-            PrimitiveArray::<T>::new(ScalarBuffer::from(first_n), 
first_n_nulls)
-                .with_data_type(self.data_type.clone()),
-        )
+        let mut new_builder = BooleanBufferBuilder::new(self.buffer.len());
+        new_builder.append_packed_range(n..self.buffer.len(), 
self.buffer.as_slice());
+        std::mem::swap(&mut new_builder, &mut self.buffer);
+
+        // take only first n values from the original builder
+        new_builder.truncate(n);
+
+        Arc::new(BooleanArray::new(new_builder.finish(), first_n_nulls))
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
+    use arrow::array::NullBufferBuilder;
 
-    use 
crate::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder;
-    use arrow::array::{ArrayRef, Int64Array, NullBufferBuilder};
-    use arrow::datatypes::{DataType, Int64Type};
-
-    use super::GroupColumn;
+    use super::*;
 
     #[test]
-    fn test_nullable_primitive_equal_to() {
-        let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type, 
true>,
+    fn test_nullable_boolean_equal_to() {
+        let append = |builder: &mut BooleanGroupValueBuilder<true>,
                       builder_array: &ArrayRef,
                       append_rows: &[usize]| {
             for &index in append_rows {
@@ -232,7 +208,7 @@ mod tests {
             }
         };
 
-        let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, true>,
+        let equal_to = |builder: &BooleanGroupValueBuilder<true>,
                         lhs_rows: &[usize],
                         input_array: &ArrayRef,
                         rhs_rows: &[usize],
@@ -243,12 +219,12 @@ mod tests {
             }
         };
 
-        test_nullable_primitive_equal_to_internal(append, equal_to);
+        test_nullable_boolean_equal_to_internal(append, equal_to);
     }
 
     #[test]
     fn test_nullable_primitive_vectorized_equal_to() {
-        let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type, 
true>,
+        let append = |builder: &mut BooleanGroupValueBuilder<true>,
                       builder_array: &ArrayRef,
                       append_rows: &[usize]| {
             builder
@@ -256,7 +232,7 @@ mod tests {
                 .unwrap();
         };
 
-        let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, true>,
+        let equal_to = |builder: &BooleanGroupValueBuilder<true>,
                         lhs_rows: &[usize],
                         input_array: &ArrayRef,
                         rhs_rows: &[usize],
@@ -269,14 +245,14 @@ mod tests {
             );
         };
 
-        test_nullable_primitive_equal_to_internal(append, equal_to);
+        test_nullable_boolean_equal_to_internal(append, equal_to);
     }
 
-    fn test_nullable_primitive_equal_to_internal<A, E>(mut append: A, mut 
equal_to: E)
+    fn test_nullable_boolean_equal_to_internal<A, E>(mut append: A, mut 
equal_to: E)
     where
-        A: FnMut(&mut PrimitiveGroupValueBuilder<Int64Type, true>, &ArrayRef, 
&[usize]),
+        A: FnMut(&mut BooleanGroupValueBuilder<true>, &ArrayRef, &[usize]),
         E: FnMut(
-            &PrimitiveGroupValueBuilder<Int64Type, true>,
+            &BooleanGroupValueBuilder<true>,
             &[usize],
             &ArrayRef,
             &[usize],
@@ -292,32 +268,37 @@ mod tests {
         //   - exist not null, input not null; values equal
 
         // Define PrimitiveGroupValueBuilder
-        let mut builder =
-            PrimitiveGroupValueBuilder::<Int64Type, 
true>::new(DataType::Int64);
-        let builder_array = Arc::new(Int64Array::from(vec![
+        let mut builder = BooleanGroupValueBuilder::<true>::new();
+        let builder_array = Arc::new(BooleanArray::from(vec![
             None,
             None,
             None,
-            Some(1),
-            Some(2),
-            Some(3),
+            Some(true),
+            Some(false),
+            Some(true),
         ])) as ArrayRef;
         append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5]);
 
         // Define input array
-        let (_nulls, values, _) =
-            Int64Array::from(vec![Some(1), Some(2), None, None, Some(1), 
Some(3)])
-                .into_parts();
+        let (values, _nulls) = BooleanArray::from(vec![
+            Some(true),
+            Some(false),
+            None,
+            None,
+            Some(true),
+            Some(true),
+        ])
+        .into_parts();
 
         // explicitly build a null buffer where one of the null values also 
happens to match
         let mut nulls = NullBufferBuilder::new(6);
         nulls.append_non_null();
-        nulls.append_null(); // this sets Some(2) to null above
+        nulls.append_null(); // this sets Some(false) to null above
         nulls.append_null();
         nulls.append_null();
         nulls.append_non_null();
         nulls.append_non_null();
-        let input_array = Arc::new(Int64Array::new(values, nulls.finish())) as 
ArrayRef;
+        let input_array = Arc::new(BooleanArray::new(values, nulls.finish())) 
as ArrayRef;
 
         // Check
         let mut equal_to_results = vec![true; builder.len()];
@@ -339,7 +320,7 @@ mod tests {
 
     #[test]
     fn test_not_nullable_primitive_equal_to() {
-        let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type, 
false>,
+        let append = |builder: &mut BooleanGroupValueBuilder<false>,
                       builder_array: &ArrayRef,
                       append_rows: &[usize]| {
             for &index in append_rows {
@@ -347,7 +328,7 @@ mod tests {
             }
         };
 
-        let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, false>,
+        let equal_to = |builder: &BooleanGroupValueBuilder<false>,
                         lhs_rows: &[usize],
                         input_array: &ArrayRef,
                         rhs_rows: &[usize],
@@ -358,12 +339,12 @@ mod tests {
             }
         };
 
-        test_not_nullable_primitive_equal_to_internal(append, equal_to);
+        test_not_nullable_boolean_equal_to_internal(append, equal_to);
     }
 
     #[test]
     fn test_not_nullable_primitive_vectorized_equal_to() {
-        let append = |builder: &mut PrimitiveGroupValueBuilder<Int64Type, 
false>,
+        let append = |builder: &mut BooleanGroupValueBuilder<false>,
                       builder_array: &ArrayRef,
                       append_rows: &[usize]| {
             builder
@@ -371,7 +352,7 @@ mod tests {
                 .unwrap();
         };
 
-        let equal_to = |builder: &PrimitiveGroupValueBuilder<Int64Type, false>,
+        let equal_to = |builder: &BooleanGroupValueBuilder<false>,
                         lhs_rows: &[usize],
                         input_array: &ArrayRef,
                         rhs_rows: &[usize],
@@ -384,14 +365,14 @@ mod tests {
             );
         };
 
-        test_not_nullable_primitive_equal_to_internal(append, equal_to);
+        test_not_nullable_boolean_equal_to_internal(append, equal_to);
     }
 
-    fn test_not_nullable_primitive_equal_to_internal<A, E>(mut append: A, mut 
equal_to: E)
+    fn test_not_nullable_boolean_equal_to_internal<A, E>(mut append: A, mut 
equal_to: E)
     where
-        A: FnMut(&mut PrimitiveGroupValueBuilder<Int64Type, false>, &ArrayRef, 
&[usize]),
+        A: FnMut(&mut BooleanGroupValueBuilder<false>, &ArrayRef, &[usize]),
         E: FnMut(
-            &PrimitiveGroupValueBuilder<Int64Type, false>,
+            &BooleanGroupValueBuilder<false>,
             &[usize],
             &ArrayRef,
             &[usize],
@@ -403,45 +384,49 @@ mod tests {
         //   - values not equal
 
         // Define PrimitiveGroupValueBuilder
-        let mut builder =
-            PrimitiveGroupValueBuilder::<Int64Type, 
false>::new(DataType::Int64);
-        let builder_array =
-            Arc::new(Int64Array::from(vec![Some(0), Some(1)])) as ArrayRef;
-        append(&mut builder, &builder_array, &[0, 1]);
+        let mut builder = BooleanGroupValueBuilder::<false>::new();
+        let builder_array = Arc::new(BooleanArray::from(vec![
+            Some(false),
+            Some(true),
+            Some(false),
+            Some(true),
+        ])) as ArrayRef;
+        append(&mut builder, &builder_array, &[0, 1, 2, 3]);
 
         // Define input array
-        let input_array = Arc::new(Int64Array::from(vec![Some(0), Some(2)])) 
as ArrayRef;
+        let input_array = Arc::new(BooleanArray::from(vec![
+            Some(false),
+            Some(false),
+            Some(true),
+            Some(true),
+        ])) as ArrayRef;
 
         // Check
         let mut equal_to_results = vec![true; builder.len()];
         equal_to(
             &builder,
-            &[0, 1],
+            &[0, 1, 2, 3],
             &input_array,
-            &[0, 1],
+            &[0, 1, 2, 3],
             &mut equal_to_results,
         );
 
         assert!(equal_to_results[0]);
         assert!(!equal_to_results[1]);
+        assert!(!equal_to_results[2]);
+        assert!(equal_to_results[3]);
     }
 
     #[test]
-    fn test_nullable_primitive_vectorized_operation_special_case() {
+    fn test_nullable_boolean_vectorized_operation_special_case() {
         // Test the special `all nulls` or `not nulls` input array case
         // for vectorized append and equal to
 
-        let mut builder =
-            PrimitiveGroupValueBuilder::<Int64Type, 
true>::new(DataType::Int64);
+        let mut builder = BooleanGroupValueBuilder::<true>::new();
 
         // All nulls input array
-        let all_nulls_input_array = Arc::new(Int64Array::from(vec![
-            Option::<i64>::None,
-            None,
-            None,
-            None,
-            None,
-        ])) as _;
+        let all_nulls_input_array =
+            Arc::new(BooleanArray::from(vec![None, None, None, None, None])) 
as _;
         builder
             .vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4])
             .unwrap();
@@ -461,12 +446,12 @@ mod tests {
         assert!(equal_to_results[4]);
 
         // All not nulls input array
-        let all_not_nulls_input_array = Arc::new(Int64Array::from(vec![
-            Some(1),
-            Some(2),
-            Some(3),
-            Some(4),
-            Some(5),
+        let all_not_nulls_input_array = Arc::new(BooleanArray::from(vec![
+            Some(false),
+            Some(true),
+            Some(false),
+            Some(true),
+            Some(true),
         ])) as _;
         builder
             .vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4])
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
index be1f68ea45..87528dc001 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes.rs
@@ -633,7 +633,7 @@ mod tests {
         //   - exist not null, input not null; values not equal
         //   - exist not null, input not null; values equal
 
-        // Define PrimitiveGroupValueBuilder
+        // Define ByteGroupValueBuilder
         let mut builder = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
         let builder_array = Arc::new(StringArray::from(vec![
             None,
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
index 5d96ac6dcc..ccdcbb13bf 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs
@@ -17,6 +17,7 @@
 
 //! `GroupValues` implementations for multi group by cases
 
+mod boolean;
 mod bytes;
 pub mod bytes_view;
 mod primitive;
@@ -24,8 +25,8 @@ mod primitive;
 use std::mem::{self, size_of};
 
 use crate::aggregates::group_values::multi_group_by::{
-    bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder,
-    primitive::PrimitiveGroupValueBuilder,
+    boolean::BooleanGroupValueBuilder, bytes::ByteGroupValueBuilder,
+    bytes_view::ByteViewGroupValueBuilder, 
primitive::PrimitiveGroupValueBuilder,
 };
 use crate::aggregates::group_values::GroupValues;
 use ahash::RandomState;
@@ -1047,6 +1048,15 @@ impl<const STREAMING: bool> GroupValues for 
GroupValuesColumn<STREAMING> {
                         let b = 
ByteViewGroupValueBuilder::<BinaryViewType>::new();
                         v.push(Box::new(b) as _)
                     }
+                    &DataType::Boolean => {
+                        if nullable {
+                            let b = BooleanGroupValueBuilder::<true>::new();
+                            v.push(Box::new(b) as _)
+                        } else {
+                            let b = BooleanGroupValueBuilder::<false>::new();
+                            v.push(Box::new(b) as _)
+                        }
+                    }
                     dt => {
                         return not_impl_err!("{dt} not supported in 
GroupValuesColumn")
                     }
@@ -1236,6 +1246,7 @@ fn supported_type(data_type: &DataType) -> bool {
             | DataType::Timestamp(_, _)
             | DataType::Utf8View
             | DataType::BinaryView
+            | DataType::Boolean
     )
 }
 
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
index afec25fd3d..f560121cd7 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/primitive.rs
@@ -305,7 +305,7 @@ mod tests {
         append(&mut builder, &builder_array, &[0, 1, 2, 3, 4, 5]);
 
         // Define input array
-        let (_nulls, values, _) =
+        let (_, values, _nulls) =
             Int64Array::from(vec![Some(1), Some(2), None, None, Some(1), 
Some(3)])
                 .into_parts();
 
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs
 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs
new file mode 100644
index 0000000000..44b763a91f
--- /dev/null
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregates::group_values::GroupValues;
+
+use arrow::array::{
+    ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder, 
NullBufferBuilder,
+    RecordBatch,
+};
+use datafusion_common::Result;
+use datafusion_expr::EmitTo;
+use std::{mem::size_of, sync::Arc};
+
+#[derive(Debug)]
+pub struct GroupValuesBoolean {
+    false_group: Option<usize>,
+    true_group: Option<usize>,
+    null_group: Option<usize>,
+}
+
+impl GroupValuesBoolean {
+    pub fn new() -> Self {
+        Self {
+            false_group: None,
+            true_group: None,
+            null_group: None,
+        }
+    }
+}
+
+impl GroupValues for GroupValuesBoolean {
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()> {
+        let array = cols[0].as_boolean();
+        groups.clear();
+
+        for value in array.iter() {
+            let index = match value {
+                Some(false) => {
+                    if let Some(index) = self.false_group {
+                        index
+                    } else {
+                        let index = self.len();
+                        self.false_group = Some(index);
+                        index
+                    }
+                }
+                Some(true) => {
+                    if let Some(index) = self.true_group {
+                        index
+                    } else {
+                        let index = self.len();
+                        self.true_group = Some(index);
+                        index
+                    }
+                }
+                None => {
+                    if let Some(index) = self.null_group {
+                        index
+                    } else {
+                        let index = self.len();
+                        self.null_group = Some(index);
+                        index
+                    }
+                }
+            };
+
+            groups.push(index);
+        }
+
+        Ok(())
+    }
+
+    fn size(&self) -> usize {
+        size_of::<Self>()
+    }
+
+    fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+
+    fn len(&self) -> usize {
+        self.false_group.is_some() as usize
+            + self.true_group.is_some() as usize
+            + self.null_group.is_some() as usize
+    }
+
+    fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        let len = self.len();
+        let mut builder = BooleanBufferBuilder::new(len);
+        let emit_count = match emit_to {
+            EmitTo::All => len,
+            EmitTo::First(n) => n,
+        };
+        builder.append_n(emit_count, false);
+        if let Some(idx) = self.true_group.as_mut() {
+            if *idx < emit_count {
+                builder.set_bit(*idx, true);
+                self.true_group = None;
+            } else {
+                *idx -= emit_count;
+            }
+        }
+
+        if let Some(idx) = self.false_group.as_mut() {
+            if *idx < emit_count {
+                // already false, no need to set
+                self.false_group = None;
+            } else {
+                *idx -= emit_count;
+            }
+        }
+
+        let values = builder.finish();
+
+        let nulls = if let Some(idx) = self.null_group.as_mut() {
+            if *idx < emit_count {
+                let mut buffer = NullBufferBuilder::new(len);
+                buffer.append_n_non_nulls(*idx);
+                buffer.append_null();
+                buffer.append_n_non_nulls(emit_count - *idx - 1);
+
+                self.null_group = None;
+                Some(buffer.finish().unwrap())
+            } else {
+                *idx -= emit_count;
+                None
+            }
+        } else {
+            None
+        };
+
+        Ok(vec![Arc::new(BooleanArray::new(values, nulls)) as _])
+    }
+
+    fn clear_shrink(&mut self, _batch: &RecordBatch) {
+        self.false_group = None;
+        self.true_group = None;
+        self.null_group = None;
+    }
+}
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
index 21078ceb8a..b901aee313 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs
@@ -28,14 +28,14 @@ use 
datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
 ///
 /// This specialization is significantly faster than using the more general
 /// purpose `Row`s format
-pub struct GroupValuesByes<O: OffsetSizeTrait> {
+pub struct GroupValuesBytes<O: OffsetSizeTrait> {
     /// Map string/binary values to group index
     map: ArrowBytesMap<O, usize>,
     /// The total number of groups so far (used to assign group_index)
     num_groups: usize,
 }
 
-impl<O: OffsetSizeTrait> GroupValuesByes<O> {
+impl<O: OffsetSizeTrait> GroupValuesBytes<O> {
     pub fn new(output_type: OutputType) -> Self {
         Self {
             map: ArrowBytesMap::new(output_type),
@@ -44,7 +44,7 @@ impl<O: OffsetSizeTrait> GroupValuesByes<O> {
     }
 }
 
-impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
+impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
     fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()> {
         assert_eq!(cols.len(), 1);
 
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
index 417618ba66..89c6b624e8 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/mod.rs
@@ -17,6 +17,7 @@
 
 //! `GroupValues` implementations for single group by cases
 
+pub(crate) mod boolean;
 pub(crate) mod bytes;
 pub(crate) mod bytes_view;
 pub(crate) mod primitive;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to