This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 3eace3d6b823 [SPARK-56032][SQL][FOLLOWUP] Add conf to gate 
subexpression elimination in FilterExec codegen
3eace3d6b823 is described below

commit 3eace3d6b823f536f90c265055a4fe17494bd875
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jun 3 01:27:29 2026 +0800

    [SPARK-56032][SQL][FOLLOWUP] Add conf to gate subexpression elimination in 
FilterExec codegen
    
    ### What changes were proposed in this pull request?
    
    Followup to https://github.com/apache/spark/pull/56209.
    
    This adds an internal conf 
`spark.sql.subexpressionElimination.filterExec.enabled` (default `true`) that 
gates subexpression elimination (CSE) in `FilterExec` whole-stage codegen 
specifically. When set to `false`, `FilterExec` falls back to the existing 
predicate codegen path (`generatePredicateCode`) that loads input columns 
lazily and short-circuits; subexpression elimination elsewhere (e.g. 
`ProjectExec`) is unaffected. The conf is checked alongside the existing 
`spark.sql.subexpre [...]
    
    ### Why are the changes needed?
    
    CSE in `FilterExec` codegen (SPARK-56032) can introduce a performance 
regression for some filters. To materialize eliminated subexpressions into 
shared variables, the CSE path emits an eager prologue (`inputVarsEvalCode`) 
that evaluates every input column referenced by the predicates at the top of 
the per-row loop. The non-CSE path instead loads each column lazily, right 
before the predicate that needs it, so a cheap, highly selective leading 
predicate can short-circuit and skip decod [...]
    
    The eager prologue defeats that short-circuiting. When a filter has 
expensive-to-decode columns behind a cheaper, selective predicate -- e.g. 
high-precision decimals, which allocate a `BigInteger`/`BigDecimal` per decode 
-- eagerly decoding those columns for every row, including rows the cheap 
predicate would reject, is pure waste. This showed up as a measurable 
regression on TPC-DS q28.
    
    https://github.com/apache/spark/pull/56209 already removes the prologue 
when there is no common subexpression at all. But even when a common 
subexpression exists, the eager prologue can still regress if the savings from 
eliminating it don't outweigh the lost short-circuiting. The global 
`spark.sql.subexpressionElimination.enabled` flag is too coarse to address this 
-- turning it off also disables CSE for projections and other operators. This 
conf provides a targeted kill-switch to fal [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The conf is internal and defaults to `true`, preserving the current 
behavior.
    
    ### How was this patch tested?
    
    New unit test in `WholeStageCodegenSuite`: with a genuine common 
subexpression in the filter predicates, flipping 
`spark.sql.subexpressionElimination.filterExec.enabled` off (while leaving 
global subexpression elimination on) makes `FilterExec` fall back to the lazy 
non-CSE path -- the shared subexpression is re-evaluated per use, matching the 
code generated when CSE is globally disabled. Existing SPARK-56032 FilterExec 
CSE tests continue to exercise the default-on path.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude (Claude Code)
    
    Closes #56271 from cloud-fan/SPARK-56032-filterExec-cse-conf.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 6d61d4699848a1846497843a4455e361aa79101b)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 ++++++++
 .../sql/execution/basicPhysicalOperators.scala     |  6 ++-
 .../sql/execution/WholeStageCodegenSuite.scala     | 47 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 46456a912ede..b69e0428630c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1331,6 +1331,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val SUBEXPRESSION_ELIMINATION_FILTER_EXEC_ENABLED =
+    buildConf("spark.sql.subexpressionElimination.filterExec.enabled")
+      .internal()
+      .doc("When true (and subexpression elimination is enabled), FilterExec 
whole-stage " +
+        "codegen eliminates common subexpressions shared across its 
predicates. When false, " +
+        "FilterExec falls back to the predicate codegen that loads input 
columns lazily and " +
+        "short-circuits, avoiding eager materialization of all 
predicate-referenced columns on " +
+        "every row. Only affects FilterExec; subexpression elimination 
elsewhere is unaffected.")
+      .version("4.2.0")
+      .withBindingPolicy(ConfigBindingPolicy.SESSION)
+      .booleanConf
+      .createWithDefault(true)
+
   val CASE_SENSITIVE = buildConf(SqlApiConfHelper.CASE_SENSITIVE_KEY)
     .internal()
     .doc("Whether the query analyzer should be case sensitive or not. " +
@@ -7996,6 +8009,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def subexpressionEliminationSkipForShotcutExpr: Boolean =
     getConf(SUBEXPRESSION_ELIMINATION_SKIP_FOR_SHORTCUT_EXPR)
 
+  def subexpressionEliminationFilterExecEnabled: Boolean =
+    getConf(SUBEXPRESSION_ELIMINATION_FILTER_EXEC_ENABLED)
+
   def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
   def limitInitialNumPartitions: Int = getConf(LIMIT_INITIAL_NUM_PARTITIONS)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 8d183f915e8a..9ed9c312a4be 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -315,8 +315,12 @@ case class FilterExec(condition: Expression, child: 
SparkPlan)
     // predicate that needs them. With no common subexpression the prologue is 
pure overhead
     // (e.g. decoding a decimal column for rows a cheaper earlier predicate 
would reject), so we
     // fall back to `generatePredicateCode`.
+    //
+    // `subexpressionElimination.filterExec.enabled` additionally gates this 
path so it can be
+    // turned off independently of subexpression elimination elsewhere.
     val (prologueCode, predicateCode) =
-      if (conf.subexpressionEliminationEnabled && otherPreds.nonEmpty &&
+      if (conf.subexpressionEliminationEnabled && 
conf.subexpressionEliminationFilterExecEnabled &&
+          otherPreds.nonEmpty &&
           otherPredsEquivalentExpressions.getCommonSubexpressions.nonEmpty) {
         // Pre-evaluate input variables before CSE analysis: CSE clears
         // ctx.currentVars[i].code as a side effect; without this 
pre-evaluation, Janino
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 8f0ec0ffd6f1..a83d5c99bb5d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -1224,4 +1224,51 @@ class WholeStageCodegenSuite extends SharedSparkSession
       "With no common subexpression, CSE-enabled FilterExec codegen should be 
identical to " +
         "CSE-disabled codegen (i.e. fall back to the lazy, short-circuiting 
non-CSE path)")
   }
+
+  test("SPARK-56032: subexpressionElimination.filterExec.enabled gates 
FilterExec CSE " +
+    "independently of subexpression elimination") {
+    // The conf disables CSE specifically for FilterExec while leaving 
subexpression elimination
+    // enabled elsewhere. With a genuine common subexpression in the 
predicates, turning the conf
+    // off should make FilterExec fall back to the lazy non-CSE path 
(re-evaluating the shared
+    // subexpression per use), matching the code generated when CSE is 
globally disabled.
+    val schema = StructType(Seq(
+      StructField("a", DayTimeIntervalType(), nullable = true),
+      StructField("b", DayTimeIntervalType(), nullable = true)))
+    val data = spark.sparkContext.parallelize(Seq(
+      Row(Duration.ofDays(1), Duration.ofDays(5)),
+      Row(Duration.ofDays(5), Duration.ofDays(6)),
+      Row(Duration.ofDays(2), Duration.ofDays(3))))
+    val expected = data.collect().toSeq
+
+    // `a + b` appears three times in the predicate, so it is a CSE candidate. 
We count `addExact`
+    // occurrences in the generated code: the CSE path evaluates it once, the 
lazy path per use.
+    def filterCode(filterExecCseEnabled: Boolean): String = {
+      withSQLConf(
+        // Subexpression elimination stays globally on; only the FilterExec 
gate flips.
+        SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> "true",
+        SQLConf.SUBEXPRESSION_ELIMINATION_FILTER_EXEC_ENABLED.key ->
+          filterExecCseEnabled.toString,
+        SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+        val df = spark.createDataFrame(data, schema)
+        val filtered = df.where(
+          "a IS NOT NULL AND (a + b) > INTERVAL '3' DAY " +
+            "AND (a + b) < INTERVAL '15' DAY AND (a + b) != INTERVAL '10' DAY")
+        val plan = filtered.queryExecution.executedPlan
+        assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]),
+          "Filter should be in whole-stage codegen")
+        checkAnswer(filtered, expected)
+        codegenString(plan)
+      }
+    }
+
+    val addExactPattern = "addExact".r
+    val enabledCount = 
addExactPattern.findAllIn(filterCode(filterExecCseEnabled = true)).length
+    val disabledCount = 
addExactPattern.findAllIn(filterCode(filterExecCseEnabled = false)).length
+    // With the gate on, CSE collapses the repeated `a + b` evaluations; with 
the gate off,
+    // FilterExec falls back to the lazy path that re-evaluates per use.
+    assert(enabledCount < disabledCount,
+      s"subexpressionElimination.filterExec.enabled should reduce repeated 
evaluation: " +
+        s"addExact appears $enabledCount times when enabled vs $disabledCount 
times when disabled")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to