This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ac8bec9e572 [FLINK-15973][python] Optimize the execution plan where 
the same Python UDF is referred in multiple places (#27203)
ac8bec9e572 is described below

commit ac8bec9e572c36e4031e84bc4237fa32ab3be761
Author: Dian Fu <[email protected]>
AuthorDate: Fri Nov 7 19:08:15 2025 +0800

    [FLINK-15973][python] Optimize the execution plan where the same Python UDF 
is referred in multiple places (#27203)
---
 .../plan/rules/logical/RemoteCalcSplitRule.scala   | 17 +++++++--
 .../plan/rules/logical/AsyncCalcSplitRuleTest.xml  | 41 ++++++++++------------
 .../plan/rules/logical/PythonCalcSplitRuleTest.xml | 19 ++++++++++
 .../rules/logical/PythonCalcSplitRuleTest.scala    |  6 ++++
 4 files changed, 58 insertions(+), 25 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
index 505428a0f74..ff60809cea6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
@@ -72,9 +72,22 @@ abstract class RemoteCalcSplitRuleBase[T](
       callFinder)
 
     val splitComponents = split(program, splitter)
+
+    val topCalcProjects = splitComponents.topCalcProjects.map {
+      node: RexNode =>
+        {
+          val idx = extractedRexNodes.indexOf(node)
+          if (idx >= 0) {
+            new RexInputRef(extractedFunctionOffset + idx, node.getType)
+          } else {
+            node
+          }
+        }
+    }
+
     val accessedFields =
       extractRefInputFields(
-        splitComponents.topCalcProjects,
+        topCalcProjects,
         splitComponents.topCalcCondition,
         extractedFunctionOffset)
 
@@ -107,7 +120,7 @@ abstract class RemoteCalcSplitRuleBase[T](
       bottomCalc,
       RexProgram.create(
         bottomCalc.getRowType,
-        splitComponents.topCalcProjects.map(_.accept(inputRewriter)),
+        topCalcProjects.map(_.accept(inputRewriter)),
         splitComponents.topCalcCondition.map(_.accept(inputRewriter)).orNull,
         calc.getRowType,
         rexBuilder
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
index 01fd5538c04..fa4f17124ed 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
@@ -104,11 +104,10 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func2($0)], 
EXPR$2=[func1($0)], EXPR$
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-AsyncCalc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f01 AS EXPR$2, func2(a) AS 
EXPR$3])
-+- AsyncCalc(select=[f0, f00, a, func1(a) AS f01])
-   +- AsyncCalc(select=[f0, a, func2(a) AS f00])
-      +- AsyncCalc(select=[a, func1(a) AS f0])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 AS EXPR$3])
++- AsyncCalc(select=[f0, func2(a) AS f00])
+   +- AsyncCalc(select=[a, func1(a) AS f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
 ]]>
     </Resource>
   </TestCase>
@@ -132,10 +131,9 @@ LogicalProject(a=[$0])
 Calc(select=[a])
 +- Join(joinType=[InnerJoin], where=[=($f4, $f40)], select=[a, $f4, $f40], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[hash[$f4]])
-   :  +- AsyncCalc(select=[a, func2(a) AS $f4])
-   :     +- Calc(select=[a], where=[REGEXP(f0, 'string (2|4)')])
-   :        +- AsyncCalc(select=[a, func2(a) AS f0])
-   :           +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :  +- Calc(select=[a, f0 AS $f4], where=[REGEXP(f0, 'string (2|4)')])
+   :     +- AsyncCalc(select=[a, func2(a) AS f0])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
    +- Exchange(distribution=[hash[$f4]])
       +- AsyncCalc(select=[func2(a2) AS $f4])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -513,10 +511,9 @@ LogicalProject(blah=[$0])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-AsyncCalc(select=[func2(a) AS blah])
-+- Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
-   +- AsyncCalc(select=[a, func2(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string (2|3)')])
++- AsyncCalc(select=[func2(a) AS f0])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
 ]]>
     </Resource>
   </TestCase>
@@ -704,10 +701,9 @@ LogicalProject(EXPR$0=[func1(func1($0))], 
EXPR$1=[func1(func1(func1($0)))], EXPR
 AsyncCalc(select=[f0 AS EXPR$0, func1(f1) AS EXPR$1, f2 AS EXPR$2])
 +- AsyncCalc(select=[f2, f0, func1(f1) AS f1])
    +- AsyncCalc(select=[f2, f1, func1(f0) AS f0])
-      +- AsyncCalc(select=[f0, f00 AS f1, func1(a) AS f2])
-         +- AsyncCalc(select=[f0, a, func1(a) AS f00])
-            +- AsyncCalc(select=[a, func1(a) AS f0])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      +- Calc(select=[f0, f0 AS f1, f0 AS f2])
+         +- AsyncCalc(select=[func1(a) AS f0])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
 ]]>
     </Resource>
   </TestCase>
@@ -723,8 +719,8 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-AsyncCalc(select=[f0 AS EXPR$0, func1(a) AS EXPR$1])
-+- AsyncCalc(select=[a, func1(a) AS f0])
+Calc(select=[f0 AS EXPR$0, f0 AS EXPR$1])
++- AsyncCalc(select=[func1(a) AS f0])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
 ]]>
     </Resource>
@@ -761,10 +757,9 @@ LogicalProject(EXPR$0=[func2($0)])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-AsyncCalc(select=[func2(a) AS EXPR$0])
-+- Calc(select=[a], where=[REGEXP(f0, 'val (2|3)')])
-   +- AsyncCalc(select=[a, func2(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[f0 AS EXPR$0], where=[REGEXP(f0, 'val (2|3)')])
++- AsyncCalc(select=[func2(a) AS f0])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
index a26065536d2..14a1873bf51 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.xml
@@ -520,6 +520,25 @@ LogicalProject(a=[$0], EXPR$1=[pyFunc1($0, $2)], b=[$1])
 FlinkLogicalCalc(select=[a, f0 AS EXPR$1, b])
 +- FlinkLogicalCalc(select=[a, b, pyFunc1(a, c) AS f0])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSamePythonFunctionUsedInBothSelectAndWhere">
+    <Resource name="sql">
+      <![CDATA[SELECT a, pyFunc1(a, c) FROM MyTable where pyFunc1(a, c) > 0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], EXPR$1=[pyFunc1($0, $2)])
++- LogicalFilter(condition=[>(pyFunc1($0, $2), 0)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalCalc(select=[a, f0 AS EXPR$1], where=[>(f0, 0)])
++- FlinkLogicalCalc(select=[a, pyFunc1(a, c) AS f0])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
index 45aae6c812b..c4f989d2443 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonCalcSplitRuleTest.scala
@@ -228,4 +228,10 @@ class PythonCalcSplitRuleTest extends TableTestBase {
     val sqlQuery = "SELECT a + 1 FROM MyTable where 
RowJavaFunc(pyFunc5(a).f0).f0 is NULL and b > 0"
     util.verifyRelPlan(sqlQuery)
   }
+
+  @Test
+  def testSamePythonFunctionUsedInBothSelectAndWhere(): Unit = {
+    val sqlQuery = "SELECT a, pyFunc1(a, c) FROM MyTable where pyFunc1(a, c) > 
0"
+    util.verifyRelPlan(sqlQuery)
+  }
 }

Reply via email to