rkrishn7 commented on code in PR #17632:
URL: https://github.com/apache/datafusion/pull/17632#discussion_r2360119099


##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -167,24 +183,112 @@ impl SharedBoundsAccumulator {
         };
         Self {
             inner: Mutex::new(SharedBoundsState {
-                bounds: Vec::with_capacity(expected_calls),
+                bounds: Vec::with_capacity(total_partitions),
+                completed_partitions: HashSet::new(),
+                filter_optimized: false,
             }),
-            barrier: Barrier::new(expected_calls),
+            total_partitions,
             dynamic_filter,
             on_right,
+            config_options: Arc::new(ConfigOptions::default()),
         }
     }
 
-    /// Create a filter expression from individual partition bounds using OR 
logic.
-    ///
-    /// This creates a filter where each partition's bounds form a conjunction 
(AND)
-    /// of column range predicates, and all partitions are combined with OR.
-    ///
-    /// For example, with 2 partitions and 2 columns:
-    /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= 
p0_max1)
-    ///  OR
-    ///  (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= 
p1_max1))
-    pub(crate) fn create_filter_from_partition_bounds(
+    /// Create hash expression for the join keys: hash(col1, col2, ...)
+    fn create_hash_expression(&self) -> Result<Arc<dyn PhysicalExpr>> {
+        // Use the same random state as RepartitionExec for consistent 
partitioning
+        // This ensures hash(row) % num_partitions produces the same partition 
assignment
+        // as the original repartitioning operation
+        let hash_udf = Arc::new(ScalarUDF::from(Hash::new_with_random_state(
+            REPARTITION_RANDOM_STATE,
+        )));
+
+        // Create the hash expression using ScalarFunctionExpr
+        Ok(Arc::new(ScalarFunctionExpr::new(
+            "hash",
+            hash_udf,
+            self.on_right.clone(),
+            Field::new("hash_result", DataType::UInt64, false).into(),
+            Arc::clone(&self.config_options),
+        )))
+    }
+
+    /// Create a bounds predicate for a single partition: (col >= min AND col 
<= max) for all columns
+    fn create_partition_bounds_predicate(
+        &self,
+        partition_bounds: &PartitionBounds,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        let mut column_predicates = Vec::with_capacity(partition_bounds.len());
+
+        for (col_idx, right_expr) in self.on_right.iter().enumerate() {
+            if let Some(column_bounds) = 
partition_bounds.get_column_bounds(col_idx) {
+                // Create predicate: col >= min AND col <= max
+                let min_expr = Arc::new(BinaryExpr::new(
+                    Arc::clone(right_expr),
+                    Operator::GtEq,
+                    lit(column_bounds.min.clone()),
+                )) as Arc<dyn PhysicalExpr>;
+                let max_expr = Arc::new(BinaryExpr::new(
+                    Arc::clone(right_expr),
+                    Operator::LtEq,
+                    lit(column_bounds.max.clone()),
+                )) as Arc<dyn PhysicalExpr>;
+                let range_expr =
+                    Arc::new(BinaryExpr::new(min_expr, Operator::And, 
max_expr))
+                        as Arc<dyn PhysicalExpr>;
+                column_predicates.push(range_expr);
+            }
+        }
+
+        // Combine all column predicates for this partition with AND
+        if column_predicates.is_empty() {
+            Ok(lit(true))
+        } else {
+            let predicate = column_predicates
+                .into_iter()
+                .reduce(|acc, pred| {
+                    Arc::new(BinaryExpr::new(acc, Operator::And, pred))
+                        as Arc<dyn PhysicalExpr>
+                })
+                .unwrap();
+            Ok(predicate)
+        }
+    }
+
+    /// Create progressive filter: (hash(...) % n != partition_id OR 
bounds_predicate)
+    fn create_progressive_partition_filter(
+        &self,
+        partition_bounds: &PartitionBounds,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        let hash_expr = self.create_hash_expression()?;
+
+        // hash(...) % total_partitions
+        let modulo_expr = Arc::new(BinaryExpr::new(
+            hash_expr,
+            Operator::Modulo,
+            lit(ScalarValue::UInt64(Some(self.total_partitions as u64))),
+        )) as Arc<dyn PhysicalExpr>;
+
+        // hash(...) % total_partitions != partition_id
+        let hash_check = Arc::new(BinaryExpr::new(
+            modulo_expr,
+            Operator::NotEq,
+            lit(ScalarValue::UInt64(Some(partition_bounds.partition as u64))),
+        )) as Arc<dyn PhysicalExpr>;
+
+        // Create bounds predicate for this partition
+        let bounds_predicate =
+            self.create_partition_bounds_predicate(partition_bounds)?;
+
+        // Combine: (hash_check OR bounds_predicate)
+        Ok(
+            Arc::new(BinaryExpr::new(hash_check, Operator::Or, 
bounds_predicate))
+                as Arc<dyn PhysicalExpr>,
+        )
+    }
+
+    /// Create final optimized filter from all partition bounds using OR logic
+    pub(crate) fn create_optimized_filter_from_partition_bounds(

Review Comment:
   I can see it being worthwhile to skip this step. For example, it ties in 
nicely with https://github.com/apache/datafusion/pull/17529 where we can just 
push the hash comparison physical expr as a partition dependent predicate.



##########
datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs:
##########
@@ -25,14 +25,24 @@ use crate::joins::PartitionMode;
 use crate::ExecutionPlan;
 use crate::ExecutionPlanProperties;
 
+use arrow::datatypes::{DataType, Field};
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Operator;
+use datafusion_expr::ScalarUDF;
+use datafusion_functions::hash::Hash;
 use datafusion_physical_expr::expressions::{lit, BinaryExpr, 
DynamicFilterPhysicalExpr};
-use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
+use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, 
ScalarFunctionExpr};
 
+use ahash::RandomState;
 use itertools::Itertools;
 use parking_lot::Mutex;
-use tokio::sync::Barrier;
+use std::collections::HashSet;
+
+/// RandomState used by RepartitionExec for consistent hash partitioning
+/// This must match the seeds used in RepartitionExec to ensure our hash-based
+/// filter expressions compute the same partition assignments as the actual 
partitioning
+const REPARTITION_RANDOM_STATE: RandomState = RandomState::with_seeds(0, 0, 0, 
0);

Review Comment:
   Can we make this available in the repartition physical plan module and 
re-use it from there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to