Copilot commented on code in PR #19411:
URL: https://github.com/apache/datafusion/pull/19411#discussion_r2668300012


##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -168,6 +249,34 @@ impl JoinLeftData {
 /// `<col1> != <col2>`) are known as "filter expressions" and are evaluated
 /// after the equijoin predicates.
 ///
+/// # ArrayMap Optimization
+///
+/// For joins with a single integer-based join key, `HashJoinExec` may use an 
[`ArrayMap`]
+/// (also known as a "perfect hash join") instead of a general-purpose hash 
map.
+/// This optimization is used when:
+/// 1. There is exactly one join key.
+/// 2. The join key can be any integer type convertible to u64 (excluding i128 
and u128).

Review Comment:
   The documentation refers to "excluding i128 and u128" types, but these types 
are not actually checked or handled anywhere in the code. The 
`is_supported_type` function in `array_map.rs` only supports up to 64-bit 
integers (Int8-Int64, UInt8-UInt64). Consider removing the mention of i128/u128 
from the documentation or clarifying that they're not supported due to design 
limitations, not just current implementation.
   ```suggestion
   /// 2. The join key is an integer type up to 64 bits wide that can be 
losslessly converted
   ///    to `u64` (128-bit integer types such as `i128` and `u128` are not 
supported).
   ```



