adriangb commented on code in PR #20192:
URL: https://github.com/apache/datafusion/pull/20192#discussion_r2793314155


##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1390,30 +1390,74 @@ impl ExecutionPlan for HashJoinExec {
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         config: &ConfigOptions,
     ) -> Result<FilterDescription> {
-        // Other types of joins can support *some* filters, but restrictions 
are complex and error prone.
-        // For now we don't support them.
-        // See the logical optimizer rules for more details: 
datafusion/optimizer/src/push_down_filter.rs
-        // See https://github.com/apache/datafusion/issues/16973 for tracking.
-        if self.join_type != JoinType::Inner {
-            return Ok(FilterDescription::all_unsupported(
-                &parent_filters,
-                &self.children(),
-            ));
+        // This is the physical-plan equivalent of `push_down_all_join` in
+        // `datafusion/optimizer/src/push_down_filter.rs`. That function uses 
`lr_is_preserved`
+        // to decide which parent predicates can be pushed past a logical join 
to its children,
+        // then checks column references to route each predicate to the 
correct side.
+        //
+        // We apply the same two-level logic here:
+        // 1. `lr_is_preserved` gates whether a side is eligible at all.
+        // 2. For each filter, we check that all column references belong to 
the
+        //    target child (using `column_indices` to map output column 
positions
+        //    to join sides). This is critical for correctness: name-based 
matching
+        //    alone (as done by `ChildFilterDescription::from_child`) can 
incorrectly
+        //    push filters when different join sides have columns with the 
same name
+        //    (e.g. nested mark joins both producing "mark" columns).

Review Comment:
   This looks sweet!



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -310,53 +309,109 @@ pub struct ChildFilterDescription {
     pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
 }
 
-/// A utility for checking whether a filter expression can be pushed down
-/// to a child node based on column availability.
+/// Validates and remaps filter column references to a target schema in one 
step.
 ///
-/// This checker validates that all columns referenced in a filter expression
-/// exist in the target schema. If any column in the filter is not present
-/// in the schema, the filter cannot be pushed down to that child.
-pub(crate) struct FilterColumnChecker<'a> {
-    column_names: HashSet<&'a str>,
+/// When pushing filters from a parent to a child node, we need to:
+/// 1. Verify that all columns referenced by the filter exist in the target
+/// 2. Remap column indices to match the target schema
+///
+/// For join nodes, an additional constraint is needed: only columns belonging
+/// to a specific side of the join should be considered valid. This is
+/// controlled by an optional set of allowed column indices (in the parent
+/// schema). When provided, a filter is only eligible if every column
+/// reference's index appears in the allowed set.
+pub(crate) struct FilterRemapper {
+    /// The target schema to remap column indices into.
+    child_schema: SchemaRef,
+    /// If set, only columns at these indices (in the *parent* schema) are
+    /// considered valid. When `None`, any column whose name exists in
+    /// `child_schema` is valid.
+    allowed_indices: Option<HashSet<usize>>,
 }
 
-impl<'a> FilterColumnChecker<'a> {
-    /// Creates a new [`FilterColumnChecker`] from the given schema.
+impl FilterRemapper {
+    /// Create a remapper that accepts any column name present in the target 
schema.
+    pub(crate) fn new(child_schema: SchemaRef) -> Self {
+        Self {
+            child_schema,
+            allowed_indices: None,
+        }
+    }
+
+    /// Create a remapper that only accepts columns at the given indices.
+    /// This is used by join nodes to restrict pushdown to one side of the
+    /// join when both sides have same-named columns.
+    fn with_allowed_indices(
+        child_schema: SchemaRef,
+        allowed_indices: HashSet<usize>,
+    ) -> Self {
+        Self {
+            child_schema,
+            allowed_indices: Some(allowed_indices),
+        }
+    }
+
+    /// Try to remap a filter's column references to the target schema.
     ///
-    /// Extracts all column names from the schema's fields to build
-    /// a lookup set for efficient column existence checks.
-    pub(crate) fn new(input_schema: &'a Schema) -> Self {
-        let column_names: HashSet<&str> = input_schema
+    /// Returns `Some(remapped_filter)` if all columns pass validation,
+    /// or `None` if any column is not valid for this target.
+    pub(crate) fn try_remap(
+        &self,
+        filter: &Arc<dyn PhysicalExpr>,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        if self.all_columns_in_schema(filter)
+            && self
+                .allowed_indices
+                .as_ref()
+                .map(|allowed| Self::all_columns_in_set(filter, allowed))
+                .unwrap_or(true)
+        {
+            let remapped = reassign_expr_columns(Arc::clone(filter), 
&self.child_schema)?;
+            Ok(Some(remapped))
+        } else {
+            Ok(None)
+        }
+    }
+
+    fn all_columns_in_schema(&self, filter: &Arc<dyn PhysicalExpr>) -> bool {
+        let names: HashSet<&str> = self
+            .child_schema
             .fields()
             .iter()
             .map(|f| f.name().as_str())
             .collect();
-        Self { column_names }
+        let mut ok = true;

Review Comment:
   Maybe a better variable name than `ok`?



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -310,53 +309,109 @@ pub struct ChildFilterDescription {
     pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
 }
 
-/// A utility for checking whether a filter expression can be pushed down
-/// to a child node based on column availability.
+/// Validates and remaps filter column references to a target schema in one 
step.
 ///
-/// This checker validates that all columns referenced in a filter expression
-/// exist in the target schema. If any column in the filter is not present
-/// in the schema, the filter cannot be pushed down to that child.
-pub(crate) struct FilterColumnChecker<'a> {
-    column_names: HashSet<&'a str>,
+/// When pushing filters from a parent to a child node, we need to:
+/// 1. Verify that all columns referenced by the filter exist in the target
+/// 2. Remap column indices to match the target schema
+///
+/// For join nodes, an additional constraint is needed: only columns belonging
+/// to a specific side of the join should be considered valid. This is
+/// controlled by an optional set of allowed column indices (in the parent
+/// schema). When provided, a filter is only eligible if every column
+/// reference's index appears in the allowed set.
+pub(crate) struct FilterRemapper {
+    /// The target schema to remap column indices into.
+    child_schema: SchemaRef,
+    /// If set, only columns at these indices (in the *parent* schema) are
+    /// considered valid. When `None`, any column whose name exists in
+    /// `child_schema` is valid.
+    allowed_indices: Option<HashSet<usize>>,
 }
 
-impl<'a> FilterColumnChecker<'a> {
-    /// Creates a new [`FilterColumnChecker`] from the given schema.
+impl FilterRemapper {
+    /// Create a remapper that accepts any column name present in the target 
schema.
+    pub(crate) fn new(child_schema: SchemaRef) -> Self {
+        Self {
+            child_schema,
+            allowed_indices: None,
+        }
+    }
+
+    /// Create a remapper that only accepts columns at the given indices.
+    /// This is used by join nodes to restrict pushdown to one side of the
+    /// join when both sides have same-named columns.
+    fn with_allowed_indices(
+        child_schema: SchemaRef,
+        allowed_indices: HashSet<usize>,
+    ) -> Self {
+        Self {
+            child_schema,
+            allowed_indices: Some(allowed_indices),
+        }
+    }

Review Comment:
   Wonder if we can merge these code paths by making the non-explicit allowed 
indices version assume that only indices `0...child_schema.fields().len()` are 
allowed?



##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -310,53 +309,109 @@ pub struct ChildFilterDescription {
     pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
 }
 
-/// A utility for checking whether a filter expression can be pushed down
-/// to a child node based on column availability.
+/// Validates and remaps filter column references to a target schema in one 
step.
 ///
-/// This checker validates that all columns referenced in a filter expression
-/// exist in the target schema. If any column in the filter is not present
-/// in the schema, the filter cannot be pushed down to that child.
-pub(crate) struct FilterColumnChecker<'a> {
-    column_names: HashSet<&'a str>,
+/// When pushing filters from a parent to a child node, we need to:
+/// 1. Verify that all columns referenced by the filter exist in the target
+/// 2. Remap column indices to match the target schema
+///
+/// For join nodes, an additional constraint is needed: only columns belonging
+/// to a specific side of the join should be considered valid. This is
+/// controlled by an optional set of allowed column indices (in the parent
+/// schema). When provided, a filter is only eligible if every column
+/// reference's index appears in the allowed set.
+pub(crate) struct FilterRemapper {
+    /// The target schema to remap column indices into.
+    child_schema: SchemaRef,
+    /// If set, only columns at these indices (in the *parent* schema) are
+    /// considered valid. When `None`, any column whose name exists in
+    /// `child_schema` is valid.
+    allowed_indices: Option<HashSet<usize>>,
 }
 
-impl<'a> FilterColumnChecker<'a> {
-    /// Creates a new [`FilterColumnChecker`] from the given schema.
+impl FilterRemapper {
+    /// Create a remapper that accepts any column name present in the target 
schema.
+    pub(crate) fn new(child_schema: SchemaRef) -> Self {
+        Self {
+            child_schema,
+            allowed_indices: None,
+        }
+    }
+
+    /// Create a remapper that only accepts columns at the given indices.
+    /// This is used by join nodes to restrict pushdown to one side of the
+    /// join when both sides have same-named columns.
+    fn with_allowed_indices(
+        child_schema: SchemaRef,
+        allowed_indices: HashSet<usize>,
+    ) -> Self {
+        Self {
+            child_schema,
+            allowed_indices: Some(allowed_indices),
+        }
+    }
+
+    /// Try to remap a filter's column references to the target schema.
     ///
-    /// Extracts all column names from the schema's fields to build
-    /// a lookup set for efficient column existence checks.
-    pub(crate) fn new(input_schema: &'a Schema) -> Self {
-        let column_names: HashSet<&str> = input_schema
+    /// Returns `Some(remapped_filter)` if all columns pass validation,
+    /// or `None` if any column is not valid for this target.
+    pub(crate) fn try_remap(
+        &self,
+        filter: &Arc<dyn PhysicalExpr>,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        if self.all_columns_in_schema(filter)
+            && self
+                .allowed_indices
+                .as_ref()
+                .map(|allowed| Self::all_columns_in_set(filter, allowed))
+                .unwrap_or(true)
+        {
+            let remapped = reassign_expr_columns(Arc::clone(filter), 
&self.child_schema)?;
+            Ok(Some(remapped))
+        } else {
+            Ok(None)
+        }
+    }
+
+    fn all_columns_in_schema(&self, filter: &Arc<dyn PhysicalExpr>) -> bool {
+        let names: HashSet<&str> = self
+            .child_schema
             .fields()
             .iter()
             .map(|f| f.name().as_str())
             .collect();
-        Self { column_names }
+        let mut ok = true;
+        filter
+            .apply(|e| {

Review Comment:
   I wonder if we can fold the remap + check into a single tree traversal. Just 
would be a bit more efficient. Like do a `transform_up` w/ remapping and if we 
hit a reference that is not in the schema set a flag do do 
`TreeNodeRecursion::Stop`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to