This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2c83b0225a Support hashing List columns (#7616)
2c83b0225a is described below
commit 2c83b0225af826d6ee5f35e2b97edfc577868ee6
Author: Jon Mease <[email protected]>
AuthorDate: Fri Sep 22 07:56:34 2023 -0400
Support hashing List columns (#7616)
* Hash ListArray
* Implement hash join for list arrays
* add sqllogic test for grouping by list column
* reset parquet-testing
* reset testing
* clippy
---
datafusion/expr/src/utils.rs | 2 +
datafusion/physical-expr/src/hash_utils.rs | 63 ++++++++++++++++++++++++++
datafusion/sqllogictest/test_files/groupby.slt | 7 +++
3 files changed, 72 insertions(+)
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index e94d5f4b3f..54a1ce348b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -917,6 +917,8 @@ pub fn can_hash(data_type: &DataType) -> bool {
{
DataType::is_dictionary_key_type(key_type)
}
+ DataType::List(_) => true,
+ DataType::LargeList(_) => true,
_ => false,
}
}
diff --git a/datafusion/physical-expr/src/hash_utils.rs
b/datafusion/physical-expr/src/hash_utils.rs
index 227caaf4aa..379e0eba52 100644
--- a/datafusion/physical-expr/src/hash_utils.rs
+++ b/datafusion/physical-expr/src/hash_utils.rs
@@ -207,6 +207,39 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
Ok(())
}
+fn hash_list_array<OffsetSize>(
+ array: &GenericListArray<OffsetSize>,
+ random_state: &RandomState,
+ hashes_buffer: &mut [u64],
+) -> Result<()>
+where
+ OffsetSize: OffsetSizeTrait,
+{
+ let values = array.values().clone();
+ let offsets = array.value_offsets();
+ let nulls = array.nulls();
+ let mut values_hashes = vec![0u64; values.len()];
+ create_hashes(&[values], random_state, &mut values_hashes)?;
+ if let Some(nulls) = nulls {
+ for (i, (start, stop)) in
offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
+ if nulls.is_valid(i) {
+ let hash = &mut hashes_buffer[i];
+ for values_hash in
&values_hashes[start.as_usize()..stop.as_usize()] {
+ *hash = combine_hashes(*hash, *values_hash);
+ }
+ }
+ }
+ } else {
+ for (i, (start, stop)) in
offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
+ let hash = &mut hashes_buffer[i];
+ for values_hash in
&values_hashes[start.as_usize()..stop.as_usize()] {
+ *hash = combine_hashes(*hash, *values_hash);
+ }
+ }
+ }
+ Ok(())
+}
+
/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
@@ -294,6 +327,14 @@ pub fn create_hashes<'a>(
array => hash_dictionary(array, random_state, hashes_buffer,
rehash)?,
_ => unreachable!()
}
+ DataType::List(_) => {
+ let array = as_list_array(array);
+ hash_list_array(array, random_state, hashes_buffer)?;
+ }
+ DataType::LargeList(_) => {
+ let array = as_large_list_array(array);
+ hash_list_array(array, random_state, hashes_buffer)?;
+ }
_ => {
// This is internal because we should have caught this before.
return internal_err!(
@@ -452,6 +493,28 @@ mod tests {
assert_ne!(dict_hashes[0], dict_hashes[2]);
}
+ #[test]
+ // Tests actual values of hashes, which are different if forcing collisions
+ #[cfg(not(feature = "force_hash_collisions"))]
+ fn create_hashes_for_list_arrays() {
+ let data = vec![
+ Some(vec![Some(0), Some(1), Some(2)]),
+ None,
+ Some(vec![Some(3), None, Some(5)]),
+ Some(vec![Some(3), None, Some(5)]),
+ None,
+ Some(vec![Some(0), Some(1), Some(2)]),
+ ];
+ let list_array =
+ Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data))
as ArrayRef;
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let mut hashes = vec![0; list_array.len()];
+ create_hashes(&[list_array], &random_state, &mut hashes).unwrap();
+ assert_eq!(hashes[0], hashes[5]);
+ assert_eq!(hashes[1], hashes[4]);
+ assert_eq!(hashes[2], hashes[3]);
+ }
+
#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index c93617f352..7092c5933f 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3250,6 +3250,13 @@ ORDER BY l.sn
----
0 30 30
+# Should support grouping by list column
+query ?I
+SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd',
'e'], 2), (['a', 'b'], 3)) as values0 GROUP BY column1 ORDER BY column2;
+----
+[c, d, e] 1
+[a, b] 2
+
# primary key should be aware from which columns it is associated
statement error DataFusion error: Error during planning: Projection references
non-aggregate values: Expression r.sn could not be resolved from available
columns: l.sn, SUM\(l.amount\)