Liu created FLINK-39268:
---------------------------

             Summary: Support Common Subexpression Elimination (CSE) in Calc 
Code Generation
                 Key: FLINK-39268
                 URL: https://issues.apache.org/jira/browse/FLINK-39268
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Liu


h2. Motivation

Common Subexpression Elimination (CSE) is a classic compiler optimization that 
avoids redundant evaluation of identical subexpressions by computing each 
unique expression only once and reusing the result.

In Flink SQL, when a query references the same expression multiple times across 
projections and/or filter conditions, the Calc operator currently evaluates it 
redundantly. For example:
{code:sql}
SELECT
  expensive_udf(a, b) + 1 AS col1,
  expensive_udf(a, b) + 2 AS col2
FROM source_table
WHERE expensive_udf(a, b) > 0
{code}
Here {{expensive_udf(a, b)}} is evaluated 3 times per row despite being 
deterministic with identical inputs. With CSE, it would be evaluated only once.

This optimization is widely adopted in other query engines (e.g., Spark's 
{{{}spark.sql.subexpressionElimination.enabled{}}}). 
h2. Proposed Approach

Perform CSE at the codegen level within {{{}CalcCodeGenerator{}}}, controlled 
by a new configuration option (disabled by default):
{code:java}
table.exec.sql.codegen.subexpression-elimination.enabled = false
{code}
The implementation introduces a {{ReusableSubExprs}} class that builds a *scope 
tree* from the expression tree to identify safely reusable subexpressions, 
respecting SQL's short-circuit evaluation semantics:
 - {*}AND/OR{*}: The leftmost operand is pulled up to the parent scope (always 
evaluated); subsequent operands form child scopes (may be short-circuited).
 - {*}CASE/IF/COALESCE{*}: The condition is pulled up; result branches form 
independent child scopes.
 - {*}Other operators{*}: All operands are pulled up to the parent scope.

An expression is reusable if: (1) it is deterministic and non-dynamic, and (2) 
an equivalent expression (matched by {{RexNode.toString()}} + 
{{{}RelDataType.getFullTypeString(){}}}) already exists in the same or an 
ancestor scope.

During codegen, {{ExprCodeGenerator.visitCall()}} checks the reuse map before 
generating code. If a match exists, it returns a {{GeneratedExpression}} 
referencing the existing result variable (with empty code). Otherwise, if the 
expression is marked reusable, it registers the generated result for subsequent 
references.
h3. Design Decisions
 * {*}Scope{*}: This initial implementation targets the Calc operator only 
(both batch and streaming). Extension to other operators (e.g., Join, 
Aggregate) can be addressed in follow-up work.
 * {*}UDF support{*}: CSE applies uniformly to built-in functions and 
user-defined functions. The correctness relies on the 
{{FunctionDefinition.isDeterministic()}} contract — the same contract already 
used by Flink's existing constant folding optimization. UDFs with side effects 
must override {{isDeterministic()}} to return false.
 * {*}Compatibility{*}: Feature is off by default with no API changes, so it is 
fully backward-compatible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to