This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8233e64e71 Implement specialized group values for single 
Uft8/LargeUtf8/Binary/LargeBinary column (#8827)
8233e64e71 is described below

commit 8233e64e711d3b51899bfd4790d5680f6db870a8
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Feb 20 02:10:40 2024 -0500

    Implement specialized group values for single 
Uft8/LargeUtf8/Binary/LargeBinary column (#8827)
    
    * Implement GroupValuesBinary special case for for handling single column 
string grouping
    
    compiles. add test for null
    
    * Avoid overflow checking
    
    * avoid offsest validation
    
    * Update datafusion/physical-plan/src/aggregates/group_values/bytes.rs
---
 .../src/aggregate/count_distinct/bytes.rs          |  90 ++
 .../src/aggregate/count_distinct/mod.rs            |  19 +-
 .../src/aggregate/count_distinct/strings.rs        | 494 -----------
 datafusion/physical-expr/src/binary_map.rs         | 986 +++++++++++++++++++++
 datafusion/physical-expr/src/lib.rs                |   1 +
 .../src/aggregates/group_values/bytes.rs           | 128 +++
 .../src/aggregates/group_values/mod.rs             |  19 +-
 datafusion/sqllogictest/test_files/aggregate.slt   |  61 ++
 datafusion/sqllogictest/test_files/binary.slt      |   9 +
 9 files changed, 1307 insertions(+), 500 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/bytes.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct/bytes.rs
new file mode 100644
index 0000000000..2ed9b002c8
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/count_distinct/bytes.rs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`BytesDistinctCountAccumulator`] for Utf8/LargeUtf8/Binary/LargeBinary 
values
+
+use crate::binary_map::{ArrowBytesSet, OutputType};
+use arrow_array::{ArrayRef, OffsetSizeTrait};
+use datafusion_common::cast::as_list_array;
+use datafusion_common::utils::array_into_list_array;
+use datafusion_common::ScalarValue;
+use datafusion_expr::Accumulator;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Specialized implementation of
+/// `COUNT DISTINCT` for [`StringArray`] [`LargeStringArray`],
+/// [`BinaryArray`] and [`LargeBinaryArray`].
+///
+/// [`StringArray`]: arrow::array::StringArray
+/// [`LargeStringArray`]: arrow::array::LargeStringArray
+/// [`BinaryArray`]: arrow::array::BinaryArray
+/// [`LargeBinaryArray`]: arrow::array::LargeBinaryArray
+#[derive(Debug)]
+pub(super) struct BytesDistinctCountAccumulator<O: 
OffsetSizeTrait>(ArrowBytesSet<O>);
+
+impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
+    pub(super) fn new(output_type: OutputType) -> Self {
+        Self(ArrowBytesSet::new(output_type))
+    }
+}
+
+impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
+    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
+        let set = self.0.take();
+        let arr = set.into_state();
+        let list = Arc::new(array_into_list_array(arr));
+        Ok(vec![ScalarValue::List(list)])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> 
datafusion_common::Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        self.0.insert(&values[0]);
+
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> 
datafusion_common::Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        assert_eq!(
+            states.len(),
+            1,
+            "count_distinct states must be single array"
+        );
+
+        let arr = as_list_array(&states[0])?;
+        arr.iter().try_for_each(|maybe_list| {
+            if let Some(list) = maybe_list {
+                self.0.insert(&list);
+            };
+            Ok(())
+        })
+    }
+
+    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self) + self.0.size()
+    }
+}
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs
index 8baea511c7..52afd82d03 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct/mod.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod bytes;
 mod native;
-mod strings;
 
 use std::any::Any;
 use std::collections::HashSet;
