Copilot commented on code in PR #19411:
URL: https://github.com/apache/datafusion/pull/19411#discussion_r2668300012
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -168,6 +249,34 @@ impl JoinLeftData {
/// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
/// after the equijoin predicates.
///
+/// # ArrayMap Optimization
+///
+/// For joins with a single integer-based join key, `HashJoinExec` may use an
[`ArrayMap`]
+/// (also known as a "perfect hash join") instead of a general-purpose hash
map.
+/// This optimization is used when:
+/// 1. There is exactly one join key.
+/// 2. The join key can be any integer type convertible to u64 (excluding i128
and u128).
Review Comment:
The documentation refers to "excluding i128 and u128" types, but these types
are not actually checked or handled anywhere in the code. The
`is_supported_type` function in `array_map.rs` only supports up to 64-bit
integers (Int8-Int64, UInt8-UInt64). Consider removing the mention of i128/u128
from the documentation or clarifying that they're not supported due to design
limitations, not just current implementation.
```suggestion
/// 2. The join key is an integer type up to 64 bits wide that can be
losslessly converted
/// to `u64` (128-bit integer types such as `i128` and `u128` are not
supported).
```
##########
datafusion/physical-plan/src/joins/array_map.rs:
##########
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::DataType;
+use num_traits::AsPrimitive;
+use std::mem::size_of;
+
+use crate::joins::MapOffset;
+use crate::joins::chain::traverse_chain;
+use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
+use arrow::buffer::BooleanBuffer;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_common::{Result, ScalarValue, internal_err};
+
+/// A macro to downcast only supported integer types (up to 64-bit) and invoke
a generic function.
+///
+/// Usage: `downcast_supported_integer!(data_type => (Method, arg1, arg2,
...))`
+///
+/// The `Method` must be an associated method of [`ArrayMap`] that is generic
over
+/// `<T: ArrowNumericType>` and allow `T::Native: AsPrimitive<u64>`.
+macro_rules! downcast_supported_integer {
+ ($DATA_TYPE:expr => ($METHOD:ident $(, $ARGS:expr)*)) => {
+ match $DATA_TYPE {
+ arrow::datatypes::DataType::Int8 =>
ArrayMap::$METHOD::<arrow::datatypes::Int8Type>($($ARGS),*),
+ arrow::datatypes::DataType::Int16 =>
ArrayMap::$METHOD::<arrow::datatypes::Int16Type>($($ARGS),*),
+ arrow::datatypes::DataType::Int32 =>
ArrayMap::$METHOD::<arrow::datatypes::Int32Type>($($ARGS),*),
+ arrow::datatypes::DataType::Int64 =>
ArrayMap::$METHOD::<arrow::datatypes::Int64Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt8 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt8Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt16 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt16Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt32 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt32Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt64 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt64Type>($($ARGS),*),
+ _ => {
+ return internal_err!(
+ "Unsupported type for ArrayMap: {:?}",
+ $DATA_TYPE
+ );
+ }
+ }
+ };
+}
+
+/// A dense map for single-column integer join keys within a limited range.
+///
+/// Maps join keys to build-side indices using direct array indexing:
+/// `data[val - min_val_in_build_side] -> val_idx_in_build_side + 1`.
+///
+/// NULL values are ignored on both the build side and the probe side.
+///
+/// # Handling Negative Numbers with `wrapping_sub`
+///
+/// This implementation supports signed integer ranges (e.g., `[-5, 5]`)
efficiently by
+/// treating them as `u64` (Two's Complement) and relying on the bitwise
properties of
+/// wrapping arithmetic (`wrapping_sub`).
+///
+/// In Two's Complement representation, `a_signed - b_signed` produces the
same bit pattern
+/// as `a_unsigned.wrapping_sub(b_unsigned)` (modulo 2^N). This allows us to
perform
+/// range calculations and zero-based index mapping uniformly for both signed
and unsigned
+/// types without branching.
+///
+/// ## Examples
+///
+/// Consider an `Int64` range `[-5, 5]`.
+/// * `min_val (-5)` casts to `u64`: `...11111011` (`u64::MAX - 4`)
+/// * `max_val (5)` casts to `u64`: `...00000101` (`5`)
+///
+/// **1. Range Calculation**
+///
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+/// (5 - (2^64 - 5)) mod 2^64
+/// = (5 - 2^64 + 5) mod 2^64
+/// = (10 - 2^64) mod 2^64
+/// = 10
+///
+/// ```
+/// The resulting `range` (10) correctly represents the size of the interval
`[-5, 5]`.
+///
+/// **2. Index Lookup (in `get_matched_indices`)**
+///
+/// For a probe value of `0` (which is stored as `0u64`):
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+/// (0 - (2^64 - 5)) mod 2^64
+/// = (-2^64 + 5) mod 2^64
+/// = 5
+/// ```
+/// This correctly maps `-5` to index `0`, `0` to index `5`, etc.
+#[derive(Debug)]
+pub struct ArrayMap {
+ // data[probSideVal-offset] -> valIdxInBuildSide + 1; 0 for absent
+ data: Vec<u32>,
+ // min val in buildSide
+ offset: u64,
+ // next[buildSideIdx] -> next matching valIdxInBuildSide + 1; 0 for end of
chain.
+ // If next is empty, it means there are no duplicate keys (no conflicts).
+ // It uses the same chain-based conflict resolution as [`JoinHashMapType`].
+ next: Vec<u32>,
+ num_of_distinct_key: usize,
+}
+
+impl ArrayMap {
+ pub fn is_supported_type(data_type: &DataType) -> bool {
+ matches!(
+ data_type,
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64
+ )
+ }
+
+ pub(crate) fn key_to_u64(v: &ScalarValue) -> Option<u64> {
+ match v {
+ ScalarValue::Int8(Some(v)) => Some(*v as u64),
+ ScalarValue::Int16(Some(v)) => Some(*v as u64),
+ ScalarValue::Int32(Some(v)) => Some(*v as u64),
+ ScalarValue::Int64(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt8(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt16(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt32(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt64(Some(v)) => Some(*v),
+ _ => None,
+ }
+ }
+
+ /// Estimates the maximum memory usage for an `ArrayMap` with the given
parameters.
+ ///
+ pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize)
-> usize {
+ let range = Self::calculate_range(min_val, max_val);
+ let size = (range + 1) as usize;
+ size * size_of::<u32>() + num_rows * size_of::<u32>()
+ }
+
+ pub fn calculate_range(min_val: u64, max_val: u64) -> u64 {
+ max_val.wrapping_sub(min_val)
+ }
+
+ /// Creates a new [`ArrayMap`] from the given array of join keys.
+ ///
+ /// Note: This function processes only the non-null values in the input
`array`,
+ /// ignoring any rows where the key is `NULL`.
+ ///
+ pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) ->
Result<Self> {
+ let range = max_val.wrapping_sub(min_val);
+ let size = (range + 1) as usize;
Review Comment:
The cast `(range + 1) as usize` on line 163 could potentially overflow if
`range` is close to `u64::MAX`. While the threshold checks in
`try_create_array_map` should prevent extremely large ranges, there's no
explicit check here. Consider adding a bounds check or documenting the
assumption that range is validated before calling this function.
##########
datafusion/physical-plan/src/joins/array_map.rs:
##########
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::DataType;
+use num_traits::AsPrimitive;
+use std::mem::size_of;
+
+use crate::joins::MapOffset;
+use crate::joins::chain::traverse_chain;
+use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
+use arrow::buffer::BooleanBuffer;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_common::{Result, ScalarValue, internal_err};
+
+/// A macro to downcast only supported integer types (up to 64-bit) and invoke
a generic function.
+///
+/// Usage: `downcast_supported_integer!(data_type => (Method, arg1, arg2,
...))`
+///
+/// The `Method` must be an associated method of [`ArrayMap`] that is generic
over
+/// `<T: ArrowNumericType>` and allow `T::Native: AsPrimitive<u64>`.
+macro_rules! downcast_supported_integer {
+ ($DATA_TYPE:expr => ($METHOD:ident $(, $ARGS:expr)*)) => {
+ match $DATA_TYPE {
+ arrow::datatypes::DataType::Int8 =>
ArrayMap::$METHOD::<arrow::datatypes::Int8Type>($($ARGS),*),
+ arrow::datatypes::DataType::Int16 =>
ArrayMap::$METHOD::<arrow::datatypes::Int16Type>($($ARGS),*),
+ arrow::datatypes::DataType::Int32 =>
ArrayMap::$METHOD::<arrow::datatypes::Int32Type>($($ARGS),*),
+ arrow::datatypes::DataType::Int64 =>
ArrayMap::$METHOD::<arrow::datatypes::Int64Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt8 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt8Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt16 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt16Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt32 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt32Type>($($ARGS),*),
+ arrow::datatypes::DataType::UInt64 =>
ArrayMap::$METHOD::<arrow::datatypes::UInt64Type>($($ARGS),*),
+ _ => {
+ return internal_err!(
+ "Unsupported type for ArrayMap: {:?}",
+ $DATA_TYPE
+ );
+ }
+ }
+ };
+}
+
+/// A dense map for single-column integer join keys within a limited range.
+///
+/// Maps join keys to build-side indices using direct array indexing:
+/// `data[val - min_val_in_build_side] -> val_idx_in_build_side + 1`.
+///
+/// NULL values are ignored on both the build side and the probe side.
+///
+/// # Handling Negative Numbers with `wrapping_sub`
+///
+/// This implementation supports signed integer ranges (e.g., `[-5, 5]`)
efficiently by
+/// treating them as `u64` (Two's Complement) and relying on the bitwise
properties of
+/// wrapping arithmetic (`wrapping_sub`).
+///
+/// In Two's Complement representation, `a_signed - b_signed` produces the
same bit pattern
+/// as `a_unsigned.wrapping_sub(b_unsigned)` (modulo 2^N). This allows us to
perform
+/// range calculations and zero-based index mapping uniformly for both signed
and unsigned
+/// types without branching.
+///
+/// ## Examples
+///
+/// Consider an `Int64` range `[-5, 5]`.
+/// * `min_val (-5)` casts to `u64`: `...11111011` (`u64::MAX - 4`)
+/// * `max_val (5)` casts to `u64`: `...00000101` (`5`)
+///
+/// **1. Range Calculation**
+///
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+/// (5 - (2^64 - 5)) mod 2^64
+/// = (5 - 2^64 + 5) mod 2^64
+/// = (10 - 2^64) mod 2^64
+/// = 10
+///
+/// ```
+/// The resulting `range` (10) correctly represents the size of the interval
`[-5, 5]`.
+///
+/// **2. Index Lookup (in `get_matched_indices`)**
+///
+/// For a probe value of `0` (which is stored as `0u64`):
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+/// (0 - (2^64 - 5)) mod 2^64
+/// = (-2^64 + 5) mod 2^64
+/// = 5
+/// ```
+/// This correctly maps `-5` to index `0`, `0` to index `5`, etc.
+#[derive(Debug)]
+pub struct ArrayMap {
+ // data[probSideVal-offset] -> valIdxInBuildSide + 1; 0 for absent
+ data: Vec<u32>,
+ // min val in buildSide
+ offset: u64,
+ // next[buildSideIdx] -> next matching valIdxInBuildSide + 1; 0 for end of
chain.
+ // If next is empty, it means there are no duplicate keys (no conflicts).
+ // It uses the same chain-based conflict resolution as [`JoinHashMapType`].
+ next: Vec<u32>,
+ num_of_distinct_key: usize,
+}
+
+impl ArrayMap {
+ pub fn is_supported_type(data_type: &DataType) -> bool {
+ matches!(
+ data_type,
+ DataType::Int8
+ | DataType::Int16
+ | DataType::Int32
+ | DataType::Int64
+ | DataType::UInt8
+ | DataType::UInt16
+ | DataType::UInt32
+ | DataType::UInt64
+ )
+ }
+
+ pub(crate) fn key_to_u64(v: &ScalarValue) -> Option<u64> {
+ match v {
+ ScalarValue::Int8(Some(v)) => Some(*v as u64),
+ ScalarValue::Int16(Some(v)) => Some(*v as u64),
+ ScalarValue::Int32(Some(v)) => Some(*v as u64),
+ ScalarValue::Int64(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt8(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt16(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt32(Some(v)) => Some(*v as u64),
+ ScalarValue::UInt64(Some(v)) => Some(*v),
+ _ => None,
+ }
+ }
+
+ /// Estimates the maximum memory usage for an `ArrayMap` with the given
parameters.
+ ///
+ pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize)
-> usize {
+ let range = Self::calculate_range(min_val, max_val);
+ let size = (range + 1) as usize;
Review Comment:
Similar to `try_new`, the cast `(range + 1) as usize` on line 148 could
overflow if `range` is close to `u64::MAX`. Since this function is used to
estimate memory before allocation, an overflow could lead to allocating much
less memory than needed or wrapping to a very small value. Consider adding
validation or using checked arithmetic.
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1601,10 +1778,37 @@ mod tests {
#[template]
#[rstest]
- fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
+ fn hash_join_exec_configs(
+ #[values(8192, 10, 5, 2, 1)] batch_size: usize,
+ #[values(true, false)] use_perfect_hash_join_as_possible: bool,
+ ) {
+ }
- fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
- let session_config =
SessionConfig::default().with_batch_size(batch_size);
+ fn prepare_task_ctx(
+ batch_size: usize,
+ use_perfect_hash_join_as_possible: bool,
+ ) -> Arc<TaskContext> {
+ let mut session_config =
SessionConfig::default().with_batch_size(batch_size);
+
+ if use_perfect_hash_join_as_possible {
+ session_config
+ .options_mut()
+ .execution
+ .perfect_hash_join_small_build_threshold = 819200;
+ session_config
+ .options_mut()
+ .execution
+ .perfect_hash_join_min_key_density = 0.0;
+ } else {
+ session_config
+ .options_mut()
+ .execution
+ .perfect_hash_join_small_build_threshold = 0;
+ session_config
+ .options_mut()
+ .execution
+ .perfect_hash_join_min_key_density = 1.0 / 0.0;
Review Comment:
The division by zero `1.0 / 0.0` is used to create positive infinity for
disabling perfect hash join. While this works in Rust (floating-point division
by zero produces infinity, not a panic), it's unclear and unconventional.
Consider using `f64::INFINITY` for better readability and clearer intent.
```suggestion
.perfect_hash_join_min_key_density = f64::INFINITY;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]