alamb commented on code in PR #8827: URL: https://github.com/apache/arrow-datafusion/pull/8827#discussion_r1492408901
########## datafusion/physical-expr/src/binary_map.rs: ########## @@ -0,0 +1,986 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`ArrowBytesMap`] and [`ArrowBytesSet`] for storing maps/sets of values from +//! StringArray / LargeStringArray / BinaryArray / LargeBinaryArray. + +use ahash::RandomState; +use arrow_array::cast::AsArray; +use arrow_array::types::{ByteArrayType, GenericBinaryType, GenericStringType}; +use arrow_array::{ + Array, ArrayRef, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, +}; +use arrow_buffer::{ + BooleanBufferBuilder, BufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer, +}; +use arrow_schema::DataType; +use datafusion_common::hash_utils::create_hashes; +use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt}; +use std::any::type_name; +use std::fmt::Debug; +use std::mem; +use std::ops::Range; +use std::sync::Arc; + +/// Should the output be a String or Binary? +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OutputType { + /// `StringArray` or `LargeStringArray` + Utf8, + /// `BinaryArray` or `LargeBinaryArray` + Binary, +} + +/// HashSet optimized for storing string or binary values that can produce that +/// the final set as a GenericStringArray with minimal copies. +#[derive(Debug)] +pub struct ArrowBytesSet<O: OffsetSizeTrait>(ArrowBytesMap<O, ()>); + +impl<O: OffsetSizeTrait> ArrowBytesSet<O> { + pub fn new(output_type: OutputType) -> Self { + Self(ArrowBytesMap::new(output_type)) + } + + /// Return the contents of this set and replace it with a new empty + /// set with the same output type + pub(super) fn take(&mut self) -> Self { + Self(self.0.take()) + } + + /// Inserts each value from `values` into the set + pub fn insert(&mut self, values: &ArrayRef) { + fn make_payload_fn(_value: Option<&[u8]>) {} + fn observe_payload_fn(_payload: ()) {} + self.0 + .insert_if_new(values, make_payload_fn, observe_payload_fn); + } + + /// Converts this set into a `StringArray`/`LargeStringArray` or + /// `BinaryArray`/`LargeBinaryArray` containing each distinct value that + /// was interned. This is done without copying the values. + pub fn into_state(self) -> ArrayRef { + self.0.into_state() + } + + /// Returns the total number of distinct values (including nulls) seen so far + pub fn len(&self) -> usize { + self.0.len() + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// returns the total number of distinct values (not including nulls) seen so far + pub fn non_null_len(&self) -> usize { + self.0.non_null_len() + } + + /// Return the total size, in bytes, of memory used to store the data in + /// this set, not including `self` + pub fn size(&self) -> usize { + self.0.size() + } +} + +/// Optimized map for storing Arrow "bytes" types (`String`, `LargeString`, +/// `Binary`, and `LargeBinary`) values that can produce the set of keys on +/// output as `GenericBinaryArray` without copies. +/// +/// Equivalent to `HashSet<String, V>` but with better performance for arrow +/// data. +/// +/// # Generic Arguments +/// +/// * `O`: OffsetSize (String/LargeString) +/// * `V`: payload type +/// +/// # Description +/// +/// This is a specialized HashMap with the following properties: +/// +/// 1. Optimized for storing and emitting Arrow byte types (e.g. +/// `StringArray` / `BinaryArray`) very efficiently by minimizing copying of +/// the string values themselves, both when inserting and when emitting the +/// final array. +/// +/// +/// 2. Retains the insertion order of entries in the final array. The values are +/// in the same order as they were inserted. +/// +/// Note this structure can be used as a `HashSet` by specifying the value type +/// as `()`, as is done by [`ArrowBytesSet`]. +/// +/// This map is used by the special `COUNT DISTINCT` aggregate function to +/// store the distinct values, and by the `GROUP BY` operator to store +/// group values when they are a single string array. +/// +/// # Example +/// +/// The following diagram shows how the map would store the four strings +/// "Foo", NULL, "Bar", "TheQuickBrownFox": +/// +/// * `hashtable` stores entries for each distinct string that has been +/// inserted. The entries contain the payload as well as information about the +/// value (either an offset or the actual bytes, see `Entry` docs for more +/// details) +/// +/// * `offsets` stores offsets into `buffer` for each distinct string value, +/// following the same convention as the offsets in a `StringArray` or +/// `LargeStringArray`. +/// +/// * `buffer` stores the actual byte data +/// +/// * `null`: stores the index and payload of the null value, in this case the +/// second value (index 1) +/// +/// ```text +/// ┌───────────────────────────────────┐ ┌─────┐ ┌────┐ +/// │ ... │ │ 0 │ │FooB│ +/// │ ┌──────────────────────────────┐ │ │ 0 │ │arTh│ +/// │ │ <Entry for "Bar"> │ │ │ 3 │ │eQui│ +/// │ │ len: 3 │ │ │ 3 │ │ckBr│ +/// │ │ offset_or_inline: "Bar" │ │ │ 6 │ │ownF│ +/// │ │ payload:... │ │ │ │ │ox │ +/// │ └──────────────────────────────┘ │ │ │ │ │ +/// │ ... │ └─────┘ └────┘ +/// │ ┌──────────────────────────────┐ │ +/// │ │<Entry for "TheQuickBrownFox">│ │ offsets buffer +/// │ │ len: 16 │ │ +/// │ │ offset_or_inline: 6 │ │ ┌───────────────┐ +/// │ │ payload: ... │ │ │ Some(1) │ +/// │ └──────────────────────────────┘ │ │ payload: ... │ +/// │ ... │ └───────────────┘ +/// └───────────────────────────────────┘ +/// null +/// HashTable +/// ``` +/// +/// # Entry Format +/// +/// Entries stored in a [`ArrowBytesMap`] represents a value that is either +/// stored inline or in the buffer +/// +/// This helps the case where there are many short (less than 8 bytes) strings +/// that are the same (e.g. "MA", "CA", "NY", "TX", etc) +/// +/// ```text +/// ┌──────────────────┐ +/// ─ ─ ─ ─ ─ ─ ─▶│... │ +/// │ │TheQuickBrownFox │ +/// │... │ +/// │ │ │ +/// └──────────────────┘ +/// │ buffer of u8 +/// +/// │ +/// ┌────────────────┬───────────────┬───────────────┐ +/// Storing │ │ starting byte │ length, in │ +/// "TheQuickBrownFox" │ hash value │ offset in │ bytes (not │ +/// (long string) │ │ buffer │ characters) │ +/// └────────────────┴───────────────┴───────────────┘ +/// 8 bytes 8 bytes 4 or 8 +/// +/// +/// ┌───────────────┬─┬─┬─┬─┬─┬─┬─┬─┬───────────────┐ +/// Storing "foobar" │ │ │ │ │ │ │ │ │ │ length, in │ +/// (short string) │ hash value │?│?│f│o│o│b│a│r│ bytes (not │ +/// │ │ │ │ │ │ │ │ │ │ characters) │ +/// └───────────────┴─┴─┴─┴─┴─┴─┴─┴─┴───────────────┘ +/// 8 bytes 8 bytes 4 or 8 +/// ``` +pub struct ArrowBytesMap<O, V> +where + O: OffsetSizeTrait, + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + /// Should the output be String or Binary? + output_type: OutputType, + /// Underlying hash set for each distinct value + map: hashbrown::raw::RawTable<Entry<O, V>>, + /// Total size of the map in bytes + map_size: usize, + /// In progress arrow `Buffer` containing all values + buffer: BufferBuilder<u8>, + /// Offsets into `buffer` for each distinct value. These offsets as used + /// directly to create the final `GenericBinaryArray`. The `i`th string is + /// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values + /// are stored as a zero length string. + offsets: Vec<O>, + /// random state used to generate hashes + random_state: RandomState, + /// buffer that stores hash values (reused across batches to save allocations) + hashes_buffer: Vec<u64>, + /// `(payload, null_index)` for the 'null' value, if any + /// NOTE null_index is the logical index in the final array, not the index + /// in the buffer + null: Option<(V, usize)>, +} + +/// The size, in number of entries, of the initial hash table +const INITIAL_MAP_CAPACITY: usize = 128; +/// The initial size, in bytes, of the string data +const INITIAL_BUFFER_CAPACITY: usize = 8 * 1024; +impl<O: OffsetSizeTrait, V> ArrowBytesMap<O, V> +where + V: Debug + PartialEq + Eq + Clone + Copy + Default, +{ + pub fn new(output_type: OutputType) -> Self { + Self { + output_type, + map: hashbrown::raw::RawTable::with_capacity(INITIAL_MAP_CAPACITY), + map_size: 0, + buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), + offsets: vec![O::default()], // first offset is always 0 + random_state: RandomState::new(), + hashes_buffer: vec![], + null: None, + } + } + + /// Return the contents of this map and replace it with a new empty map with + /// the same output type + pub fn take(&mut self) -> Self { + let mut new_self = Self::new(self.output_type); + std::mem::swap(self, &mut new_self); + new_self + } + + /// Inserts each value from `values` into the map, invoking `payload_fn` for + /// each value if *not* already present, deferring the allocation of the + /// payload until it is needed. + /// + /// Note that this is different than a normal map that would replace the + /// existing entry + /// + /// # Arguments: + /// + /// `values`: array whose values are inserted + /// + /// `make_payload_fn`: invoked for each value that is not already present + /// to create the payload, in order of the values in `values` + /// + /// `observe_payload_fn`: invoked once, for each value in `values`, that was + /// already present in the map, with corresponding payload value. + /// + /// # Returns + /// + /// The payload value for the entry, either the existing value or + /// the the newly inserted value + /// + /// # Safety: + /// + /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked + /// with valid values from `values`, not for the `NULL` value. + pub fn insert_if_new<MP, OP>( + &mut self, + values: &ArrayRef, + make_payload_fn: MP, + observe_payload_fn: OP, + ) where + MP: FnMut(Option<&[u8]>) -> V, + OP: FnMut(V), + { + // Sanity array type + match self.output_type { + OutputType::Binary => { + assert!(matches!( + values.data_type(), + DataType::Binary | DataType::LargeBinary + )); + self.insert_if_new_inner::<MP, OP, GenericBinaryType<O>>( + values, + make_payload_fn, + observe_payload_fn, + ) + } + OutputType::Utf8 => { + assert!(matches!( + values.data_type(), + DataType::Utf8 | DataType::LargeUtf8 + )); + self.insert_if_new_inner::<MP, OP, GenericStringType<O>>( + values, + make_payload_fn, + observe_payload_fn, + ) + } + }; + } + + /// Generic version of [`Self::insert_if_new`] that handles `ByteArrayType` + /// (both String and Binary) + /// + /// Note this is the only function that is generic on [`ByteArrayType`], which + /// avoids having to template the entire structure, making the code + /// simpler and understand and reducing code bloat due to duplication. + /// + /// See comments on `insert_if_new` for more details + fn insert_if_new_inner<MP, OP, B>( + &mut self, + values: &ArrayRef, + mut make_payload_fn: MP, + mut observe_payload_fn: OP, + ) where + MP: FnMut(Option<&[u8]>) -> V, + OP: FnMut(V), + B: ByteArrayType, + { + // step 1: compute hashes + let batch_hashes = &mut self.hashes_buffer; + batch_hashes.clear(); + batch_hashes.resize(values.len(), 0); + create_hashes(&[values.clone()], &self.random_state, batch_hashes) + // hash is supported for all types and create_hashes only + // returns errors for unsupported types + .unwrap(); + + // step 2: insert each value into the set, if not already present + let values = values.as_bytes::<B>(); + + // Ensure lengths are equivalent + assert_eq!(values.len(), batch_hashes.len()); + + for (value, &hash) in values.iter().zip(batch_hashes.iter()) { + // hande null value + let Some(value) = value else { + let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { + payload + } else { + let payload = make_payload_fn(None); + let null_index = self.offsets.len() - 1; + // nulls need a zero length in the offset buffer + let offset = self.buffer.len(); + self.offsets.push(O::usize_as(offset)); + self.null = Some((payload, null_index)); + payload + }; + observe_payload_fn(payload); + continue; + }; + + // get the value as bytes + let value: &[u8] = value.as_ref(); + let value_len = O::usize_as(value.len()); + + // value is "small" + let payload = if value.len() <= SHORT_VALUE_LEN { + let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x as usize); + + // is value is already present in the set? + let entry = self.map.get_mut(hash, |header| { + // compare value if hashes match + if header.len != value_len { + return false; + } + // value is stored inline so no need to consult buffer + // (this is the "small string optimization") + inline == header.offset_or_inline + }); + + if let Some(entry) = entry { + entry.payload + } + // if no existing entry, make a new one + else { + // Put the small values into buffer and offsets so it appears + // the output array, but store the actual bytes inline for + // comparison + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + let payload = make_payload_fn(Some(value)); + let new_header = Entry { + hash, + len: value_len, + offset_or_inline: inline, + payload, + }; + self.map.insert_accounted( + new_header, + |header| header.hash, + &mut self.map_size, + ); + payload + } + } + // value is not "small" + else { + // Check if the value is already present in the set + let entry = self.map.get_mut(hash, |header| { + // compare value if hashes match + if header.len != value_len { + return false; + } + // Need to compare the bytes in the buffer + // SAFETY: buffer is only appended to, and we correctly inserted values and offsets + let existing_value = + unsafe { self.buffer.as_slice().get_unchecked(header.range()) }; + value == existing_value + }); + + if let Some(entry) = entry { + entry.payload + } + // if no existing entry, make a new one + else { + // Put the small values into buffer and offsets so it + // appears the output array, and store that offset + // so the bytes can be compared if needed + let offset = self.buffer.len(); // offset of start fof data + self.buffer.append_slice(value); + self.offsets.push(O::usize_as(self.buffer.len())); + + let payload = make_payload_fn(Some(value)); + let new_header = Entry { + hash, + len: value_len, + offset_or_inline: offset, + payload, + }; + self.map.insert_accounted( + new_header, + |header| header.hash, + &mut self.map_size, + ); + payload + } + }; + observe_payload_fn(payload); + } + // Check for overflow in offsets (if more data was sent than can be represented) + if O::from_usize(self.buffer.len()).is_none() { + panic!( + "Put {} bytes in buffer, more than can be represented by a {}", + self.buffer.len(), + type_name::<O>() + ); + } + } + + /// Converts this set into a `StringArray`, `LargeStringArray`, + /// `BinaryArray`, or `LargeBinaryArray` containing each distinct value + /// that was inserted. This is done without copying the values. + /// + /// The values are guaranteed to be returned in the same order in which + /// they were first seen. + pub fn into_state(self) -> ArrayRef { + let Self { + output_type, + map: _, + map_size: _, + offsets, + mut buffer, + random_state: _, + hashes_buffer: _, + null, + } = self; + + // Only make a `NullBuffer` if there was a null value + let nulls = null.map(|(_payload, null_index)| { + let num_values = offsets.len() - 1; + single_null_buffer(num_values, null_index) + }); + // SAFETY: the offsets were constructed correctly in `insert_if_new` -- + // monotonically increasing, overflows were checked. + let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) }; Review Comment: Validating the offsets actually took significant time which took me a while to find and trackdown (without this unsafe call ClickBench Q28 slowed down by 10%) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