@@ -37,11 +37,12 @@ use arrow_array::types::{
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
 
+use crate::aggregate::count_distinct::bytes::BytesDistinctCountAccumulator;
 use crate::aggregate::count_distinct::native::{
     FloatDistinctCountAccumulator, PrimitiveDistinctCountAccumulator,
 };
-use crate::aggregate::count_distinct::strings::StringDistinctCountAccumulator;
 use crate::aggregate::utils::down_cast_any_ref;
+use crate::binary_map::OutputType;
 use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
 
@@ -144,8 +145,16 @@ impl AggregateExpr for DistinctCount {
             Float32 => 
Box::new(FloatDistinctCountAccumulator::<Float32Type>::new()),
             Float64 => 
Box::new(FloatDistinctCountAccumulator::<Float64Type>::new()),
 
-            Utf8 => Box::new(StringDistinctCountAccumulator::<i32>::new()),
-            LargeUtf8 => 
Box::new(StringDistinctCountAccumulator::<i64>::new()),
+            Utf8 => 
Box::new(BytesDistinctCountAccumulator::<i32>::new(OutputType::Utf8)),
+            LargeUtf8 => {
+                
Box::new(BytesDistinctCountAccumulator::<i64>::new(OutputType::Utf8))
+            }
+            Binary => Box::new(BytesDistinctCountAccumulator::<i32>::new(
+                OutputType::Binary,
+            )),
+            LargeBinary => Box::new(BytesDistinctCountAccumulator::<i64>::new(
+                OutputType::Binary,
+            )),
 
             _ => Box::new(DistinctCountAccumulator {
                 values: HashSet::default(),
@@ -175,7 +184,7 @@ impl PartialEq<dyn Any> for DistinctCount {
 /// General purpose distinct accumulator that works for any DataType by using
 /// [`ScalarValue`]. Some types have specialized accumulators that are (much)
 /// more efficient such as [`PrimitiveDistinctCountAccumulator`] and
-/// [`StringDistinctCountAccumulator`]
+/// [`BytesDistinctCountAccumulator`]
 #[derive(Debug)]
 struct DistinctCountAccumulator {
     values: HashSet<ScalarValue, RandomState>,
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs
deleted file mode 100644
index 02d30c3506..0000000000
--- a/datafusion/physical-expr/src/aggregate/count_distinct/strings.rs
+++ /dev/null
@@ -1,494 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Specialized implementation of `COUNT DISTINCT` for [`StringArray`]
-//! and [`LargeStringArray`]
-//!
-//! [`StringArray`]: arrow::array::StringArray
-//! [`LargeStringArray`]: arrow::array::LargeStringArray
-
-use ahash::RandomState;
-use arrow_array::cast::AsArray;
-use arrow_array::{Array, ArrayRef, GenericStringArray, OffsetSizeTrait};
-use arrow_buffer::{BufferBuilder, OffsetBuffer, ScalarBuffer};
-use datafusion_common::cast::as_list_array;
-use datafusion_common::hash_utils::create_hashes;
-use datafusion_common::utils::array_into_list_array;
-use datafusion_common::ScalarValue;
-use datafusion_execution::memory_pool::proxy::RawTableAllocExt;
-use datafusion_expr::Accumulator;
-use std::fmt::Debug;
-use std::mem;
-use std::ops::Range;
-use std::sync::Arc;
-
-#[derive(Debug)]
-pub(super) struct StringDistinctCountAccumulator<O: 
OffsetSizeTrait>(SSOStringHashSet<O>);
-impl<O: OffsetSizeTrait> StringDistinctCountAccumulator<O> {
-    pub(super) fn new() -> Self {
-        Self(SSOStringHashSet::<O>::new())
-    }
-}
-
-impl<O: OffsetSizeTrait> Accumulator for StringDistinctCountAccumulator<O> {
-    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
-        // take the state out of the string set and replace with default
-        let set = std::mem::take(&mut self.0);
-        let arr = set.into_state();
-        let list = Arc::new(array_into_list_array(arr));
-        Ok(vec![ScalarValue::List(list)])
-    }
-
-    fn update_batch(&mut self, values: &[ArrayRef]) -> 
datafusion_common::Result<()> {
-        if values.is_empty() {
-            return Ok(());
-        }
-
-        self.0.insert(&values[0]);
-
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> 
datafusion_common::Result<()> {
-        if states.is_empty() {
-            return Ok(());
-        }
-        assert_eq!(
-            states.len(),
-            1,
-            "count_distinct states must be single array"
-        );
-
-        let arr = as_list_array(&states[0])?;
-        arr.iter().try_for_each(|maybe_list| {
-            if let Some(list) = maybe_list {
-                self.0.insert(&list);
-            };
-            Ok(())
-        })
-    }
-
-    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
-        Ok(ScalarValue::Int64(Some(self.0.len() as i64)))
-    }
-
-    fn size(&self) -> usize {
-        // Size of accumulator
-        // + SSOStringHashSet size
-        std::mem::size_of_val(self) + self.0.size()
-    }
-}
-
-/// Maximum size of a string that can be inlined in the hash table
-const SHORT_STRING_LEN: usize = mem::size_of::<usize>();
-
-/// Entry that is stored in a `SSOStringHashSet` that represents a string
-/// that is either stored inline or in the buffer
-///
-/// This helps the case where there are many short (less than 8 bytes) strings
-/// that are the same (e.g. "MA", "CA", "NY", "TX", etc)
-///
-/// ```text
-///                                                                
┌──────────────────┐
-///                                                                │...        
       │
-///                                                                
│TheQuickBrownFox  │
-///                                                  ─ ─ ─ ─ ─ ─ ─▶│...        
       │
-///                                                 │              │           
       │
-///                                                                
└──────────────────┘
-///                                                 │               buffer of 
u8
-///
-///                                                 │
-///                        ┌────────────────┬───────────────┬───────────────┐
-///  Storing               │                │ starting byte │  length, in   │
-///  "TheQuickBrownFox"    │   hash value   │   offset in   │  bytes (not   │
-///  (long string)         │                │    buffer     │  characters)  │
-///                        └────────────────┴───────────────┴───────────────┘
-///                              8 bytes          8 bytes       4 or 8
-///
-///
-///                         ┌───────────────┬─┬─┬─┬─┬─┬─┬─┬─┬───────────────┐
-/// Storing "foobar"        │               │ │ │ │ │ │ │ │ │  length, in   │
-/// (short string)          │  hash value   │?│?│f│o│o│b│a│r│  bytes (not   │
-///                         │               │ │ │ │ │ │ │ │ │  characters)  │
-///                         └───────────────┴─┴─┴─┴─┴─┴─┴─┴─┴───────────────┘
-///                              8 bytes         8 bytes        4 or 8
-/// ```
-#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
-struct SSOStringHeader {
-    /// hash of the string value (stored to avoid recomputing it in hash table
-    /// check)
-    hash: u64,
-    /// if len =< SHORT_STRING_LEN: the string data inlined
-    /// if len > SHORT_STRING_LEN, the offset of where the data starts
-    offset_or_inline: usize,
-    /// length of the string, in bytes
-    len: usize,
-}
-
-impl SSOStringHeader {
-    /// returns self.offset..self.offset + self.len
-    fn range(&self) -> Range<usize> {
-        self.offset_or_inline..self.offset_or_inline + self.len
-    }
-}
-
-/// HashSet optimized for storing `String` and `LargeString` values
-/// and producing the final set as a GenericStringArray with minimal copies.
-///
-/// Equivalent to `HashSet<String>` but with better performance for arrow data.
-struct SSOStringHashSet<O> {
-    /// Underlying hash set for each distinct string
-    map: hashbrown::raw::RawTable<SSOStringHeader>,
-    /// Total size of the map in bytes
-    map_size: usize,
-    /// In progress arrow `Buffer` containing all string values
-    buffer: BufferBuilder<u8>,
-    /// Offsets into `buffer` for each distinct string value. These offsets
-    /// as used directly to create the final `GenericStringArray`
-    offsets: Vec<O>,
-    /// random state used to generate hashes
-    random_state: RandomState,
-    /// buffer that stores hash values (reused across batches to save 
allocations)
-    hashes_buffer: Vec<u64>,
-}
-
-impl<O: OffsetSizeTrait> Default for SSOStringHashSet<O> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl<O: OffsetSizeTrait> SSOStringHashSet<O> {
-    fn new() -> Self {
-        Self {
-            map: hashbrown::raw::RawTable::new(),
-            map_size: 0,
-            buffer: BufferBuilder::new(0),
-            offsets: vec![O::default()], // first offset is always 0
-            random_state: RandomState::new(),
-            hashes_buffer: vec![],
-        }
-    }
-
-    fn insert(&mut self, values: &ArrayRef) {
-        // step 1: compute hashes for the strings
-        let batch_hashes = &mut self.hashes_buffer;
-        batch_hashes.clear();
-        batch_hashes.resize(values.len(), 0);
-        create_hashes(&[values.clone()], &self.random_state, batch_hashes)
-            // hash is supported for all string types and create_hashes only
-            // returns errors for unsupported types
-            .unwrap();
-
-        // step 2: insert each string into the set, if not already present
-        let values = values.as_string::<O>();
-
-        // Ensure lengths are equivalent (to guard unsafe values calls below)
-        assert_eq!(values.len(), batch_hashes.len());
-
-        for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
-            // count distinct ignores nulls
-            let Some(value) = value else {
-                continue;
-            };
-
-            // from here on only use bytes (not str/chars) for value
-            let value = value.as_bytes();
-
-            // value is a "small" string
-            if value.len() <= SHORT_STRING_LEN {
-                let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x 
as usize);
-
-                // is value is already present in the set?
-                let entry = self.map.get_mut(hash, |header| {
-                    // compare value if hashes match
-                    if header.len != value.len() {
-                        return false;
-                    }
-                    // value is stored inline so no need to consult buffer
-                    // (this is the "small string optimization")
-                    inline == header.offset_or_inline
-                });
-
-                // if no existing entry, make a new one
-                if entry.is_none() {
-                    // Put the small values into buffer and offsets so it 
appears
-                    // the output array, but store the actual bytes inline for
-                    // comparison
-                    self.buffer.append_slice(value);
-                    
self.offsets.push(O::from_usize(self.buffer.len()).unwrap());
-                    let new_header = SSOStringHeader {
-                        hash,
-                        len: value.len(),
-                        offset_or_inline: inline,
-                    };
-                    self.map.insert_accounted(
-                        new_header,
-                        |header| header.hash,
-                        &mut self.map_size,
-                    );
-                }
-            }
-            // value is not a "small" string
-            else {
-                // Check if the value is already present in the set
-                let entry = self.map.get_mut(hash, |header| {
-                    // compare value if hashes match
-                    if header.len != value.len() {
-                        return false;
-                    }
-                    // Need to compare the bytes in the buffer
-                    // SAFETY: buffer is only appended to, and we correctly 
inserted values and offsets
-                    let existing_value =
-                        unsafe { 
self.buffer.as_slice().get_unchecked(header.range()) };
-                    value == existing_value
-                });
-
-                // if no existing entry, make a new one
-                if entry.is_none() {
-                    // Put the small values into buffer and offsets so it
-                    // appears the output array, and store that offset
-                    // so the bytes can be compared if needed
-                    let offset = self.buffer.len(); // offset of start fof data
-                    self.buffer.append_slice(value);
-                    
self.offsets.push(O::from_usize(self.buffer.len()).unwrap());
-
-                    let new_header = SSOStringHeader {
-                        hash,
-                        len: value.len(),
-                        offset_or_inline: offset,
-                    };
-                    self.map.insert_accounted(
-                        new_header,
-                        |header| header.hash,
-                        &mut self.map_size,
-                    );
-                }
-            }
-        }
-    }
-
-    /// Converts this set into a `StringArray` or `LargeStringArray` with each
-    /// distinct string value without any copies
-    fn into_state(self) -> ArrayRef {
-        let Self {
-            map: _,
-            map_size: _,
-            offsets,
-            mut buffer,
-            random_state: _,
-            hashes_buffer: _,
-        } = self;
-
-        let offsets: ScalarBuffer<O> = offsets.into();
-        let values = buffer.finish();
-        let nulls = None; // count distinct ignores nulls so intermediate 
state never has nulls
-
-        // SAFETY: all the values that went in were valid utf8 so are all the 
values that come out
-        let array = unsafe {
-            GenericStringArray::new_unchecked(OffsetBuffer::new(offsets), 
values, nulls)
-        };
-        Arc::new(array)
-    }
-
-    fn len(&self) -> usize {
-        self.map.len()
-    }
-
-    /// Return the total size, in bytes, of memory used to store the data in
-    /// this set, not including `self`
-    fn size(&self) -> usize {
-        self.map_size
-            + self.buffer.capacity() * std::mem::size_of::<u8>()
-            + self.offsets.capacity() * std::mem::size_of::<O>()
-            + self.hashes_buffer.capacity() * std::mem::size_of::<u64>()
-    }
-}
-
-impl<O: OffsetSizeTrait> Debug for SSOStringHashSet<O> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("SSOStringHashSet")
-            .field("map", &"<map>")
-            .field("map_size", &self.map_size)
-            .field("buffer", &self.buffer)
-            .field("random_state", &self.random_state)
-            .field("hashes_buffer", &self.hashes_buffer)
-            .finish()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use arrow::array::ArrayRef;
-    use arrow_array::StringArray;
-    #[test]
-    fn string_set_empty() {
-        for values in [StringArray::new_null(0), StringArray::new_null(11)] {
-            let mut set = SSOStringHashSet::<i32>::new();
-            let array: ArrayRef = Arc::new(values);
-            set.insert(&array);
-            assert_set(set, &[]);
-        }
-    }
-
-    #[test]
-    fn string_set_basic_i32() {
-        test_string_set_basic::<i32>();
-    }
-    #[test]
-    fn string_set_basic_i64() {
-        test_string_set_basic::<i64>();
-    }
-    fn test_string_set_basic<O: OffsetSizeTrait>() {
-        // basic test for mixed small and large string values
-        let values = GenericStringArray::<O>::from(vec![
-            Some("a"),
-            Some("b"),
-            Some("CXCCCCCCCC"), // 10 bytes
-            Some(""),
-            Some("cbcxx"), // 5 bytes
-            None,
-            Some("AAAAAAAA"),  // 8 bytes
-            Some("BBBBBQBBB"), // 9 bytes
-            Some("a"),
-            Some("cbcxx"),
-            Some("b"),
-            Some("cbcxx"),
-            Some(""),
-            None,
-            Some("BBBBBQBBB"),
-            Some("BBBBBQBBB"),
-            Some("AAAAAAAA"),
-            Some("CXCCCCCCCC"),
-        ]);
-
-        let mut set = SSOStringHashSet::<O>::new();
-        let array: ArrayRef = Arc::new(values);
-        set.insert(&array);
-        assert_set(
-            set,
-            &[
-                Some(""),
-                Some("AAAAAAAA"),
-                Some("BBBBBQBBB"),
-                Some("CXCCCCCCCC"),
-                Some("a"),
-                Some("b"),
-                Some("cbcxx"),
-            ],
-        );
-    }
-
-    #[test]
-    fn string_set_non_utf8_32() {
-        test_string_set_non_utf8::<i32>();
-    }
-    #[test]
-    fn string_set_non_utf8_64() {
-        test_string_set_non_utf8::<i64>();
-    }
-    fn test_string_set_non_utf8<O: OffsetSizeTrait>() {
-        // basic test for mixed small and large string values
-        let values = GenericStringArray::<O>::from(vec![
-            Some("a"),
-            Some("✨🔥"),
-            Some("🔥"),
-            Some("✨✨✨"),
-            Some("foobarbaz"),
-            Some("🔥"),
-            Some("✨🔥"),
-        ]);
-
-        let mut set = SSOStringHashSet::<O>::new();
-        let array: ArrayRef = Arc::new(values);
-        set.insert(&array);
-        assert_set(
-            set,
-            &[
-                Some("a"),
-                Some("foobarbaz"),
-                Some("✨✨✨"),
-                Some("✨🔥"),
-                Some("🔥"),
-            ],
-        );
-    }
-
-    // asserts that the set contains the expected strings
-    fn assert_set<O: OffsetSizeTrait>(
-        set: SSOStringHashSet<O>,
-        expected: &[Option<&str>],
-    ) {
-        let strings = set.into_state();
-        let strings = strings.as_string::<O>();
-        let mut state = strings.into_iter().collect::<Vec<_>>();
-        state.sort();
-        assert_eq!(state, expected);
-    }
-
-    // inserting strings into the set does not increase reported memoyr
-    #[test]
-    fn test_string_set_memory_usage() {
-        let strings1 = GenericStringArray::<i32>::from(vec![
-            Some("a"),
-            Some("b"),
-            Some("CXCCCCCCCC"), // 10 bytes
-            Some("AAAAAAAA"),   // 8 bytes
-            Some("BBBBBQBBB"),  // 9 bytes
-        ]);
-        let total_strings1_len = strings1
-            .iter()
-            .map(|s| s.map(|s| s.len()).unwrap_or(0))
-            .sum::<usize>();
-        let values1: ArrayRef = 
Arc::new(GenericStringArray::<i32>::from(strings1));
-
-        // Much larger strings in strings2
-        let strings2 = GenericStringArray::<i32>::from(vec![
-            "FOO".repeat(1000),
-            "BAR".repeat(2000),
-            "BAZ".repeat(3000),
-        ]);
-        let total_strings2_len = strings2
-            .iter()
-            .map(|s| s.map(|s| s.len()).unwrap_or(0))
-            .sum::<usize>();
-        let values2: ArrayRef = 
Arc::new(GenericStringArray::<i32>::from(strings2));
-
-        let mut set = SSOStringHashSet::<i32>::new();
-        let size_empty = set.size();
-
-        set.insert(&values1);
-        let size_after_values1 = set.size();
-        assert!(size_empty < size_after_values1);
-        assert!(
-            size_after_values1 > total_strings1_len,
-            "expect {size_after_values1} to be more than {total_strings1_len}"
-        );
-        assert!(size_after_values1 < total_strings1_len + total_strings2_len);
-
-        // inserting the same strings should not affect the size
-        set.insert(&values1);
-        assert_eq!(set.size(), size_after_values1);
-
-        // inserting the large strings should increase the reported size
-        set.insert(&values2);
-        let size_after_values2 = set.size();
-        assert!(size_after_values2 > size_after_values1);
-        assert!(size_after_values2 > total_strings1_len + total_strings2_len);
-    }
-}
diff --git a/datafusion/physical-expr/src/binary_map.rs 
b/datafusion/physical-expr/src/binary_map.rs
new file mode 100644
index 0000000000..b661f0a741
--- /dev/null
+++ b/datafusion/physical-expr/src/binary_map.rs
@@ -0,0 +1,986 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values 
from
+//! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray.
+
+use ahash::RandomState;
+use arrow_array::cast::AsArray;
+use arrow_array::types::{ByteArrayType, GenericBinaryType, GenericStringType};
+use arrow_array::{
+    Array, ArrayRef, GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
+};
+use arrow_buffer::{
+    BooleanBufferBuilder, BufferBuilder, NullBuffer, OffsetBuffer, 
ScalarBuffer,
+};
+use arrow_schema::DataType;
+use datafusion_common::hash_utils::create_hashes;
+use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use std::any::type_name;
+use std::fmt::Debug;
+use std::mem;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// Should the output be a String or Binary?
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum OutputType {
+    /// `StringArray` or `LargeStringArray`
+    Utf8,
+    /// `BinaryArray` or `LargeBinaryArray`
+    Binary,
+}
+
+/// HashSet optimized for storing string or binary values that can produce that
+/// the final set as a GenericStringArray with minimal copies.
+#[derive(Debug)]
+pub struct ArrowBytesSet<O: OffsetSizeTrait>(ArrowBytesMap<O, ()>);
+
+impl<O: OffsetSizeTrait> ArrowBytesSet<O> {
+    pub fn new(output_type: OutputType) -> Self {
+        Self(ArrowBytesMap::new(output_type))
+    }
+
+    /// Return the contents of this set and replace it with a new empty
+    /// set with the same output type
+    pub(super) fn take(&mut self) -> Self {
+        Self(self.0.take())
+    }
+
+    /// Inserts each value from `values` into the set
+    pub fn insert(&mut self, values: &ArrayRef) {
+        fn make_payload_fn(_value: Option<&[u8]>) {}
+        fn observe_payload_fn(_payload: ()) {}
+        self.0
+            .insert_if_new(values, make_payload_fn, observe_payload_fn);
+    }
+
+    /// Converts this set into a `StringArray`/`LargeStringArray` or
+    /// `BinaryArray`/`LargeBinaryArray` containing each distinct value that
+    /// was interned. This is done without copying the values.
+    pub fn into_state(self) -> ArrayRef {
+        self.0.into_state()
+    }
+
+    /// Returns the total number of distinct values (including nulls) seen so 
far
+    pub fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
+
+    /// returns the total number of distinct values (not including nulls) seen 
so far
+    pub fn non_null_len(&self) -> usize {
+        self.0.non_null_len()
+    }
+
+    /// Return the total size, in bytes, of memory used to store the data in
+    /// this set, not including `self`
+    pub fn size(&self) -> usize {
+        self.0.size()
+    }
+}
+
+/// Optimized map for storing Arrow "bytes" types (`String`, `LargeString`,
+/// `Binary`, and `LargeBinary`) values that can produce the set of keys on
+/// output as `GenericBinaryArray` without copies.
+///
+/// Equivalent to `HashSet<String, V>` but with better performance for arrow
+/// data.
+///
+/// # Generic Arguments
+///
+/// * `O`: OffsetSize (String/LargeString)
+/// * `V`: payload type
+///
+/// # Description
+///
+/// This is a specialized HashMap with the following properties:
+///
+/// 1. Optimized for storing and emitting Arrow byte types  (e.g.
+/// `StringArray` / `BinaryArray`) very efficiently by minimizing copying of
+/// the string values themselves, both when inserting and when emitting the
+/// final array.
+///
+///
+/// 2. Retains the insertion order of entries in the final array. The values 
are
+/// in the same order as they were inserted.
+///
+/// Note this structure can be used as a `HashSet` by specifying the value type
+/// as `()`, as is done by [`ArrowBytesSet`].
+///
+/// This map is used by the special `COUNT DISTINCT` aggregate function to
+/// store the distinct values, and by the `GROUP BY` operator to store
+/// group values when they are a single string array.
+///
+/// # Example
+///
+/// The following diagram shows how the map would store the four strings
+/// "Foo", NULL, "Bar", "TheQuickBrownFox":
+///
+/// * `hashtable` stores entries for each distinct string that has been
+/// inserted. The entries contain the payload as well as information about the
+/// value (either an offset or the actual bytes, see `Entry` docs for more
+/// details)
+///
+/// * `offsets` stores offsets into `buffer` for each distinct string value,
+/// following the same convention as the offsets in a `StringArray` or
+/// `LargeStringArray`.
+///
+/// * `buffer` stores the actual byte data
+///
+/// * `null`: stores the index and payload of the null value, in this case the
+/// second value (index 1)
+///
+/// ```text
+/// ┌───────────────────────────────────┐    ┌─────┐    ┌────┐
+/// │                ...                │    │  0  │    │FooB│
+/// │ ┌──────────────────────────────┐  │    │  0  │    │arTh│
+/// │ │      <Entry for "Bar">       │  │    │  3  │    │eQui│
+/// │ │            len: 3            │  │    │  3  │    │ckBr│
+/// │ │   offset_or_inline: "Bar"    │  │    │  6  │    │ownF│
+/// │ │         payload:...          │  │    │     │    │ox  │
+/// │ └──────────────────────────────┘  │    │     │    │    │
+/// │                ...                │    └─────┘    └────┘
+/// │ ┌──────────────────────────────┐  │
+/// │ │<Entry for "TheQuickBrownFox">│  │    offsets    buffer
+/// │ │           len: 16            │  │
+/// │ │     offset_or_inline: 6      │  │    ┌───────────────┐
+/// │ │         payload: ...         │  │    │    Some(1)    │
+/// │ └──────────────────────────────┘  │    │ payload: ...  │
+/// │                ...                │    └───────────────┘
+/// └───────────────────────────────────┘
+///                                              null
+///               HashTable
+/// ```
+///
+/// # Entry Format
+///
+/// Entries stored in a [`ArrowBytesMap`] represents a value that is either
+/// stored inline or in the buffer
+///
+/// This helps the case where there are many short (less than 8 bytes) strings
+/// that are the same (e.g. "MA", "CA", "NY", "TX", etc)
+///
+/// ```text
+///                                                                
┌──────────────────┐
+///                                                  ─ ─ ─ ─ ─ ─ ─▶│...        
       │
+///                                                 │              
│TheQuickBrownFox  │
+///                                                                │...        
       │
+///                                                 │              │           
       │
+///                                                                
└──────────────────┘
+///                                                 │               buffer of 
u8
+///
+///                                                 │
+///                        ┌────────────────┬───────────────┬───────────────┐
+///  Storing               │                │ starting byte │  length, in   │
+///  "TheQuickBrownFox"    │   hash value   │   offset in   │  bytes (not   │
+///  (long string)         │                │    buffer     │  characters)  │
+///                        └────────────────┴───────────────┴───────────────┘
+///                              8 bytes          8 bytes       4 or 8
+///
+///
+///                         ┌───────────────┬─┬─┬─┬─┬─┬─┬─┬─┬───────────────┐
+/// Storing "foobar"        │               │ │ │ │ │ │ │ │ │  length, in   │
+/// (short string)          │  hash value   │?│?│f│o│o│b│a│r│  bytes (not   │
+///                         │               │ │ │ │ │ │ │ │ │  characters)  │
+///                         └───────────────┴─┴─┴─┴─┴─┴─┴─┴─┴───────────────┘
+///                              8 bytes         8 bytes        4 or 8
+/// ```
+pub struct ArrowBytesMap<O, V>
+where
+    O: OffsetSizeTrait,
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    /// Should the output be String or Binary?
+    output_type: OutputType,
+    /// Underlying hash set for each distinct value
+    map: hashbrown::raw::RawTable<Entry<O, V>>,
+    /// Total size of the map in bytes
+    map_size: usize,
+    /// In progress arrow `Buffer` containing all values
+    buffer: BufferBuilder<u8>,
+    /// Offsets into `buffer` for each distinct  value. These offsets as used
+    /// directly to create the final `GenericBinaryArray`. The `i`th string is
+    /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
+    /// are stored as a zero length string.
+    offsets: Vec<O>,
+    /// random state used to generate hashes
+    random_state: RandomState,
+    /// buffer that stores hash values (reused across batches to save 
allocations)
+    hashes_buffer: Vec<u64>,
+    /// `(payload, null_index)` for the 'null' value, if any
+    /// NOTE null_index is the logical index in the final array, not the index
+    /// in the buffer
+    null: Option<(V, usize)>,
+}
+
+/// The size, in number of entries, of the initial hash table
+const INITIAL_MAP_CAPACITY: usize = 128;
+/// The initial size, in bytes, of the string data
+const INITIAL_BUFFER_CAPACITY: usize = 8 * 1024;
+impl<O: OffsetSizeTrait, V> ArrowBytesMap<O, V>
+where
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    pub fn new(output_type: OutputType) -> Self {
+        Self {
+            output_type,
+            map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY),
+            map_size: 0,
+            buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY),
+            offsets: vec![O::default()], // first offset is always 0
+            random_state: RandomState::new(),
+            hashes_buffer: vec![],
+            null: None,
+        }
+    }
+
+    /// Return the contents of this map and replace it with a new empty map 
with
+    /// the same output type
+    pub fn take(&mut self) -> Self {
+        let mut new_self = Self::new(self.output_type);
+        std::mem::swap(self, &mut new_self);
+        new_self
+    }
+
+    /// Inserts each value from `values` into the map, invoking `payload_fn` 
for
+    /// each value if *not* already present, deferring the allocation of the
+    /// payload until it is needed.
+    ///
+    /// Note that this is different than a normal map that would replace the
+    /// existing entry
+    ///
+    /// # Arguments:
+    ///
+    /// `values`: array whose values are inserted
+    ///
+    /// `make_payload_fn`:  invoked for each value that is not already present
+    /// to create the payload, in order of the values in `values`
+    ///
+    /// `observe_payload_fn`: invoked once, for each value in `values`, that 
was
+    /// already present in the map, with corresponding payload value.
+    ///
+    /// # Returns
+    ///
+    /// The payload value for the entry, either the existing value or
+    /// the the newly inserted value
+    ///
+    /// # Safety:
+    ///
+    /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked
+    /// with valid values from `values`, not for the `NULL` value.
+    pub fn insert_if_new<MP, OP>(
+        &mut self,
+        values: &ArrayRef,
+        make_payload_fn: MP,
+        observe_payload_fn: OP,
+    ) where
+        MP: FnMut(Option<&[u8]>) -> V,
+        OP: FnMut(V),
+    {
+        // Sanity array type
+        match self.output_type {
+            OutputType::Binary => {
+                assert!(matches!(
+                    values.data_type(),
+                    DataType::Binary | DataType::LargeBinary
+                ));
+                self.insert_if_new_inner::<MP, OP, GenericBinaryType<O>>(
+                    values,
+                    make_payload_fn,
+                    observe_payload_fn,
+                )
+            }
+            OutputType::Utf8 => {
+                assert!(matches!(
+                    values.data_type(),
+                    DataType::Utf8 | DataType::LargeUtf8
+                ));
+                self.insert_if_new_inner::<MP, OP, GenericStringType<O>>(
+                    values,
+                    make_payload_fn,
+                    observe_payload_fn,
+                )
+            }
+        };
+    }
+
+    /// Generic version of [`Self::insert_if_new`] that handles `ByteArrayType`
+    /// (both String and Binary)
+    ///
+    /// Note this is the only function that is generic on [`ByteArrayType`], 
which
+    /// avoids having to template the entire structure,  making the code
+    /// simpler and understand and reducing code bloat due to duplication.
+    ///
+    /// See comments on `insert_if_new` for more details
+    fn insert_if_new_inner<MP, OP, B>(
+        &mut self,
+        values: &ArrayRef,
+        mut make_payload_fn: MP,
+        mut observe_payload_fn: OP,
+    ) where
+        MP: FnMut(Option<&[u8]>) -> V,
+        OP: FnMut(V),
+        B: ByteArrayType,
+    {
+        // step 1: compute hashes
+        let batch_hashes = &mut self.hashes_buffer;
+        batch_hashes.clear();
+        batch_hashes.resize(values.len(), 0);
+        create_hashes(&[values.clone()], &self.random_state, batch_hashes)
+            // hash is supported for all types and create_hashes only
+            // returns errors for unsupported types
+            .unwrap();
+
+        // step 2: insert each value into the set, if not already present
+        let values = values.as_bytes::<B>();
+
+        // Ensure lengths are equivalent
+        assert_eq!(values.len(), batch_hashes.len());
+
+        for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
+            // hande null value
+            let Some(value) = value else {
+                let payload = if let Some(&(payload, _offset)) = 
self.null.as_ref() {
+                    payload
+                } else {
+                    let payload = make_payload_fn(None);
+                    let null_index = self.offsets.len() - 1;
+                    // nulls need a zero length in the offset buffer
+                    let offset = self.buffer.len();
+                    self.offsets.push(O::usize_as(offset));
+                    self.null = Some((payload, null_index));
+                    payload
+                };
+                observe_payload_fn(payload);
+                continue;
+            };
+
+            // get the value as bytes
+            let value: &[u8] = value.as_ref();
+            let value_len = O::usize_as(value.len());
+
+            // value is "small"
+            let payload = if value.len() <= SHORT_VALUE_LEN {
+                let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x 
as usize);
+
+                // is value is already present in the set?
+                let entry = self.map.get_mut(hash, |header| {
+                    // compare value if hashes match
+                    if header.len != value_len {
+                        return false;
+                    }
+                    // value is stored inline so no need to consult buffer
+                    // (this is the "small string optimization")
+                    inline == header.offset_or_inline
+                });
+
+                if let Some(entry) = entry {
+                    entry.payload
+                }
+                // if no existing entry, make a new one
+                else {
+                    // Put the small values into buffer and offsets so it 
appears
+                    // the output array, but store the actual bytes inline for
+                    // comparison
+                    self.buffer.append_slice(value);
+                    self.offsets.push(O::usize_as(self.buffer.len()));
+                    let payload = make_payload_fn(Some(value));
+                    let new_header = Entry {
+                        hash,
+                        len: value_len,
+                        offset_or_inline: inline,
+                        payload,
+                    };
+                    self.map.insert_accounted(
+                        new_header,
+                        |header| header.hash,
+                        &mut self.map_size,
+                    );
+                    payload
+                }
+            }
+            // value is not "small"
+            else {
+                // Check if the value is already present in the set
+                let entry = self.map.get_mut(hash, |header| {
+                    // compare value if hashes match
+                    if header.len != value_len {
+                        return false;
+                    }
+                    // Need to compare the bytes in the buffer
+                    // SAFETY: buffer is only appended to, and we correctly 
inserted values and offsets
+                    let existing_value =
+                        unsafe { 
self.buffer.as_slice().get_unchecked(header.range()) };
+                    value == existing_value
+                });
+
+                if let Some(entry) = entry {
+                    entry.payload
+                }
+                // if no existing entry, make a new one
+                else {
+                    // Put the small values into buffer and offsets so it
+                    // appears the output array, and store that offset
+                    // so the bytes can be compared if needed
+                    let offset = self.buffer.len(); // offset of start fof data
+                    self.buffer.append_slice(value);
+                    self.offsets.push(O::usize_as(self.buffer.len()));
+
+                    let payload = make_payload_fn(Some(value));
+                    let new_header = Entry {
+                        hash,
+                        len: value_len,
+                        offset_or_inline: offset,
+                        payload,
+                    };
+                    self.map.insert_accounted(
+                        new_header,
+                        |header| header.hash,
+                        &mut self.map_size,
+                    );
+                    payload
+                }
+            };
+            observe_payload_fn(payload);
+        }
+        // Check for overflow in offsets (if more data was sent than can be 
represented)
+        if O::from_usize(self.buffer.len()).is_none() {
+            panic!(
+                "Put {} bytes in buffer, more than can be represented by a {}",
+                self.buffer.len(),
+                type_name::<O>()
+            );
+        }
+    }
+
+    /// Converts this set into a `StringArray`, `LargeStringArray`,
+    /// `BinaryArray`, or `LargeBinaryArray` containing each distinct value
+    /// that was inserted. This is done without copying the values.
+    ///
+    /// The values are guaranteed to be returned in the same order in which
+    /// they were first seen.
+    pub fn into_state(self) -> ArrayRef {
+        let Self {
+            output_type,
+            map: _,
+            map_size: _,
+            offsets,
+            mut buffer,
+            random_state: _,
+            hashes_buffer: _,
+            null,
+        } = self;
+
+        // Only make a `NullBuffer` if there was a null value
+        let nulls = null.map(|(_payload, null_index)| {
+            let num_values = offsets.len() - 1;
+            single_null_buffer(num_values, null_index)
+        });
+        // SAFETY: the offsets were constructed correctly in `insert_if_new` --
+        // monotonically increasing, overflows were checked.
+        let offsets = unsafe { 
OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };
+        let values = buffer.finish();
+
+        match output_type {
+            OutputType::Binary => {
+                // SAFETY: the offsets were constructed correctly
+                Arc::new(unsafe {
+                    GenericBinaryArray::new_unchecked(offsets, values, nulls)
+                })
+            }
+            OutputType::Utf8 => {
+                // SAFETY:
+                // 1. the offsets were constructed safely
+                //
+                // 2. we asserted the input arrays were all the correct type 
and
+                // thus since all the values that went in were valid (e.g. 
utf8)
+                // so are all the values that come out
+                Arc::new(unsafe {
+                    GenericStringArray::new_unchecked(offsets, values, nulls)
+                })
+            }
+        }
+    }
+
+    /// Total number of entries (including null, if present)
+    pub fn len(&self) -> usize {
+        self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)
+    }
+
+    /// Is the set empty?
+    pub fn is_empty(&self) -> bool {
+        self.map.is_empty() && self.null.is_none()
+    }
+
+    /// Number of non null entries
+    pub fn non_null_len(&self) -> usize {
+        self.map.len()
+    }
+
+    /// Return the total size, in bytes, of memory used to store the data in
+    /// this set, not including `self`
+    pub fn size(&self) -> usize {
+        self.map_size
+            + self.buffer.capacity() * std::mem::size_of::<u8>()
+            + self.offsets.allocated_size()
+            + self.hashes_buffer.allocated_size()
+    }
+}
+
+/// Returns a `NullBuffer` with a single null value at the given index
+fn single_null_buffer(num_values: usize, null_index: usize) -> NullBuffer {
+    let mut bool_builder = BooleanBufferBuilder::new(num_values);
+    bool_builder.append_n(num_values, true);
+    bool_builder.set_bit(null_index, false);
+    NullBuffer::from(bool_builder.finish())
+}
+
+impl<O: OffsetSizeTrait, V> Debug for ArrowBytesMap<O, V>
+where
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ArrowBytesMap")
+            .field("map", &"<map>")
+            .field("map_size", &self.map_size)
+            .field("buffer", &self.buffer)
+            .field("random_state", &self.random_state)
+            .field("hashes_buffer", &self.hashes_buffer)
+            .finish()
+    }
+}
+
+/// Maximum size of a value that can be inlined in the hash table
+const SHORT_VALUE_LEN: usize = mem::size_of::<usize>();
+
+/// Entry in the hash table -- see [`ArrowBytesMap`] for more details
+#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
+struct Entry<O, V>
+where
+    O: OffsetSizeTrait,
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    /// hash of the value (stored to avoid recomputing it in hash table check)
+    hash: u64,
+    /// if len =< [`SHORT_VALUE_LEN`]: the data inlined
+    /// if len > [`SHORT_VALUE_LEN`], the offset of where the data starts
+    offset_or_inline: usize,
+    /// length of the value, in bytes (use O here so we use only i32 for
+    /// strings, rather 64 bit usize)
+    len: O,
+    /// value stored by the entry
+    payload: V,
+}
+
+impl<O, V> Entry<O, V>
+where
+    O: OffsetSizeTrait,
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    /// returns self.offset..self.offset + self.len
+    #[inline(always)]
+    fn range(&self) -> Range<usize> {
+        self.offset_or_inline..self.offset_or_inline + self.len.as_usize()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::ArrayRef;
+    use arrow_array::{BinaryArray, LargeBinaryArray, StringArray};
+    use hashbrown::HashMap;
+
+    #[test]
+    fn string_set_empty() {
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Utf8);
+        let array: ArrayRef = Arc::new(StringArray::new_null(0));
+        set.insert(&array);
+        assert_eq!(set.len(), 0);
+        assert_eq!(set.non_null_len(), 0);
+        assert_set(set, &[]);
+    }
+
+    #[test]
+    fn string_set_one_null() {
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Utf8);
+        let array: ArrayRef = Arc::new(StringArray::new_null(1));
+        set.insert(&array);
+        assert_eq!(set.len(), 1);
+        assert_eq!(set.non_null_len(), 0);
+        assert_set(set, &[None]);
+    }
+
+    #[test]
+    fn string_set_many_null() {
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Utf8);
+        let array: ArrayRef = Arc::new(StringArray::new_null(11));
+        set.insert(&array);
+        assert_eq!(set.len(), 1);
+        assert_eq!(set.non_null_len(), 0);
+        assert_set(set, &[None]);
+    }
+
+    #[test]
+    fn string_set_basic_i32() {
+        test_string_set_basic::<i32>();
+    }
+
+    #[test]
+    fn string_set_basic_i64() {
+        test_string_set_basic::<i64>();
+    }
+
+    fn test_string_set_basic<O: OffsetSizeTrait>() {
+        // basic test for mixed small and large string values
+        let values = GenericStringArray::<O>::from(vec![
+            Some("a"),
+            Some("b"),
+            Some("CXCCCCCCCC"), // 10 bytes
+            Some(""),
+            Some("cbcxx"), // 5 bytes
+            None,
+            Some("AAAAAAAA"),  // 8 bytes
+            Some("BBBBBQBBB"), // 9 bytes
+            Some("a"),
+            Some("cbcxx"),
+            Some("b"),
+            Some("cbcxx"),
+            Some(""),
+            None,
+            Some("BBBBBQBBB"),
+            Some("BBBBBQBBB"),
+            Some("AAAAAAAA"),
+            Some("CXCCCCCCCC"),
+        ]);
+
+        let mut set = ArrowBytesSet::<O>::new(OutputType::Utf8);
+        let array: ArrayRef = Arc::new(values);
+        set.insert(&array);
+        // values mut appear be in the order they were inserted
+        assert_set(
+            set,
+            &[
+                Some("a"),
+                Some("b"),
+                Some("CXCCCCCCCC"),
+                Some(""),
+                Some("cbcxx"),
+                None,
+                Some("AAAAAAAA"),
+                Some("BBBBBQBBB"),
+            ],
+        );
+    }
+
+    #[test]
+    fn string_set_non_utf8_32() {
+        test_string_set_non_utf8::<i32>();
+    }
+
+    #[test]
+    fn string_set_non_utf8_64() {
+        test_string_set_non_utf8::<i64>();
+    }
+
+    fn test_string_set_non_utf8<O: OffsetSizeTrait>() {
+        // basic test for mixed small and large string values
+        let values = GenericStringArray::<O>::from(vec![
+            Some("a"),
+            Some("✨🔥"),
+            Some("🔥"),
+            Some("✨✨✨"),
+            Some("foobarbaz"),
+            Some("🔥"),
+            Some("✨🔥"),
+        ]);
+
+        let mut set = ArrowBytesSet::<O>::new(OutputType::Utf8);
+        let array: ArrayRef = Arc::new(values);
+        set.insert(&array);
+        // strings mut appear be in the order they were inserted
+        assert_set(
+            set,
+            &[
+                Some("a"),
+                Some("✨🔥"),
+                Some("🔥"),
+                Some("✨✨✨"),
+                Some("foobarbaz"),
+            ],
+        );
+    }
+
+    // asserts that the set contains the expected strings, in the same order
+    fn assert_set<O: OffsetSizeTrait>(set: ArrowBytesSet<O>, expected: 
&[Option<&str>]) {
+        let strings = set.into_state();
+        let strings = strings.as_string::<O>();
+        let state = strings.into_iter().collect::<Vec<_>>();
+        assert_eq!(state, expected);
+    }
+
+    // Test use of binary output type
+    #[test]
+    fn test_binary_set() {
+        let values: ArrayRef = Arc::new(BinaryArray::from_opt_vec(vec![
+            Some(b"a"),
+            Some(b"CXCCCCCCCC"),
+            None,
+            Some(b"CXCCCCCCCC"),
+        ]));
+
+        let expected: ArrayRef = Arc::new(BinaryArray::from_opt_vec(vec![
+            Some(b"a"),
+            Some(b"CXCCCCCCCC"),
+            None,
+        ]));
+
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Binary);
+        set.insert(&values);
+        assert_eq!(&set.into_state(), &expected);
+    }
+
+    // Test use of binary output type
+    #[test]
+    fn test_large_binary_set() {
+        let values: ArrayRef = Arc::new(LargeBinaryArray::from_opt_vec(vec![
+            Some(b"a"),
+            Some(b"CXCCCCCCCC"),
+            None,
+            Some(b"CXCCCCCCCC"),
+        ]));
+
+        let expected: ArrayRef = Arc::new(LargeBinaryArray::from_opt_vec(vec![
+            Some(b"a"),
+            Some(b"CXCCCCCCCC"),
+            None,
+        ]));
+
+        let mut set = ArrowBytesSet::<i64>::new(OutputType::Binary);
+        set.insert(&values);
+        assert_eq!(&set.into_state(), &expected);
+    }
+
+    #[test]
+    #[should_panic(
+        expected = "matches!(values.data_type(), DataType::Utf8 | 
DataType::LargeUtf8)"
+    )]
+    fn test_mismatched_types() {
+        // inserting binary into a set that expects strings should panic
+        let values: ArrayRef = 
Arc::new(LargeBinaryArray::from_opt_vec(vec![Some(b"a")]));
+
+        let mut set = ArrowBytesSet::<i64>::new(OutputType::Utf8);
+        set.insert(&values);
+    }
+
+    #[test]
+    #[should_panic]
+    fn test_mismatched_sizes() {
+        // inserting large strings into a set that expects small should panic
+        let values: ArrayRef = 
Arc::new(LargeBinaryArray::from_opt_vec(vec![Some(b"a")]));
+
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Binary);
+        set.insert(&values);
+    }
+
+    // put more than 2GB in a string set and expect it to panic
+    #[test]
+    #[should_panic(
+        expected = "Put 2147483648 bytes in buffer, more than can be 
represented by a i32"
+    )]
+    fn test_string_overflow() {
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Utf8);
+        for value in ["a", "b", "c"] {
+            // 1GB strings, so 3rd is over 2GB and should panic
+            let arr: ArrayRef =
+                Arc::new(StringArray::from_iter_values([value.repeat(1 << 
30)]));
+            set.insert(&arr);
+        }
+    }
+
+    // inserting strings into the set does not increase reported memory
+    #[test]
+    fn test_string_set_memory_usage() {
+        let strings1 = GenericStringArray::<i32>::from(vec![
+            Some("a"),
+            Some("b"),
+            Some("CXCCCCCCCC"), // 10 bytes
+            Some("AAAAAAAA"),   // 8 bytes
+            Some("BBBBBQBBB"),  // 9 bytes
+        ]);
+        let total_strings1_len = strings1
+            .iter()
+            .map(|s| s.map(|s| s.len()).unwrap_or(0))
+            .sum::<usize>();
+        let values1: ArrayRef = 
Arc::new(GenericStringArray::<i32>::from(strings1));
+
+        // Much larger strings in strings2
+        let strings2 = GenericStringArray::<i32>::from(vec![
+            "FOO".repeat(1000),
+            "BAR".repeat(2000),
+            "BAZ".repeat(3000),
+        ]);
+        let total_strings2_len = strings2
+            .iter()
+            .map(|s| s.map(|s| s.len()).unwrap_or(0))
+            .sum::<usize>();
+        let values2: ArrayRef = 
Arc::new(GenericStringArray::<i32>::from(strings2));
+
+        let mut set = ArrowBytesSet::<i32>::new(OutputType::Utf8);
+        let size_empty = set.size();
+
+        set.insert(&values1);
+        let size_after_values1 = set.size();
+        assert!(size_empty < size_after_values1);
+        assert!(
+            size_after_values1 > total_strings1_len,
+            "expect {size_after_values1} to be more than {total_strings1_len}"
+        );
+        assert!(size_after_values1 < total_strings1_len + total_strings2_len);
+
+        // inserting the same strings should not affect the size
+        set.insert(&values1);
+        assert_eq!(set.size(), size_after_values1);
+
+        // inserting the large strings should increase the reported size
+        set.insert(&values2);
+        let size_after_values2 = set.size();
+        assert!(size_after_values2 > size_after_values1);
+        assert!(size_after_values2 > total_strings1_len + total_strings2_len);
+    }
+
+    #[test]
+    fn test_map() {
+        let input = vec![
+            // Note mix of short/long strings
+            Some("A"),
+            Some("bcdefghijklmnop"),
+            Some("X"),
+            Some("Y"),
+            None,
+            Some("qrstuvqxyzhjwya"),
+            Some("✨🔥"),
+            Some("🔥"),
+            Some("🔥🔥🔥🔥🔥🔥"),
+        ];
+
+        let mut test_map = TestMap::new();
+        test_map.insert(&input);
+        test_map.insert(&input); // put it in twice
+        let expected_output: ArrayRef = Arc::new(StringArray::from(input));
+        assert_eq!(&test_map.into_array(), &expected_output);
+    }
+
+    #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)]
+    struct TestPayload {
+        // store the string value to check against input
+        index: usize, // store the index of the string (each new string gets 
the next sequential input)
+    }
+
+    /// Wraps an [`ArrowBytesMap`], validating its invariants
+    struct TestMap {
+        map: ArrowBytesMap<i32, TestPayload>,
+        // stores distinct strings seen, in order
+        strings: Vec<Option<String>>,
+        // map strings to index in strings
+        indexes: HashMap<Option<String>, usize>,
+    }
+
+    impl Debug for TestMap {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            f.debug_struct("TestMap")
+                .field("map", &"...")
+                .field("strings", &self.strings)
+                .field("indexes", &self.indexes)
+                .finish()
+        }
+    }
+
+    impl TestMap {
+        /// creates a map with TestPayloads for the given strings and then
+        /// validates the payloads
+        fn new() -> Self {
+            Self {
+                map: ArrowBytesMap::new(OutputType::Utf8),
+                strings: vec![],
+                indexes: HashMap::new(),
+            }
+        }
+
+        /// Inserts strings into the map
+        fn insert(&mut self, strings: &[Option<&str>]) {
+            let string_array = StringArray::from(strings.to_vec());
+            let arr: ArrayRef = Arc::new(string_array);
+
+            let mut next_index = self.indexes.len();
+            let mut actual_new_strings = vec![];
+            let mut actual_seen_indexes = vec![];
+            // update self with new values, keeping track of newly added values
+            for str in strings {
+                let str = str.map(|s| s.to_string());
+                let index = self.indexes.get(&str).cloned().unwrap_or_else(|| {
+                    actual_new_strings.push(str.clone());
+                    let index = self.strings.len();
+                    self.strings.push(str.clone());
+                    self.indexes.insert(str, index);
+                    index
+                });
+                actual_seen_indexes.push(index);
+            }
+
+            // insert the values into the map, recording what we did
+            let mut seen_new_strings = vec![];
+            let mut seen_indexes = vec![];
+            self.map.insert_if_new(
+                &arr,
+                |s| {
+                    let value = s
+                        .map(|s| String::from_utf8(s.to_vec()).expect("Non 
utf8 string"));
+                    let index = next_index;
+                    next_index += 1;
+                    seen_new_strings.push(value);
+                    TestPayload { index }
+                },
+                |payload| {
+                    seen_indexes.push(payload.index);
+                },
+            );
+
+            assert_eq!(actual_seen_indexes, seen_indexes);
+            assert_eq!(actual_new_strings, seen_new_strings);
+        }
+
+        /// Call `self.map.into_array()` validating that the strings are in 
the same
+        /// order as they were inserted
+        fn into_array(self) -> ArrayRef {
+            let Self {
+                map,
+                strings,
+                indexes: _,
+            } = self;
+
+            let arr = map.into_state();
+            let expected: ArrayRef = Arc::new(StringArray::from(strings));
+            assert_eq!(&arr, &expected);
+            arr
+        }
+    }
+}
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 95c1f3591d..41d36d8bcb 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -18,6 +18,7 @@
 pub mod aggregate;
 pub mod analysis;
 pub mod array_expressions;
+pub mod binary_map;
 pub mod conditional_expressions;
 #[cfg(feature = "crypto_expressions")]
 pub mod crypto_expressions;
diff --git a/datafusion/physical-plan/src/aggregates/group_values/bytes.rs 
b/datafusion/physical-plan/src/aggregates/group_values/bytes.rs
new file mode 100644
index 0000000000..4a4c5e4b05
--- /dev/null
+++ b/datafusion/physical-plan/src/aggregates/group_values/bytes.rs
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregates::group_values::GroupValues;
+use arrow_array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
+use datafusion_expr::EmitTo;
+use datafusion_physical_expr::binary_map::{ArrowBytesMap, OutputType};
+
+/// A [`GroupValues`] storing single column of 
Utf8/LargeUtf8/Binary/LargeBinary values
+///
+/// This specialization is significantly faster than using the more general
+/// purpose `Row`s format
+pub struct GroupValuesByes<O: OffsetSizeTrait> {
+    /// Map string/binary values to group index
+    map: ArrowBytesMap<O, usize>,
+    /// The total number of groups so far (used to assign group_index)
+    num_groups: usize,
+}
+
+impl<O: OffsetSizeTrait> GroupValuesByes<O> {
+    pub fn new(output_type: OutputType) -> Self {
+        Self {
+            map: ArrowBytesMap::new(output_type),
+            num_groups: 0,
+        }
+    }
+}
+
+impl<O: OffsetSizeTrait> GroupValues for GroupValuesByes<O> {
+    fn intern(
+        &mut self,
+        cols: &[ArrayRef],
+        groups: &mut Vec<usize>,
+    ) -> datafusion_common::Result<()> {
+        assert_eq!(cols.len(), 1);
+
+        // look up / add entries in the table
+        let arr = &cols[0];
+
+        groups.clear();
+        self.map.insert_if_new(
+            arr,
+            // called for each new group
+            |_value| {
+                // assign new group index on each insert
+                let group_idx = self.num_groups;
+                self.num_groups += 1;
+                group_idx
+            },
+            // called for each group
+            |group_idx| {
+                groups.push(group_idx);
+            },
+        );
+
+        // ensure we assigned a group to for each row
+        assert_eq!(groups.len(), arr.len());
+        Ok(())
+    }
+
+    fn size(&self) -> usize {
+        self.map.size() + std::mem::size_of::<Self>()
+    }
+
+    fn is_empty(&self) -> bool {
+        self.num_groups == 0
+    }
+
+    fn len(&self) -> usize {
+        self.num_groups
+    }
+
+    fn emit(&mut self, emit_to: EmitTo) -> 
datafusion_common::Result<Vec<ArrayRef>> {
+        // Reset the map to default, and convert it into a single array
+        let map_contents = self.map.take().into_state();
+
+        let group_values = match emit_to {
+            EmitTo::All => {
+                self.num_groups -= map_contents.len();
+                map_contents
+            }
+            EmitTo::First(n) if n == self.len() => {
+                self.num_groups -= map_contents.len();
+                map_contents
+            }
+            EmitTo::First(n) => {
+                // if we only wanted to take the first n, insert the rest back
+                // into the map we could potentially avoid this reallocation, 
at
+                // the expense of much more complex code.
+                // see https://github.com/apache/arrow-datafusion/issues/9195
+                let emit_group_values = map_contents.slice(0, n);
+                let remaining_group_values =
+                    map_contents.slice(n, map_contents.len() - n);
+
+                self.num_groups = 0;
+                let mut group_indexes = vec![];
+                self.intern(&[remaining_group_values], &mut group_indexes)?;
+
+                // Verify that the group indexes were assigned in the correct 
order
+                assert_eq!(0, group_indexes[0]);
+
+                emit_group_values
+            }
+        };
+
+        Ok(vec![group_values])
+    }
+
+    fn clear_shrink(&mut self, _batch: &RecordBatch) {
+        // in theory we could potentially avoid this reallocation and clear the
+        // contents of the maps, but for now we just reset the map from the 
beginning
+        self.map.take();
+    }
+}
diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs 
b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
index ef9aac3d3e..b5bc923b46 100644
--- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs
@@ -17,7 +17,7 @@
 
 use arrow::record_batch::RecordBatch;
 use arrow_array::{downcast_primitive, ArrayRef};
-use arrow_schema::SchemaRef;
+use arrow_schema::{DataType, SchemaRef};
 use datafusion_common::Result;
 
 pub(crate) mod primitive;
@@ -27,6 +27,10 @@ use primitive::GroupValuesPrimitive;
 mod row;
 use row::GroupValuesRows;
 
+mod bytes;
+use bytes::GroupValuesByes;
+use datafusion_physical_expr::binary_map::OutputType;
+
 /// An interning store for group keys
 pub trait GroupValues: Send {
     /// Calculates the `groups` for each input row of `cols`
@@ -62,6 +66,19 @@ pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn 
GroupValues>> {
             d => (downcast_helper, d),
             _ => {}
         }
+
+        if let DataType::Utf8 = d {
+            return Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Utf8)));
+        }
+        if let DataType::LargeUtf8 = d {
+            return Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Utf8)));
+        }
+        if let DataType::Binary = d {
+            return 
Ok(Box::new(GroupValuesByes::<i32>::new(OutputType::Binary)));
+        }
+        if let DataType::LargeBinary = d {
+            return 
Ok(Box::new(GroupValuesByes::<i64>::new(OutputType::Binary)));
+        }
     }
 
     Ok(Box::new(GroupValuesRows::try_new(schema)?))
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index f50134e635..fdd70a80ac 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -3121,10 +3121,71 @@ select count(distinct column1), count(distinct 
column2), count(distinct column3)
 1 1 2 2
 1 1 3 3
 
