This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new e666847b119 [FLINK-38709][table][python] Fix ScalarFunctionSplitter to 
allow PythonFunction & AsyncFunction work when taking the recursive field of 
composite type as input
e666847b119 is described below

commit e666847b119b3fab693d636cc2078b23706cda7c
Author: Dian Fu <[email protected]>
AuthorDate: Thu Nov 20 09:45:19 2025 +0800

    [FLINK-38709][table][python] Fix ScalarFunctionSplitter to allow 
PythonFunction & AsyncFunction work when taking the recursive field of 
composite type as input
    
    This closes #27259.
---
 .../plan/rules/logical/RemoteCalcSplitRule.scala   |  15 +-
 .../plan/rules/logical/AsyncCalcSplitRuleTest.java |   9 +-
 .../rules/logical/AsyncCorrelateSplitRuleTest.java |   9 +-
 .../plan/rules/logical/AsyncCalcSplitRuleTest.xml  | 166 ++++++++++++---------
 .../rules/logical/AsyncCorrelateSplitRuleTest.xml  |  83 +++++++----
 5 files changed, 174 insertions(+), 108 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
index ff60809cea6..26baa44f43d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
@@ -434,6 +434,8 @@ class ScalarFunctionSplitter(
 
   private var fieldsRexCall: Map[Int, Int] = Map[Int, Int]()
 
+  private val extractedRexNodeRefs: mutable.HashSet[RexNode] = 
mutable.HashSet[RexNode]()
+
   override def visitCall(call: RexCall): RexNode = {
     if (needConvert(call)) {
       getExtractedRexNode(call)
@@ -454,7 +456,9 @@ class ScalarFunctionSplitter(
           new RexInputRef(field.getIndex, field.getType)
         case _ =>
           val newFieldAccess =
-            rexBuilder.makeFieldAccess(expr.accept(this), 
fieldAccess.getField.getIndex)
+            rexBuilder.makeFieldAccess(
+              convertInputRefToLocalRefIfNecessary(expr.accept(this)),
+              fieldAccess.getField.getIndex)
           getExtractedRexNode(newFieldAccess)
       }
     } else {
@@ -468,9 +472,18 @@ class ScalarFunctionSplitter(
 
   override def visitNode(rexNode: RexNode): RexNode = rexNode
 
+  private def convertInputRefToLocalRefIfNecessary(node: RexNode): RexNode = {
+    node match {
+      case inputRef: RexInputRef if extractedRexNodeRefs.contains(node) =>
+        new RexLocalRef(inputRef.getIndex, node.getType)
+      case _ => node
+    }
+  }
+
   private def getExtractedRexNode(node: RexNode): RexNode = {
     val newNode = new RexInputRef(extractedFunctionOffset + 
extractedRexNodes.length, node.getType)
     extractedRexNodes.append(node)
+    extractedRexNodeRefs.add(newNode)
     newNode
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
index 444c17fee7e..53ba394f5f0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
@@ -62,7 +62,8 @@ public class AsyncCalcSplitRuleTest extends TableTestBase {
                         + "  a int,\n"
                         + "  b bigint,\n"
                         + "  c string,\n"
-                        + "  d ARRAY<INT NOT NULL>\n"
+                        + "  d ARRAY<INT NOT NULL>,\n"
+                        + "  e ROW<f ROW<h int, i double>, g string>"
                         + ") WITH (\n"
                         + "  'connector' = 'test-simple-table-source'\n"
                         + ") ;");
@@ -182,6 +183,12 @@ public class AsyncCalcSplitRuleTest extends TableTestBase {
         util.verifyRelPlan(sqlQuery);
     }
 
+    @Test
+    public void testCompositeFieldAsInput() {
+        String sqlQuery = "SELECT func1(e.f.h) from MyTable";
+        util.verifyRelPlan(sqlQuery);
+    }
+
     @Test
     public void testFieldOperand() {
         String sqlQuery = "SELECT func1(func5(a).f0) from MyTable";
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
index b98403a2846..862aedb421b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
@@ -62,7 +62,8 @@ public class AsyncCorrelateSplitRuleTest extends 
TableTestBase {
                         + "  a int,\n"
                         + "  b bigint,\n"
                         + "  c string,\n"
-                        + "  d ARRAY<INT NOT NULL>\n"
+                        + "  d ARRAY<INT NOT NULL>,\n"
+                        + "  e ROW<f ROW<h int, i double>, g string>\n"
                         + ") WITH (\n"
                         + "  'connector' = 'test-simple-table-source'\n"
                         + ") ;");
@@ -110,6 +111,12 @@ public class AsyncCorrelateSplitRuleTest extends 
TableTestBase {
         util.verifyRelPlan(sqlQuery);
     }
 
+    @Test
+    public void testCorrelateWithCompositeFieldAsInput() {
+        String sqlQuery = "select * FROM MyTable, LATERAL 
TABLE(asyncTableFunc(e.f.h))";
+        util.verifyRelPlan(sqlQuery);
+    }
+
     /** Test function. */
     public static class AsyncFunc extends AsyncTableFunction<String> {
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
index fa4f17124ed..726b02847c8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
@@ -34,7 +34,25 @@ AsyncCalc(select=[a, func3($f1) AS EXPR$1])
 +- GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS $f1])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCompositeFieldAsInput">
+    <Resource name="sql">
+      <![CDATA[SELECT func1(e.f.h) from MyTable]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[func1($4.f.h)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+AsyncCalc(select=[func1(f1) AS EXPR$0])
++- Calc(select=[e.f AS f0, e.f.h AS f1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -52,7 +70,7 @@ LogicalProject(EXPR$0=[func5($0).f0])
       <![CDATA[
 Calc(select=[f0.f0 AS EXPR$0])
 +- AsyncCalc(select=[func5(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -71,7 +89,7 @@ LogicalProject(EXPR$0=[func1(func5($0).f0)])
 AsyncCalc(select=[func1(f0) AS EXPR$0])
 +- Calc(select=[f0.f0 AS f0])
    +- AsyncCalc(select=[func5(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -88,7 +106,7 @@ LogicalProject(a=[$0], EXPR$1=[func1($0)])
     <Resource name="optimized rel plan">
       <![CDATA[
 AsyncCalc(select=[a, func1(a) AS EXPR$1])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -107,7 +125,7 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func2($0)], 
EXPR$2=[func1($0)], EXPR$
 Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 AS EXPR$3])
 +- AsyncCalc(select=[f0, func2(a) AS f00])
    +- AsyncCalc(select=[a, func1(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -118,9 +136,9 @@ Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 
AS EXPR$3])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], a2=[$5], b2=[$6], c2=[$7], 
d2=[$8])
-   +- LogicalJoin(condition=[AND(=($4, $9), REGEXP(func2($0), _UTF-16LE'string 
(2|4)'))], joinType=[inner])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], $f4=[func2($0)])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a2=[$6], b2=[$7], 
c2=[$8], d2=[$9])
+   +- LogicalJoin(condition=[AND(=($5, $10), REGEXP(func2($0), 
_UTF-16LE'string (2|4)'))], joinType=[inner])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
$f5=[func2($0)])
       :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(a2=[$0], b2=[$1], c2=[$2], d2=[$3], $f4=[func2($0)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
@@ -129,11 +147,11 @@ LogicalProject(a=[$0])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a])
-+- Join(joinType=[InnerJoin], where=[=($f4, $f40)], select=[a, $f4, $f40], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[$f4]])
-   :  +- Calc(select=[a, f0 AS $f4], where=[REGEXP(f0, 'string (2|4)')])
++- Join(joinType=[InnerJoin], where=[=($f5, $f4)], select=[a, $f5, $f4], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[$f5]])
+   :  +- Calc(select=[a, f0 AS $f5], where=[REGEXP(f0, 'string (2|4)')])
    :     +- AsyncCalc(select=[a, func2(a) AS f0])
-   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[$f4]])
       +- AsyncCalc(select=[func2(a2) AS $f4])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -147,7 +165,7 @@ Calc(select=[a])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalJoin(condition=[AND(=($0, $4), >(func6($0, $4), 10))], 
joinType=[inner])
++- LogicalJoin(condition=[AND(=($0, $5), >(func6($0, $5), 10))], 
joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
@@ -159,7 +177,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -174,7 +192,7 @@ Calc(select=[a], where=[>(f0, 10)])
       <![CDATA[
 LogicalProject(a=[$0])
 +- LogicalFilter(condition=[REGEXP(func2($0), _UTF-16LE'val (2|3)')])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -186,7 +204,7 @@ Calc(select=[a])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a], where=[REGEXP(f0, 'val (2|3)')])
    :     +- AsyncCalc(select=[a, func2(a) AS f0])
-   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[a2]])
       +- Calc(select=[a2], where=[REGEXP(f0, 'val (2|3)')])
          +- AsyncCalc(select=[a2, func2(a2) AS f0])
@@ -201,8 +219,8 @@ Calc(select=[a])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func6($0, $4), 10)])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
++- LogicalFilter(condition=[>(func6($0, $5), 10)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -214,7 +232,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -228,7 +246,7 @@ Calc(select=[a], where=[>(f0, 10)])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(EXPR$0=[func1($0)])
-+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
@@ -239,7 +257,7 @@ AsyncCalc(select=[func1(a) AS EXPR$0])
 +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[a2]])
       +- Calc(select=[a2])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -270,8 +288,8 @@ AsyncCalc(select=[func1(1) AS EXPR$0])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalFilter(condition=[AND(=($0, $4), >(func6($0, $4), 10))])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
++- LogicalFilter(condition=[AND(=($0, $5), >(func6($0, $5), 10))])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -283,7 +301,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -296,10 +314,10 @@ Calc(select=[a], where=[>(f0, 10)])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], a2=[$4])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], a2=[$5], b2=[$6], c2=[$7], 
d2=[$8])
-   +- LogicalJoin(condition=[=($4, $9)], joinType=[left])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], $f4=[func1($0)])
+LogicalProject(a=[$0], a2=[$5])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a2=[$6], b2=[$7], 
c2=[$8], d2=[$9])
+   +- LogicalJoin(condition=[=($5, $10)], joinType=[left])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
$f5=[func1($0)])
       :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(a2=[$0], b2=[$1], c2=[$2], d2=[$3], $f4=[func1($0)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
@@ -308,10 +326,10 @@ LogicalProject(a=[$0], a2=[$4])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, a2])
-+- Join(joinType=[LeftOuterJoin], where=[=($f4, $f40)], select=[a, $f4, a2, 
$f40], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[$f4]])
-   :  +- AsyncCalc(select=[a, func1(a) AS $f4])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
++- Join(joinType=[LeftOuterJoin], where=[=($f5, $f4)], select=[a, $f5, a2, 
$f4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[$f5]])
+   :  +- AsyncCalc(select=[a, func1(a) AS $f5])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[$f4]])
       +- AsyncCalc(select=[a2, func1(a2) AS $f4])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -324,9 +342,9 @@ Calc(select=[a, a2])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], a2=[$4])
