LiaCastaneda commented on code in PR #18393:
URL: https://github.com/apache/datafusion/pull/18393#discussion_r2546056222


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -333,81 +402,154 @@ impl SharedBuildAccumulator {
                 // CollectLeft: Simple conjunction of bounds and membership 
check
                 AccumulatedBuildData::CollectLeft { data } => {
                     if let Some(partition_data) = data {
+                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
+                        let membership_expr = create_membership_predicate(
+                            &self.on_right,
+                            partition_data.pushdown.clone(),
+                            &HASH_JOIN_SEED,
+                            self.probe_schema.as_ref(),
+                        )?;
+
                         // Create bounds check expression (if bounds available)
-                        let Some(filter_expr) = create_bounds_predicate(
+                        let bounds_expr = create_bounds_predicate(
                             &self.on_right,
                             &partition_data.bounds,
-                        ) else {
-                            // No bounds available, nothing to update
-                            return Ok(());
+                        );
+
+                        // Combine membership and bounds expressions
+                        let filter_expr = match (membership_expr, bounds_expr) 
{

Review Comment:
   I probably missed the explanation somewhere in previous threads, but is 
there a special benefit of pushing both bounds and IN LIST filters into the 
consumer?



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -333,81 +402,154 @@ impl SharedBuildAccumulator {
                 // CollectLeft: Simple conjunction of bounds and membership 
check
                 AccumulatedBuildData::CollectLeft { data } => {
                     if let Some(partition_data) = data {
+                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
+                        let membership_expr = create_membership_predicate(
+                            &self.on_right,
+                            partition_data.pushdown.clone(),
+                            &HASH_JOIN_SEED,
+                            self.probe_schema.as_ref(),
+                        )?;
+
                         // Create bounds check expression (if bounds available)
-                        let Some(filter_expr) = create_bounds_predicate(
+                        let bounds_expr = create_bounds_predicate(
                             &self.on_right,
                             &partition_data.bounds,
-                        ) else {
-                            // No bounds available, nothing to update
-                            return Ok(());
+                        );
+
+                        // Combine membership and bounds expressions
+                        let filter_expr = match (membership_expr, bounds_expr) 
{
+                            (Some(membership), Some(bounds)) => {
+                                // Both available: combine with AND
+                                Arc::new(BinaryExpr::new(
+                                    bounds,
+                                    Operator::And,
+                                    membership,
+                                ))
+                                    as Arc<dyn PhysicalExpr>
+                            }
+                            (Some(membership), None) => membership,
+                            (None, Some(bounds)) => bounds,

Review Comment:
   Is (None, Some(bounds))  and (Some(membership), None)  actually reachable? 
If we have no data, we shouldn't have any bounds either, right?



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -333,81 +402,154 @@ impl SharedBuildAccumulator {
                 // CollectLeft: Simple conjunction of bounds and membership 
check
                 AccumulatedBuildData::CollectLeft { data } => {
                     if let Some(partition_data) = data {
+                        // Create membership predicate (InList for small build 
sides, hash lookup otherwise)
+                        let membership_expr = create_membership_predicate(
+                            &self.on_right,
+                            partition_data.pushdown.clone(),
+                            &HASH_JOIN_SEED,
+                            self.probe_schema.as_ref(),
+                        )?;
+
                         // Create bounds check expression (if bounds available)
-                        let Some(filter_expr) = create_bounds_predicate(
+                        let bounds_expr = create_bounds_predicate(
                             &self.on_right,
                             &partition_data.bounds,
-                        ) else {
-                            // No bounds available, nothing to update
-                            return Ok(());
+                        );
+
+                        // Combine membership and bounds expressions
+                        let filter_expr = match (membership_expr, bounds_expr) 
{
+                            (Some(membership), Some(bounds)) => {
+                                // Both available: combine with AND
+                                Arc::new(BinaryExpr::new(
+                                    bounds,
+                                    Operator::And,
+                                    membership,
+                                ))
+                                    as Arc<dyn PhysicalExpr>
+                            }
+                            (Some(membership), None) => membership,
+                            (None, Some(bounds)) => bounds,
+                            (None, None) => {
+                                // No filter available, nothing to update
+                                return Ok(());
+                            }
                         };
 
                         self.dynamic_filter.update(filter_expr)?;
                     }
                 }
                 // Partitioned: CASE expression routing to per-partition 
filters
                 AccumulatedBuildData::Partitioned { partitions } => {
-                    // Collect all partition data, skipping empty partitions
+                    // Collect all partition data (should all be Some at this 
point)
                     let partition_data: Vec<_> =
                         partitions.iter().filter_map(|p| p.as_ref()).collect();
 
-                    if partition_data.is_empty() {
-                        // All partitions are empty: no rows can match, skip 
the probe side entirely
-                        self.dynamic_filter.update(lit(false))?;
-                        return Ok(());
-                    }
+                    if !partition_data.is_empty() {
+                        // Build a CASE expression that combines range checks 
AND membership checks
+                        // CASE (hash_repartition(join_keys) % num_partitions)
+                        //   WHEN 0 THEN (col >= min_0 AND col <= max_0 AND 
...) AND membership_check_0
+                        //   WHEN 1 THEN (col >= min_1 AND col <= max_1 AND 
...) AND membership_check_1
+                        //   ...
+                        //   ELSE false
+                        // END
+
+                        let num_partitions = partition_data.len();
+
+                        // Create base expression: hash_repartition(join_keys) 
% num_partitions
+                        let routing_hash_expr = Arc::new(HashExpr::new(
+                            self.on_right.clone(),
+                            self.repartition_random_state.clone(),
+                            "hash_repartition".to_string(),
+                        ))
+                            as Arc<dyn PhysicalExpr>;
+
+                        let modulo_expr = Arc::new(BinaryExpr::new(
+                            routing_hash_expr,
+                            Operator::Modulo,
+                            lit(ScalarValue::UInt64(Some(num_partitions as 
u64))),
+                        ))
+                            as Arc<dyn PhysicalExpr>;
+
+                        // Create WHEN branches for each partition
+                        let when_then_branches: Vec<(
+                            Arc<dyn PhysicalExpr>,
+                            Arc<dyn PhysicalExpr>,
+                        )> = partitions
+                            .iter()
+                            .enumerate()
+                            .filter_map(|(partition_id, partition_opt)| {
+                                partition_opt.as_ref().and_then(|partition| {
+                                    // Skip empty partitions - they would 
always return false anyway
+                                    match &partition.pushdown {
+                                        PushdownStrategy::Empty => None,
+                                        _ => Some((partition_id, partition)),
+                                    }
+                                })
+                            })
+                            .map(|(partition_id, partition)| -> Result<_> {
+                                // WHEN partition_id
+                                let when_expr =
+                                    lit(ScalarValue::UInt64(Some(partition_id 
as u64)));
+
+                                // THEN: Combine bounds check AND membership 
predicate
+
+                                // 1. Create membership predicate (InList for 
small build sides, hash lookup otherwise)
+                                let membership_expr = 
create_membership_predicate(
+                                    &self.on_right,
+                                    partition.pushdown.clone(),
+                                    &HASH_JOIN_SEED,
+                                    self.probe_schema.as_ref(),
+                                )?;
+
+                                // 2. Create bounds check expression for this 
partition (if bounds available)
+                                let bounds_expr = create_bounds_predicate(
+                                    &self.on_right,
+                                    &partition.bounds,
+                                );
+
+                                // 3. Combine membership and bounds expressions
+                                let then_expr = match (membership_expr, 
bounds_expr) {
+                                    (Some(membership), Some(bounds)) => {
+                                        // Both available: combine with AND
+                                        Arc::new(BinaryExpr::new(
+                                            bounds,
+                                            Operator::And,
+                                            membership,
+                                        ))
+                                            as Arc<dyn PhysicalExpr>
+                                    }
+                                    (Some(membership), None) => membership,
+                                    (None, Some(bounds)) => bounds,
+                                    (None, None) => {
+                                        // No filter for this partition - 
should not happen due to filter_map above
+                                        // but handle defensively by returning 
a "true" literal
+                                        lit(true)
+                                    }
+                                };
+
+                                Ok((when_expr, then_expr))
+                            })
+                            .collect::<Result<Vec<_>>>()?;
+
+                        // Optimize for single partition: skip CASE expression 
entirely
+                        let filter_expr = if when_then_branches.is_empty() {
+                            // All partitions are empty: no rows can match
+                            lit(false)
+                        } else if when_then_branches.len() == 1 {
+                            // Single partition: just use the condition 
directly
+                            // since hash % 1 == 0 always, the WHEN 0 branch 
will always match
+                            Arc::clone(&when_then_branches[0].1)
+                        } else {

Review Comment:
   This makes sense, so we avoid calling `create_hashes` for every single row 
on the probe side if it's going to end up landing on the same branch. Can we 
add a comment to the tests that changed in 
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs about this?



##########
datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs:
##########
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities for building InList expressions from hash join build side data
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, StructArray};
+use arrow::datatypes::{Field, FieldRef, Fields};
+use arrow::downcast_dictionary_array;
+use arrow_schema::DataType;
+use datafusion_common::Result;
+
+pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result<Fields> {
+    data_types
+        .iter()
+        .enumerate()
+        .map(|(i, dt)| Ok(Field::new(format!("c{i}"), dt.clone(), true)))
+        .collect()
+}
+
+/// Flattens dictionary-encoded arrays to their underlying value arrays.
+/// Non-dictionary arrays are returned as-is.
+fn flatten_dictionary_array(array: &ArrayRef) -> ArrayRef {
+    downcast_dictionary_array! {
+        array => {
+            // Recursively flatten in case of nested dictionaries
+            flatten_dictionary_array(array.values())
+        }
+        _ => Arc::clone(array)
+    }
+}
+
+/// Builds InList values from join key column arrays.
+///
+/// If `join_key_arrays` is:
+/// 1. A single array, let's say Int32, this will produce a flat
+///    InList expression where the lookup is expected to be scalar Int32 
values,
+///    that is: this will produce `IN LIST (1, 2, 3)` expected to be used as 
`2 IN LIST (1, 2, 3)`.
+/// 2. An Int32 array and a Utf8 array, this will produce a Struct InList 
expression
+///    where the lookup is expected to be Struct values with two fields 
(Int32, Utf8),
+///    that is: this will produce `IN LIST ((1, "a"), (2, "b"))` expected to 
be used as `(2, "b") IN LIST ((1, "a"), (2, "b"))`.
+///    The field names of the struct are auto-generated as "c0", "c1", ... and 
should match the struct expression used in the join keys.
+///
+/// Note that this will not deduplicate values, that will happen later when 
building an InList expression from this array.
+///
+/// Returns `None` if the estimated size exceeds `max_size_bytes`.
+/// Performs deduplication to ensure unique values only.

Review Comment:
   🤔 I think it doesn't dedup here



##########
datafusion/common/src/config.rs:
##########
@@ -1019,6 +1019,22 @@ config_namespace! {
         /// will be collected into a single partition
         pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 
128
 
+        /// Maximum size in bytes for the build side of a hash join to be 
pushed down as an InList expression for dynamic filtering.
+        /// Build sides larger than this will use hash table lookups instead.
+        /// Set to 0 to always use hash table lookups.
+        ///
+        /// InList pushdown can be more efficient for small build sides 
because it can result in better
+        /// statistics pruning as well as use any bloom filters present on the 
scan side.
+        /// InList expressions are also more transparent and easier to 
serialize over the network in distributed uses of DataFusion.
+        /// On the other hand InList pushdown requires making a copy of the 
data and thus adds some overhead to the build side and uses more memory.
+        ///
+        /// This setting is per-partition, so we may end up using 
`hash_join_inlist_pushdown_max_size` * `target_partitions` memory.
+        ///
+        /// The default is 128kB per partition.
+        /// This should allow point lookup joins (e.g. joining on a unique 
primary key) to use InList pushdown in most cases
+        /// but avoids excessive memory usage or overhead for larger joins.
+        pub hash_join_inlist_pushdown_max_size: usize, default = 128 * 1024

Review Comment:
   Maybe a future improvement could be to also expose an option to limit the 
number of distinct values that can be inside an IN LIST. For instance, we could 
end up with a very large list like `x IN (1, 2, 3, ..., 1000000)` that fits in 
128KB but is still inefficient because we'd be duplicating values and 
performance might decrease.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to