adriangb commented on code in PR #21931:
URL: https://github.com/apache/datafusion/pull/21931#discussion_r3222027073
##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -649,47 +700,157 @@ impl SharedBuildAccumulator {
}
}
- let filter_expr = if has_canceled_unknown {
- let mut when_then_branches = empty_partition_ids
- .into_iter()
- .map(|partition_id| {
- (
- lit(ScalarValue::UInt64(Some(partition_id as
u64))),
- lit(false),
- )
- })
- .collect::<Vec<_>>();
- when_then_branches.extend(real_branches);
-
- if when_then_branches.is_empty() {
- lit(true)
- } else {
- Arc::new(CaseExpr::try_new(
- Some(modulo_expr),
- when_then_branches,
- Some(lit(true)),
- )?) as Arc<dyn PhysicalExpr>
- }
- } else if real_branches.is_empty() {
- lit(false)
- } else if real_branches.len() == 1
- && empty_partition_ids.len() + 1 == num_partitions
- {
- Arc::clone(&real_branches[0].1)
- } else {
- Arc::new(CaseExpr::try_new(
- Some(modulo_expr),
- real_branches,
- Some(lit(false)),
- )?) as Arc<dyn PhysicalExpr>
- };
-
+ let filter_expr = self
+ .build_partitioned_filter(&real_partitions,
has_canceled_unknown)?;
self.dynamic_filter.update(filter_expr)?;
}
}
Ok(())
}
+
+ /// CollectLeft has a single shared build side, so we always have one
+ /// `Map`. We prefer the InList expression when it's available (the build
+ /// side fit under the InList caps) because it's directly representable in
+ /// parquet stats / bloom-filter pruning at the scan; otherwise fall back
+ /// to a single `hash_lookup` against the map.
+ fn collect_left_membership(
+ &self,
+ pushdown: &PushdownStrategy,
+ ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+ if let Some(arr) = &pushdown.inlist {
+ return Ok(Some(create_inlist_predicate(
+ &self.on_right,
+ Arc::clone(arr),
+ self.probe_schema.as_ref(),
+ )?));
+ }
+ Ok(pushdown.map.as_ref().map(|map| {
+ create_hash_lookup_predicate(&self.on_right, Arc::clone(map),
&HASH_JOIN_SEED)
+ }))
+ }
+
+ /// Build the dynamic filter for `PartitionMode::Partitioned`. Emits
+ /// `global_minmax AND ([merged_in_list AND] multi_hash_lookup)` —
+ /// independent of how the build side was repartitioned.
+ ///
+ /// * `global_minmax` — envelope of every partition's per-column min/max.
+ /// Cheap short-circuit and the only piece visible to scan-level
+ /// `pruning_predicate` extraction.
+ /// * `merged_in_list` — concatenated, deduplicated build keys when every
+ /// reported partition contributed an `InList` array and the
+ /// cross-partition union fits under
+ /// `optimizer.hash_join_inlist_pushdown_max_distinct_values`. A small
+ /// `IN (SET)` participates in parquet stats / bloom-filter pruning,
+ /// which `multi_hash_lookup` does not. When present it fully replaces
+ /// the lookup.
+ /// * `multi_hash_lookup` — hashes the join keys once and ORs
+ /// `contain_hashes()` across every partition's hash table.
+ ///
+ /// `has_canceled_unknown` partitions short-circuit to `lit(true)`: we
+ /// don't have their maps, so we cannot include them in the lookup, and
+ /// the query is being torn down anyway.
+ fn build_partitioned_filter(
+ &self,
+ real_partitions: &[&PartitionData],
+ has_canceled_unknown: bool,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ if has_canceled_unknown {
+ return Ok(lit(true));
+ }
+ if real_partitions.is_empty() {
+ return Ok(lit(false));
+ }
+
+ let bounds_refs: Vec<&PartitionBounds> =
+ real_partitions.iter().map(|p| &p.bounds).collect();
+ let global_bounds_expr = compute_global_bounds(&bounds_refs)
+ .as_ref()
+ .and_then(|b| create_bounds_predicate(&self.on_right, b));
+
+ // The merged InList covers the union of every partition's
+ // build-side keys, so when it fires it stands alone — there is no
+ // need to also AND a `multi_hash_lookup` (which would just probe
+ // the same data via a different structure).
+ let membership_expr =
+ if let Some(merged) =
self.try_build_merged_inlist(real_partitions)? {
+ Some(merged)
+ } else {
+ let maps: Vec<Arc<Map>> = real_partitions
+ .iter()
+ .filter_map(|p| p.pushdown.map.clone())
+ .collect();
+ if maps.is_empty() {
+ // Defensive: every reported (non-empty) partition is
+ // supposed to carry a Map. Falling through to None means
+ // we degrade to bounds-only filtering.
+ None
+ } else {
+ Some(Arc::new(MultiMapLookupExpr::new(
+ self.on_right.clone(),
+ HASH_JOIN_SEED.clone(),
+ maps,
+ "multi_hash_lookup".to_string(),
+ )) as Arc<dyn PhysicalExpr>)
+ }
+ };
+
+ Ok(
+ combine_membership_and_bounds(membership_expr, global_bounds_expr)
+ .unwrap_or_else(|| lit(true)),
+ )
+ }
+
+ /// If every reported partition contributed an InList array, concatenate
+ /// them, deduplicate by scalar value, and gate on the
+ /// `inlist_max_distinct_values` cap. Returns the merged
+ /// `(struct(...))? IN (SET) ([…])` predicate built over the
+ /// deduplicated keys when the cap is satisfied; `None` otherwise.
+ ///
+ /// Per-partition arrays carry duplicates — each partition ships its raw
+ /// build-side join keys, dedup happens here. The dedup walk early-aborts
+ /// the moment we cross the cap, so the cost stays bounded by
+ /// `O(rows-until-cap+1-distinct-found)` rather than total input size.
+ fn try_build_merged_inlist(
+ &self,
+ real_partitions: &[&PartitionData],
+ ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+ let cap = self.inlist_max_distinct_values;
+ let mut arrays: Vec<ArrayRef> =
Vec::with_capacity(real_partitions.len());
+ for p in real_partitions {
+ let Some(arr) = &p.pushdown.inlist else {
+ return Ok(None);
+ };
+ arrays.push(Arc::clone(arr));
+ }
+ let Some(merged) = merge_inlist_arrays(&arrays) else {
Review Comment:
It's a bit tricky. Per partition is caped by size (bytes) because that's
cheap to calculate. So the size of the non-deduplicated merged InList is
already capped (by bytes) so we only pay deduplication cost once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]