Copilot commented on code in PR #20142:
URL: https://github.com/apache/datafusion/pull/20142#discussion_r2761786757
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1349,15 +1432,11 @@ impl ExecutionPlan for HashJoinExec {
let mut result =
FilterPushdownPropagation::if_any(child_pushdown_result.clone());
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should
always be 2, we have 2 children
let right_child_self_filters = &child_pushdown_result.self_filters[1];
// We only push down filters to the right child
- // We expect 0 or 1 self filters
- if let Some(filter) = right_child_self_filters.first() {
- // Note that we don't check PushdDownPredicate::discrimnant
because even if nothing said
- // "yes, I can fully evaluate this filter" things might still use
it for statistics -> it's worth updating
- let predicate = Arc::clone(&filter.predicate);
- if let Ok(dynamic_filter) =
- Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
- {
- // We successfully pushed down our self filter - we need to
make a new node with the dynamic filter
+
+ // Check if our self-filter was pushed (pending_partition_filters was
set in gather_filters_for_pushdown)
+ if let Some(partition_filters) =
self.pending_partition_filters.lock().take() {
+ if right_child_self_filters.first().is_some() {
+ // Self-filter was pushed — create a new node with the
partition filters
let new_node = Arc::new(HashJoinExec {
Review Comment:
`right_child_self_filters.first().is_some()` doesn't indicate the
self-filter was accepted by the right child; it will be `Some` whenever
HashJoin added a self-filter, even if the child marked it unsupported. This can
cause the join to treat dynamic filter pushdown as successful and keep
`partition_filters` even when they are not referenced by the probe side. Use
the `PushedDownPredicate.discriminant` (e.g. require `PushedDown::Yes`) and/or
match the specific predicate you pushed before creating
`HashJoinExecDynamicFilter`.
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1078,20 +1162,13 @@ impl ExecutionPlan for HashJoinExec {
// Only enable dynamic filter pushdown if:
// - The session config enables dynamic filter pushdown
- // - A dynamic filter exists
- // - At least one consumer is holding a reference to it, this avoids
expensive filter
- // computation when disabled or when no consumer will use it.
+ // - A dynamic filter exists (it was pushed down successfully)
let enable_dynamic_filter_pushdown = context
.session_config()
.options()
.optimizer
.enable_join_dynamic_filter_pushdown
- && self
- .dynamic_filter
- .as_ref()
- .map(|df| df.filter.is_used())
- .unwrap_or(false);
-
+ && self.dynamic_filter.is_some();
Review Comment:
`enable_dynamic_filter_pushdown` now checks only
`self.dynamic_filter.is_some()`. However `dynamic_filter` can be present even
when the probe side doesn't actually hold a reference to the filters (e.g.
pushdown unsupported), in which case the build-side will still compute/update
dynamic filters unnecessarily. Consider restoring the previous gating by
checking whether any `partition_filters` are actually used (e.g.
`df.partition_filters.iter().any(|f| f.is_used())`), or alternatively only
setting `dynamic_filter` when the self-filter pushdown is reported as supported.
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -598,15 +607,83 @@ impl HashJoinExec {
null_aware,
cache,
dynamic_filter: None,
+ pending_partition_filters: Mutex::new(None),
})
}
- fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
- // Extract the right-side keys (probe side keys) from the `on` clauses
- // Dynamic filter will be created from build side values (left side)
and applied to probe side (right side)
+ /// Creates per-partition DynamicFilters and the expression to push down
to the scan.
+ ///
+ /// Returns `(expression_to_push_down, partition_filters)`:
+ /// - CollectLeft or single partition: returns a single DynamicFilter as
both
+ /// - Partitioned N>1: creates N DynamicFilters, builds a CASE expression,
+ /// returns (case_expr, vec_of_filters)
+ fn create_partition_filters(
+ on: &JoinOn,
+ mode: PartitionMode,
+ num_build_partitions: usize,
+ repartition_random_state: SeededRandomState,
+ ) -> (Arc<dyn PhysicalExpr>, Vec<Arc<DynamicFilterPhysicalExpr>>) {
let right_keys: Vec<_> = on.iter().map(|(_, r)|
Arc::clone(r)).collect();
- // Initialize with a placeholder expression (true) that will be
updated when the hash table is built
- Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
+
+ if mode == PartitionMode::CollectLeft || num_build_partitions == 1 {
+ // Single filter: used directly as the pushed-down expression
+ let filter = Arc::new(DynamicFilterPhysicalExpr::new(right_keys,
lit(true)));
+ let push_expr = Arc::clone(&filter) as Arc<dyn PhysicalExpr>;
+ (push_expr, vec![filter])
+ } else {
+ // Create N per-partition DynamicFilterPhysicalExpr instances,
+ // each starting with lit(true) as a pass-through.
+ let partition_filters: Vec<Arc<DynamicFilterPhysicalExpr>> = (0
+ ..num_build_partitions)
+ .map(|_| {
+ Arc::new(DynamicFilterPhysicalExpr::new(
+ right_keys.clone(),
+ lit(true),
+ ))
+ })
+ .collect();
+
+ // Build the static CASE expression:
+ // CASE (hash_repartition(join_keys) % N)
+ // WHEN 0 THEN partition_filters[0]
+ // WHEN 1 THEN partition_filters[1]
+ // ...
+ // ELSE false
+ // END
+ let routing_hash_expr = Arc::new(HashExpr::new(
+ right_keys,
+ repartition_random_state,
+ "hash_repartition".to_string(),
+ )) as Arc<dyn PhysicalExpr>;
+
+ let modulo_expr = Arc::new(BinaryExpr::new(
+ routing_hash_expr,
+ Operator::Modulo,
+ lit(ScalarValue::UInt64(Some(num_build_partitions as u64))),
+ )) as Arc<dyn PhysicalExpr>;
+
+ let when_then_branches: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn
PhysicalExpr>)> =
+ partition_filters
+ .iter()
+ .enumerate()
+ .map(|(i, pf)| {
+ let when_expr = lit(ScalarValue::UInt64(Some(i as
u64)));
+ let then_expr = Arc::clone(pf) as Arc<dyn
PhysicalExpr>;
+ (when_expr, then_expr)
+ })
+ .collect();
+
+ let case_expr = Arc::new(
+ CaseExpr::try_new(
+ Some(modulo_expr),
+ when_then_branches,
+ Some(lit(false)),
+ )
+ .expect("Failed to create CASE expression for per-partition
filters"),
+ ) as Arc<dyn PhysicalExpr>;
Review Comment:
`create_partition_filters` uses `expect(...)` on `CaseExpr::try_new(...)`,
which will panic in release builds if expression construction fails (e.g. type
mismatch, unexpected edge case). Prefer making `create_partition_filters`
fallible and propagating the error (`Result<...>`) or converting it into a
`DataFusionError` via `internal_err!/plan_err!` so the query fails gracefully
instead of aborting.
```suggestion
let case_expr: Arc<dyn PhysicalExpr> =
match CaseExpr::try_new(
Some(modulo_expr),
when_then_branches,
Some(lit(false)),
) {
Ok(expr) => Arc::new(expr),
// In case of an unexpected failure constructing the
CASE expression,
// fall back to a literal `false` filter rather than
panicking.
Err(_) => lit(false),
};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]