This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fee90be613 Misc minor optimizations to query optimizer performance 
(#21128)
fee90be613 is described below

commit fee90be613446d2c61c3bd129a201c7faecef521
Author: Adam Gutglick <[email protected]>
AuthorDate: Tue Mar 31 19:34:44 2026 +0100

    Misc minor optimizations to query optimizer performance (#21128)
    
    ## Which issue does this PR close?
    
    - Closes #.
    
    ## Rationale for this change
    
    Inspired by @blaginin, trying to find more places that might drag the
    optimizer's performance. On my laptop , this improves many of the sql
    planner's benchmarks by a fairly consistent 2-5%.
    
    ## What changes are included in this PR?
    
    A slew of minor optimization in the logical planner, trying to avoid
    wasted work or repeated allocations
    
    ## Are these changes tested?
    
    Existing tests.
    
    ## Are there any user-facing changes?
    
    None
    
    ---------
    
    Signed-off-by: Adam Gutglick <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/optimizer/src/analyzer/type_coercion.rs |  6 +--
 .../optimizer/src/common_subexpr_eliminate.rs      |  6 +--
 .../optimizer/src/extract_leaf_expressions.rs      | 46 +++++++---------
 datafusion/optimizer/src/optimize_unions.rs        |  4 +-
 datafusion/optimizer/src/push_down_filter.rs       | 63 ++++++++++++----------
 datafusion/optimizer/src/push_down_limit.rs        |  2 +-
 .../src/simplify_expressions/expr_simplifier.rs    | 53 +++++++++---------
 datafusion/optimizer/src/utils.rs                  | 46 ++++++++++++++--
 8 files changed, 127 insertions(+), 99 deletions(-)

diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 6f6898e219..bf91595e95 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -20,7 +20,7 @@
 use arrow::compute::can_cast_types;
 use datafusion_expr::binary::BinaryTypeCoercer;
 use itertools::{Itertools as _, izip};
-use std::sync::Arc;
+use std::sync::{Arc, LazyLock};
 
 use crate::analyzer::AnalyzerRule;
 use crate::utils::NamePreserver;
@@ -91,11 +91,11 @@ impl AnalyzerRule for TypeCoercion {
     }
 
     fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> 
Result<LogicalPlan> {
-        let empty_schema = DFSchema::empty();
+        static EMPTY_SCHEMA: LazyLock<DFSchema> = 
LazyLock::new(DFSchema::empty);
 
         // recurse
         let transformed_plan = plan
-            .transform_up_with_subqueries(|plan| 
analyze_internal(&empty_schema, plan))?
+            .transform_up_with_subqueries(|plan| 
analyze_internal(&EMPTY_SCHEMA, plan))?
             .data;
 
         // finish
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 88dba57d75..4213c23ccc 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -325,11 +325,7 @@ impl CommonSubexprEliminate {
                                 .map(|expr| Some(name_preserver.save(expr)))
                                 .collect::<Vec<_>>()
                         } else {
-                            new_aggr_expr
-                                .clone()
-                                .into_iter()
-                                .map(|_| None)
-                                .collect::<Vec<_>>()
+                            (0..new_aggr_expr.len()).map(|_| None).collect()
                         };
 
                         let mut agg_exprs = common_exprs
diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs 
b/datafusion/optimizer/src/extract_leaf_expressions.rs
index a57f71ea67..d3a84c0172 100644
--- a/datafusion/optimizer/src/extract_leaf_expressions.rs
+++ b/datafusion/optimizer/src/extract_leaf_expressions.rs
@@ -32,7 +32,7 @@ use datafusion_expr::{Expr, ExpressionPlacement, Projection};
 
 use crate::optimizer::ApplyOrder;
 use crate::push_down_filter::replace_cols_by_name;
-use crate::utils::has_all_column_refs;
+use crate::utils::{ColumnReference, has_all_column_refs, schema_columns};
 use crate::{OptimizerConfig, OptimizerRule};
 
 /// Prefix for aliases generated by the extraction optimizer passes.
@@ -213,10 +213,11 @@ fn extract_from_plan(
         .collect();
 
     // Build per-input column sets for routing expressions to the correct input
-    let input_column_sets: Vec<std::collections::HashSet<Column>> = 
input_schemas
-        .iter()
-        .map(|schema| schema_columns(schema.as_ref()))
-        .collect();
+    let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
+        input_schemas
+            .iter()
+            .map(|schema| schema_columns(schema.as_ref()))
+            .collect();
 
     // Transform expressions via map_expressions with routing
     let transformed = plan.map_expressions(|expr| {
@@ -272,7 +273,7 @@ fn extract_from_plan(
 /// in both sides of a join).
 fn find_owning_input(
     expr: &Expr,
-    input_column_sets: &[std::collections::HashSet<Column>],
+    input_column_sets: &[std::collections::HashSet<ColumnReference>],
 ) -> Option<usize> {
     let mut found = None;
     for (idx, cols) in input_column_sets.iter().enumerate() {
@@ -292,7 +293,7 @@ fn find_owning_input(
 fn routing_extract(
     expr: Expr,
     extractors: &mut [LeafExpressionExtractor],
-    input_column_sets: &[std::collections::HashSet<Column>],
+    input_column_sets: &[std::collections::HashSet<ColumnReference>],
 ) -> Result<Transformed<Expr>> {
     expr.transform_down(|e| {
         // Skip expressions already aliased with extracted expression pattern
@@ -340,19 +341,6 @@ fn routing_extract(
     })
 }
 
-/// Returns all columns in the schema (both qualified and unqualified forms)
-fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
-    schema
-        .iter()
-        .flat_map(|(qualifier, field)| {
-            [
-                Column::new(qualifier.cloned(), field.name()),
-                Column::new_unqualified(field.name()),
-            ]
-        })
-        .collect()
-}
-
 /// Rewrites extraction pairs and column references from one qualifier
 /// space to another.
 ///
@@ -1072,7 +1060,7 @@ fn route_to_inputs(
     pairs: &[(Expr, String)],
     columns: &IndexSet<Column>,
     node: &LogicalPlan,
-    input_column_sets: &[std::collections::HashSet<Column>],
+    input_column_sets: &[std::collections::HashSet<ColumnReference>],
     input_schemas: &[Arc<DFSchema>],
 ) -> Result<Option<Vec<ExtractionTarget>>> {
     let num_inputs = input_schemas.len();
@@ -1173,7 +1161,7 @@ fn try_push_into_inputs(
     // Build per-input schemas and column sets for routing
     let input_schemas: Vec<Arc<DFSchema>> =
         inputs.iter().map(|i| Arc::clone(i.schema())).collect();
-    let input_column_sets: Vec<std::collections::HashSet<Column>> =
+    let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
         input_schemas.iter().map(|s| schema_columns(s)).collect();
 
     // Route pairs and columns to the appropriate inputs
@@ -2436,16 +2424,18 @@ mod tests {
         // Simulate schema_columns output for two sides of a join where both
         // have a "user" column — each set contains the qualified and
         // unqualified form.
-        let left_cols: HashSet<Column> = [
-            Column::new(Some("test"), "user"),
-            Column::new_unqualified("user"),
+        let relation = "test".into();
+        let left_cols: HashSet<ColumnReference> = [
+            ColumnReference::new(Some(&relation), "user"),
+            ColumnReference::new_unqualified("user"),
         ]
         .into_iter()
         .collect();
 
-        let right_cols: HashSet<Column> = [
-            Column::new(Some("right"), "user"),
-            Column::new_unqualified("user"),
+        let relation = "right".into();
+        let right_cols: HashSet<ColumnReference> = [
+            ColumnReference::new(Some(&relation), "user"),
+            ColumnReference::new_unqualified("user"),
         ]
         .into_iter()
         .collect();
diff --git a/datafusion/optimizer/src/optimize_unions.rs 
b/datafusion/optimizer/src/optimize_unions.rs
index 900757b9a0..80f8ebeef1 100644
--- a/datafusion/optimizer/src/optimize_unions.rs
+++ b/datafusion/optimizer/src/optimize_unions.rs
@@ -64,11 +64,11 @@ impl OptimizerRule for OptimizeUnions {
                 let inputs = inputs
                     .into_iter()
                     .flat_map(extract_plans_from_union)
-                    .map(|plan| coerce_plan_expr_for_schema(plan, &schema))
+                    .map(|plan| Ok(Arc::new(coerce_plan_expr_for_schema(plan, 
&schema)?)))
                     .collect::<Result<Vec<_>>>()?;
 
                 Ok(Transformed::yes(LogicalPlan::Union(Union {
-                    inputs: inputs.into_iter().map(Arc::new).collect_vec(),
+                    inputs,
                     schema,
                 })))
             }
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index 36deb0f67d..a1a636cfef 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -45,7 +45,9 @@ use datafusion_expr::{
 
 use crate::optimizer::ApplyOrder;
 use crate::simplify_expressions::simplify_predicates;
-use crate::utils::{has_all_column_refs, is_restrict_null_predicate};
+use crate::utils::{
+    ColumnReference, has_all_column_refs, is_restrict_null_predicate, 
schema_columns,
+};
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_expr::ExpressionPlacement;
 
@@ -190,11 +192,11 @@ struct ColumnChecker<'a> {
     /// schema of left join input
     left_schema: &'a DFSchema,
     /// columns in left_schema, computed on demand
-    left_columns: Option<HashSet<Column>>,
+    left_columns: Option<HashSet<ColumnReference<'a>>>,
     /// schema of right join input
     right_schema: &'a DFSchema,
     /// columns in left_schema, computed on demand
-    right_columns: Option<HashSet<Column>>,
+    right_columns: Option<HashSet<ColumnReference<'a>>>,
 }
 
 impl<'a> ColumnChecker<'a> {
@@ -224,20 +226,6 @@ impl<'a> ColumnChecker<'a> {
     }
 }
 
-/// Returns all columns in the schema
-fn schema_columns(schema: &DFSchema) -> HashSet<Column> {
-    schema
-        .iter()
-        .flat_map(|(qualifier, field)| {
-            [
-                Column::new(qualifier.cloned(), field.name()),
-                // we need to push down filter using unqualified column as well
-                Column::new_unqualified(field.name()),
-            ]
-        })
-        .collect::<HashSet<_>>()
-}
-
 /// Determine whether the predicate can evaluate as the join conditions
 fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
     let mut is_evaluate = true;
@@ -320,10 +308,8 @@ fn can_evaluate_as_join_condition(predicate: &Expr) -> 
Result<bool> {
 /// * do nothing.
 fn extract_or_clauses_for_join<'a>(
     filters: &'a [Expr],
-    schema: &'a DFSchema,
+    schema_cols: &'a HashSet<ColumnReference>,
 ) -> impl Iterator<Item = Expr> + 'a {
-    let schema_columns = schema_columns(schema);
-
     // new formed OR clauses and their column references
     filters.iter().filter_map(move |expr| {
         if let Expr::BinaryExpr(BinaryExpr {
@@ -332,8 +318,8 @@ fn extract_or_clauses_for_join<'a>(
             right,
         }) = expr
         {
-            let left_expr = extract_or_clause(left.as_ref(), &schema_columns);
-            let right_expr = extract_or_clause(right.as_ref(), 
&schema_columns);
+            let left_expr = extract_or_clause(left.as_ref(), schema_cols);
+            let right_expr = extract_or_clause(right.as_ref(), schema_cols);
 
             // If nothing can be extracted from any sub clauses, do nothing 
for this OR clause.
             if let (Some(left_expr), Some(right_expr)) = (left_expr, 
right_expr) {
@@ -355,7 +341,10 @@ fn extract_or_clauses_for_join<'a>(
 /// Otherwise, return None.
 ///
 /// For other clause, apply the rule above to extract clause.
-fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> 
Option<Expr> {
+fn extract_or_clause(
+    expr: &Expr,
+    schema_columns: &HashSet<ColumnReference>,
+) -> Option<Expr> {
     let mut predicate = None;
 
     match expr {
@@ -421,6 +410,10 @@ fn push_down_all_join(
     // 3) should be kept as filter conditions
     let left_schema = join.left.schema();
     let right_schema = join.right.schema();
+
+    let left_schema_columns = schema_columns(left_schema.as_ref());
+    let right_schema_columns = schema_columns(right_schema.as_ref());
+
     let mut left_push = vec![];
     let mut right_push = vec![];
     let mut keep_predicates = vec![];
@@ -467,12 +460,24 @@ fn push_down_all_join(
     // Extract from OR clause, generate new predicates for both side of join 
if possible.
     // We only track the unpushable predicates above.
     if left_preserved {
-        left_push.extend(extract_or_clauses_for_join(&keep_predicates, 
left_schema));
-        left_push.extend(extract_or_clauses_for_join(&join_conditions, 
left_schema));
+        left_push.extend(extract_or_clauses_for_join(
+            &keep_predicates,
+            &left_schema_columns,
+        ));
+        left_push.extend(extract_or_clauses_for_join(
+            &join_conditions,
+            &left_schema_columns,
+        ));
     }
     if right_preserved {
-        right_push.extend(extract_or_clauses_for_join(&keep_predicates, 
right_schema));
-        right_push.extend(extract_or_clauses_for_join(&join_conditions, 
right_schema));
+        right_push.extend(extract_or_clauses_for_join(
+            &keep_predicates,
+            &right_schema_columns,
+        ));
+        right_push.extend(extract_or_clauses_for_join(
+            &join_conditions,
+            &right_schema_columns,
+        ));
     }
 
     // For predicates from join filter, we should check with if a join side is 
preserved
@@ -480,13 +485,13 @@ fn push_down_all_join(
     if on_left_preserved {
         left_push.extend(extract_or_clauses_for_join(
             &on_filter_join_conditions,
-            left_schema,
+            &left_schema_columns,
         ));
     }
     if on_right_preserved {
         right_push.extend(extract_or_clauses_for_join(
             &on_filter_join_conditions,
-            right_schema,
+            &right_schema_columns,
         ));
     }
 
diff --git a/datafusion/optimizer/src/push_down_limit.rs 
b/datafusion/optimizer/src/push_down_limit.rs
index 755e192e34..4a26cd5884 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -47,12 +47,12 @@ impl OptimizerRule for PushDownLimit {
         true
     }
 
+    #[expect(clippy::only_used_in_recursion)]
     fn rewrite(
         &self,
         plan: LogicalPlan,
         config: &dyn OptimizerConfig,
     ) -> Result<Transformed<LogicalPlan>> {
-        let _ = config.options();
         let LogicalPlan::Limit(mut limit) = plan else {
             return Ok(Transformed::no(plan));
         };
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs 
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index 4778f75e3a..e4455b8c82 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -26,6 +26,7 @@ use std::borrow::Cow;
 use std::collections::HashSet;
 use std::ops::Not;
 use std::sync::Arc;
+use std::sync::LazyLock;
 
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::nested_struct::has_one_of_more_common_fields;
@@ -498,8 +499,6 @@ struct ConstEvaluator {
     /// The `config_options` are passed from the session to allow scalar 
functions
     /// to access configuration like timezone.
     execution_props: ExecutionProps,
-    input_schema: DFSchema,
-    input_batch: RecordBatch,
 }
 
 /// The simplify result of ConstEvaluator
@@ -575,6 +574,18 @@ impl TreeNodeRewriter for ConstEvaluator {
     }
 }
 
+static DUMMY_SCHEMA: LazyLock<Arc<Schema>> =
+    LazyLock::new(|| Arc::new(Schema::new(vec![Field::new(".", DataType::Null, 
true)])));
+
+static DUMMY_DF_SCHEMA: LazyLock<DFSchema> =
+    LazyLock::new(|| DFSchema::try_from(Arc::clone(&*DUMMY_SCHEMA)).unwrap());
+
+static DUMMY_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
+    // Need a single "input" row to produce a single output row
+    let col = new_null_array(&DataType::Null, 1);
+    RecordBatch::try_new(DUMMY_SCHEMA.clone(), vec![col]).unwrap()
+});
+
 impl ConstEvaluator {
     /// Create a new `ConstantEvaluator`.
     ///
@@ -588,16 +599,6 @@ impl ConstEvaluator {
     pub fn try_new(config_options: Option<Arc<ConfigOptions>>) -> Result<Self> 
{
         // The dummy column name is unused and doesn't matter as only
         // expressions without column references can be evaluated
-        static DUMMY_COL_NAME: &str = ".";
-        let schema = Arc::new(Schema::new(vec![Field::new(
-            DUMMY_COL_NAME,
-            DataType::Null,
-            true,
-        )]));
-        let input_schema = DFSchema::try_from(Arc::clone(&schema))?;
-        // Need a single "input" row to produce a single output row
-        let col = new_null_array(&DataType::Null, 1);
-        let input_batch = RecordBatch::try_new(schema, vec![col])?;
 
         let mut execution_props = ExecutionProps::new();
         execution_props.config_options = config_options;
@@ -605,8 +606,6 @@ impl ConstEvaluator {
         Ok(Self {
             can_evaluate: vec![],
             execution_props,
-            input_schema,
-            input_batch,
         })
     }
 
@@ -702,16 +701,13 @@ impl ConstEvaluator {
             return ConstSimplifyResult::NotSimplified(s, m);
         }
 
-        let phys_expr = match create_physical_expr(
-            &expr,
-            &self.input_schema,
-            &self.execution_props,
-        ) {
-            Ok(e) => e,
-            Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, 
expr),
-        };
+        let phys_expr =
+            match create_physical_expr(&expr, &DUMMY_DF_SCHEMA, 
&self.execution_props) {
+                Ok(e) => e,
+                Err(err) => return 
ConstSimplifyResult::SimplifyRuntimeError(err, expr),
+            };
         let metadata = phys_expr
-            .return_field(self.input_batch.schema_ref())
+            .return_field(DUMMY_BATCH.schema_ref())
             .ok()
             .and_then(|f| {
                 let m = f.metadata();
@@ -720,7 +716,7 @@ impl ConstEvaluator {
                     false => Some(FieldMetadata::from(m)),
                 }
             });
-        let col_val = match phys_expr.evaluate(&self.input_batch) {
+        let col_val = match phys_expr.evaluate(&DUMMY_BATCH) {
             Ok(v) => v,
             Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err, 
expr),
         };
@@ -1698,10 +1694,11 @@ impl TreeNodeRewriter for Simplifier<'_> {
                             {
                                 // Repeated occurrences of wildcard are 
redundant so remove them
                                 // exp LIKE '%%'  --> exp LIKE '%'
-                                let simplified_pattern = Regex::new("%%+")
-                                    .unwrap()
-                                    .replace_all(pattern_str, "%")
-                                    .to_string();
+
+                                static LIKE_REGEX: LazyLock<Regex> =
+                                    LazyLock::new(|| 
Regex::new("%%+").unwrap());
+                                let simplified_pattern =
+                                    LIKE_REGEX.replace_all(pattern_str, 
"%").to_string();
                                 Transformed::yes(Expr::Like(Like {
                                     pattern: Box::new(
                                         
string_scalar.to_expr(&simplified_pattern),
diff --git a/datafusion/optimizer/src/utils.rs 
b/datafusion/optimizer/src/utils.rs
index 7e038d2392..ad151d1ddb 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -22,6 +22,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
 use crate::analyzer::type_coercion::TypeCoercionRewriter;
 use arrow::array::{Array, RecordBatch, new_null_array};
 use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::TableReference;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::tree_node::{TransformedResult, TreeNode};
 use datafusion_common::{Column, DFSchema, Result, ScalarValue};
@@ -37,12 +38,17 @@ use std::sync::Arc;
 pub use datafusion_expr::expr_rewriter::NamePreserver;
 
 /// Returns true if `expr` contains all columns in `schema_cols`
-pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) 
-> bool {
+pub(crate) fn has_all_column_refs(
+    expr: &Expr,
+    schema_cols: &HashSet<ColumnReference>,
+) -> bool {
     let column_refs = expr.column_refs();
     // note can't use HashSet::intersect because of different types (owned vs 
References)
-    schema_cols
+    column_refs
         .iter()
-        .filter(|c| column_refs.contains(c))
+        .filter(|c| {
+            schema_cols.contains(&ColumnReference::new(c.relation.as_ref(), 
c.name()))
+        })
         .count()
         == column_refs.len()
 }
@@ -62,6 +68,40 @@ pub(crate) fn replace_qualified_name(
     replace_col(expr, &replace_map)
 }
 
+/// Column reference to avoid copying string around
+#[derive(PartialEq, Eq, Hash, Debug)]
+pub(crate) struct ColumnReference<'a> {
+    pub relation: Option<&'a TableReference>,
+    pub name: &'a str,
+}
+
+impl<'a> ColumnReference<'a> {
+    pub fn new(relation: Option<&'a TableReference>, name: &'a str) -> Self {
+        Self { relation, name }
+    }
+
+    pub fn new_unqualified(name: &'a str) -> Self {
+        Self {
+            relation: None,
+            name,
+        }
+    }
+}
+
+/// Returns references to all columns in the schema
+pub(crate) fn schema_columns<'a>(schema: &'a DFSchema) -> 
HashSet<ColumnReference<'a>> {
+    schema
+        .iter()
+        .flat_map(|(qualifier, field)| {
+            [
+                ColumnReference::new(qualifier, field.name()),
+                // we need to push down filter using unqualified column as well
+                ColumnReference::new_unqualified(field.name()),
+            ]
+        })
+        .collect::<HashSet<_>>()
+}
+
 /// Log the plan in debug/tracing mode after some part of the optimizer runs
 pub fn log_plan(description: &str, plan: &LogicalPlan) {
     debug!("{description}:\n{}\n", plan.display_indent());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to