This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ac8bec9e572 [FLINK-15973][python] Optimize the execution plan where
the same Python UDF is referred in multiple places (#27203)
ac8bec9e572 is described below
commit ac8bec9e572c36e4031e84bc4237fa32ab3be761
Author: Dian Fu <[email protected]>
AuthorDate: Fri Nov 7 19:08:15 2025 +0800
[FLINK-15973][python] Optimize the execution plan where the same Python UDF
is referred in multiple places (#27203)
---
.../plan/rules/logical/RemoteCalcSplitRule.scala | 17 +++++++--
.../plan/rules/logical/AsyncCalcSplitRuleTest.xml | 41 ++++++++++------------
.../plan/rules/logical/PythonCalcSplitRuleTest.xml | 19 ++++++++++
.../rules/logical/PythonCalcSplitRuleTest.scala | 6 ++++
4 files changed, 58 insertions(+), 25 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
index 505428a0f74..ff60809cea6 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
@@ -72,9 +72,22 @@ abstract class RemoteCalcSplitRuleBase[T](
callFinder)
val splitComponents = split(program, splitter)
+
+ val topCalcProjects = splitComponents.topCalcProjects.map {
+ node: RexNode =>
+ {
+ val idx = extractedRexNodes.indexOf(node)
+ if (idx >= 0) {
+ new RexInputRef(extractedFunctionOffset + idx, node.getType)
+ } else {
+ node
+ }
+ }
+ }
+
val accessedFields =
extractRefInputFields(
- splitComponents.topCalcProjects,
+ topCalcProjects,
splitComponents.topCalcCondition,
extractedFunctionOffset)
@@ -107,7 +120,7 @@ abstract class RemoteCalcSplitRuleBase[T](
bottomCalc,
RexProgram.create(
bottomCalc.getRowType,
- splitComponents.topCalcProjects.map(_.accept(inputRewriter)),
+ topCalcProjects.map(_.accept(inputRewriter)),
splitComponents.topCalcCondition.map(_.accept(inputRewriter)).orNull,
calc.getRowType,
rexBuilder
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
index 01fd5538c04..fa4f17124ed 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
@@ -104,11 +104,10 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func2($0)],
EXPR$2=[func1($0)], EXPR$
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-AsyncCalc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f01 AS EXPR$2, func2(a) AS
EXPR$3])
-+- AsyncCalc(select=[f0, f00, a, func1(a) AS f01])
- +- AsyncCalc(select=[f0, a, func2(a) AS f00])
- +- AsyncCalc(select=[a, func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 AS EXPR$3])
++- AsyncCalc(select=[f0, func2(a) AS f00])
+ +- AsyncCalc(select=[a, func1(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
@@ -132,10 +131,9 @@ LogicalProject(a=[$0])
Calc(select=[a])
+- Join(joinType=[InnerJoin], where=[=($f4, $f40)], select=[a, $f4, $f40],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[$f4]])
- : +- AsyncCalc(select=[a, func2(a) AS $f4])
- : +- Calc(select=[a], where=[REGEXP(f0, 'string (2|4)')])
- : +- AsyncCalc(select=[a, func2(a) AS f0])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- Calc(select=[a, f0 AS $f4], where=[REGEXP(f0, 'string (2|4)')])
+ : +- AsyncCalc(select=[a, func2(a) AS f0])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+- Exchange(distribution=[hash[$f4]])
+- AsyncCalc(select=[func2(a2) AS $f4])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -513,10 +511,9 @@ LogicalProject(blah=[$0])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-AsyncCalc(select=[func2(a) AS blah])
-+- Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
- +- AsyncCalc(select=[a, func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string (2|3)')])
++- AsyncCalc(select=[func2(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
@@ -704,10 +701,9 @@ LogicalProject(EXPR$0=[func1(func1($0))],
EXPR$1=[func1(func1(func1($0)))], EXPR
AsyncCalc(select=[f0 AS EXPR$0, func1(f1) AS EXPR$1, f2 AS EXPR$2])
+- AsyncCalc(select=[f2, f0, func1(f1) AS f1])
+- AsyncCalc(select=[f2, f1, func1(f0) AS f0])
- +- AsyncCalc(select=[f0, f00 AS f1, func1(a) AS f2])
- +- AsyncCalc(select=[f0, a, func1(a) AS f00])
- +- AsyncCalc(select=[a, func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ +- Calc(select=[f0, f0 AS f1, f0 AS f2])
+ +- AsyncCalc(select=[func1(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
@@ -723,8 +719,8 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-AsyncCalc(select=[f0 AS EXPR$0, func1(a) AS EXPR$1])
-+- AsyncCalc(select=[a, func1(a) AS f0])
+Calc(select=[f0 AS EXPR$0, f0 AS EXPR$1])
++- AsyncCalc(select=[func1(a) AS f0])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
]]>
</Resource>
@@ -761,10 +757,9 @@ LogicalProject(EXPR$0=[func2($0)])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-AsyncCalc(select=[func2(a) AS EXPR$0])
-+- Calc(select=[a], where=[REGEXP(f0, 'val (2|3)')])
- +- AsyncCalc(select=[a, func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[f0 AS EXPR$0], where=[REGEXP(f0, 'val (2|3)')])
++- AsyncCalc(select=[func2(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
index a26065536d2..14a1873bf51 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
@@ -520,6 +520,25 @@ LogicalProject(a=[$0], EXPR$1=[pyFunc1($0, $2)], b=[$1])
FlinkLogicalCalc(select=[a, f0 AS EXPR$1, b])
+- FlinkLogicalCalc(select=[a, b, pyFunc1(a, c) AS f0])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSamePythonFunctionUsedInBothSelectAndWhere">
+ <Resource name="sql">
+ <![CDATA[SELECT a, pyFunc1(a, c) FROM MyTable where pyFunc1(a, c) > 0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[pyFunc1($0, $2)])
++- LogicalFilter(condition=[>(pyFunc1($0, $2), 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, f0 AS EXPR$1], where=[>(f0, 0)])
++- FlinkLogicalCalc(select=[a, pyFunc1(a, c) AS f0])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
index 45aae6c812b..c4f989d2443 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
@@ -228,4 +228,10 @@ class PythonCalcSplitRuleTest extends TableTestBase {
val sqlQuery = "SELECT a + 1 FROM MyTable where
RowJavaFunc(pyFunc5(a).f0).f0 is NULL and b > 0"
util.verifyRelPlan(sqlQuery)
}
+
+ @Test
+ def testSamePythonFunctionUsedInBothSelectAndWhere(): Unit = {
+ val sqlQuery = "SELECT a, pyFunc1(a, c) FROM MyTable where pyFunc1(a, c) >
0"
+ util.verifyRelPlan(sqlQuery)
+ }
}