+LogicalProject(a=[$0], a2=[$5])
 +- LogicalFilter(condition=[REGEXP(func2($0), _UTF-16LE'string (2|3)')])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -337,7 +355,7 @@ Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, 
a2], leftInputSpec=[
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
 :     +- AsyncCalc(select=[a, func2(a) AS f0])
-:        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+:        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
 +- Exchange(distribution=[hash[a2]])
    +- Calc(select=[a2])
       +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -351,8 +369,8 @@ Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, 
a2], leftInputSpec=[
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func6($0, $4), 10)])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
++- LogicalFilter(condition=[>(func6($0, $5), 10)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -364,7 +382,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -378,7 +396,7 @@ Calc(select=[a], where=[>(f0, 10)])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalJoin(condition=[AND(=($0, $4), >(func1($4), 10))], joinType=[left])
++- LogicalJoin(condition=[AND(=($0, $5), >(func1($5), 10))], joinType=[left])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
@@ -389,7 +407,7 @@ Calc(select=[a])
 +- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[a2]])
       +- Calc(select=[a2], where=[>(f0, 10)])
          +- AsyncCalc(select=[a2, func1(a2) AS f0])
@@ -404,8 +422,8 @@ Calc(select=[a])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func1($4), 10)])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
++- LogicalFilter(condition=[>(func1($5), 10)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -417,7 +435,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -438,7 +456,7 @@ LogicalProject(EXPR$0=[_UTF-16LE'foo'], EXPR$1=[func1($0)])
       <![CDATA[
 Calc(select=['foo' AS EXPR$0, f0 AS EXPR$1])
 +- AsyncCalc(select=[func1(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -457,7 +475,7 @@ LogicalProject(EXPR$0=[func1(func1(func1($0)))])
 AsyncCalc(select=[func1(f0) AS EXPR$0])
 +- AsyncCalc(select=[func1(f0) AS f0])
    +- AsyncCalc(select=[func1(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -493,7 +511,7 @@ LogicalProject(EXPR$0=[CONCAT(func2($0), _UTF-16LE'foo')])
       <![CDATA[
 Calc(select=[CONCAT(f0, 'foo') AS EXPR$0])
 +- AsyncCalc(select=[func2(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -513,7 +531,7 @@ LogicalProject(blah=[$0])
       <![CDATA[
 Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string (2|3)')])
 +- AsyncCalc(select=[func2(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -524,8 +542,8 @@ Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string 
(2|3)')])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalFilter(condition=[AND(=($0, $4), >(func6($0, $4), 10))])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[right])
++- LogicalFilter(condition=[AND(=($0, $5), >(func6($0, $5), 10))])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -537,7 +555,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -550,10 +568,10 @@ Calc(select=[a], where=[>(f0, 10)])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], a2=[$4])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], a2=[$5], b2=[$6], c2=[$7], 
d2=[$8])
-   +- LogicalJoin(condition=[=($4, $9)], joinType=[right])
-      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], $f4=[func1($0)])
+LogicalProject(a=[$0], a2=[$5])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a2=[$6], b2=[$7], 
c2=[$8], d2=[$9])
+   +- LogicalJoin(condition=[=($5, $10)], joinType=[right])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], 
$f5=[func1($0)])
       :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
       +- LogicalProject(a2=[$0], b2=[$1], c2=[$2], d2=[$3], $f4=[func1($0)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
@@ -562,10 +580,10 @@ LogicalProject(a=[$0], a2=[$4])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, a2])
-+- Join(joinType=[RightOuterJoin], where=[=($f4, $f40)], select=[a, $f4, a2, 
$f40], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[$f4]])
-   :  +- AsyncCalc(select=[a, func1(a) AS $f4])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
++- Join(joinType=[RightOuterJoin], where=[=($f5, $f4)], select=[a, $f5, a2, 
$f4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[$f5]])
+   :  +- AsyncCalc(select=[a, func1(a) AS $f5])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[$f4]])
       +- AsyncCalc(select=[a2, func1(a2) AS $f4])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -579,8 +597,8 @@ Calc(select=[a, a2])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func6($0, $4), 10)])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[right])
++- LogicalFilter(condition=[>(func6($0, $5), 10)])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -592,7 +610,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[RightOuterJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -606,7 +624,7 @@ Calc(select=[a], where=[>(f0, 10)])
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalJoin(condition=[AND(=($0, $4), >(func1($0), 10))], joinType=[right])
++- LogicalJoin(condition=[AND(=($0, $5), >(func1($0), 10))], joinType=[right])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
@@ -618,7 +636,7 @@ Calc(select=[a])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a], where=[>(f0, 10)])
    :     +- AsyncCalc(select=[a, func1(a) AS f0])
-   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[hash[a2]])
       +- Calc(select=[a2])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -633,7 +651,7 @@ Calc(select=[a])
       <![CDATA[
 LogicalProject(a=[$0])
 +- LogicalFilter(condition=[>(func1($0), 10)])
-   +- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+   +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2]])
 ]]>
