adriangb commented on code in PR #20192:
URL: https://github.com/apache/datafusion/pull/20192#discussion_r2793314155
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1390,30 +1390,74 @@ impl ExecutionPlan for HashJoinExec {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterDescription> {
- // Other types of joins can support *some* filters, but restrictions
are complex and error prone.
- // For now we don't support them.
- // See the logical optimizer rules for more details:
datafusion/optimizer/src/push_down_filter.rs
- // See https://github.com/apache/datafusion/issues/16973 for tracking.
- if self.join_type != JoinType::Inner {
- return Ok(FilterDescription::all_unsupported(
- &parent_filters,
- &self.children(),
- ));
+ // This is the physical-plan equivalent of `push_down_all_join` in
+ // `datafusion/optimizer/src/push_down_filter.rs`. That function uses
`lr_is_preserved`
+ // to decide which parent predicates can be pushed past a logical join
to its children,
+ // then checks column references to route each predicate to the
correct side.
+ //
+ // We apply the same two-level logic here:
+ // 1. `lr_is_preserved` gates whether a side is eligible at all.
+ // 2. For each filter, we check that all column references belong to
the
+ // target child (using `column_indices` to map output column
positions
+ // to join sides). This is critical for correctness: name-based
matching
+ // alone (as done by `ChildFilterDescription::from_child`) can
incorrectly
+ // push filters when different join sides have columns with the
same name
+ // (e.g. nested mark joins both producing "mark" columns).
Review Comment:
This looks sweet!
##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -310,53 +309,109 @@ pub struct ChildFilterDescription {
pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
}
-/// A utility for checking whether a filter expression can be pushed down
-/// to a child node based on column availability.
+/// Validates and remaps filter column references to a target schema in one
step.
///
-/// This checker validates that all columns referenced in a filter expression
-/// exist in the target schema. If any column in the filter is not present
-/// in the schema, the filter cannot be pushed down to that child.
-pub(crate) struct FilterColumnChecker<'a> {
- column_names: HashSet<&'a str>,
+/// When pushing filters from a parent to a child node, we need to:
+/// 1. Verify that all columns referenced by the filter exist in the target
+/// 2. Remap column indices to match the target schema
+///
+/// For join nodes, an additional constraint is needed: only columns belonging
+/// to a specific side of the join should be considered valid. This is
+/// controlled by an optional set of allowed column indices (in the parent
+/// schema). When provided, a filter is only eligible if every column
+/// reference's index appears in the allowed set.
+pub(crate) struct FilterRemapper {
+ /// The target schema to remap column indices into.
+ child_schema: SchemaRef,
+ /// If set, only columns at these indices (in the *parent* schema) are
+ /// considered valid. When `None`, any column whose name exists in
+ /// `child_schema` is valid.
+ allowed_indices: Option<HashSet<usize>>,
}
-impl<'a> FilterColumnChecker<'a> {
- /// Creates a new [`FilterColumnChecker`] from the given schema.
+impl FilterRemapper {
+ /// Create a remapper that accepts any column name present in the target
schema.
+ pub(crate) fn new(child_schema: SchemaRef) -> Self {
+ Self {
+ child_schema,
+ allowed_indices: None,
+ }
+ }
+
+ /// Create a remapper that only accepts columns at the given indices.
+ /// This is used by join nodes to restrict pushdown to one side of the
+ /// join when both sides have same-named columns.
+ fn with_allowed_indices(
+ child_schema: SchemaRef,
+ allowed_indices: HashSet<usize>,
+ ) -> Self {
+ Self {
+ child_schema,
+ allowed_indices: Some(allowed_indices),
+ }
+ }
+
+ /// Try to remap a filter's column references to the target schema.
///
- /// Extracts all column names from the schema's fields to build
- /// a lookup set for efficient column existence checks.
- pub(crate) fn new(input_schema: &'a Schema) -> Self {
- let column_names: HashSet<&str> = input_schema
+ /// Returns `Some(remapped_filter)` if all columns pass validation,
+ /// or `None` if any column is not valid for this target.
+ pub(crate) fn try_remap(
+ &self,
+ filter: &Arc<dyn PhysicalExpr>,
+ ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+ if self.all_columns_in_schema(filter)
+ && self
+ .allowed_indices
+ .as_ref()
+ .map(|allowed| Self::all_columns_in_set(filter, allowed))
+ .unwrap_or(true)
+ {
+ let remapped = reassign_expr_columns(Arc::clone(filter),
&self.child_schema)?;
+ Ok(Some(remapped))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn all_columns_in_schema(&self, filter: &Arc<dyn PhysicalExpr>) -> bool {
+ let names: HashSet<&str> = self
+ .child_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
- Self { column_names }
+ let mut ok = true;
Review Comment:
Maybe a better variable name than `ok`?
##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -310,53 +309,109 @@ pub struct ChildFilterDescription {
pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
}
-/// A utility for checking whether a filter expression can be pushed down
-/// to a child node based on column availability.
+/// Validates and remaps filter column references to a target schema in one
step.
///
-/// This checker validates that all columns referenced in a filter expression
-/// exist in the target schema. If any column in the filter is not present
-/// in the schema, the filter cannot be pushed down to that child.
-pub(crate) struct FilterColumnChecker<'a> {
- column_names: HashSet<&'a str>,
+/// When pushing filters from a parent to a child node, we need to:
+/// 1. Verify that all columns referenced by the filter exist in the target
+/// 2. Remap column indices to match the target schema
+///
+/// For join nodes, an additional constraint is needed: only columns belonging
+/// to a specific side of the join should be considered valid. This is
+/// controlled by an optional set of allowed column indices (in the parent
+/// schema). When provided, a filter is only eligible if every column
+/// reference's index appears in the allowed set.
+pub(crate) struct FilterRemapper {
+ /// The target schema to remap column indices into.
+ child_schema: SchemaRef,
+ /// If set, only columns at these indices (in the *parent* schema) are
+ /// considered valid. When `None`, any column whose name exists in
+ /// `child_schema` is valid.
+ allowed_indices: Option<HashSet<usize>>,
}
-impl<'a> FilterColumnChecker<'a> {
- /// Creates a new [`FilterColumnChecker`] from the given schema.
+impl FilterRemapper {
+ /// Create a remapper that accepts any column name present in the target
schema.
+ pub(crate) fn new(child_schema: SchemaRef) -> Self {
+ Self {
+ child_schema,
+ allowed_indices: None,
+ }
+ }
+
+ /// Create a remapper that only accepts columns at the given indices.
+ /// This is used by join nodes to restrict pushdown to one side of the
+ /// join when both sides have same-named columns.
+ fn with_allowed_indices(
+ child_schema: SchemaRef,
+ allowed_indices: HashSet<usize>,
+ ) -> Self {
+ Self {
+ child_schema,
+ allowed_indices: Some(allowed_indices),
+ }
+ }
Review Comment:
Wonder if we can merge these code paths by making the non-explicit allowed
indices version assume that only indices `0...child_schema.fields().len()` are
allowed?
##########
datafusion/physical-plan/src/filter_pushdown.rs:
##########
@@ -310,53 +309,109 @@ pub struct ChildFilterDescription {
pub(crate) self_filters: Vec<Arc<dyn PhysicalExpr>>,
}
-/// A utility for checking whether a filter expression can be pushed down
-/// to a child node based on column availability.
+/// Validates and remaps filter column references to a target schema in one
step.
///
-/// This checker validates that all columns referenced in a filter expression
-/// exist in the target schema. If any column in the filter is not present
-/// in the schema, the filter cannot be pushed down to that child.
-pub(crate) struct FilterColumnChecker<'a> {
- column_names: HashSet<&'a str>,
+/// When pushing filters from a parent to a child node, we need to:
+/// 1. Verify that all columns referenced by the filter exist in the target
+/// 2. Remap column indices to match the target schema
+///
+/// For join nodes, an additional constraint is needed: only columns belonging
+/// to a specific side of the join should be considered valid. This is
+/// controlled by an optional set of allowed column indices (in the parent
+/// schema). When provided, a filter is only eligible if every column
+/// reference's index appears in the allowed set.
+pub(crate) struct FilterRemapper {
+ /// The target schema to remap column indices into.
+ child_schema: SchemaRef,
+ /// If set, only columns at these indices (in the *parent* schema) are
+ /// considered valid. When `None`, any column whose name exists in
+ /// `child_schema` is valid.
+ allowed_indices: Option<HashSet<usize>>,
}
-impl<'a> FilterColumnChecker<'a> {
- /// Creates a new [`FilterColumnChecker`] from the given schema.
+impl FilterRemapper {
+ /// Create a remapper that accepts any column name present in the target
schema.
+ pub(crate) fn new(child_schema: SchemaRef) -> Self {
+ Self {
+ child_schema,
+ allowed_indices: None,
+ }
+ }
+
+ /// Create a remapper that only accepts columns at the given indices.
+ /// This is used by join nodes to restrict pushdown to one side of the
+ /// join when both sides have same-named columns.
+ fn with_allowed_indices(
+ child_schema: SchemaRef,
+ allowed_indices: HashSet<usize>,
+ ) -> Self {
+ Self {
+ child_schema,
+ allowed_indices: Some(allowed_indices),
+ }
+ }
+
+ /// Try to remap a filter's column references to the target schema.
///
- /// Extracts all column names from the schema's fields to build
- /// a lookup set for efficient column existence checks.
- pub(crate) fn new(input_schema: &'a Schema) -> Self {
- let column_names: HashSet<&str> = input_schema
+ /// Returns `Some(remapped_filter)` if all columns pass validation,
+ /// or `None` if any column is not valid for this target.
+ pub(crate) fn try_remap(
+ &self,
+ filter: &Arc<dyn PhysicalExpr>,
+ ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+ if self.all_columns_in_schema(filter)
+ && self
+ .allowed_indices
+ .as_ref()
+ .map(|allowed| Self::all_columns_in_set(filter, allowed))
+ .unwrap_or(true)
+ {
+ let remapped = reassign_expr_columns(Arc::clone(filter),
&self.child_schema)?;
+ Ok(Some(remapped))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn all_columns_in_schema(&self, filter: &Arc<dyn PhysicalExpr>) -> bool {
+ let names: HashSet<&str> = self
+ .child_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
- Self { column_names }
+ let mut ok = true;
+ filter
+ .apply(|e| {
Review Comment:
I wonder if we can fold the remap + check into a single tree traversal. Just
would be a bit more efficient. Like do a `transform_up` w/ remapping and if we
hit a reference that is not in the schema set a flag do do
`TreeNodeRecursion::Stop`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]