+statement ok
+drop table distinct_count_long_string_table;
+
+
+# test with binary strings as well
+statement ok
+create table distinct_count_binary_table as
+SELECT column1,
+  arrow_cast(column2, 'Binary') as column2,
+  arrow_cast(column3, 'Binary') as column3,
+  arrow_cast(column4, 'Binary') as column4
+FROM distinct_count_string_table;
+
+# run through update_batch
+query IIII
+select count(distinct column1), count(distinct column2), count(distinct 
column3), count(distinct column4) from distinct_count_binary_table;
+----
+3 3 6 6
+
+# run through merge_batch
+query IIII rowsort
+select count(distinct column1), count(distinct column2), count(distinct 
column3), count(distinct column4) from distinct_count_binary_table group by 
column1;
+----
+1 1 1 1
+1 1 2 2
+1 1 3 3
+
+statement ok
+drop table distinct_count_binary_table;
+
+
+# test with large binary strings as well
+statement ok
+create table distinct_count_large_binary_table as
+SELECT column1,
+  arrow_cast(column2, 'LargeBinary') as column2,
+  arrow_cast(column3, 'LargeBinary') as column3,
+  arrow_cast(column4, 'LargeBinary') as column4
+FROM distinct_count_string_table;
+
+# run through update_batch
+query IIII
+select count(distinct column1), count(distinct column2), count(distinct 
column3), count(distinct column4) from distinct_count_large_binary_table;
+----
+3 3 6 6
+
+# run through merge_batch
+query IIII rowsort
+select count(distinct column1), count(distinct column2), count(distinct 
column3), count(distinct column4) from distinct_count_large_binary_table group 
by column1;
+----
+1 1 1 1
+1 1 2 2
+1 1 3 3
+
+statement ok
+drop table distinct_count_large_binary_table;
+
+
+
+## Cleanup from distinct count tests
 statement ok
 drop table distinct_count_string_table;
 
 
+
 # rule `aggregate_statistics` should not optimize MIN/MAX to wrong values on 
empty relation
 
 statement ok
diff --git a/datafusion/sqllogictest/test_files/binary.slt 
b/datafusion/sqllogictest/test_files/binary.slt
index 0568ada3ad..621cd3e528 100644
--- a/datafusion/sqllogictest/test_files/binary.slt
+++ b/datafusion/sqllogictest/test_files/binary.slt
@@ -216,6 +216,15 @@ SELECT largebinary FROM t ORDER BY largebinary;
 466f6f426172
 NULL
 
+# group by
+query I? rowsort
+SELECT COUNT(*), largebinary FROM t GROUP BY largebinary;
+----
+1 426172
+1 466f6f
+1 466f6f426172
+1 NULL
+
 # LIKE
 query ?
 SELECT binary FROM t where binary LIKE '%F%';

Reply via email to