This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c83b0225a Support hashing List columns (#7616)
2c83b0225a is described below

commit 2c83b0225af826d6ee5f35e2b97edfc577868ee6
Author: Jon Mease <[email protected]>
AuthorDate: Fri Sep 22 07:56:34 2023 -0400

    Support hashing List columns (#7616)
    
    * Hash ListArray
    
    * Implement hash join for list arrays
    
    * add sqllogic test for grouping by list column
    
    * reset parquet-testing
    
    * reset testing
    
    * clippy
---
 datafusion/expr/src/utils.rs                   |  2 +
 datafusion/physical-expr/src/hash_utils.rs     | 63 ++++++++++++++++++++++++++
 datafusion/sqllogictest/test_files/groupby.slt |  7 +++
 3 files changed, 72 insertions(+)

diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index e94d5f4b3f..54a1ce348b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -917,6 +917,8 @@ pub fn can_hash(data_type: &DataType) -> bool {
         {
             DataType::is_dictionary_key_type(key_type)
         }
+        DataType::List(_) => true,
+        DataType::LargeList(_) => true,
         _ => false,
     }
 }
diff --git a/datafusion/physical-expr/src/hash_utils.rs 
b/datafusion/physical-expr/src/hash_utils.rs
index 227caaf4aa..379e0eba52 100644
--- a/datafusion/physical-expr/src/hash_utils.rs
+++ b/datafusion/physical-expr/src/hash_utils.rs
@@ -207,6 +207,39 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
     Ok(())
 }
 
+fn hash_list_array<OffsetSize>(
+    array: &GenericListArray<OffsetSize>,
+    random_state: &RandomState,
+    hashes_buffer: &mut [u64],
+) -> Result<()>
+where
+    OffsetSize: OffsetSizeTrait,
+{
+    let values = array.values().clone();
+    let offsets = array.value_offsets();
+    let nulls = array.nulls();
+    let mut values_hashes = vec![0u64; values.len()];
+    create_hashes(&[values], random_state, &mut values_hashes)?;
+    if let Some(nulls) = nulls {
+        for (i, (start, stop)) in 
offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
+            if nulls.is_valid(i) {
+                let hash = &mut hashes_buffer[i];
+                for values_hash in 
&values_hashes[start.as_usize()..stop.as_usize()] {
+                    *hash = combine_hashes(*hash, *values_hash);
+                }
+            }
+        }
+    } else {
+        for (i, (start, stop)) in 
offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
+            let hash = &mut hashes_buffer[i];
+            for values_hash in 
&values_hashes[start.as_usize()..stop.as_usize()] {
+                *hash = combine_hashes(*hash, *values_hash);
+            }
+        }
+    }
+    Ok(())
+}
+
 /// Test version of `create_hashes` that produces the same value for
 /// all hashes (to test collisions)
 ///
@@ -294,6 +327,14 @@ pub fn create_hashes<'a>(
                 array => hash_dictionary(array, random_state, hashes_buffer, 
rehash)?,
                 _ => unreachable!()
             }
+            DataType::List(_) => {
+                let array = as_list_array(array);
+                hash_list_array(array, random_state, hashes_buffer)?;
+            }
+            DataType::LargeList(_) => {
+                let array = as_large_list_array(array);
+                hash_list_array(array, random_state, hashes_buffer)?;
+            }
             _ => {
                 // This is internal because we should have caught this before.
                 return internal_err!(
@@ -452,6 +493,28 @@ mod tests {
         assert_ne!(dict_hashes[0], dict_hashes[2]);
     }
 
+    #[test]
+    // Tests actual values of hashes, which are different if forcing collisions
+    #[cfg(not(feature = "force_hash_collisions"))]
+    fn create_hashes_for_list_arrays() {
+        let data = vec![
+            Some(vec![Some(0), Some(1), Some(2)]),
+            None,
+            Some(vec![Some(3), None, Some(5)]),
+            Some(vec![Some(3), None, Some(5)]),
+            None,
+            Some(vec![Some(0), Some(1), Some(2)]),
+        ];
+        let list_array =
+            Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data)) 
as ArrayRef;
+        let random_state = RandomState::with_seeds(0, 0, 0, 0);
+        let mut hashes = vec![0; list_array.len()];
+        create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
+        assert_eq!(hashes[0], hashes[5]);
+        assert_eq!(hashes[1], hashes[4]);
+        assert_eq!(hashes[2], hashes[3]);
+    }
+
     #[test]
     // Tests actual values of hashes, which are different if forcing collisions
     #[cfg(not(feature = "force_hash_collisions"))]
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index c93617f352..7092c5933f 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3250,6 +3250,13 @@ ORDER BY l.sn
 ----
 0 30 30
 
+# Should support grouping by list column
+query ?I
+SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd', 
'e'], 2), (['a', 'b'], 3)) as values0 GROUP BY column1 ORDER BY column2;
+----
+[c, d, e] 1
+[a, b] 2
+
 
 # primary key should be aware from which columns it is associated
 statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression r.sn could not be resolved from available 
columns: l.sn, SUM\(l.amount\)

Reply via email to