gene-bordegaray commented on code in PR #21931:
URL: https://github.com/apache/datafusion/pull/21931#discussion_r3242278159


##########
datafusion/common/src/config.rs:
##########
@@ -1309,11 +1309,15 @@ config_namespace! {
         /// very large IN lists that might not provide much benefit over hash 
table lookups.
         ///
         /// This uses the deduplicated row count once the build side has been 
evaluated.
+        /// In `Partitioned` hash-join mode the same threshold also gates the

Review Comment:
   does this also affect CollectLeft?



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -650,47 +701,157 @@ impl SharedBuildAccumulator {
                     }
                 }
 
-                let filter_expr = if has_canceled_unknown {
-                    let mut when_then_branches = empty_partition_ids
-                        .into_iter()
-                        .map(|partition_id| {
-                            (
-                                lit(ScalarValue::UInt64(Some(partition_id as 
u64))),
-                                lit(false),
-                            )
-                        })
-                        .collect::<Vec<_>>();
-                    when_then_branches.extend(real_branches);
-
-                    if when_then_branches.is_empty() {
-                        lit(true)
-                    } else {
-                        Arc::new(CaseExpr::try_new(
-                            Some(modulo_expr),
-                            when_then_branches,
-                            Some(lit(true)),
-                        )?) as Arc<dyn PhysicalExpr>
-                    }
-                } else if real_branches.is_empty() {
-                    lit(false)
-                } else if real_branches.len() == 1
-                    && empty_partition_ids.len() + 1 == num_partitions
-                {
-                    Arc::clone(&real_branches[0].1)
-                } else {
-                    Arc::new(CaseExpr::try_new(
-                        Some(modulo_expr),
-                        real_branches,
-                        Some(lit(false)),
-                    )?) as Arc<dyn PhysicalExpr>
-                };
-
+                let filter_expr = self
+                    .build_partitioned_filter(&real_partitions, 
has_canceled_unknown)?;
                 self.dynamic_filter.update(filter_expr)?;
             }
         }
 
         Ok(())
     }
