This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new d317d00b88 [branch-52] fix: `HashJoin` panic with String dictionary
keys (don't flatten keys) (#20505) (#20708)
d317d00b88 is described below
commit d317d00b886bbf11cb489e4c4bdc2280b3ca9e07
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Mar 4 19:01:19 2026 -0500
[branch-52] fix: `HashJoin` panic with String dictionary keys (don't
flatten keys) (#20505) (#20708)
- Part of https://github.com/apache/datafusion/issues/20681
- Closes https://github.com/apache/datafusion/issues/20696 on branch-52
This PR:
- Backports https://github.com/apache/datafusion/pull/20505 from to the
[branch-52] line
---
.../src/joins/hash_join/inlist_builder.rs | 39 +++-----
.../test_files/parquet_filter_pushdown.slt | 100 +++++++++++++++++++++
2 files changed, 113 insertions(+), 26 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs
b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs
index 9bf59d9e33..0ca338265e 100644
--- a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs
@@ -20,7 +20,6 @@
use std::sync::Arc;
use arrow::array::{ArrayRef, StructArray};
-use arrow::compute::cast;
use arrow::datatypes::{Field, FieldRef, Fields};
use arrow_schema::DataType;
use datafusion_common::Result;
@@ -33,19 +32,6 @@ pub(super) fn build_struct_fields(data_types: &[DataType])
-> Result<Fields> {
.collect()
}
-/// Casts dictionary-encoded arrays to their underlying value type, preserving
row count.
-/// Non-dictionary arrays are returned as-is.
-fn flatten_dictionary_array(array: &ArrayRef) -> Result<ArrayRef> {
- match array.data_type() {
- DataType::Dictionary(_, value_type) => {
- let casted = cast(array, value_type)?;
- // Recursively flatten in case of nested dictionaries
- flatten_dictionary_array(&casted)
- }
- _ => Ok(Arc::clone(array)),
- }
-}
-
/// Builds InList values from join key column arrays.
///
/// If `join_key_arrays` is:
@@ -65,20 +51,14 @@ fn flatten_dictionary_array(array: &ArrayRef) ->
Result<ArrayRef> {
pub(super) fn build_struct_inlist_values(
join_key_arrays: &[ArrayRef],
) -> Result<Option<ArrayRef>> {
- // Flatten any dictionary-encoded arrays
- let flattened_arrays: Vec<ArrayRef> = join_key_arrays
- .iter()
- .map(flatten_dictionary_array)
- .collect::<Result<Vec<_>>>()?;
-
// Build the source array/struct
- let source_array: ArrayRef = if flattened_arrays.len() == 1 {
+ let source_array: ArrayRef = if join_key_arrays.len() == 1 {
// Single column: use directly
- Arc::clone(&flattened_arrays[0])
+ Arc::clone(&join_key_arrays[0])
} else {
// Multi-column: build StructArray once from all columns
let fields = build_struct_fields(
- &flattened_arrays
+ &join_key_arrays
.iter()
.map(|arr| arr.data_type().clone())
.collect::<Vec<_>>(),
@@ -88,7 +68,7 @@ pub(super) fn build_struct_inlist_values(
let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields
.iter()
.cloned()
- .zip(flattened_arrays.iter().cloned())
+ .zip(join_key_arrays.iter().cloned())
.collect();
Arc::new(StructArray::from(arrays_with_fields))
@@ -152,7 +132,14 @@ mod tests {
assert_eq!(
*result.data_type(),
DataType::Struct(
- build_struct_fields(&[DataType::Utf8,
DataType::Int32]).unwrap()
+ build_struct_fields(&[
+ DataType::Dictionary(
+ Box::new(DataType::Int8),
+ Box::new(DataType::Utf8)
+ ),
+ DataType::Int32
+ ])
+ .unwrap()
)
);
}
@@ -168,6 +155,6 @@ mod tests {
.unwrap();
assert_eq!(result.len(), 3);
- assert_eq!(*result.data_type(), DataType::Utf8);
+ assert_eq!(result.data_type(), dict_array.data_type());
}
}
diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
index 8bb79d5769..5e643273ba 100644
--- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
@@ -563,3 +563,103 @@ ORDER BY start_timestamp, trace_id
LIMIT 1;
----
2024-10-01T00:00:00
+
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+# Regression test for https://github.com/apache/datafusion/issues/20696
+# Multi-column INNER JOIN with dictionary fails
+# when parquet pushdown filters are enabled.
+
+statement ok
+COPY (
+ SELECT
+ to_timestamp_nanos(time_ns) AS time,
+ arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
+ arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
+ temp
+ FROM (
+ VALUES
+ (200, 'CA', 'LA', 90.0),
+ (250, 'MA', 'Boston', 72.4),
+ (100, 'MA', 'Boston', 70.4),
+ (350, 'CA', 'LA', 90.0)
+ ) AS t(time_ns, state, city, temp)
+)
+TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/data.parquet';
+
+statement ok
+COPY (
+ SELECT
+ to_timestamp_nanos(time_ns) AS time,
+ arrow_cast(state, 'Dictionary(Int32, Utf8)') AS state,
+ arrow_cast(city, 'Dictionary(Int32, Utf8)') AS city,
+ temp,
+ reading
+ FROM (
+ VALUES
+ (250, 'MA', 'Boston', 53.4, 51.0),
+ (100, 'MA', 'Boston', 50.4, 50.0)
+ ) AS t(time_ns, state, city, temp, reading)
+)
+TO 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/data.parquet';
+
+statement ok
+CREATE EXTERNAL TABLE h2o_parquet_20696 STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/h2o/';
+
+statement ok
+CREATE EXTERNAL TABLE o2_parquet_20696 STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet_filter_pushdown/issue_20696/o2/';
+
+# Query should work both with and without filters
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
+
+query RRR
+SELECT
+ h2o_parquet_20696.temp AS h2o_temp,
+ o2_parquet_20696.temp AS o2_temp,
+ o2_parquet_20696.reading
+FROM h2o_parquet_20696
+INNER JOIN o2_parquet_20696
+ ON h2o_parquet_20696.time = o2_parquet_20696.time
+ AND h2o_parquet_20696.state = o2_parquet_20696.state
+ AND h2o_parquet_20696.city = o2_parquet_20696.city
+WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
+ AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
+----
+72.4 53.4 51
+70.4 50.4 50
+
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+query RRR
+SELECT
+ h2o_parquet_20696.temp AS h2o_temp,
+ o2_parquet_20696.temp AS o2_temp,
+ o2_parquet_20696.reading
+FROM h2o_parquet_20696
+INNER JOIN o2_parquet_20696
+ ON h2o_parquet_20696.time = o2_parquet_20696.time
+ AND h2o_parquet_20696.state = o2_parquet_20696.state
+ AND h2o_parquet_20696.city = o2_parquet_20696.city
+WHERE h2o_parquet_20696.time >= '1970-01-01T00:00:00.000000050Z'
+ AND h2o_parquet_20696.time <= '1970-01-01T00:00:00.000000300Z';
+----
+72.4 53.4 51
+70.4 50.4 50
+
+# Cleanup
+statement ok
+DROP TABLE h2o_parquet_20696;
+
+statement ok
+DROP TABLE o2_parquet_20696;
+
+# Cleanup settings
+statement ok
+set datafusion.execution.parquet.pushdown_filters = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]