This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fee90be613 Misc minor optimizations to query optimizer performance
(#21128)
fee90be613 is described below
commit fee90be613446d2c61c3bd129a201c7faecef521
Author: Adam Gutglick <[email protected]>
AuthorDate: Tue Mar 31 19:34:44 2026 +0100
Misc minor optimizations to query optimizer performance (#21128)
## Which issue does this PR close?
- Closes #.
## Rationale for this change
Inspired by @blaginin, trying to find more places that might drag the
optimizer's performance. On my laptop , this improves many of the sql
planner's benchmarks by a fairly consistent 2-5%.
## What changes are included in this PR?
A slew of minor optimization in the logical planner, trying to avoid
wasted work or repeated allocations
## Are these changes tested?
Existing tests.
## Are there any user-facing changes?
None
---------
Signed-off-by: Adam Gutglick <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/optimizer/src/analyzer/type_coercion.rs | 6 +--
.../optimizer/src/common_subexpr_eliminate.rs | 6 +--
.../optimizer/src/extract_leaf_expressions.rs | 46 +++++++---------
datafusion/optimizer/src/optimize_unions.rs | 4 +-
datafusion/optimizer/src/push_down_filter.rs | 63 ++++++++++++----------
datafusion/optimizer/src/push_down_limit.rs | 2 +-
.../src/simplify_expressions/expr_simplifier.rs | 53 +++++++++---------
datafusion/optimizer/src/utils.rs | 46 ++++++++++++++--
8 files changed, 127 insertions(+), 99 deletions(-)
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 6f6898e219..bf91595e95 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -20,7 +20,7 @@
use arrow::compute::can_cast_types;
use datafusion_expr::binary::BinaryTypeCoercer;
use itertools::{Itertools as _, izip};
-use std::sync::Arc;
+use std::sync::{Arc, LazyLock};
use crate::analyzer::AnalyzerRule;
use crate::utils::NamePreserver;
@@ -91,11 +91,11 @@ impl AnalyzerRule for TypeCoercion {
}
fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) ->
Result<LogicalPlan> {
- let empty_schema = DFSchema::empty();
+ static EMPTY_SCHEMA: LazyLock<DFSchema> =
LazyLock::new(DFSchema::empty);
// recurse
let transformed_plan = plan
- .transform_up_with_subqueries(|plan|
analyze_internal(&empty_schema, plan))?
+ .transform_up_with_subqueries(|plan|
analyze_internal(&EMPTY_SCHEMA, plan))?
.data;
// finish
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 88dba57d75..4213c23ccc 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -325,11 +325,7 @@ impl CommonSubexprEliminate {
.map(|expr| Some(name_preserver.save(expr)))
.collect::<Vec<_>>()
} else {
- new_aggr_expr
- .clone()
- .into_iter()
- .map(|_| None)
- .collect::<Vec<_>>()
+ (0..new_aggr_expr.len()).map(|_| None).collect()
};
let mut agg_exprs = common_exprs
diff --git a/datafusion/optimizer/src/extract_leaf_expressions.rs
b/datafusion/optimizer/src/extract_leaf_expressions.rs
index a57f71ea67..d3a84c0172 100644
--- a/datafusion/optimizer/src/extract_leaf_expressions.rs
+++ b/datafusion/optimizer/src/extract_leaf_expressions.rs
@@ -32,7 +32,7 @@ use datafusion_expr::{Expr, ExpressionPlacement, Projection};
use crate::optimizer::ApplyOrder;
use crate::push_down_filter::replace_cols_by_name;
-use crate::utils::has_all_column_refs;
+use crate::utils::{ColumnReference, has_all_column_refs, schema_columns};
use crate::{OptimizerConfig, OptimizerRule};
/// Prefix for aliases generated by the extraction optimizer passes.
@@ -213,10 +213,11 @@ fn extract_from_plan(
.collect();
// Build per-input column sets for routing expressions to the correct input
- let input_column_sets: Vec<std::collections::HashSet<Column>> =
input_schemas
- .iter()
- .map(|schema| schema_columns(schema.as_ref()))
- .collect();
+ let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
+ input_schemas
+ .iter()
+ .map(|schema| schema_columns(schema.as_ref()))
+ .collect();
// Transform expressions via map_expressions with routing
let transformed = plan.map_expressions(|expr| {
@@ -272,7 +273,7 @@ fn extract_from_plan(
/// in both sides of a join).
fn find_owning_input(
expr: &Expr,
- input_column_sets: &[std::collections::HashSet<Column>],
+ input_column_sets: &[std::collections::HashSet<ColumnReference>],
) -> Option<usize> {
let mut found = None;
for (idx, cols) in input_column_sets.iter().enumerate() {
@@ -292,7 +293,7 @@ fn find_owning_input(
fn routing_extract(
expr: Expr,
extractors: &mut [LeafExpressionExtractor],
- input_column_sets: &[std::collections::HashSet<Column>],
+ input_column_sets: &[std::collections::HashSet<ColumnReference>],
) -> Result<Transformed<Expr>> {
expr.transform_down(|e| {
// Skip expressions already aliased with extracted expression pattern
@@ -340,19 +341,6 @@ fn routing_extract(
})
}
-/// Returns all columns in the schema (both qualified and unqualified forms)
-fn schema_columns(schema: &DFSchema) -> std::collections::HashSet<Column> {
- schema
- .iter()
- .flat_map(|(qualifier, field)| {
- [
- Column::new(qualifier.cloned(), field.name()),
- Column::new_unqualified(field.name()),
- ]
- })
- .collect()
-}
-
/// Rewrites extraction pairs and column references from one qualifier
/// space to another.
///
@@ -1072,7 +1060,7 @@ fn route_to_inputs(
pairs: &[(Expr, String)],
columns: &IndexSet<Column>,
node: &LogicalPlan,
- input_column_sets: &[std::collections::HashSet<Column>],
+ input_column_sets: &[std::collections::HashSet<ColumnReference>],
input_schemas: &[Arc<DFSchema>],
) -> Result<Option<Vec<ExtractionTarget>>> {
let num_inputs = input_schemas.len();
@@ -1173,7 +1161,7 @@ fn try_push_into_inputs(
// Build per-input schemas and column sets for routing
let input_schemas: Vec<Arc<DFSchema>> =
inputs.iter().map(|i| Arc::clone(i.schema())).collect();
- let input_column_sets: Vec<std::collections::HashSet<Column>> =
+ let input_column_sets: Vec<std::collections::HashSet<ColumnReference>> =
input_schemas.iter().map(|s| schema_columns(s)).collect();
// Route pairs and columns to the appropriate inputs
@@ -2436,16 +2424,18 @@ mod tests {
// Simulate schema_columns output for two sides of a join where both
// have a "user" column — each set contains the qualified and
// unqualified form.
- let left_cols: HashSet<Column> = [
- Column::new(Some("test"), "user"),
- Column::new_unqualified("user"),
+ let relation = "test".into();
+ let left_cols: HashSet<ColumnReference> = [
+ ColumnReference::new(Some(&relation), "user"),
+ ColumnReference::new_unqualified("user"),
]
.into_iter()
.collect();
- let right_cols: HashSet<Column> = [
- Column::new(Some("right"), "user"),
- Column::new_unqualified("user"),
+ let relation = "right".into();
+ let right_cols: HashSet<ColumnReference> = [
+ ColumnReference::new(Some(&relation), "user"),
+ ColumnReference::new_unqualified("user"),
]
.into_iter()
.collect();
diff --git a/datafusion/optimizer/src/optimize_unions.rs
b/datafusion/optimizer/src/optimize_unions.rs
index 900757b9a0..80f8ebeef1 100644
--- a/datafusion/optimizer/src/optimize_unions.rs
+++ b/datafusion/optimizer/src/optimize_unions.rs
@@ -64,11 +64,11 @@ impl OptimizerRule for OptimizeUnions {
let inputs = inputs
.into_iter()
.flat_map(extract_plans_from_union)
- .map(|plan| coerce_plan_expr_for_schema(plan, &schema))
+ .map(|plan| Ok(Arc::new(coerce_plan_expr_for_schema(plan,
&schema)?)))
.collect::<Result<Vec<_>>>()?;
Ok(Transformed::yes(LogicalPlan::Union(Union {
- inputs: inputs.into_iter().map(Arc::new).collect_vec(),
+ inputs,
schema,
})))
}
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index 36deb0f67d..a1a636cfef 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -45,7 +45,9 @@ use datafusion_expr::{
use crate::optimizer::ApplyOrder;
use crate::simplify_expressions::simplify_predicates;
-use crate::utils::{has_all_column_refs, is_restrict_null_predicate};
+use crate::utils::{
+ ColumnReference, has_all_column_refs, is_restrict_null_predicate,
schema_columns,
+};
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_expr::ExpressionPlacement;
@@ -190,11 +192,11 @@ struct ColumnChecker<'a> {
/// schema of left join input
left_schema: &'a DFSchema,
/// columns in left_schema, computed on demand
- left_columns: Option<HashSet<Column>>,
+ left_columns: Option<HashSet<ColumnReference<'a>>>,
/// schema of right join input
right_schema: &'a DFSchema,
/// columns in left_schema, computed on demand
- right_columns: Option<HashSet<Column>>,
+ right_columns: Option<HashSet<ColumnReference<'a>>>,
}
impl<'a> ColumnChecker<'a> {
@@ -224,20 +226,6 @@ impl<'a> ColumnChecker<'a> {
}
}
-/// Returns all columns in the schema
-fn schema_columns(schema: &DFSchema) -> HashSet<Column> {
- schema
- .iter()
- .flat_map(|(qualifier, field)| {
- [
- Column::new(qualifier.cloned(), field.name()),
- // we need to push down filter using unqualified column as well
- Column::new_unqualified(field.name()),
- ]
- })
- .collect::<HashSet<_>>()
-}
-
/// Determine whether the predicate can evaluate as the join conditions
fn can_evaluate_as_join_condition(predicate: &Expr) -> Result<bool> {
let mut is_evaluate = true;
@@ -320,10 +308,8 @@ fn can_evaluate_as_join_condition(predicate: &Expr) ->
Result<bool> {
/// * do nothing.
fn extract_or_clauses_for_join<'a>(
filters: &'a [Expr],
- schema: &'a DFSchema,
+ schema_cols: &'a HashSet<ColumnReference>,
) -> impl Iterator<Item = Expr> + 'a {
- let schema_columns = schema_columns(schema);
-
// new formed OR clauses and their column references
filters.iter().filter_map(move |expr| {
if let Expr::BinaryExpr(BinaryExpr {
@@ -332,8 +318,8 @@ fn extract_or_clauses_for_join<'a>(
right,
}) = expr
{
- let left_expr = extract_or_clause(left.as_ref(), &schema_columns);
- let right_expr = extract_or_clause(right.as_ref(),
&schema_columns);
+ let left_expr = extract_or_clause(left.as_ref(), schema_cols);
+ let right_expr = extract_or_clause(right.as_ref(), schema_cols);
// If nothing can be extracted from any sub clauses, do nothing
for this OR clause.
if let (Some(left_expr), Some(right_expr)) = (left_expr,
right_expr) {
@@ -355,7 +341,10 @@ fn extract_or_clauses_for_join<'a>(
/// Otherwise, return None.
///
/// For other clause, apply the rule above to extract clause.
-fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) ->
Option<Expr> {
+fn extract_or_clause(
+ expr: &Expr,
+ schema_columns: &HashSet<ColumnReference>,
+) -> Option<Expr> {
let mut predicate = None;
match expr {
@@ -421,6 +410,10 @@ fn push_down_all_join(
// 3) should be kept as filter conditions
let left_schema = join.left.schema();
let right_schema = join.right.schema();
+
+ let left_schema_columns = schema_columns(left_schema.as_ref());
+ let right_schema_columns = schema_columns(right_schema.as_ref());
+
let mut left_push = vec![];
let mut right_push = vec![];
let mut keep_predicates = vec![];
@@ -467,12 +460,24 @@ fn push_down_all_join(
// Extract from OR clause, generate new predicates for both side of join
if possible.
// We only track the unpushable predicates above.
if left_preserved {
- left_push.extend(extract_or_clauses_for_join(&keep_predicates,
left_schema));
- left_push.extend(extract_or_clauses_for_join(&join_conditions,
left_schema));
+ left_push.extend(extract_or_clauses_for_join(
+ &keep_predicates,
+ &left_schema_columns,
+ ));
+ left_push.extend(extract_or_clauses_for_join(
+ &join_conditions,
+ &left_schema_columns,
+ ));
}
if right_preserved {
- right_push.extend(extract_or_clauses_for_join(&keep_predicates,
right_schema));
- right_push.extend(extract_or_clauses_for_join(&join_conditions,
right_schema));
+ right_push.extend(extract_or_clauses_for_join(
+ &keep_predicates,
+ &right_schema_columns,
+ ));
+ right_push.extend(extract_or_clauses_for_join(
+ &join_conditions,
+ &right_schema_columns,
+ ));
}
// For predicates from join filter, we should check with if a join side is
preserved
@@ -480,13 +485,13 @@ fn push_down_all_join(
if on_left_preserved {
left_push.extend(extract_or_clauses_for_join(
&on_filter_join_conditions,
- left_schema,
+ &left_schema_columns,
));
}
if on_right_preserved {
right_push.extend(extract_or_clauses_for_join(
&on_filter_join_conditions,
- right_schema,
+ &right_schema_columns,
));
}
diff --git a/datafusion/optimizer/src/push_down_limit.rs
b/datafusion/optimizer/src/push_down_limit.rs
index 755e192e34..4a26cd5884 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -47,12 +47,12 @@ impl OptimizerRule for PushDownLimit {
true
}
+ #[expect(clippy::only_used_in_recursion)]
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
- let _ = config.options();
let LogicalPlan::Limit(mut limit) = plan else {
return Ok(Transformed::no(plan));
};
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index 4778f75e3a..e4455b8c82 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -26,6 +26,7 @@ use std::borrow::Cow;
use std::collections::HashSet;
use std::ops::Not;
use std::sync::Arc;
+use std::sync::LazyLock;
use datafusion_common::config::ConfigOptions;
use datafusion_common::nested_struct::has_one_of_more_common_fields;
@@ -498,8 +499,6 @@ struct ConstEvaluator {
/// The `config_options` are passed from the session to allow scalar
functions
/// to access configuration like timezone.
execution_props: ExecutionProps,
- input_schema: DFSchema,
- input_batch: RecordBatch,
}
/// The simplify result of ConstEvaluator
@@ -575,6 +574,18 @@ impl TreeNodeRewriter for ConstEvaluator {
}
}
+static DUMMY_SCHEMA: LazyLock<Arc<Schema>> =
+ LazyLock::new(|| Arc::new(Schema::new(vec![Field::new(".", DataType::Null,
true)])));
+
+static DUMMY_DF_SCHEMA: LazyLock<DFSchema> =
+ LazyLock::new(|| DFSchema::try_from(Arc::clone(&*DUMMY_SCHEMA)).unwrap());
+
+static DUMMY_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
+ // Need a single "input" row to produce a single output row
+ let col = new_null_array(&DataType::Null, 1);
+ RecordBatch::try_new(DUMMY_SCHEMA.clone(), vec![col]).unwrap()
+});
+
impl ConstEvaluator {
/// Create a new `ConstantEvaluator`.
///
@@ -588,16 +599,6 @@ impl ConstEvaluator {
pub fn try_new(config_options: Option<Arc<ConfigOptions>>) -> Result<Self>
{
// The dummy column name is unused and doesn't matter as only
// expressions without column references can be evaluated
- static DUMMY_COL_NAME: &str = ".";
- let schema = Arc::new(Schema::new(vec![Field::new(
- DUMMY_COL_NAME,
- DataType::Null,
- true,
- )]));
- let input_schema = DFSchema::try_from(Arc::clone(&schema))?;
- // Need a single "input" row to produce a single output row
- let col = new_null_array(&DataType::Null, 1);
- let input_batch = RecordBatch::try_new(schema, vec![col])?;
let mut execution_props = ExecutionProps::new();
execution_props.config_options = config_options;
@@ -605,8 +606,6 @@ impl ConstEvaluator {
Ok(Self {
can_evaluate: vec![],
execution_props,
- input_schema,
- input_batch,
})
}
@@ -702,16 +701,13 @@ impl ConstEvaluator {
return ConstSimplifyResult::NotSimplified(s, m);
}
- let phys_expr = match create_physical_expr(
- &expr,
- &self.input_schema,
- &self.execution_props,
- ) {
- Ok(e) => e,
- Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err,
expr),
- };
+ let phys_expr =
+ match create_physical_expr(&expr, &DUMMY_DF_SCHEMA,
&self.execution_props) {
+ Ok(e) => e,
+ Err(err) => return
ConstSimplifyResult::SimplifyRuntimeError(err, expr),
+ };
let metadata = phys_expr
- .return_field(self.input_batch.schema_ref())
+ .return_field(DUMMY_BATCH.schema_ref())
.ok()
.and_then(|f| {
let m = f.metadata();
@@ -720,7 +716,7 @@ impl ConstEvaluator {
false => Some(FieldMetadata::from(m)),
}
});
- let col_val = match phys_expr.evaluate(&self.input_batch) {
+ let col_val = match phys_expr.evaluate(&DUMMY_BATCH) {
Ok(v) => v,
Err(err) => return ConstSimplifyResult::SimplifyRuntimeError(err,
expr),
};
@@ -1698,10 +1694,11 @@ impl TreeNodeRewriter for Simplifier<'_> {
{
// Repeated occurrences of wildcard are
redundant so remove them
// exp LIKE '%%' --> exp LIKE '%'
- let simplified_pattern = Regex::new("%%+")
- .unwrap()
- .replace_all(pattern_str, "%")
- .to_string();
+
+ static LIKE_REGEX: LazyLock<Regex> =
+ LazyLock::new(||
Regex::new("%%+").unwrap());
+ let simplified_pattern =
+ LIKE_REGEX.replace_all(pattern_str,
"%").to_string();
Transformed::yes(Expr::Like(Like {
pattern: Box::new(
string_scalar.to_expr(&simplified_pattern),
diff --git a/datafusion/optimizer/src/utils.rs
b/datafusion/optimizer/src/utils.rs
index 7e038d2392..ad151d1ddb 100644
--- a/datafusion/optimizer/src/utils.rs
+++ b/datafusion/optimizer/src/utils.rs
@@ -22,6 +22,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use crate::analyzer::type_coercion::TypeCoercionRewriter;
use arrow::array::{Array, RecordBatch, new_null_array};
use arrow::datatypes::{DataType, Field, Schema};
+use datafusion_common::TableReference;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{Column, DFSchema, Result, ScalarValue};
@@ -37,12 +38,17 @@ use std::sync::Arc;
pub use datafusion_expr::expr_rewriter::NamePreserver;
/// Returns true if `expr` contains all columns in `schema_cols`
-pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>)
-> bool {
+pub(crate) fn has_all_column_refs(
+ expr: &Expr,
+ schema_cols: &HashSet<ColumnReference>,
+) -> bool {
let column_refs = expr.column_refs();
// note can't use HashSet::intersect because of different types (owned vs
References)
- schema_cols
+ column_refs
.iter()
- .filter(|c| column_refs.contains(c))
+ .filter(|c| {
+ schema_cols.contains(&ColumnReference::new(c.relation.as_ref(),
c.name()))
+ })
.count()
== column_refs.len()
}
@@ -62,6 +68,40 @@ pub(crate) fn replace_qualified_name(
replace_col(expr, &replace_map)
}
+/// Column reference to avoid copying string around
+#[derive(PartialEq, Eq, Hash, Debug)]
+pub(crate) struct ColumnReference<'a> {
+ pub relation: Option<&'a TableReference>,
+ pub name: &'a str,
+}
+
+impl<'a> ColumnReference<'a> {
+ pub fn new(relation: Option<&'a TableReference>, name: &'a str) -> Self {
+ Self { relation, name }
+ }
+
+ pub fn new_unqualified(name: &'a str) -> Self {
+ Self {
+ relation: None,
+ name,
+ }
+ }
+}
+
+/// Returns references to all columns in the schema
+pub(crate) fn schema_columns<'a>(schema: &'a DFSchema) ->
HashSet<ColumnReference<'a>> {
+ schema
+ .iter()
+ .flat_map(|(qualifier, field)| {
+ [
+ ColumnReference::new(qualifier, field.name()),
+ // we need to push down filter using unqualified column as well
+ ColumnReference::new_unqualified(field.name()),
+ ]
+ })
+ .collect::<HashSet<_>>()
+}
+
/// Log the plan in debug/tracing mode after some part of the optimizer runs
pub fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]