gene-bordegaray commented on code in PR #20246:
URL: https://github.com/apache/datafusion/pull/20246#discussion_r2788677005
##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -402,181 +455,133 @@ impl SharedBuildAccumulator {
// CollectLeft: Simple conjunction of bounds and membership
check
AccumulatedBuildData::CollectLeft { data } => {
if let Some(partition_data) = data {
- // Create membership predicate (InList for small build
sides, hash lookup otherwise)
- let membership_expr = create_membership_predicate(
- &self.on_right,
- partition_data.pushdown.clone(),
- &HASH_JOIN_SEED,
- self.probe_schema.as_ref(),
- )?;
-
- // Create bounds check expression (if bounds available)
- let bounds_expr = create_bounds_predicate(
- &self.on_right,
- &partition_data.bounds,
- );
-
- // Combine membership and bounds expressions for
multi-layer optimization:
- // - Bounds (min/max): Enable statistics-based pruning
(Parquet row group/file skipping)
- // - Membership (InList/hash lookup): Enables:
- // * Precise filtering (exact value matching)
- // * Bloom filter utilization (if present in Parquet
files)
- // * Better pruning for data types where min/max
isn't effective (e.g., UUIDs)
- // Together, they provide complementary benefits and
maximize data skipping.
// Only update the filter if we have something to push
down
- if let Some(filter_expr) = match (membership_expr,
bounds_expr) {
- (Some(membership), Some(bounds)) => {
- // Both available: combine with AND
- Some(Arc::new(BinaryExpr::new(
- bounds,
- Operator::And,
- membership,
- ))
- as Arc<dyn PhysicalExpr>)
- }
- (Some(membership), None) => {
- // Membership available but no bounds
- // This is reachable when we have data but
bounds aren't available
- // (e.g., unsupported data types or no columns
with bounds)
- Some(membership)
- }
- (None, Some(bounds)) => {
- // Bounds available but no membership.
- // This should be unreachable in practice: we
can always push down a reference
- // to the hash table.
- // But it seems safer to handle it defensively.
- Some(bounds)
- }
- (None, None) => {
- // No filter available (e.g., empty build side)
- // Don't update the filter, but continue to
mark complete
- None
- }
- } {
+ if let Some(filter_expr) = build_partition_filter_expr(
+ &self.on_right,
+ partition_data,
+ &self.probe_schema,
+ )? {
self.dynamic_filter.update(filter_expr)?;
}
}
}
// Partitioned: CASE expression routing to per-partition
filters
AccumulatedBuildData::Partitioned { partitions } => {
- // Collect all partition data (should all be Some at this
point)
- let partition_data: Vec<_> =
- partitions.iter().filter_map(|p| p.as_ref()).collect();
-
- if !partition_data.is_empty() {
- // Build a CASE expression that combines range checks
AND membership checks
- // CASE (hash_repartition(join_keys) % num_partitions)
- // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND
...) AND membership_check_0
- // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND
...) AND membership_check_1
+ if self.use_partition_index {
+ // Partition-index routing: both sides preserve file
partitioning (no
+ // RepartitionExec(Hash)), so build-partition i
corresponds to
+ // probe-partition i. Store per-partition filters
directly:
+ // partitioned_exprs[0] = (col >= min_0 AND col <=
max_0 AND ...) AND membership_check_0
+ // partitioned_exprs[1] = (col >= min_1 AND col <=
max_1 AND ...) AND membership_check_1
// ...
- // ELSE false
- // END
-
- let num_partitions = partition_data.len();
-
- // Create base expression: hash_repartition(join_keys)
% num_partitions
- let routing_hash_expr = Arc::new(HashExpr::new(
- self.on_right.clone(),
- self.repartition_random_state.clone(),
- "hash_repartition".to_string(),
- ))
- as Arc<dyn PhysicalExpr>;
-
- let modulo_expr = Arc::new(BinaryExpr::new(
- routing_hash_expr,
- Operator::Modulo,
- lit(ScalarValue::UInt64(Some(num_partitions as
u64))),
- ))
- as Arc<dyn PhysicalExpr>;
-
- // Create WHEN branches for each partition
- let when_then_branches: Vec<(
- Arc<dyn PhysicalExpr>,
- Arc<dyn PhysicalExpr>,
- )> = partitions
- .iter()
- .enumerate()
- .filter_map(|(partition_id, partition_opt)| {
- partition_opt.as_ref().and_then(|partition| {
- // Skip empty partitions - they would
always return false anyway
- match &partition.pushdown {
- PushdownStrategy::Empty => None,
- _ => Some((partition_id, partition)),
+ // partitioned_exprs[N] = None (empty partition —
no data)
+ let per_partition: Vec<Option<Arc<dyn PhysicalExpr>>> =
+ partitions
+ .iter()
+ .map(|p| match p {
+ Some(partition)
+ if !matches!(
+ partition.pushdown,
+ PushdownStrategy::Empty
+ ) =>
+ {
+ build_partition_filter_expr(
+ &self.on_right,
+ partition,
+ &self.probe_schema,
+ )
}
+ _ => Ok(None),
})
- })
- .map(|(partition_id, partition)| -> Result<_> {
- // WHEN partition_id
- let when_expr =
- lit(ScalarValue::UInt64(Some(partition_id
as u64)));
-
- // THEN: Combine bounds check AND membership
predicate
-
- // 1. Create membership predicate (InList for
small build sides, hash lookup otherwise)
- let membership_expr =
create_membership_predicate(
- &self.on_right,
- partition.pushdown.clone(),
- &HASH_JOIN_SEED,
- self.probe_schema.as_ref(),
- )?;
-
- // 2. Create bounds check expression for this
partition (if bounds available)
- let bounds_expr = create_bounds_predicate(
- &self.on_right,
- &partition.bounds,
- );
-
- // 3. Combine membership and bounds expressions
- let then_expr = match (membership_expr,
bounds_expr) {
- (Some(membership), Some(bounds)) => {
- // Both available: combine with AND
- Arc::new(BinaryExpr::new(
- bounds,
- Operator::And,
- membership,
- ))
- as Arc<dyn PhysicalExpr>
- }
- (Some(membership), None) => {
- // Membership available but no bounds
(e.g., unsupported data types)
- membership
- }
- (None, Some(bounds)) => {
- // Bounds available but no membership.
- // This should be unreachable in
practice: we can always push down a reference
- // to the hash table.
- // But it seems safer to handle it
defensively.
- bounds
- }
- (None, None) => {
- // No filter for this partition -
should not happen due to filter_map above
- // but handle defensively by returning
a "true" literal
- lit(true)
- }
- };
-
- Ok((when_expr, then_expr))
- })
- .collect::<Result<Vec<_>>>()?;
-
- // Optimize for single partition: skip CASE expression
entirely
- let filter_expr = if when_then_branches.is_empty() {
- // All partitions are empty: no rows can match
- lit(false)
- } else if when_then_branches.len() == 1 {
- // Single partition: just use the condition
directly
- // since hash % 1 == 0 always, the WHEN 0 branch
will always match
- Arc::clone(&when_then_branches[0].1)
- } else {
- // Multiple partitions: create CASE expression
- Arc::new(CaseExpr::try_new(
- Some(modulo_expr),
- when_then_branches,
- Some(lit(false)), // ELSE false
- )?) as Arc<dyn PhysicalExpr>
- };
-
- self.dynamic_filter.update(filter_expr)?;
+ .collect::<Result<_>>()?;
+
+ self.dynamic_filter.update_partitioned(per_partition)?;
Review Comment:
Yes this is available.
The probe side is aware of its partition index and just does a lookup. In
`FileScanConfig::open()` the partitioning in index is passed to the opener and
addresses its corresponding build side partition.
Something to note though is that this solution assumes that build and probe
side partitions are aligned (i.e. partition 1 in the build side is going to the
corresponding filter to partition 1 on the probe side). This can lead to
misalignment where using a CASE on value we are partitioned on may be better
but this introduces an O(N) comparison for each row and compared to the
Hash-routing we would be comparing strings, which is more expensive
@NGA-TRAN
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]