This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0ac434d0e5 Add case-heavy LEFT JOIN benchmark and debug timing/logging
for PushDownFilter hot paths (#20664)
0ac434d0e5 is described below
commit 0ac434d0e5207b1e2d6fcc9c04b5a4de3b13bec8
Author: kosiew <[email protected]>
AuthorDate: Sat Mar 7 12:10:15 2026 +0800
Add case-heavy LEFT JOIN benchmark and debug timing/logging for
PushDownFilter hot paths (#20664)
## Which issue does this PR close?
* Part of #20002.
## Rationale for this change
The `PushDownFilter` optimizer rule shows a severe planner-time
performance pathology in the `sql_planner_extended` benchmark, where
profiling indicates it dominates total planning CPU time and repeatedly
recomputes expression types.
This PR adds a deterministic, CASE-heavy LEFT JOIN benchmark to reliably
reproduce the worst-case behavior and introduces lightweight debug-only
timing + counters inside `push_down_filter` to make it easier to
pinpoint expensive sub-sections (e.g. predicate simplification and join
predicate inference) during profiling.
## What changes are included in this PR?
* **Benchmark: add a deterministic CASE-heavy LEFT JOIN workload**
* Adds `build_case_heavy_left_join_query` and helpers to construct a
CASE-nested predicate chain over a `LEFT JOIN`.
* Adds a new benchmark `logical_plan_optimize_case_heavy_left_join` to
stress planning/optimization time.
* Adds an A/B benchmark group `push_down_filter_case_heavy_left_join_ab`
that sweeps predicate counts and CASE depth, comparing:
* default optimizer with `push_down_filter` enabled
* optimizer with `push_down_filter` removed
* **Optimizer instrumentation (debug-only)**
* Adds a small `with_debug_timing` helper gated by `log_enabled!(Debug)`
to record microsecond timings for specific sections.
* Instruments and logs:
* time spent in `infer_join_predicates`
* time spent in `simplify_predicates`
* counts of parent predicates, `on_filters`, inferred join predicates
* before/after predicate counts for simplification
## Are these changes tested?
* No new unit/integration tests were added because this PR is focused on
**benchmarking and debug-only instrumentation** rather than changing
optimizer semantics.
* Coverage is provided by:
* compiling/running the `sql_planner_extended` benchmark
* validating both benchmark variants (with/without `push_down_filter`)
produce optimized plans without errors
* enabling `RUST_LOG=debug` to confirm timing sections and counters emit
as expected
## Are there any user-facing changes?
* No user-facing behavior changes.
* The optimizer logic is unchanged; only **debug logging** is added
(emits only when `RUST_LOG` enables Debug for the relevant modules).
* Benchmark suite additions only affect developers running benches.
## LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated
content has been manually reviewed and tested.
---
datafusion/core/benches/sql_planner_extended.rs | 239 +++++++++++++++++++++++-
datafusion/optimizer/src/push_down_filter.rs | 43 ++++-
2 files changed, 276 insertions(+), 6 deletions(-)
diff --git a/datafusion/core/benches/sql_planner_extended.rs
b/datafusion/core/benches/sql_planner_extended.rs
index adaf3e5911..d4955313c7 100644
--- a/datafusion/core/benches/sql_planner_extended.rs
+++ b/datafusion/core/benches/sql_planner_extended.rs
@@ -18,7 +18,7 @@
use arrow::array::{ArrayRef, RecordBatch};
use arrow_schema::DataType;
use arrow_schema::TimeUnit::Nanosecond;
-use criterion::{Criterion, criterion_group, criterion_main};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_catalog::MemTable;
use datafusion_common::ScalarValue;
@@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
+use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
@@ -212,14 +213,127 @@ fn build_test_data_frame(ctx: &SessionContext, rt:
&Runtime) -> DataFrame {
})
}
-fn criterion_benchmark(c: &mut Criterion) {
+/// Build a CASE-heavy dataframe over a non-inner join to stress
+/// planner-time filter pushdown and nullability/type inference.
+fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) ->
DataFrame {
+ register_string_table(ctx, 100, 1000);
+ let query = build_case_heavy_left_join_query(30, 1);
+ rt.block_on(async { ctx.sql(&query).await.unwrap() })
+}
+
+fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize)
-> String {
+ let mut query = String::from(
+ "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE
",
+ );
+
+ if predicate_count == 0 {
+ query.push_str("TRUE");
+ return query;
+ }
+
+ // Keep this deterministic so comparisons between profiles are stable.
+ for i in 0..predicate_count {
+ if i > 0 {
+ query.push_str(" AND ");
+ }
+
+ let mut expr = format!("length(l.c{})", i % 20);
+ for depth in 0..case_depth {
+ let left_col = (i + depth + 1) % 20;
+ let right_col = (i + depth + 2) % 20;
+ expr = format!(
+ "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE
length(r.c{right_col}) END"
+ );
+ }
+
+ let _ = write!(&mut query, "{expr} > 2");
+ }
+
+ query
+}
+
+fn build_case_heavy_left_join_df_with_push_down_filter(
+ rt: &Runtime,
+ predicate_count: usize,
+ case_depth: usize,
+ push_down_filter_enabled: bool,
+) -> DataFrame {
+ let ctx = SessionContext::new();
+ register_string_table(&ctx, 100, 1000);
+ if !push_down_filter_enabled {
+ let removed = ctx.remove_optimizer_rule("push_down_filter");
+ assert!(
+ removed,
+ "push_down_filter rule should be present in the default optimizer"
+ );
+ }
+
+ let query = build_case_heavy_left_join_query(predicate_count, case_depth);
+ rt.block_on(async { ctx.sql(&query).await.unwrap() })
+}
+
+fn build_non_case_left_join_query(
+ predicate_count: usize,
+ nesting_depth: usize,
+) -> String {
+ let mut query = String::from(
+ "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE
",
+ );
+
+ if predicate_count == 0 {
+ query.push_str("TRUE");
+ return query;
+ }
+
+ // Keep this deterministic so comparisons between profiles are stable.
+ for i in 0..predicate_count {
+ if i > 0 {
+ query.push_str(" AND ");
+ }
+
+ let left_col = i % 20;
+ let mut expr = format!("l.c{left_col}");
+ for depth in 0..nesting_depth {
+ let right_col = (i + depth + 1) % 20;
+ expr = format!("coalesce({expr}, r.c{right_col})");
+ }
+
+ let _ = write!(&mut query, "length({expr}) > 2");
+ }
+
+ query
+}
+
+fn build_non_case_left_join_df_with_push_down_filter(
+ rt: &Runtime,
+ predicate_count: usize,
+ nesting_depth: usize,
+ push_down_filter_enabled: bool,
+) -> DataFrame {
let ctx = SessionContext::new();
+ register_string_table(&ctx, 100, 1000);
+ if !push_down_filter_enabled {
+ let removed = ctx.remove_optimizer_rule("push_down_filter");
+ assert!(
+ removed,
+ "push_down_filter rule should be present in the default optimizer"
+ );
+ }
+
+ let query = build_non_case_left_join_query(predicate_count, nesting_depth);
+ rt.block_on(async { ctx.sql(&query).await.unwrap() })
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let baseline_ctx = SessionContext::new();
+ let case_heavy_ctx = SessionContext::new();
let rt = Runtime::new().unwrap();
// validate logical plan optimize performance
// https://github.com/apache/datafusion/issues/17261
- let df = build_test_data_frame(&ctx, &rt);
+ let df = build_test_data_frame(&baseline_ctx, &rt);
+ let case_heavy_left_join_df =
build_case_heavy_left_join_df(&case_heavy_ctx, &rt);
c.bench_function("logical_plan_optimize", |b| {
b.iter(|| {
@@ -227,6 +341,125 @@ fn criterion_benchmark(c: &mut Criterion) {
black_box(rt.block_on(async {
df_clone.into_optimized_plan().unwrap() }));
})
});
+
+ c.bench_function("logical_plan_optimize_hotspot_case_heavy_left_join", |b|
{
+ b.iter(|| {
+ let df_clone = case_heavy_left_join_df.clone();
+ black_box(rt.block_on(async {
df_clone.into_optimized_plan().unwrap() }));
+ })
+ });
+
+ let predicate_sweep = [10, 20, 30, 40, 60];
+ let case_depth_sweep = [1, 2, 3];
+
+ let mut hotspot_group =
+ c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
+ for case_depth in case_depth_sweep {
+ for predicate_count in predicate_sweep {
+ let with_push_down_filter =
+ build_case_heavy_left_join_df_with_push_down_filter(
+ &rt,
+ predicate_count,
+ case_depth,
+ true,
+ );
+ let without_push_down_filter =
+ build_case_heavy_left_join_df_with_push_down_filter(
+ &rt,
+ predicate_count,
+ case_depth,
+ false,
+ );
+
+ let input_label =
+
format!("predicates={predicate_count},case_depth={case_depth}");
+ // A/B interpretation:
+ // - with_push_down_filter: default optimizer path (rule enabled)
+ // - without_push_down_filter: control path with the rule removed
+ // Compare both IDs at the same sweep point to isolate rule impact.
+ hotspot_group.bench_with_input(
+ BenchmarkId::new("with_push_down_filter", &input_label),
+ &with_push_down_filter,
+ |b, df| {
+ b.iter(|| {
+ let df_clone = df.clone();
+ black_box(
+ rt.block_on(async {
+ df_clone.into_optimized_plan().unwrap()
+ }),
+ );
+ })
+ },
+ );
+ hotspot_group.bench_with_input(
+ BenchmarkId::new("without_push_down_filter", &input_label),
+ &without_push_down_filter,
+ |b, df| {
+ b.iter(|| {
+ let df_clone = df.clone();
+ black_box(
+ rt.block_on(async {
+ df_clone.into_optimized_plan().unwrap()
+ }),
+ );
+ })
+ },
+ );
+ }
+ }
+ hotspot_group.finish();
+
+ let mut control_group =
+ c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
+ for nesting_depth in case_depth_sweep {
+ for predicate_count in predicate_sweep {
+ let with_push_down_filter =
build_non_case_left_join_df_with_push_down_filter(
+ &rt,
+ predicate_count,
+ nesting_depth,
+ true,
+ );
+ let without_push_down_filter =
+ build_non_case_left_join_df_with_push_down_filter(
+ &rt,
+ predicate_count,
+ nesting_depth,
+ false,
+ );
+
+ let input_label =
+
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
+ control_group.bench_with_input(
+ BenchmarkId::new("with_push_down_filter", &input_label),
+ &with_push_down_filter,
+ |b, df| {
+ b.iter(|| {
+ let df_clone = df.clone();
+ black_box(
+ rt.block_on(async {
+ df_clone.into_optimized_plan().unwrap()
+ }),
+ );
+ })
+ },
+ );
+ control_group.bench_with_input(
+ BenchmarkId::new("without_push_down_filter", &input_label),
+ &without_push_down_filter,
+ |b, df| {
+ b.iter(|| {
+ let df_clone = df.clone();
+ black_box(
+ rt.block_on(async {
+ df_clone.into_optimized_plan().unwrap()
+ }),
+ );
+ })
+ },
+ );
+ }
+ }
+ control_group.finish();
}
criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index f1664f267b..03a7a0b864 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -23,7 +23,9 @@ use std::sync::Arc;
use arrow::datatypes::DataType;
use indexmap::IndexSet;
use itertools::Itertools;
+use log::{Level, debug, log_enabled};
+use datafusion_common::instant::Instant;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
@@ -525,8 +527,19 @@ fn push_down_join(
.map_or_else(Vec::new, |filter|
split_conjunction_owned(filter.clone()));
// Are there any new join predicates that can be inferred from the filter
expressions?
- let inferred_join_predicates =
- infer_join_predicates(&join, &predicates, &on_filters)?;
+ let inferred_join_predicates = with_debug_timing("infer_join_predicates",
|| {
+ infer_join_predicates(&join, &predicates, &on_filters)
+ })?;
+
+ if log_enabled!(Level::Debug) {
+ debug!(
+ "push_down_filter: join_type={:?}, parent_predicates={},
on_filters={}, inferred_join_predicates={}",
+ join.join_type,
+ predicates.len(),
+ on_filters.len(),
+ inferred_join_predicates.len()
+ );
+ }
if on_filters.is_empty()
&& predicates.is_empty()
@@ -765,7 +778,15 @@ impl OptimizerRule for PushDownFilter {
let predicate = split_conjunction_owned(filter.predicate.clone());
let old_predicate_len = predicate.len();
- let new_predicates = simplify_predicates(predicate)?;
+ let new_predicates =
+ with_debug_timing("simplify_predicates", ||
simplify_predicates(predicate))?;
+ if log_enabled!(Level::Debug) {
+ debug!(
+ "push_down_filter: simplify_predicates old_count={},
new_count={}",
+ old_predicate_len,
+ new_predicates.len()
+ );
+ }
if old_predicate_len != new_predicates.len() {
let Some(new_predicate) = conjunction(new_predicates) else {
// new_predicates is empty - remove the filter entirely
@@ -1377,6 +1398,22 @@ impl PushDownFilter {
}
}
+fn with_debug_timing<T, F>(label: &'static str, f: F) -> Result<T>
+where
+ F: FnOnce() -> Result<T>,
+{
+ if !log_enabled!(Level::Debug) {
+ return f();
+ }
+ let start = Instant::now();
+ let result = f();
+ debug!(
+ "push_down_filter_timing: section={label}, elapsed_us={}",
+ start.elapsed().as_micros()
+ );
+ result
+}
+
/// replaces columns by its name on the projection.
pub fn replace_cols_by_name(
e: Expr,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]