+
+    /// CollectLeft has a single shared build side, so we always have one
+    /// `Map`. We prefer the InList expression when it's available (the build
+    /// side fit under the InList caps) because it's directly representable in
+    /// parquet stats / bloom-filter pruning at the scan; otherwise fall back
+    /// to a single `hash_lookup` against the map.
+    fn collect_left_membership(
+        &self,
+        pushdown: &PushdownStrategy,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        if let Some(arr) = &pushdown.inlist {
+            return Ok(Some(create_inlist_predicate(
+                &self.on_right,
+                Arc::clone(arr),
+                self.probe_schema.as_ref(),
+            )?));
+        }
+        Ok(pushdown.map.as_ref().map(|map| {
+            create_hash_lookup_predicate(&self.on_right, Arc::clone(map), 
&HASH_JOIN_SEED)
+        }))
+    }
+
+    /// Build the dynamic filter for `PartitionMode::Partitioned`. Emits
+    /// `global_minmax AND ([merged_in_list AND] multi_hash_lookup)` —
+    /// independent of how the build side was repartitioned.
+    ///
+    /// * `global_minmax` — envelope of every partition's per-column min/max.
+    ///   Cheap short-circuit and the only piece visible to scan-level
+    ///   `pruning_predicate` extraction.
+    /// * `merged_in_list` — concatenated, deduplicated build keys when every
+    ///   reported partition contributed an `InList` array and the
+    ///   cross-partition union fits under
+    ///   `optimizer.hash_join_inlist_pushdown_max_distinct_values`. A small
+    ///   `IN (SET)` participates in parquet stats / bloom-filter pruning,
+    ///   which `multi_hash_lookup` does not. When present it fully replaces
+    ///   the lookup.
+    /// * `multi_hash_lookup` — hashes the join keys once and ORs
+    ///   `contain_hashes()` across every partition's hash table.
+    ///
+    /// `has_canceled_unknown` partitions short-circuit to `lit(true)`: we
+    /// don't have their maps, so we cannot include them in the lookup, and
+    /// the query is being torn down anyway.
+    fn build_partitioned_filter(
+        &self,
+        real_partitions: &[&PartitionData],
+        has_canceled_unknown: bool,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        if has_canceled_unknown {
+            return Ok(lit(true));
+        }
+        if real_partitions.is_empty() {
+            return Ok(lit(false));
+        }
+
+        let bounds_refs: Vec<&PartitionBounds> =
+            real_partitions.iter().map(|p| &p.bounds).collect();
+        let global_bounds_expr = compute_global_bounds(&bounds_refs)
+            .as_ref()
+            .and_then(|b| create_bounds_predicate(&self.on_right, b));
+
+        // The merged InList covers the union of every partition's
+        // build-side keys, so when it fires it stands alone — there is no
+        // need to also AND a `multi_hash_lookup` (which would just probe
+        // the same data via a different structure).
+        let membership_expr =
+            if let Some(merged) = 
self.try_build_merged_inlist(real_partitions)? {
+                Some(merged)
+            } else {
+                let maps: Vec<Arc<Map>> = real_partitions
+                    .iter()
+                    .filter_map(|p| p.pushdown.map.clone())
+                    .collect();

Review Comment:
   This can become bounds only if non are present



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -202,6 +195,64 @@ fn combine_membership_and_bounds(
     }
 }
 
+/// Compute the global (envelope) min/max bounds across a set of partition 
bounds.
+///
+/// For each column index, returns the smallest min seen and the largest max 
seen.
+/// Columns where any partition is missing bounds, or where bounds are not 
totally
+/// ordered (e.g. mixed-type comparisons), are dropped from the global 
envelope.
+fn compute_global_bounds(per_partition: &[&PartitionBounds]) -> 
Option<PartitionBounds> {
+    let mut iter = per_partition.iter();
+    let first = iter.next()?;
+    let mut acc: Vec<Option<ColumnBounds>> = first
+        .column_bounds
+        .iter()
+        .map(|cb| Some(cb.clone()))
+        .collect();
+
+    for partition in iter {
+        if partition.column_bounds.len() != acc.len() {
+            return None;
+        }
+        for (slot, cb) in acc.iter_mut().zip(partition.column_bounds.iter()) {
+            let Some(existing) = slot.as_mut() else {
+                continue;
+            };
+            match cb.min.partial_cmp(&existing.min) {
+                Some(std::cmp::Ordering::Less) => existing.min = 
cb.min.clone(),
+                Some(_) => {}
+                None => {
+                    *slot = None;
+                    continue;
+                }
+            }
+            match cb.max.partial_cmp(&existing.max) {
+                Some(std::cmp::Ordering::Greater) => existing.max = 
cb.max.clone(),
+                Some(_) => {}
+                None => *slot = None,
+            }
+        }
+    }
+
+    let merged: Vec<ColumnBounds> = acc.into_iter().flatten().collect();

Review Comment:
   still think this flatten can cuase a problem. would prefer regression test 
with this as well 👍 



##########
datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs:
##########
@@ -343,6 +354,197 @@ impl PhysicalExpr for HashTableLookupExpr {
     }
 }
 
+/// Physical expression that probes the same join keys against multiple 
[`Map`]s
+/// and returns `true` for any row whose join keys match at least one map.
+///
+/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s, but
+/// `create_hashes` runs exactly once for the whole batch and every
+/// [`Map::HashMap`] probe shares the same hash buffer. All `HashMap`
+/// entries must therefore have been built with the same `RandomState`;
+/// [`Map::ArrayMap`] entries are queried via `contain_keys` and do not
+/// consume hashes.
+pub struct MultiMapLookupExpr {
+    /// Join-key expressions evaluated against each input batch.
+    on_columns: Vec<PhysicalExprRef>,
+    /// Hashing seed shared by every entry in `maps`.
+    random_state: SeededRandomState,
+    /// Build-side maps to OR over, one per partition.
+    maps: Vec<Arc<Map>>,
+    /// Display name used in `EXPLAIN` output (e.g. `"multi_hash_lookup"`).
+    description: String,
+}
+
+impl MultiMapLookupExpr {
+    pub fn new(
+        on_columns: Vec<PhysicalExprRef>,
+        random_state: SeededRandomState,
+        maps: Vec<Arc<Map>>,
+        description: String,
+    ) -> Self {
+        Self {
+            on_columns,
+            random_state,
+            maps,
+            description,
+        }
+    }
+}
+
+impl std::fmt::Debug for MultiMapLookupExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let cols = self
+            .on_columns
+            .iter()
+            .map(|e| e.to_string())
+            .collect::<Vec<_>>()
+            .join(", ");
+        write!(
+            f,
+            "{}({cols}, [{}], maps={})",
+            self.description,
+            self.random_state.seed(),
+            self.maps.len()
+        )
+    }
+}
+
+impl Hash for MultiMapLookupExpr {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.on_columns.dyn_hash(state);
+        self.description.hash(state);
+        self.random_state.seed().hash(state);
+        // See HashTableLookupExpr — pointer identity is what we use for Maps.
+        for map in &self.maps {
+            Arc::as_ptr(map).hash(state);
+        }
+    }
+}
+
+impl PartialEq for MultiMapLookupExpr {
+    fn eq(&self, other: &Self) -> bool {
+        self.on_columns == other.on_columns
+            && self.description == other.description
+            && self.random_state.seed() == other.random_state.seed()
+            && self.maps.len() == other.maps.len()
+            && self
+                .maps
+                .iter()
+                .zip(other.maps.iter())
+                .all(|(a, b)| Arc::ptr_eq(a, b))
+    }
+}
+
+impl Eq for MultiMapLookupExpr {}
+
+impl Display for MultiMapLookupExpr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.description)
+    }
+}
+
+impl PhysicalExpr for MultiMapLookupExpr {
+    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
+        self.on_columns.iter().collect()
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        Ok(Arc::new(MultiMapLookupExpr::new(
+            children,
+            self.random_state.clone(),
+            self.maps.clone(),
+            self.description.clone(),
+        )))
+    }
+
+    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
+        Ok(DataType::Boolean)
+    }
+
+    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
+        Ok(false)
+    }
+
+    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
+        let num_rows = batch.num_rows();
+        let join_keys = evaluate_columns(&self.on_columns, batch)?;
+
+        if self.maps.is_empty() || num_rows == 0 {
+            // Empty `maps` would not be constructed by the dynamic-filter
+            // builder — guard anyway: an empty OR is `false` for every row.
+            let buffer = BooleanBufferBuilder::new(num_rows);
+            let mut buffer = buffer;

Review Comment:
   just "let mut buffer = ..."



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -650,47 +701,157 @@ impl SharedBuildAccumulator {
                     }
                 }
 