@@ -645,7 +663,7 @@ Calc(select=[a], where=[>(f0, 10)])
    +- Join(joinType=[RightOuterJoin], where=[=(a, a2)], select=[a, a2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
       +- Exchange(distribution=[hash[a2]])
          +- Calc(select=[a2])
             +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
@@ -665,7 +683,7 @@ LogicalProject(EXPR$0=[func4($3)])
     <Resource name="optimized rel plan">
       <![CDATA[
 AsyncCalc(select=[func4(d) AS EXPR$0])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -682,7 +700,7 @@ LogicalProject(EXPR$0=[func1($0)])
     <Resource name="optimized rel plan">
       <![CDATA[
 AsyncCalc(select=[func1(a) AS EXPR$0])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -703,7 +721,7 @@ AsyncCalc(select=[f0 AS EXPR$0, func1(f1) AS EXPR$1, f2 AS 
EXPR$2])
    +- AsyncCalc(select=[f2, f1, func1(f0) AS f0])
       +- Calc(select=[f0, f0 AS f1, f0 AS f2])
          +- AsyncCalc(select=[func1(a) AS f0])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -721,7 +739,7 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)])
       <![CDATA[
 Calc(select=[f0 AS EXPR$0, f0 AS EXPR$1])
 +- AsyncCalc(select=[func1(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -740,7 +758,7 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
 +- AsyncCalc(select=[a, func2(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -759,7 +777,7 @@ LogicalProject(EXPR$0=[func2($0)])
       <![CDATA[
 Calc(select=[f0 AS EXPR$0], where=[REGEXP(f0, 'val (2|3)')])
 +- AsyncCalc(select=[func2(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -779,7 +797,7 @@ LogicalProject(blah=[$0])
       <![CDATA[
 Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
 +- AsyncCalc(select=[a, func2(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -798,7 +816,7 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a], where=[>=(f0, 12)])
 +- AsyncCalc(select=[a, func1(a) AS f0])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -822,7 +840,7 @@ AsyncCalc(select=[func1(a) AS EXPR$0])
 +- Join(joinType=[LeftAntiJoin], where=[OR(IS NULL(a), IS NULL(a2), =(a, 
a2))], select=[a], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    :- Exchange(distribution=[single])
    :  +- Calc(select=[a])
-   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
    +- Exchange(distribution=[single])
       +- Calc(select=[a2])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2]], fields=[a2, b2, c2, d2])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
index dcbe743d2c2..2cdbfbaf162 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
@@ -22,7 +22,7 @@ limitations under the License.
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableFunctionScan(invocation=[tableFunc(func1($cor0.a))], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -30,10 +30,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- Correlate(invocation=[tableFunc($4)], correlate=[table(tableFunc(f0))], 
select=[a,b,c,d,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, INTEGER ARRAY d, INTEGER f0, VARCHAR(2147483647) 
EXPR$0)], joinType=[INNER])
-   +- AsyncCalc(select=[a, b, c, d, func1(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- Correlate(invocation=[tableFunc($5)], correlate=[table(tableFunc(f0))], 
select=[a,b,c,d,e,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, INTEGER ARRAY d, 
RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
+   +- AsyncCalc(select=[a, b, c, d, e, func1(a) AS f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -43,7 +43,7 @@ Calc(select=[a, b, c, d, EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableFunctionScan(invocation=[tableFunc(ABS(func1($cor0.a)))], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -51,10 +51,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- Correlate(invocation=[tableFunc(ABS($4))], 
correlate=[table(tableFunc(ABS(f0)))], select=[a,b,c,d,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
-   +- AsyncCalc(select=[a, b, c, d, func1(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- Correlate(invocation=[tableFunc(ABS($5))], 
correlate=[table(tableFunc(ABS(f0)))], select=[a,b,c,d,e,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
+   +- AsyncCalc(select=[a, b, c, d, e, func1(a) AS f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -64,7 +64,7 @@ Calc(select=[a, b, c, d, EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableFunctionScan(invocation=[tableFunc(func1(ABS($cor0.a)))], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -72,11 +72,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- Correlate(invocation=[tableFunc($4)], correlate=[table(tableFunc(f0))], 
select=[a,b,c,d,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, INTEGER ARRAY d, INTEGER f0, VARCHAR(2147483647) 
EXPR$0)], joinType=[INNER])
-   +- AsyncCalc(select=[a, b, c, d, func1(f0) AS f0])
-      +- Calc(select=[a, b, c, d, ABS(a) AS f0])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- Correlate(invocation=[tableFunc($5)], correlate=[table(tableFunc(f0))], 
select=[a,b,c,d,e,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, INTEGER ARRAY d, 
RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
+   +- AsyncCalc(select=[a, b, c, d, e, func1(f0) AS f0])
+      +- Calc(select=[a, b, c, d, e, ABS(a) AS f0])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -86,7 +86,7 @@ Calc(select=[a, b, c, d, EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- 
LogicalTableFunctionScan(invocation=[asyncTableFunc(CAST(CAST($cor0.a):INTEGER):INTEGER)],
 rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -94,10 +94,31 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- AsyncCorrelate(invocation=[asyncTableFunc($4)], 
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
-   +- Calc(select=[a, b, c, d, a AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($5)], 
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,e,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
+   +- Calc(select=[a, b, c, d, e, a AS f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCorrelateWithCompositeFieldAsInput">
+    <Resource name="sql">
+      <![CDATA[select * FROM MyTable, LATERAL TABLE(asyncTableFunc(e.f.h))]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{4}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalTableFunctionScan(invocation=[asyncTableFunc($cor0.e.f.h)], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($6)], 
correlate=[table(asyncTableFunc(f1))], select=[a,b,c,d,e,f0,f1,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, RecordType:peek_no_expand(INTEGER h, DOUBLE i) f0, 
INTEGER f1, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+   +- Calc(select=[a, b, c, d, e, e.f AS f0, e.f.h AS f1])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -107,7 +128,7 @@ Calc(select=[a, b, c, d, EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableFunctionScan(invocation=[asyncTableFunc(scalar($cor0.a))], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -115,10 +136,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
EXPR$0=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- AsyncCorrelate(invocation=[asyncTableFunc($4)], 
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
-   +- Calc(select=[a, b, c, d, scalar(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($5)], 
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,e,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
+   +- Calc(select=[a, b, c, d, e, scalar(a) AS f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
@@ -128,7 +149,7 @@ Calc(select=[a, b, c, d, EXPR$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableFunctionScan(invocation=[asyncTableFunc(ABS($cor0.a))], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -136,10 +157,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
EXPR$0=[$4])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- AsyncCorrelate(invocation=[asyncTableFunc($4)], 
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
-   +- Calc(select=[a, b, c, d, ABS(a) AS f0])
-      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($5)], 
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,e,f0,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY 
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f, 
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)], 
joinType=[INNER])
+   +- Calc(select=[a, b, c, d, e, ABS(a) AS f0])
+      +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>


Reply via email to