[ 
https://issues.apache.org/jira/browse/FLINK-39268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-39268:
------------------------------------
    Description: 
h2. Motivation

Common Subexpression Elimination (CSE) is a classic compiler optimization that 
avoids redundant evaluation of identical subexpressions by computing each 
unique expression only once and reusing the result.

In Flink SQL, when a query references the same expression multiple times across 
projections and/or filter conditions, the Calc operator currently evaluates it 
redundantly. For example:
{code:sql}
SELECT
  expensive_udf(a, b) + 1 AS col1,
  expensive_udf(a, b) + 2 AS col2
FROM source_table
WHERE expensive_udf(a, b) > 0
{code}
Here {{expensive_udf(a, b)}} is evaluated 3 times per row despite being 
deterministic with identical inputs. With CSE, it would be evaluated only once.

This optimization is widely adopted in other query engines (e.g., Spark's 
{{{}spark.sql.subexpressionElimination.enabled{}}}). 
h2. Proposed Approach

Perform CSE at the codegen level within {{{}CalcCodeGenerator{}}},:
The implementation introduces a {{ReusableSubExprs}} class that builds a *scope 
tree* from the expression tree to identify safely reusable subexpressions, 
respecting SQL's short-circuit evaluation semantics:
 - {*}AND/OR{*}: The leftmost operand is pulled up to the parent scope (always 
evaluated); subsequent operands form child scopes (may be short-circuited).
 - {*}CASE/IF/COALESCE{*}: The condition is pulled up; result branches form 
independent child scopes.
 - {*}Other operators{*}: All operands are pulled up to the parent scope.

An expression is reusable if: (1) it is deterministic and non-dynamic, and (2) 
an equivalent expression (matched by {{RexNode.toString()}} + 
{{{}RelDataType.getFullTypeString(){}}}) already exists in the same or an 
ancestor scope.

During codegen, {{ExprCodeGenerator.visitCall()}} checks the reuse map before 
generating code. If a match exists, it returns a {{GeneratedExpression}} 
referencing the existing result variable (with empty code). Otherwise, if the 
expression is marked reusable, it registers the generated result for subsequent 
references.
h3. Design Decisions
 * {*}Scope{*}: This initial implementation targets the Calc operator only 
(both batch and streaming). Extension to other operators (e.g., Join, 
Aggregate) can be addressed in follow-up work.
 * {*}UDF support{*}: CSE applies uniformly to built-in functions and 
user-defined functions. The correctness relies on the 
{{FunctionDefinition.isDeterministic()}} contract — the same contract already 
used by Flink's existing constant folding optimization. UDFs with side effects 
must override {{isDeterministic()}} to return false.


  was:
h2. Motivation

Common Subexpression Elimination (CSE) is a classic compiler optimization that 
avoids redundant evaluation of identical subexpressions by computing each 
unique expression only once and reusing the result.

In Flink SQL, when a query references the same expression multiple times across 
projections and/or filter conditions, the Calc operator currently evaluates it 
redundantly. For example:
{code:sql}
SELECT
  expensive_udf(a, b) + 1 AS col1,
  expensive_udf(a, b) + 2 AS col2
FROM source_table
WHERE expensive_udf(a, b) > 0
{code}
Here {{expensive_udf(a, b)}} is evaluated 3 times per row despite being 
deterministic with identical inputs. With CSE, it would be evaluated only once.

This optimization is widely adopted in other query engines (e.g., Spark's 
{{{}spark.sql.subexpressionElimination.enabled{}}}). 
h2. Proposed Approach

Perform CSE at the codegen level within {{{}CalcCodeGenerator{}}},:
{code:java}
table.exec.sql.codegen.subexpression-elimination.enabled = false
{code}
The implementation introduces a {{ReusableSubExprs}} class that builds a *scope 
tree* from the expression tree to identify safely reusable subexpressions, 
respecting SQL's short-circuit evaluation semantics:
 - {*}AND/OR{*}: The leftmost operand is pulled up to the parent scope (always 
evaluated); subsequent operands form child scopes (may be short-circuited).
 - {*}CASE/IF/COALESCE{*}: The condition is pulled up; result branches form 
independent child scopes.
 - {*}Other operators{*}: All operands are pulled up to the parent scope.

An expression is reusable if: (1) it is deterministic and non-dynamic, and (2) 
an equivalent expression (matched by {{RexNode.toString()}} + 
{{{}RelDataType.getFullTypeString(){}}}) already exists in the same or an 
ancestor scope.

During codegen, {{ExprCodeGenerator.visitCall()}} checks the reuse map before 
generating code. If a match exists, it returns a {{GeneratedExpression}} 
referencing the existing result variable (with empty code). Otherwise, if the 
expression is marked reusable, it registers the generated result for subsequent 
references.
h3. Design Decisions
 * {*}Scope{*}: This initial implementation targets the Calc operator only 
(both batch and streaming). Extension to other operators (e.g., Join, 
Aggregate) can be addressed in follow-up work.
 * {*}UDF support{*}: CSE applies uniformly to built-in functions and 
user-defined functions. The correctness relies on the 
{{FunctionDefinition.isDeterministic()}} contract — the same contract already 
used by Flink's existing constant folding optimization. UDFs with side effects 
must override {{isDeterministic()}} to return false.



> Support Common Subexpression Elimination (CSE) in Calc Code Generation
> ----------------------------------------------------------------------
>
>                 Key: FLINK-39268
>                 URL: https://issues.apache.org/jira/browse/FLINK-39268
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Liu
>            Assignee: Sergey Nuyanzin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.4.0
>
>
> h2. Motivation
> Common Subexpression Elimination (CSE) is a classic compiler optimization 
> that avoids redundant evaluation of identical subexpressions by computing 
> each unique expression only once and reusing the result.
> In Flink SQL, when a query references the same expression multiple times 
> across projections and/or filter conditions, the Calc operator currently 
> evaluates it redundantly. For example:
> {code:sql}
> SELECT
>   expensive_udf(a, b) + 1 AS col1,
>   expensive_udf(a, b) + 2 AS col2
> FROM source_table
> WHERE expensive_udf(a, b) > 0
> {code}
> Here {{expensive_udf(a, b)}} is evaluated 3 times per row despite being 
> deterministic with identical inputs. With CSE, it would be evaluated only 
> once.
> This optimization is widely adopted in other query engines (e.g., Spark's 
> {{{}spark.sql.subexpressionElimination.enabled{}}}). 
> h2. Proposed Approach
> Perform CSE at the codegen level within {{{}CalcCodeGenerator{}}},:
> The implementation introduces a {{ReusableSubExprs}} class that builds a 
> *scope tree* from the expression tree to identify safely reusable 
> subexpressions, respecting SQL's short-circuit evaluation semantics:
>  - {*}AND/OR{*}: The leftmost operand is pulled up to the parent scope 
> (always evaluated); subsequent operands form child scopes (may be 
> short-circuited).
>  - {*}CASE/IF/COALESCE{*}: The condition is pulled up; result branches form 
> independent child scopes.
>  - {*}Other operators{*}: All operands are pulled up to the parent scope.
> An expression is reusable if: (1) it is deterministic and non-dynamic, and 
> (2) an equivalent expression (matched by {{RexNode.toString()}} + 
> {{{}RelDataType.getFullTypeString(){}}}) already exists in the same or an 
> ancestor scope.
> During codegen, {{ExprCodeGenerator.visitCall()}} checks the reuse map before 
> generating code. If a match exists, it returns a {{GeneratedExpression}} 
> referencing the existing result variable (with empty code). Otherwise, if the 
> expression is marked reusable, it registers the generated result for 
> subsequent references.
> h3. Design Decisions
>  * {*}Scope{*}: This initial implementation targets the Calc operator only 
> (both batch and streaming). Extension to other operators (e.g., Join, 
> Aggregate) can be addressed in follow-up work.
>  * {*}UDF support{*}: CSE applies uniformly to built-in functions and 
> user-defined functions. The correctness relies on the 
> {{FunctionDefinition.isDeterministic()}} contract — the same contract already 
> used by Flink's existing constant folding optimization. UDFs with side 
> effects must override {{isDeterministic()}} to return false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to