-                let filter_expr = if has_canceled_unknown {
-                    let mut when_then_branches = empty_partition_ids
-                        .into_iter()
-                        .map(|partition_id| {
-                            (
-                                lit(ScalarValue::UInt64(Some(partition_id as 
u64))),
-                                lit(false),
-                            )
-                        })
-                        .collect::<Vec<_>>();
-                    when_then_branches.extend(real_branches);
-
-                    if when_then_branches.is_empty() {
-                        lit(true)
-                    } else {
-                        Arc::new(CaseExpr::try_new(
-                            Some(modulo_expr),
-                            when_then_branches,
-                            Some(lit(true)),
-                        )?) as Arc<dyn PhysicalExpr>
-                    }
-                } else if real_branches.is_empty() {
-                    lit(false)
-                } else if real_branches.len() == 1
-                    && empty_partition_ids.len() + 1 == num_partitions
-                {
-                    Arc::clone(&real_branches[0].1)
-                } else {
-                    Arc::new(CaseExpr::try_new(
-                        Some(modulo_expr),
-                        real_branches,
-                        Some(lit(false)),
-                    )?) as Arc<dyn PhysicalExpr>
-                };
-
+                let filter_expr = self
+                    .build_partitioned_filter(&real_partitions, 
has_canceled_unknown)?;
                 self.dynamic_filter.update(filter_expr)?;
             }
         }
 
         Ok(())
     }
+
+    /// CollectLeft has a single shared build side, so we always have one
+    /// `Map`. We prefer the InList expression when it's available (the build
+    /// side fit under the InList caps) because it's directly representable in
+    /// parquet stats / bloom-filter pruning at the scan; otherwise fall back
+    /// to a single `hash_lookup` against the map.
+    fn collect_left_membership(
+        &self,
+        pushdown: &PushdownStrategy,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        if let Some(arr) = &pushdown.inlist {
+            return Ok(Some(create_inlist_predicate(
+                &self.on_right,
+                Arc::clone(arr),
+                self.probe_schema.as_ref(),
+            )?));
+        }
+        Ok(pushdown.map.as_ref().map(|map| {
+            create_hash_lookup_predicate(&self.on_right, Arc::clone(map), 
&HASH_JOIN_SEED)
+        }))
+    }
+
+    /// Build the dynamic filter for `PartitionMode::Partitioned`. Emits

Review Comment:
   I think more accurate is:
   ```text
   global_minmax AND merged_in_list
   
   or
   
   global_minmax AND multi_hash_lookup
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to