[ 
https://issues.apache.org/jira/browse/FLINK-39986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

raoraoxiong updated FLINK-39986:
--------------------------------
    Description: 
h2. Motivation

FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala 
UDFs by leveraging Calcite's built-in RexProgram normalization. However, this 
optimization {*}does not apply to Python UDFs{*}.

The root cause is that PythonCalcSplitRule breaks the original RexProgram 
structure during the plan rewriting phase — it splits a single Calc into 
multiple Calcs to separate Python and Java function execution. After this 
split, the structural sharing maintained by RexProgram (via RexLocalRef 
deduplication) is lost, and identical Python UDF calls end up being computed 
independently.

Due to the cross-process nature of Python UDF execution (Java ↔ Python via 
gRPC), redundant calls are significantly more expensive than Java UDF 
duplicates:
 # Duplicated serialization/deserialization and cross-process communication 
overhead

 # Redundant Python UDF computation in the Python worker

For example:

 
{code:java}
SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
FROM source_table
WHERE python_udf(a, b) > 0 {code}
 

Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be 
computed only {*}once{*}.
----
h2. Proposed Approach
h3. Phase 1: Intra-operator Projection-level CSE

Deduplicate identical Python UDF calls within the same PythonCalc operator:
 * Introduce PythonCallDeduplicator to identify structurally identical calls in 
the projection

 * Send only unique calls to the Python worker, using refIndex to indicate 
result reuse

 * Add an expansion projection to map deduplicated results back to the original 
output schema

h3. Phase 2: Cross Condition/Projection CSE

When the same Python UDF appears in both WHERE and SELECT:
 * Modify RemoteCalcSplitConditionRule.split() to process both condition and 
projection through the ScalarFunctionSplitter (condition first, then projection)

 * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode(): 
before extracting a new node, check if a structurally equal node was already 
extracted; if so, reuse its reference

----
h2. Scope / Limitations
||Scenario||Supported||
|Same UDF repeated in projection|✅ Phase 1|
|Same UDF in WHERE and SELECT|✅ Phase 2|
|Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ Phase 
1|
|Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
|Python UDTF (Table Functions)|❌ Out of scope|

  was:
h2. Motivation

FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala 
UDFs by leveraging Calcite's built-in RexProgram normalization. However, this 
optimization {*}does not apply to Python UDFs{*}.

The root cause is that PythonCalcSplitRule breaks the original RexProgram 
structure during the plan rewriting phase — it splits a single Calc into 
multiple Calcs to separate Python and Java function execution. After this 
split, the structural sharing maintained by RexProgram (via RexLocalRef 
deduplication) is lost, and identical Python UDF calls end up being computed 
independently.

Due to the cross-process nature of Python UDF execution (Java ↔ Python via 
gRPC), redundant calls are significantly more expensive than Java UDF 
duplicates:
 # Duplicated serialization/deserialization and cross-process communication 
overhead

 # Redundant Python UDF computation in the Python worker

For example:

 

{{SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
FROM source_table
WHERE python_udf(a, b) > 0}}

Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be 
computed only {*}once{*}.
----
h2. Proposed Approach
h3. Phase 1: Intra-operator Projection-level CSE

Deduplicate identical Python UDF calls within the same PythonCalc operator:
 * Introduce PythonCallDeduplicator to identify structurally identical calls in 
the projection

 * Send only unique calls to the Python worker, using refIndex to indicate 
result reuse

 * Add an expansion projection to map deduplicated results back to the original 
output schema

h3. Phase 2: Cross Condition/Projection CSE

When the same Python UDF appears in both WHERE and SELECT:
 * Modify RemoteCalcSplitConditionRule.split() to process both condition and 
projection through the ScalarFunctionSplitter (condition first, then projection)

 * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode(): 
before extracting a new node, check if a structurally equal node was already 
extracted; if so, reuse its reference

----
h2. Scope / Limitations
||Scenario||Supported||
|Same UDF repeated in projection|✅ Phase 1|
|Same UDF in WHERE and SELECT|✅ Phase 2|
|Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ Phase 
1|
|Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
|Python UDTF (Table Functions)|❌ Out of scope|


> Support Common Sub-Expression Elimination (CSE) for Python UDFs
> ---------------------------------------------------------------
>
>                 Key: FLINK-39986
>                 URL: https://issues.apache.org/jira/browse/FLINK-39986
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python, Table SQL / Planner
>            Reporter: raoraoxiong
>            Priority: Major
>
> h2. Motivation
> FLINK-39268 introduced CSE (Common Sub-Expression Elimination) for Java/Scala 
> UDFs by leveraging Calcite's built-in RexProgram normalization. However, this 
> optimization {*}does not apply to Python UDFs{*}.
> The root cause is that PythonCalcSplitRule breaks the original RexProgram 
> structure during the plan rewriting phase — it splits a single Calc into 
> multiple Calcs to separate Python and Java function execution. After this 
> split, the structural sharing maintained by RexProgram (via RexLocalRef 
> deduplication) is lost, and identical Python UDF calls end up being computed 
> independently.
> Due to the cross-process nature of Python UDF execution (Java ↔ Python via 
> gRPC), redundant calls are significantly more expensive than Java UDF 
> duplicates:
>  # Duplicated serialization/deserialization and cross-process communication 
> overhead
>  # Redundant Python UDF computation in the Python worker
> For example:
>  
> {code:java}
> SELECT python_udf(a, b) + 1, python_udf(a, b) + 2
> FROM source_table
> WHERE python_udf(a, b) > 0 {code}
>  
> Currently python_udf(a, b) is computed {*}3 times{*}. With CSE it should be 
> computed only {*}once{*}.
> ----
> h2. Proposed Approach
> h3. Phase 1: Intra-operator Projection-level CSE
> Deduplicate identical Python UDF calls within the same PythonCalc operator:
>  * Introduce PythonCallDeduplicator to identify structurally identical calls 
> in the projection
>  * Send only unique calls to the Python worker, using refIndex to indicate 
> result reuse
>  * Add an expansion projection to map deduplicated results back to the 
> original output schema
> h3. Phase 2: Cross Condition/Projection CSE
> When the same Python UDF appears in both WHERE and SELECT:
>  * Modify RemoteCalcSplitConditionRule.split() to process both condition and 
> projection through the ScalarFunctionSplitter (condition first, then 
> projection)
>  * Add deduplication logic to ScalarFunctionSplitter.getExtractedRexNode(): 
> before extracting a new node, check if a structurally equal node was already 
> extracted; if so, reuse its reference
> ----
> h2. Scope / Limitations
> ||Scenario||Supported||
> |Same UDF repeated in projection|✅ Phase 1|
> |Same UDF in WHERE and SELECT|✅ Phase 2|
> |Nested UDF with shared sub-expression (e.g. udf2(udf1(x)) and udf1(x))|✅ 
> Phase 1|
> |Same UDF across different operators (e.g., Calc + LookupJoin)|❌ Out of scope|
> |Python UDTF (Table Functions)|❌ Out of scope|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to