##########
datafusion/physical-plan/src/joins/array_map.rs:
##########
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::DataType;
+use num_traits::AsPrimitive;
+use std::mem::size_of;
+
+use crate::joins::MapOffset;
+use crate::joins::chain::traverse_chain;
+use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
+use arrow::buffer::BooleanBuffer;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_common::{Result, ScalarValue, internal_err};
+
+/// A macro to downcast only supported integer types (up to 64-bit) and invoke 
a generic function.
+///
+/// Usage: `downcast_supported_integer!(data_type => (Method, arg1, arg2, 
...))`
+///
+/// The `Method` must be an associated method of [`ArrayMap`] that is generic 
over
+/// `<T: ArrowNumericType>` and allow `T::Native: AsPrimitive<u64>`.
+macro_rules! downcast_supported_integer {
+    ($DATA_TYPE:expr => ($METHOD:ident $(, $ARGS:expr)*)) => {
+        match $DATA_TYPE {
+            arrow::datatypes::DataType::Int8 => 
ArrayMap::$METHOD::<arrow::datatypes::Int8Type>($($ARGS),*),
+            arrow::datatypes::DataType::Int16 => 
ArrayMap::$METHOD::<arrow::datatypes::Int16Type>($($ARGS),*),
+            arrow::datatypes::DataType::Int32 => 
ArrayMap::$METHOD::<arrow::datatypes::Int32Type>($($ARGS),*),
+            arrow::datatypes::DataType::Int64 => 
ArrayMap::$METHOD::<arrow::datatypes::Int64Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt8 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt8Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt16 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt16Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt32 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt32Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt64 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt64Type>($($ARGS),*),
+            _ => {
+                return internal_err!(
+                    "Unsupported type for ArrayMap: {:?}",
+                    $DATA_TYPE
+                );
+            }
+        }
+    };
+}
+
+/// A dense map for single-column integer join keys within a limited range.
+///
+/// Maps join keys to build-side indices using direct array indexing:
+/// `data[val - min_val_in_build_side] -> val_idx_in_build_side + 1`.
+///
+/// NULL values are ignored on both the build side and the probe side.
+///
+/// # Handling Negative Numbers with `wrapping_sub`
+///
+/// This implementation supports signed integer ranges (e.g., `[-5, 5]`) 
efficiently by
+/// treating them as `u64` (Two's Complement) and relying on the bitwise 
properties of
+/// wrapping arithmetic (`wrapping_sub`).
+///
+/// In Two's Complement representation, `a_signed - b_signed` produces the 
same bit pattern
+/// as `a_unsigned.wrapping_sub(b_unsigned)` (modulo 2^N). This allows us to 
perform
+/// range calculations and zero-based index mapping uniformly for both signed 
and unsigned
+/// types without branching.
+///
+/// ## Examples
+///
+/// Consider an `Int64` range `[-5, 5]`.
+/// * `min_val (-5)` casts to `u64`: `...11111011` (`u64::MAX - 4`)
+/// * `max_val (5)` casts to `u64`: `...00000101` (`5`)
+///
+/// **1. Range Calculation**
+///
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+///   (5 - (2^64 - 5)) mod 2^64
+/// = (5 - 2^64 + 5) mod 2^64
+/// = (10 - 2^64) mod 2^64
+/// = 10
+///
+/// ```
+/// The resulting `range` (10) correctly represents the size of the interval 
`[-5, 5]`.
+///
+/// **2. Index Lookup (in `get_matched_indices`)**
+///
+/// For a probe value of `0` (which is stored as `0u64`):
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+///   (0 - (2^64 - 5)) mod 2^64
+/// = (-2^64 + 5) mod 2^64
+/// = 5
+/// ```
+/// This correctly maps `-5` to index `0`, `0` to index `5`, etc.
+#[derive(Debug)]
+pub struct ArrayMap {
+    // data[probSideVal-offset] -> valIdxInBuildSide + 1; 0 for absent
+    data: Vec<u32>,
+    // min val in buildSide
+    offset: u64,
+    // next[buildSideIdx] -> next matching valIdxInBuildSide + 1; 0 for end of 
chain.
+    // If next is empty, it means there are no duplicate keys (no conflicts).
+    // It uses the same chain-based conflict resolution as [`JoinHashMapType`].
+    next: Vec<u32>,
+    num_of_distinct_key: usize,
+}
+
+impl ArrayMap {
+    pub fn is_supported_type(data_type: &DataType) -> bool {
+        matches!(
+            data_type,
+            DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+        )
+    }
+
+    pub(crate) fn key_to_u64(v: &ScalarValue) -> Option<u64> {
+        match v {
+            ScalarValue::Int8(Some(v)) => Some(*v as u64),
+            ScalarValue::Int16(Some(v)) => Some(*v as u64),
+            ScalarValue::Int32(Some(v)) => Some(*v as u64),
+            ScalarValue::Int64(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt8(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt16(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt32(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt64(Some(v)) => Some(*v),
+            _ => None,
+        }
+    }
+
+    /// Estimates the maximum memory usage for an `ArrayMap` with the given 
parameters.
+    ///
+    pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize) 
-> usize {
+        let range = Self::calculate_range(min_val, max_val);
+        let size = (range + 1) as usize;
+        size * size_of::<u32>() + num_rows * size_of::<u32>()
+    }
+
+    pub fn calculate_range(min_val: u64, max_val: u64) -> u64 {
+        max_val.wrapping_sub(min_val)
+    }
+
+    /// Creates a new [`ArrayMap`] from the given array of join keys.
+    ///
+    /// Note: This function processes only the non-null values in the input 
`array`,
+    /// ignoring any rows where the key is `NULL`.
+    ///
+    pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> 
Result<Self> {
+        let range = max_val.wrapping_sub(min_val);
+        let size = (range + 1) as usize;

Review Comment:
   The cast `(range + 1) as usize` on line 163 could potentially overflow if 
`range` is close to `u64::MAX`. While the threshold checks in 
`try_create_array_map` should prevent extremely large ranges, there's no 
explicit check here. Consider adding a bounds check or documenting the 
assumption that range is validated before calling this function.



##########
datafusion/physical-plan/src/joins/array_map.rs:
##########
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::DataType;
+use num_traits::AsPrimitive;
+use std::mem::size_of;
+
+use crate::joins::MapOffset;
+use crate::joins::chain::traverse_chain;
+use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
+use arrow::buffer::BooleanBuffer;
+use arrow::datatypes::ArrowNumericType;
+use datafusion_common::{Result, ScalarValue, internal_err};
+
+/// A macro to downcast only supported integer types (up to 64-bit) and invoke 
a generic function.
+///
+/// Usage: `downcast_supported_integer!(data_type => (Method, arg1, arg2, 
...))`
+///
+/// The `Method` must be an associated method of [`ArrayMap`] that is generic 
over
+/// `<T: ArrowNumericType>` and allow `T::Native: AsPrimitive<u64>`.
+macro_rules! downcast_supported_integer {
+    ($DATA_TYPE:expr => ($METHOD:ident $(, $ARGS:expr)*)) => {
+        match $DATA_TYPE {
+            arrow::datatypes::DataType::Int8 => 
ArrayMap::$METHOD::<arrow::datatypes::Int8Type>($($ARGS),*),
+            arrow::datatypes::DataType::Int16 => 
ArrayMap::$METHOD::<arrow::datatypes::Int16Type>($($ARGS),*),
+            arrow::datatypes::DataType::Int32 => 
ArrayMap::$METHOD::<arrow::datatypes::Int32Type>($($ARGS),*),
+            arrow::datatypes::DataType::Int64 => 
ArrayMap::$METHOD::<arrow::datatypes::Int64Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt8 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt8Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt16 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt16Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt32 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt32Type>($($ARGS),*),
+            arrow::datatypes::DataType::UInt64 => 
ArrayMap::$METHOD::<arrow::datatypes::UInt64Type>($($ARGS),*),
+            _ => {
+                return internal_err!(
+                    "Unsupported type for ArrayMap: {:?}",
+                    $DATA_TYPE
+                );
+            }
+        }
+    };
+}
+
+/// A dense map for single-column integer join keys within a limited range.
+///
+/// Maps join keys to build-side indices using direct array indexing:
+/// `data[val - min_val_in_build_side] -> val_idx_in_build_side + 1`.
+///
+/// NULL values are ignored on both the build side and the probe side.
+///
+/// # Handling Negative Numbers with `wrapping_sub`
+///
+/// This implementation supports signed integer ranges (e.g., `[-5, 5]`) 
efficiently by
+/// treating them as `u64` (Two's Complement) and relying on the bitwise 
properties of
+/// wrapping arithmetic (`wrapping_sub`).
+///
+/// In Two's Complement representation, `a_signed - b_signed` produces the 
same bit pattern
+/// as `a_unsigned.wrapping_sub(b_unsigned)` (modulo 2^N). This allows us to 
perform
+/// range calculations and zero-based index mapping uniformly for both signed 
and unsigned
+/// types without branching.
+///
+/// ## Examples
+///
+/// Consider an `Int64` range `[-5, 5]`.
+/// * `min_val (-5)` casts to `u64`: `...11111011` (`u64::MAX - 4`)
+/// * `max_val (5)` casts to `u64`: `...00000101` (`5`)
+///
+/// **1. Range Calculation**
+///
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+///   (5 - (2^64 - 5)) mod 2^64
+/// = (5 - 2^64 + 5) mod 2^64
+/// = (10 - 2^64) mod 2^64
+/// = 10
+///
+/// ```
+/// The resulting `range` (10) correctly represents the size of the interval 
`[-5, 5]`.
+///
+/// **2. Index Lookup (in `get_matched_indices`)**
+///
+/// For a probe value of `0` (which is stored as `0u64`):
+/// ```text
+/// In modular arithmetic, this is equivalent to:
+///   (0 - (2^64 - 5)) mod 2^64
+/// = (-2^64 + 5) mod 2^64
+/// = 5
+/// ```
+/// This correctly maps `-5` to index `0`, `0` to index `5`, etc.
+#[derive(Debug)]
+pub struct ArrayMap {
+    // data[probSideVal-offset] -> valIdxInBuildSide + 1; 0 for absent
+    data: Vec<u32>,
+    // min val in buildSide
+    offset: u64,
+    // next[buildSideIdx] -> next matching valIdxInBuildSide + 1; 0 for end of 
chain.
+    // If next is empty, it means there are no duplicate keys (no conflicts).
+    // It uses the same chain-based conflict resolution as [`JoinHashMapType`].
+    next: Vec<u32>,
+    num_of_distinct_key: usize,
+}
+
+impl ArrayMap {
+    pub fn is_supported_type(data_type: &DataType) -> bool {
+        matches!(
+            data_type,
+            DataType::Int8
+                | DataType::Int16
+                | DataType::Int32
+                | DataType::Int64
+                | DataType::UInt8
+                | DataType::UInt16
+                | DataType::UInt32
+                | DataType::UInt64
+        )
+    }
+
+    pub(crate) fn key_to_u64(v: &ScalarValue) -> Option<u64> {
+        match v {
+            ScalarValue::Int8(Some(v)) => Some(*v as u64),
+            ScalarValue::Int16(Some(v)) => Some(*v as u64),
+            ScalarValue::Int32(Some(v)) => Some(*v as u64),
+            ScalarValue::Int64(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt8(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt16(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt32(Some(v)) => Some(*v as u64),
+            ScalarValue::UInt64(Some(v)) => Some(*v),
+            _ => None,
+        }
+    }
+
+    /// Estimates the maximum memory usage for an `ArrayMap` with the given 
parameters.
+    ///
+    pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize) 
-> usize {
+        let range = Self::calculate_range(min_val, max_val);
+        let size = (range + 1) as usize;

Review Comment:
   Similar to `try_new`, the cast `(range + 1) as usize` on line 148 could 
overflow if `range` is close to `u64::MAX`. Since this function is used to 
estimate memory before allocation, an overflow could lead to allocating much 
less memory than needed or wrapping to a very small value. Consider adding 
validation or using checked arithmetic.



##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1601,10 +1778,37 @@ mod tests {
 
     #[template]
     #[rstest]
-    fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {}
+    fn hash_join_exec_configs(
+        #[values(8192, 10, 5, 2, 1)] batch_size: usize,
+        #[values(true, false)] use_perfect_hash_join_as_possible: bool,
+    ) {
+    }
 
-    fn prepare_task_ctx(batch_size: usize) -> Arc<TaskContext> {
-        let session_config = 
SessionConfig::default().with_batch_size(batch_size);
+    fn prepare_task_ctx(
+        batch_size: usize,
+        use_perfect_hash_join_as_possible: bool,
+    ) -> Arc<TaskContext> {
+        let mut session_config = 
SessionConfig::default().with_batch_size(batch_size);
+
+        if use_perfect_hash_join_as_possible {
+            session_config
+                .options_mut()
+                .execution
+                .perfect_hash_join_small_build_threshold = 819200;
+            session_config
+                .options_mut()
+                .execution
+                .perfect_hash_join_min_key_density = 0.0;
+        } else {
+            session_config
+                .options_mut()
+                .execution
+                .perfect_hash_join_small_build_threshold = 0;
+            session_config
+                .options_mut()
+                .execution
+                .perfect_hash_join_min_key_density = 1.0 / 0.0;

Review Comment:
   The division by zero `1.0 / 0.0` is used to create positive infinity for 
disabling perfect hash join. While this works in Rust (floating-point division 
by zero produces infinity, not a panic), it's unclear and unconventional. 
Consider using `f64::INFINITY` for better readability and clearer intent.
   ```suggestion
                   .perfect_hash_join_min_key_density = f64::INFINITY;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to