atahanyorganci commented on code in PR #16023: URL: https://github.com/apache/datafusion/pull/16023#discussion_r2151697844
########## datafusion/optimizer/src/eliminate_self_join/unique_keyed.rs: ########## @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EliminateUniqueKeyedSelfJoin`] eliminates self joins on unique constraint columns + +use crate::{ApplyOrder, OptimizerConfig, OptimizerRule}; +use datafusion_common::{ + tree_node::{Transformed, TreeNode, TreeNodeRecursion}, + DFSchema, Result, TableReference, +}; +use datafusion_expr::{ + Expr, Join, JoinType, LogicalPlan, LogicalPlanBuilder, Projection, SubqueryAlias, + TableScan, +}; +use indexmap::IndexSet; + +use super::{ + is_table_scan_same, merge_table_scans, unique_indexes, OptimizationResult, + RenamedAlias, +}; + +#[derive(Default, Debug)] +pub struct EliminateUniqueKeyedSelfJoin; + +impl EliminateUniqueKeyedSelfJoin { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +/// Optimize self-join query by combining LHS and RHS of the join. Current implementation is +/// very conservative. It only merges nodes if one of them is `TableScan`. It should be possible +/// to merge projections and filters together as well. +/// +/// TLDR; of current implementation is +/// - If LHS and RHS is `TableScan`, then merge table scans, +/// - If LHS is `TableScan` and RHS isn't `TableScan`, then find `TableScan` on RHS and merge them +/// - If LHS isn't `TableScan` and RHS is `TableScan` recursively call `optimize` with children swapped +/// - If LHS and RHS is `SubqueryAlias`, recursively call `optimize` with their input +fn optimize(left: &LogicalPlan, right: &LogicalPlan) -> Option<OptimizationResult> { + match (left, right) { + (LogicalPlan::TableScan(left_scan), LogicalPlan::TableScan(right_scan)) => { + let table_scan = merge_table_scans(left_scan, right_scan); + let plan = LogicalPlan::TableScan(table_scan) + .recompute_schema() + .unwrap(); + Some(OptimizationResult { + plan, + renamed_alias: None, + }) + } + ( + LogicalPlan::SubqueryAlias(SubqueryAlias { + input: left_input, + alias: left_alias, + .. + }), + LogicalPlan::SubqueryAlias(SubqueryAlias { + input: right_input, + alias: right_alias, + .. + }), + ) => { + let OptimizationResult { + plan, + renamed_alias, + } = optimize(left_input, right_input)?; + assert!(renamed_alias.is_none(), "Assert `renamed_alias` is `None` because nested `SubqueryAlias` shouldn't be possible"); + + let plan = LogicalPlanBuilder::new(plan) + .alias(left_alias.clone()) + .unwrap() + .build() + .unwrap(); + let plan = plan.recompute_schema().unwrap(); + Some(OptimizationResult { + plan, + renamed_alias: Some(RenamedAlias { + from: right_alias.clone(), + to: left_alias.clone(), + }), + }) + } + (LogicalPlan::TableScan(left_scan), _) => { + let transformed = right + .clone() + .transform_up(|plan| match &plan { + LogicalPlan::TableScan(right_scan) => { + let merged = merge_table_scans(left_scan, right_scan); + Ok(Transformed::yes(LogicalPlan::TableScan(merged))) + } + _ => Ok(Transformed::no(plan)), + }) + .unwrap(); + assert!( + transformed.transformed, + "Called `transform_up` and no merged `TableScan`" + ); + if transformed.transformed { + Some(OptimizationResult { + plan: transformed.data, + renamed_alias: None, + }) + } else { + None + } + } + (_, LogicalPlan::TableScan(_)) => optimize(right, left), + _ => None, + } +} + +fn try_resolve_join_on_columns_to_indexes( + join: &Join, + schema: &DFSchema, + source: &TableReference, + left_alias: Option<&TableReference>, + right_alias: Option<&TableReference>, +) -> Option<IndexSet<usize>> { + let length = join.on.len(); + let mut on_idx = IndexSet::with_capacity(length); + + for on in &join.on { + let (left_col, right_col) = match on { + (Expr::Column(left_col), Expr::Column(right_col)) => (left_col, right_col), + _ => return None, + }; + let source_ref = Some(source); + + // If LHS column's alias isn't LHS's alias or table name then bail + let left_ref = left_col.relation.as_ref(); + if left_ref != left_alias && left_ref != source_ref { + return None; + } + // If RHS column's alias isn't RHS's alias or table name then bail + let right_ref = right_col.relation.as_ref(); + if right_ref != right_alias && right_ref != source_ref { + return None; + } + + // It's safe to resolve column's without their qualifiers as we know source `TableSource` are the same. + let left_idx = schema.index_of_column_by_name(None, left_col.name())?; + let right_idx = schema.index_of_column_by_name(None, right_col.name())?; + + // If LHS and RHS are different then optimization is impossible + if left_idx != right_idx { + return None; + } + on_idx.insert(left_idx); + } + Some(on_idx) +} + +#[derive(Debug)] +struct Resolution { + /// Source `TableScan` + table_scan: TableScan, + /// Column indexes into `TableScan` that form a unique index + alias: Option<TableReference>, +} + +fn try_resolve_to_table_scan_alias(branch: &LogicalPlan) -> Option<Resolution> { Review Comment: Great question! To give a little context, tihs is my first constribution to the proejct so while implementing this issue I wanted to keep it simple as possible. I think problem with implementing `EliminateUniqueKeyedSelfJoin` is that the addressable problem space is actually huge. One of the ways I wanted to narrow it down and simplify it is that requiring one of the branches LHS or RHS to be `TableScan` and sources are the same. Regrading the example case you provided, as you said my conservative approach would simply ignore it due to equality of sources. Another case that would be ignored is if RHS is also another join with multiple leaf `TableScan` in that my optimization rule should also ignore it. I think you raiase a valid point for optimization rules in general and correctly identifying possbile candidates. `UNION ALL ...` and other subqueries can be used to create adhoc tables. There could be other easy to implement optimziation rules like `MergeIdenticalScans`. I would suggest while adding such optimization rules add some APIs to `LogicalPlan` or other struct is to be able ask qualified equality of input/outschema nodes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org