alamb commented on code in PR #9188:
URL: https://github.com/apache/arrow-datafusion/pull/9188#discussion_r1485585211


##########
datafusion/physical-expr/src/string_map.rs:
##########
@@ -0,0 +1,1028 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ArrowStringMap`] and [`ArrowStringSet`] for storing maps/sets of values 
from
+//! StringArray / LargeStringArray
+
+use ahash::RandomState;
+use arrow_array::cast::AsArray;
+use arrow_array::{Array, ArrayRef, GenericStringArray, OffsetSizeTrait};
+use arrow_buffer::{
+    BooleanBufferBuilder, BufferBuilder, NullBuffer, OffsetBuffer, 
ScalarBuffer,
+};
+use datafusion_common::hash_utils::create_hashes;
+use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
+use std::fmt::Debug;
+use std::mem;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// HashSet optimized for storing `String` and `LargeString` values
+/// and producing the final set as a GenericStringArray with minimal copies.
+#[derive(Debug, Default)]
+pub struct ArrowStringSet<O: OffsetSizeTrait>(ArrowStringMap<O, ()>);
+
+impl<O: OffsetSizeTrait> ArrowStringSet<O> {
+    pub fn new() -> Self {
+        Self(ArrowStringMap::new())
+    }
+
+    /// Inserts each string from `values` into the set
+    pub fn insert(&mut self, values: &ArrayRef) {
+        fn make_payload_fn(_value: Option<&[u8]>) {}
+        fn observe_payload_fn(_payload: ()) {}
+        self.0
+            .insert_if_new(values, make_payload_fn, observe_payload_fn);
+    }
+
+    /// Converts this set into a `StringArray` or `LargeStringArray` 
containing each
+    /// distinct string value. This is done without copying the values.
+    pub fn into_state(self) -> ArrayRef {
+        self.0.into_state()
+    }
+
+    /// Returns the total number of distinct strings (including nulls) seen so 
far
+    pub fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
+
+    /// returns the total number of distinct strings (not including nulls) 
seen so far
+    pub fn non_null_len(&self) -> usize {
+        self.0.non_null_len()
+    }
+
+    /// Return the total size, in bytes, of memory used to store the data in
+    /// this set, not including `self`
+    pub fn size(&self) -> usize {
+        self.0.size()
+    }
+}
+
+/// Map optimized for storing `String` and `LargeString` values that can 
produce
+/// that set of keys on output as a GenericStringArray.
+///
+/// # Generic Arguments
+///
+/// O: OffsetSize (String/LargeString)
+/// V: payload type
+///
+/// # Description
+///
+/// This is a specialized HashMap that is
+///
+/// 1. optimized for storing and emitting Arrow StringArrays as efficiently by
+/// minimizing copying of the string values themselves both when inserting and
+/// when emitting the final array.
+///
+/// 2. The entries in the final array are in the same order as they were
+/// inserted.
+///
+/// Note it can be used as a HashSet by specifying the value type as `()`.
+///
+/// This is used by the special `COUNT DISTINCT` string aggregate function to
+/// store the distinct values and by the `GROUP BY` operator to store the
+/// distinct values for each group when they are single strings/// Equivalent 
to
+/// `HashSet<String>` but with better performance for arrow data.
+///
+/// # Example
+///
+/// The following diagram shows how the map would store the four strings
+/// "Foo", NULL, "Bar", "TheQuickBrownFox":
+///
+/// * `hashtable` stores entries for each distinct string that has been
+/// inserted. The entries contain the payload as well as information about the
+/// string value (either an offset or the actual bytes, see [`Entry`] for more
+/// details)
+///
+/// * `offsets` stores offsets into `buffer` for each distinct string value,
+/// following the same convention as the offsets in a `StringArray` or
+/// `LargeStringArray`.
+///
+/// * `buffer` stores the actual byte data
+///
+/// * `null`: stores the index and payload of the null value, in this case the
+/// second value (index 1)
+///
+/// ```text
+/// ┌───────────────────────────────────┐    ┌─────┐    ┌────┐
+/// │                ...                │    │  0  │    │FooB│
+/// │ ┌──────────────────────────────┐  │    │  0  │    │arTh│
+/// │ │      <Entry for "Bar">       │  │    │  3  │    │eQui│
+/// │ │            len: 3            │  │    │  3  │    │ckBr│
+/// │ │   offset_or_inline: "Bar"    │  │    │  6  │    │ownF│
+/// │ │         payload:...          │  │    │     │    │ox  │
+/// │ └──────────────────────────────┘  │    │     │    │    │
+/// │                ...                │    └─────┘    └────┘
+/// │ ┌──────────────────────────────┐  │
+/// │ │<Entry for "TheQuickBrownFox">│  │    offsets    buffer
+/// │ │           len: 16            │  │
+/// │ │     offset_or_inline: 6      │  │    ┌───────────────┐
+/// │ │         payload: ...         │  │    │    Some(1)    │
+/// │ └──────────────────────────────┘  │    │ payload: ...  │
+/// │                ...                │    └───────────────┘
+/// └───────────────────────────────────┘
+///                                              null
+///               HashTable
+/// ```
+pub struct ArrowStringMap<O, V>
+where
+    O: OffsetSizeTrait,
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    /// Underlying hash set for each distinct string
+    map: hashbrown::raw::RawTable<Entry<O, V>>,
+    /// Total size of the map in bytes
+    map_size: usize,
+    /// In progress arrow `Buffer` containing all string values
+    buffer: BufferBuilder<u8>,
+    /// Offsets into `buffer` for each distinct string value. These offsets as
+    /// used directly to create the final `GenericStringArray`. The `i`th 
string
+    /// is stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null
+    /// values are stored as a zero length string.
+    offsets: Vec<O>,
+    /// random state used to generate hashes
+    random_state: RandomState,
+    /// buffer that stores hash values (reused across batches to save 
allocations)
+    hashes_buffer: Vec<u64>,
+    /// `(payload, null_index)` for the 'null' value, if any
+    /// NOTE null_index is the logical index in the final array, not the index
+    /// in the buffer
+    null: Option<(V, usize)>,
+}
+
+impl<O: OffsetSizeTrait, V> Default for ArrowStringMap<O, V>
+where
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// The size, in number of entries, of the initial hash table
+const INITIAL_MAP_CAPACITY: usize = 128;
+/// The initial size, in bytes, of the string data
+const INITIAL_BUFFER_CAPACITY: usize = 8 * 1024;
+impl<O: OffsetSizeTrait, V> ArrowStringMap<O, V>
+where
+    V: Debug + PartialEq + Eq + Clone + Copy + Default,
+{
+    pub fn new() -> Self {
+        Self {
+            map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY),
+            map_size: 0,
+            buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY),
+            offsets: vec![O::default()], // first offset is always 0
+            random_state: RandomState::new(),
+            hashes_buffer: vec![],
+            null: None,
+        }
+    }
+
+    /// Inserts each string from `values` into the map, invoking `payload_fn`
+    /// for each value if *not* already present, deferring the allocation of 
the
+    /// payload until it is needed.
+    ///
+    /// Note that this is different than a normal map that would replace the
+    /// existing entry
+    ///
+    /// # Arguments:
+    ///
+    /// `values`: array whose values are inserted
+    ///
+    /// `make_payload_fn`:  invoked for each value that is not already present 
to
+    /// create the payload
+    ///
+    /// `observe_payload_fn`: invoked once, in order in `values`, for each
+    /// element in the map, with corresponding payload value
+    ///
+    /// returns the payload value for the entry, either the existing value or
+    /// the the newly inserted value
+    ///
+    /// # Safety:
+    ///
+    /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked
+    /// with valid values from `values`.
+    pub fn insert_if_new<MP, OP>(
+        &mut self,
+        values: &ArrayRef,
+        mut make_payload_fn: MP,
+        mut observe_payload_fn: OP,
+    ) where
+        MP: FnMut(Option<&[u8]>) -> V,
+        OP: FnMut(V),
+    {
+        // step 1: compute hashes for the strings
+        let batch_hashes = &mut self.hashes_buffer;
+        batch_hashes.clear();
+        batch_hashes.resize(values.len(), 0);
+        create_hashes(&[values.clone()], &self.random_state, batch_hashes)
+            // hash is supported for all string types and create_hashes only
+            // returns errors for unsupported types
+            .unwrap();
+
+        // step 2: insert each string into the set, if not already present
+        let values = values.as_string::<O>();
+
+        // Ensure lengths are equivalent
+        assert_eq!(values.len(), batch_hashes.len());
+
+        for (value, &hash) in values.iter().zip(batch_hashes.iter()) {
+            // hande null value
+            let Some(value) = value else {
+                let payload = if let Some(&(payload, _offset)) = 
self.null.as_ref() {
+                    payload
+                } else {
+                    let payload = make_payload_fn(None);
+                    let null_index = self.offsets.len() - 1;
+                    // nulls need a zero length in the offset buffer
+                    let offset = self.buffer.len();
+                    self.offsets.push(O::from_usize(offset).unwrap());
+                    self.null = Some((payload, null_index));
+                    payload
+                };
+                observe_payload_fn(payload);
+                continue;
+            };
+
+            // from here on only use bytes (not str/chars) for value
+            let value = value.as_bytes();
+            let value_len = O::from_usize(value.len()).unwrap();
+
+            // value is a "small" string
+            let payload = if value.len() <= SHORT_STRING_LEN {
+                let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x 
as usize);
+
+                // is value is already present in the set?
+                let entry = self.map.get_mut(hash, |header| {
+                    // compare value if hashes match
+                    if header.len != value_len {
+                        return false;
+                    }
+                    // value is stored inline so no need to consult buffer
+                    // (this is the "small string optimization")
+                    inline == header.offset_or_inline
+                });
+
+                if let Some(entry) = entry {
+                    entry.payload
+                }
+                // if no existing entry, make a new one
+                else {
+                    // Put the small values into buffer and offsets so it 
appears
+                    // the output array, but store the actual bytes inline for
+                    // comparison
+                    self.buffer.append_slice(value);
+                    
self.offsets.push(O::from_usize(self.buffer.len()).unwrap());
+                    let payload = make_payload_fn(Some(value));
+                    let new_header = Entry {
+                        hash,
+                        len: value_len,
+                        offset_or_inline: inline,
+                        payload,
+                    };
+                    self.map.insert_accounted(
+                        new_header,
+                        |header| header.hash,
+                        &mut self.map_size,
+                    );
+                    payload
+                }
+            }
+            // value is not a "small" string
+            else {
+                // Check if the value is already present in the set
+                let entry = self.map.get_mut(hash, |header| {
+                    // compare value if hashes match
+                    if header.len != value_len {
+                        return false;
+                    }
+                    // Need to compare the bytes in the buffer
+                    // SAFETY: buffer is only appended to, and we correctly 
inserted values and offsets
+                    let existing_value =
+                        unsafe { 
self.buffer.as_slice().get_unchecked(header.range()) };
+                    value == existing_value
+                });
+
+                if let Some(entry) = entry {
+                    entry.payload
+                }
+                // if no existing entry, make a new one
+                else {
+                    // Put the small values into buffer and offsets so it
+                    // appears the output array, and store that offset
+                    // so the bytes can be compared if needed
+                    let offset = self.buffer.len(); // offset of start fof data
+                    self.buffer.append_slice(value);
+                    
self.offsets.push(O::from_usize(self.buffer.len()).unwrap());
+
+                    let payload = make_payload_fn(Some(value));
+                    let new_header = Entry {
+                        hash,
+                        len: value_len,
+                        offset_or_inline: offset,
+                        payload,
+                    };
+                    self.map.insert_accounted(
+                        new_header,
+                        |header| header.hash,
+                        &mut self.map_size,
+                    );
+                    payload
+                }
+            };
+            observe_payload_fn(payload);
+        }
+    }
+
+    /// Removes the first n distinct values inserted into this set, in the 
order

Review Comment:
   It does have significant test coverage, however



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to