Rich-T-kid commented on code in PR #21589:
URL: https://github.com/apache/datafusion/pull/21589#discussion_r3096431597


##########
datafusion/physical-plan/src/aggregates/group_values/single_group_by/dictionary.rs:
##########
@@ -0,0 +1,1488 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregates::group_values::GroupValues;
+use crate::hash_utils::RandomState;
+use arrow::array::{
+    Array, ArrayRef, BinaryArray, BinaryBuilder, BinaryViewArray, 
BinaryViewBuilder,
+    DictionaryArray, LargeBinaryArray, LargeBinaryBuilder, LargeStringArray,
+    LargeStringBuilder, PrimitiveArray, PrimitiveBuilder, StringArray, 
StringBuilder,
+    StringViewArray, StringViewBuilder, UInt64Array,
+};
+use arrow::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType};
+use datafusion_common::Result;
+use datafusion_common::hash_utils::create_hashes;
+use datafusion_expr::EmitTo;
+use std::collections::HashMap;
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+pub struct GroupValuesDictionary<K: ArrowDictionaryKeyType + Send> {
+    /*
+    We know that every single &[ArrayRef] that is passed in is a dictionary 
array
+
+    self.inter() will be called across record batches, this means that
+    we cannot rely on a trivial approach where we just store the dictionary 
mapping as it is
+
+
+
+    Possible soluitions:
+    1A. store a hashmap that last across .intern() calls
+        | cast cols:&[ArrayRef] to generic Dictionary array, check if weve 
already stored its values (unique values) before
+        | if we have check the current mapping internally and update the 
groups array with the initial mapping for this value
+        | if it does not exist already (hashmap.size) is the group_id for this 
element
+    1B. how do we retrieve the dictionary encoded array this function expects?
+        | NOTE: emit returns one value per group not one value per row. The 
group values are the distinct values in the order they were first seen — not 
the full expanded key array [one per group index]
+        | keep a value_order array that stores unique elements the first time 
their seen, this maintains order for self.emit()
+        | the return type of the array self.emit() returns is based on the 
value type of the dictionary, may be smart to have an internal Group values 
that handles that logic
+        |
+
+    Possible optimizations (Ignore for now)
+    2A. dont rely directly in a hashmap we could hash all of the values at 
once and then as we iterate the keys array refer to them as the values are 
assumed to be smaller than the keys
+        | at the start of self.intern hash every value in the dictionary
+        | iterate through the keys section of dict_array
+            | for each key check its corresponding value and if it exist
+
+
+    */
+    // stores the order new unique elements are seen for self.emit()
+    seen_elements: Vec<Vec<u8>>, //  Box<dyn Builder> doesnt provide the 
flexibility of building partition arrays that wed need to support emit::First(N)
+    value_dt: DataType,
+    _phantom: PhantomData<K>,
+    // keeps track of which values weve already seen. stored as -> 
<unique_value_hash:(initial_group_id, raw_bytes)>
+    unique_dict_value_mapping: HashMap<u64, Vec<(usize, Vec<u8>)>>,
+    // fixed seeds ensure consistent hashing across GroupValuesDictionary 
instances
+    // this is critical for correct behavior in multi-partition aggregation 
where
+    // partial phase emits are re-interned by the final phase
+    random_state: RandomState,
+    null_group_id: Option<usize>, // cache the group id for nulls since they 
all map to the same group
+}
+
+impl<K: ArrowDictionaryKeyType + Send> GroupValuesDictionary<K> {
+    pub fn new(data_type: &DataType) -> Self {
+        Self {
+            seen_elements: Vec::new(),
+            unique_dict_value_mapping: HashMap::new(),
+            value_dt: data_type.clone(),
+            _phantom: PhantomData,
+            random_state: RandomState::with_seed(0),
+            null_group_id: None,
+        }
+    }
+    fn compute_value_hashes(&mut self, values: &ArrayRef) -> Result<Vec<u64>> {
+        let mut hashes = vec![0u64; values.len()];
+        create_hashes([Arc::clone(values)], &self.random_state, &mut hashes)?;
+        Ok(hashes)
+    }
+    /*fn keys_to_usize(keys: &PrimitiveArray<K>) -> Vec<Option<usize>> {
+        keys.iter()
+            .map(|k| k.map(|v| v.to_usize().unwrap()))
+            .collect()
+    }*/
+
+    fn get_raw_bytes(values: &ArrayRef, index: usize) -> &[u8] {
+        match values.data_type() {
+            DataType::Utf8 => values
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .expect("Expected StringArray")
+                .value(index)
+                .as_bytes(),
+            DataType::LargeUtf8 => values
+                .as_any()
+                .downcast_ref::<LargeStringArray>()
+                .expect("Expected LargeStringArray")
+                .value(index)
+                .as_bytes(),
+            DataType::Utf8View => values
+                .as_any()
+                .downcast_ref::<StringViewArray>()
+                .expect("Expected StringViewArray")
+                .value(index)
+                .as_bytes(),
+            DataType::Binary => values
+                .as_any()
+                .downcast_ref::<BinaryArray>()
+                .expect("Expected BinaryArray")
+                .value(index),
+            DataType::LargeBinary => values
+                .as_any()
+                .downcast_ref::<LargeBinaryArray>()
+                .expect("Expected LargeBinaryArray")
+                .value(index),
+            DataType::BinaryView => values
+                .as_any()
+                .downcast_ref::<BinaryViewArray>()
+                .expect("Expected BinaryViewArray")
+                .value(index),
+            other => unimplemented!("get_raw_bytes not implemented for 
{other:?}"),
+        }
+    }
+
+    fn sentinel_repr(dt: &DataType) -> Vec<u8> {
+        match dt {
+            // 0xFF bytes cannot appear in valid UTF8 so no risk of collision 
with real values
+            DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => {
+                vec![0xFF, 0xFF, 0xFF, 0xFF]
+            }
+            // TODO: binary types need a better sentinel
+            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => 
{
+                vec![0xFF, 0xFF, 0xFF, 0xFF]
+            }
+            // for primitives use a byte sequence that is a different length 
than the native type
+            // a real i8 is always exactly 1 byte so 2 bytes can never match a 
real value
+            other => unimplemented!("sentinel_repr not implemented for 
{other:?}"),
+        }
+    }
+
+    #[inline]
+    fn get_null_group_id(&mut self) -> usize {
+        if let Some(group_id) = self.null_group_id {
+            group_id
+        } else {
+            if let Some(entries) = self
+                .unique_dict_value_mapping
+                .get(&((usize::MAX - 1) as u64))
+            {
+                entries[0].0
+            } else {
+                // first time we've seen a null
+                let new_group_id = self.seen_elements.len();
+                let raw_bytes = Self::sentinel_repr(&self.value_dt);
+                self.seen_elements.push(raw_bytes.clone());
+                self.unique_dict_value_mapping
+                    .insert((usize::MAX - 1) as u64, vec![(new_group_id, 
raw_bytes)]);
+                self.null_group_id = Some(new_group_id); // cache it
+                new_group_id
+            }
+        }
+    }
+    fn transform_into_array(&self, raw: &[Vec<u8>]) -> Result<ArrayRef> {
+        let sentinel = Self::sentinel_repr(&self.value_dt);
+        match &self.value_dt {
+            DataType::Utf8 => {
+                let mut builder = StringBuilder::new();
+                for raw_bytes in raw {
+                    if raw_bytes == &sentinel {
+                        builder.append_null();
+                    } else {
+                        let s = std::str::from_utf8(raw_bytes).map_err(|e| {
+                            
datafusion_common::DataFusionError::Internal(format!(
+                                "Invalid utf8 in seen_elements: {e}"
+                            ))
+                        })?;
+                        builder.append_value(s);
+                    }
+                }
+                Ok(Arc::new(builder.finish()) as ArrayRef)
+            }
+            DataType::LargeUtf8 => {
+                let mut builder = LargeStringBuilder::new();
+                for raw_bytes in raw {
+                    if raw_bytes == &sentinel {
+                        builder.append_null();
+                    } else {
+                        let s = std::str::from_utf8(raw_bytes).map_err(|e| {
+                            
datafusion_common::DataFusionError::Internal(format!(
+                                "Invalid utf8 in seen_elements: {e}"
+                            ))
+                        })?;
+                        builder.append_value(s);
+                    }
+                }
+                Ok(Arc::new(builder.finish()) as ArrayRef)
+            }
+            DataType::Utf8View => {
+                let mut builder = StringViewBuilder::new();
+                for raw_bytes in raw {
+                    if raw_bytes == &sentinel {
+                        builder.append_null();
+                    } else {
+                        let s = std::str::from_utf8(raw_bytes).map_err(|e| {
+                            
datafusion_common::DataFusionError::Internal(format!(
+                                "Invalid utf8 in seen_elements: {e}"
+                            ))
+                        })?;
+                        builder.append_value(s);
+                    }
+                }
+                Ok(Arc::new(builder.finish()) as ArrayRef)
+            }
+            DataType::Binary => {
+                let mut builder = BinaryBuilder::new();
+                for raw_bytes in raw {
+                    if raw_bytes == &sentinel {
+                        builder.append_null();
+                    } else {
+                        builder.append_value(raw_bytes);
+                    }
+                }
+                Ok(Arc::new(builder.finish()) as ArrayRef)
+            }
+            DataType::LargeBinary => {
+                let mut builder = LargeBinaryBuilder::new();
+                for raw_bytes in raw {
+                    if raw_bytes == &sentinel {
+                        builder.append_null();
+                    } else {
+                        builder.append_value(raw_bytes);
+                    }
+                }
+                Ok(Arc::new(builder.finish()) as ArrayRef)
+            }
+            DataType::BinaryView => {
+                let mut builder = BinaryViewBuilder::new();
+                for raw_bytes in raw {
+                    if raw_bytes == &sentinel {
+                        builder.append_null();
+                    } else {
+                        builder.append_value(raw_bytes);
+                    }
+                }
+                Ok(Arc::new(builder.finish()) as ArrayRef)
+            }
+            other => 
Err(datafusion_common::DataFusionError::NotImplemented(format!(
+                "transform_into_array not implemented for {other:?}"
+            ))),
+        }
+    }
+    fn normalize_dict_array(
+        values: &ArrayRef,
+        key_array: &PrimitiveArray<K>,
+    ) -> (ArrayRef, Vec<Option<usize>>) {
+        // maps old value index -> new canonical index
+        let mut old_to_new: Vec<Option<usize>> = vec![None; values.len()];
+        let mut canonical_indices: Vec<usize> = Vec::new();
+
+        for (i, slot) in old_to_new.iter_mut().enumerate() {
+            if values.is_null(i) {
+                continue;
+            }
+            let raw = Self::get_raw_bytes(values, i);
+            let canonical = canonical_indices
+                .iter()
+                .position(|&j| Self::get_raw_bytes(values, j) == raw);
+            if let Some(idx) = canonical {
+                *slot = Some(idx);
+            } else {
+                *slot = Some(canonical_indices.len());
+                canonical_indices.push(i);
+            }
+        }
+        // build new deduplicated values array using take
+        let indices = UInt64Array::from(
+            canonical_indices
+                .iter()
+                .map(|&i| i as u64)
+                .collect::<Vec<_>>(),
+        );
+        let new_values = arrow::compute::take(values.as_ref(), &indices, 
None).unwrap();
+
+        // remap keys
+        let new_keys: Vec<Option<usize>> = (0..key_array.len())
+            .map(|i| {
+                if key_array.is_null(i) {
+                    None
+                } else {
+                    let old_key = key_array.value(i).to_usize().unwrap();
+                    old_to_new[old_key]
+                }
+            })
+            .collect();
+
+        (new_values, new_keys)
+    }
+}
+
+impl<K: ArrowDictionaryKeyType + Send> GroupValues for 
GroupValuesDictionary<K> {
+    // not really sure how to return the size of strings and binary values so 
this is a best effort approach
+    fn size(&self) -> usize {
+        size_of::<Self>()
+            + self
+                .seen_elements
+                .iter()
+                .map(|b| b.capacity())
+                .sum::<usize>()
+            + self.unique_dict_value_mapping.capacity()
+                * size_of::<(u64, Vec<(usize, Vec<u8>)>)>()
+    }
+    fn len(&self) -> usize {
+        self.seen_elements.len()
+    }
+    fn is_empty(&self) -> bool {
+        self.seen_elements.is_empty()
+    }
+    fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> 
Result<()> {
+        if cols.len() != 1 {
+            return Err(datafusion_common::DataFusionError::Internal(
+                "GroupValuesDictionary only supports single column group 
by".to_string(),
+            ));
+        }
+        let array = Arc::clone(&cols[0]);
+        groups.clear(); // zero out buffer
+        let dict_array = array
+            .as_any()
+            .downcast_ref::<DictionaryArray<K>>()
+            .ok_or_else(|| {
+                datafusion_common::DataFusionError::Internal(format!(
+                    "GroupValuesDictionary expected DictionaryArray but got 
{:?}",
+                    array.data_type()
+                ))
+            })?;
+
+        // pre-allocate space for seen_elements using occupancy
+        // occupancy count gives us the number of truly distinct non-null 
values in this batch
+        let occupied = dict_array.occupancy().count_set_bits();
+        self.seen_elements.reserve(occupied);
+
+        let values = dict_array.values();
+        let key_array = dict_array.keys();
+        if key_array.is_empty() {
+            return Ok(()); // nothing to intern, just return early
+        }
+        let (values, keys_as_usize) = Self::normalize_dict_array(values, 
key_array);
+        let values = &values;
+        // compute hashes for all values in the values array upfront
+        // value_hashes[i] corresponds to values[i]
+        let value_hashes = self.compute_value_hashes(values)?;
+
+        // convert key array to Vec<usize> for cheap indexed access
+        // avoids repeated .value(i).to_usize() calls in the hot loop
+        //let keys_as_usize = Self::keys_to_usize(key_array);
+
+        // Pass 1: iterate values array (d iterations) - build a mapping of 
value hash -> group id for all unique values in the dictionary
+        // this allows us to do a single hashmap lookup per key in the hot 
loop instead of
+        let mut key_to_group: Vec<Option<usize>> = vec![None; values.len()];
+        for value_idx in 0..values.len() {
+            if values.is_null(value_idx) {
+                // this will be handled in phase 2
+                continue;
+            }
+            let hash = value_hashes[value_idx];
+            if let Some(entries) = self.unique_dict_value_mapping.get(&hash) {
+                let raw = Self::get_raw_bytes(values, value_idx);
+                if let Some((group_id, _)) = entries
+                    .iter()
+                    .find(|(_, stored_bytes)| raw == stored_bytes.as_slice())

Review Comment:
   Yes, there's a chance of hash collisions where `hash_array[i] == 
hash_array[j]` but `original_array[i] != original_array[j]`. So when a hash 
already exists in the hash table, we can't just return that group id, we need 
to verify the actual value matches. We iterate through the vector of values 
that map to this hash, comparing each one to find the entry where the actual 
value matches, and return that group id. If no match is found it's a new group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to