This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9fb5ff9935 Fix join on arrays of unhashable types and allow hash join
on all types supported at run-time (#13388)
9fb5ff9935 is described below
commit 9fb5ff99350cea8d360a0519ad9abb8046770973
Author: Piotr Findeisen <[email protected]>
AuthorDate: Tue Nov 19 13:01:58 2024 +0100
Fix join on arrays of unhashable types and allow hash join on all types
supported at run-time (#13388)
* Remove unused code paths from create_hashes
The `downcast_primitive_array!` macro handles all primitive types
and only then delegates to fallbacks. It handles Decimal128 and
Decimal256 internally.
* Fix join on arrays of unhashable types and allow hash join on all types
supported at run-time #13388
Update can_hash to match currently supported hashes.
* Rename table_with_many_types in tests
* Test join on binary is hash join
---
datafusion/common/src/hash_utils.rs | 10 +---
datafusion/expr/src/utils.rs | 59 +++++++++++++++-------
datafusion/sqllogictest/src/test_context.rs | 9 +++-
.../test_files/information_schema_columns.slt | 16 +++---
datafusion/sqllogictest/test_files/joins.slt | 21 ++++++++
5 files changed, 79 insertions(+), 36 deletions(-)
diff --git a/datafusion/common/src/hash_utils.rs
b/datafusion/common/src/hash_utils.rs
index 8bd646626e..e18d70844d 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -32,7 +32,7 @@ use arrow_buffer::IntervalMonthDayNano;
use crate::cast::{
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
- as_primitive_array, as_string_array, as_string_view_array, as_struct_array,
+ as_string_array, as_string_view_array, as_struct_array,
};
use crate::error::Result;
#[cfg(not(feature = "force_hash_collisions"))]
@@ -392,14 +392,6 @@ pub fn create_hashes<'a>(
let array: &FixedSizeBinaryArray =
array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
- DataType::Decimal128(_, _) => {
- let array = as_primitive_array::<Decimal128Type>(array)?;
- hash_array_primitive(array, random_state, hashes_buffer,
rehash)
- }
- DataType::Decimal256(_, _) => {
- let array = as_primitive_array::<Decimal256Type>(array)?;
- hash_array_primitive(array, random_state, hashes_buffer,
rehash)
- }
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer,
rehash)?,
_ => unreachable!()
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index c22ee244fe..6f7c5d3792 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -29,7 +29,7 @@ use crate::{
};
use datafusion_expr_common::signature::{Signature, TypeSignature};
-use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
+use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
@@ -958,7 +958,7 @@ pub(crate) fn find_column_indexes_referenced_by_expr(
/// Can this data type be used in hash join equal conditions??
/// Data types here come from function 'equal_rows', if more data types are
supported
-/// in equal_rows(hash join), add those data types here to generate join
logical plan.
+/// in create_hashes, add those data types here to generate join logical plan.
pub fn can_hash(data_type: &DataType) -> bool {
match data_type {
DataType::Null => true,
@@ -971,31 +971,38 @@ pub fn can_hash(data_type: &DataType) -> bool {
DataType::UInt16 => true,
DataType::UInt32 => true,
DataType::UInt64 => true,
+ DataType::Float16 => true,
DataType::Float32 => true,
DataType::Float64 => true,
- DataType::Timestamp(time_unit, _) => match time_unit {
- TimeUnit::Second => true,
- TimeUnit::Millisecond => true,
- TimeUnit::Microsecond => true,
- TimeUnit::Nanosecond => true,
- },
+ DataType::Decimal128(_, _) => true,
+ DataType::Decimal256(_, _) => true,
+ DataType::Timestamp(_, _) => true,
DataType::Utf8 => true,
DataType::LargeUtf8 => true,
DataType::Utf8View => true,
- DataType::Decimal128(_, _) => true,
+ DataType::Binary => true,
+ DataType::LargeBinary => true,
+ DataType::BinaryView => true,
DataType::Date32 => true,
DataType::Date64 => true,
+ DataType::Time32(_) => true,
+ DataType::Time64(_) => true,
+ DataType::Duration(_) => true,
+ DataType::Interval(_) => true,
DataType::FixedSizeBinary(_) => true,
- DataType::Dictionary(key_type, value_type)
- if *value_type.as_ref() == DataType::Utf8 =>
- {
- DataType::is_dictionary_key_type(key_type)
+ DataType::Dictionary(key_type, value_type) => {
+ DataType::is_dictionary_key_type(key_type) && can_hash(value_type)
}
- DataType::List(_) => true,
- DataType::LargeList(_) => true,
- DataType::FixedSizeList(_, _) => true,
+ DataType::List(value_type) => can_hash(value_type.data_type()),
+ DataType::LargeList(value_type) => can_hash(value_type.data_type()),
+ DataType::FixedSizeList(value_type, _) =>
can_hash(value_type.data_type()),
+ DataType::Map(map_struct, true | false) =>
can_hash(map_struct.data_type()),
DataType::Struct(fields) => fields.iter().all(|f|
can_hash(f.data_type())),
- _ => false,
+
+ DataType::ListView(_)
+ | DataType::LargeListView(_)
+ | DataType::Union(_, _)
+ | DataType::RunEndEncoded(_, _) => false,
}
}
@@ -1403,6 +1410,7 @@ mod tests {
test::function_stub::max_udaf, test::function_stub::min_udaf,
test::function_stub::sum_udaf, Cast, ExprFunctionExt,
WindowFunctionDefinition,
};
+ use arrow::datatypes::{UnionFields, UnionMode};
#[test]
fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
@@ -1805,4 +1813,21 @@ mod tests {
assert!(accum.contains(&Column::from_name("a")));
Ok(())
}
+
+ #[test]
+ fn test_can_hash() {
+ let union_fields: UnionFields = [
+ (0, Arc::new(Field::new("A", DataType::Int32, true))),
+ (1, Arc::new(Field::new("B", DataType::Float64, true))),
+ ]
+ .into_iter()
+ .collect();
+
+ let union_type = DataType::Union(union_fields, UnionMode::Sparse);
+ assert!(!can_hash(&union_type));
+
+ let list_union_type =
+ DataType::List(Arc::new(Field::new("my_union", union_type, true)));
+ assert!(!can_hash(&list_union_type));
+ }
}
diff --git a/datafusion/sqllogictest/src/test_context.rs
b/datafusion/sqllogictest/src/test_context.rs
index 477f225443..2466303c32 100644
--- a/datafusion/sqllogictest/src/test_context.rs
+++ b/datafusion/sqllogictest/src/test_context.rs
@@ -106,6 +106,8 @@ impl TestContext {
let example_udf = create_example_udf();
test_ctx.ctx.register_udf(example_udf);
register_partition_table(&mut test_ctx).await;
+ info!("Registering table with many types");
+ register_table_with_many_types(test_ctx.session_ctx()).await;
}
"metadata.slt" => {
info!("Registering metadata table tables");
@@ -251,8 +253,11 @@ pub async fn register_table_with_many_types(ctx:
&SessionContext) {
.unwrap();
ctx.register_catalog("my_catalog", Arc::new(catalog));
- ctx.register_table("my_catalog.my_schema.t2", table_with_many_types())
- .unwrap();
+ ctx.register_table(
+ "my_catalog.my_schema.table_with_many_types",
+ table_with_many_types(),
+ )
+ .unwrap();
}
pub async fn register_table_with_map(ctx: &SessionContext) {
diff --git a/datafusion/sqllogictest/test_files/information_schema_columns.slt
b/datafusion/sqllogictest/test_files/information_schema_columns.slt
index 7cf845c16d..d348a764fa 100644
--- a/datafusion/sqllogictest/test_files/information_schema_columns.slt
+++ b/datafusion/sqllogictest/test_files/information_schema_columns.slt
@@ -37,17 +37,17 @@ query TTTTITTTIIIIIIT rowsort
SELECT * from information_schema.columns;
----
my_catalog my_schema t1 i 0 NULL YES Int32 NULL NULL 32 2 NULL NULL NULL
-my_catalog my_schema t2 binary_col 4 NULL NO Binary NULL 2147483647 NULL NULL
NULL NULL NULL
-my_catalog my_schema t2 float64_col 1 NULL YES Float64 NULL NULL 24 2 NULL
NULL NULL
-my_catalog my_schema t2 int32_col 0 NULL NO Int32 NULL NULL 32 2 NULL NULL NULL
-my_catalog my_schema t2 large_binary_col 5 NULL NO LargeBinary NULL
9223372036854775807 NULL NULL NULL NULL NULL
-my_catalog my_schema t2 large_utf8_col 3 NULL NO LargeUtf8 NULL
9223372036854775807 NULL NULL NULL NULL NULL
-my_catalog my_schema t2 timestamp_nanos 6 NULL NO Timestamp(Nanosecond, None)
NULL NULL NULL NULL NULL NULL NULL
-my_catalog my_schema t2 utf8_col 2 NULL YES Utf8 NULL 2147483647 NULL NULL
NULL NULL NULL
+my_catalog my_schema table_with_many_types binary_col 4 NULL NO Binary NULL
2147483647 NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types float64_col 1 NULL YES Float64 NULL
NULL 24 2 NULL NULL NULL
+my_catalog my_schema table_with_many_types int32_col 0 NULL NO Int32 NULL NULL
32 2 NULL NULL NULL
+my_catalog my_schema table_with_many_types large_binary_col 5 NULL NO
LargeBinary NULL 9223372036854775807 NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types large_utf8_col 3 NULL NO LargeUtf8
NULL 9223372036854775807 NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types timestamp_nanos 6 NULL NO
Timestamp(Nanosecond, None) NULL NULL NULL NULL NULL NULL NULL
+my_catalog my_schema table_with_many_types utf8_col 2 NULL YES Utf8 NULL
2147483647 NULL NULL NULL NULL NULL
# Cleanup
statement ok
drop table t1
statement ok
-drop table t2
+drop table table_with_many_types
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index d45dbc7ee1..e636e93007 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4292,3 +4292,24 @@ query T
select * from table1 as t1 natural join table1_stringview as t2;
----
foo
+
+query TT
+EXPLAIN SELECT count(*)
+FROM my_catalog.my_schema.table_with_many_types AS l
+JOIN my_catalog.my_schema.table_with_many_types AS r ON l.binary_col =
r.binary_col
+----
+logical_plan
+01)Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
+02)--Projection:
+03)----Inner Join: l.binary_col = r.binary_col
+04)------SubqueryAlias: l
+05)--------TableScan: my_catalog.my_schema.table_with_many_types
projection=[binary_col]
+06)------SubqueryAlias: r
+07)--------TableScan: my_catalog.my_schema.table_with_many_types
projection=[binary_col]
+physical_plan
+01)AggregateExec: mode=Single, gby=[], aggr=[count(*)]
+02)--ProjectionExec: expr=[]
+03)----CoalesceBatchesExec: target_batch_size=3
+04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(binary_col@0,
binary_col@0)]
+05)--------MemoryExec: partitions=1, partition_sizes=[1]
+06)--------MemoryExec: partitions=1, partition_sizes=[1]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]