This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b152cde4062 [FLINK-38709][table][python] Fix ScalarFunctionSplitter to
allow PythonFunction & AsyncFunction work when taking the recursive field of
composite type as input
b152cde4062 is described below
commit b152cde406235113194b1516e3a5d7957bf621c7
Author: Dian Fu <[email protected]>
AuthorDate: Thu Nov 20 09:45:19 2025 +0800
[FLINK-38709][table][python] Fix ScalarFunctionSplitter to allow
PythonFunction & AsyncFunction work when taking the recursive field of
composite type as input
This closes #27259.
---
.../plan/rules/logical/RemoteCalcSplitRule.scala | 15 +-
.../plan/rules/logical/AsyncCalcSplitRuleTest.java | 9 +-
.../rules/logical/AsyncCorrelateSplitRuleTest.java | 9 +-
.../plan/rules/logical/AsyncCalcSplitRuleTest.xml | 166 ++++++++++++---------
.../rules/logical/AsyncCorrelateSplitRuleTest.xml | 83 +++++++----
5 files changed, 174 insertions(+), 108 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
index ff60809cea6..26baa44f43d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/RemoteCalcSplitRule.scala
@@ -434,6 +434,8 @@ class ScalarFunctionSplitter(
private var fieldsRexCall: Map[Int, Int] = Map[Int, Int]()
+ private val extractedRexNodeRefs: mutable.HashSet[RexNode] =
mutable.HashSet[RexNode]()
+
override def visitCall(call: RexCall): RexNode = {
if (needConvert(call)) {
getExtractedRexNode(call)
@@ -454,7 +456,9 @@ class ScalarFunctionSplitter(
new RexInputRef(field.getIndex, field.getType)
case _ =>
val newFieldAccess =
- rexBuilder.makeFieldAccess(expr.accept(this),
fieldAccess.getField.getIndex)
+ rexBuilder.makeFieldAccess(
+ convertInputRefToLocalRefIfNecessary(expr.accept(this)),
+ fieldAccess.getField.getIndex)
getExtractedRexNode(newFieldAccess)
}
} else {
@@ -468,9 +472,18 @@ class ScalarFunctionSplitter(
override def visitNode(rexNode: RexNode): RexNode = rexNode
+ private def convertInputRefToLocalRefIfNecessary(node: RexNode): RexNode = {
+ node match {
+ case inputRef: RexInputRef if extractedRexNodeRefs.contains(node) =>
+ new RexLocalRef(inputRef.getIndex, node.getType)
+ case _ => node
+ }
+ }
+
private def getExtractedRexNode(node: RexNode): RexNode = {
val newNode = new RexInputRef(extractedFunctionOffset +
extractedRexNodes.length, node.getType)
extractedRexNodes.append(node)
+ extractedRexNodeRefs.add(newNode)
newNode
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
index 444c17fee7e..53ba394f5f0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.java
@@ -62,7 +62,8 @@ public class AsyncCalcSplitRuleTest extends TableTestBase {
+ " a int,\n"
+ " b bigint,\n"
+ " c string,\n"
- + " d ARRAY<INT NOT NULL>\n"
+ + " d ARRAY<INT NOT NULL>,\n"
+ + " e ROW<f ROW<h int, i double>, g string>"
+ ") WITH (\n"
+ " 'connector' = 'test-simple-table-source'\n"
+ ") ;");
@@ -182,6 +183,12 @@ public class AsyncCalcSplitRuleTest extends TableTestBase {
util.verifyRelPlan(sqlQuery);
}
+ @Test
+ public void testCompositeFieldAsInput() {
+ String sqlQuery = "SELECT func1(e.f.h) from MyTable";
+ util.verifyRelPlan(sqlQuery);
+ }
+
@Test
public void testFieldOperand() {
String sqlQuery = "SELECT func1(func5(a).f0) from MyTable";
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
index b98403a2846..862aedb421b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.java
@@ -62,7 +62,8 @@ public class AsyncCorrelateSplitRuleTest extends
TableTestBase {
+ " a int,\n"
+ " b bigint,\n"
+ " c string,\n"
- + " d ARRAY<INT NOT NULL>\n"
+ + " d ARRAY<INT NOT NULL>,\n"
+ + " e ROW<f ROW<h int, i double>, g string>\n"
+ ") WITH (\n"
+ " 'connector' = 'test-simple-table-source'\n"
+ ") ;");
@@ -110,6 +111,12 @@ public class AsyncCorrelateSplitRuleTest extends
TableTestBase {
util.verifyRelPlan(sqlQuery);
}
+ @Test
+ public void testCorrelateWithCompositeFieldAsInput() {
+ String sqlQuery = "select * FROM MyTable, LATERAL
TABLE(asyncTableFunc(e.f.h))";
+ util.verifyRelPlan(sqlQuery);
+ }
+
/** Test function. */
public static class AsyncFunc extends AsyncTableFunction<String> {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
index fa4f17124ed..726b02847c8 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCalcSplitRuleTest.xml
@@ -34,7 +34,25 @@ AsyncCalc(select=[a, func3($f1) AS EXPR$1])
+- GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS $f1])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCompositeFieldAsInput">
+ <Resource name="sql">
+ <![CDATA[SELECT func1(e.f.h) from MyTable]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[func1($4.f.h)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+AsyncCalc(select=[func1(f1) AS EXPR$0])
++- Calc(select=[e.f AS f0, e.f.h AS f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -52,7 +70,7 @@ LogicalProject(EXPR$0=[func5($0).f0])
<![CDATA[
Calc(select=[f0.f0 AS EXPR$0])
+- AsyncCalc(select=[func5(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -71,7 +89,7 @@ LogicalProject(EXPR$0=[func1(func5($0).f0)])
AsyncCalc(select=[func1(f0) AS EXPR$0])
+- Calc(select=[f0.f0 AS f0])
+- AsyncCalc(select=[func5(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -88,7 +106,7 @@ LogicalProject(a=[$0], EXPR$1=[func1($0)])
<Resource name="optimized rel plan">
<![CDATA[
AsyncCalc(select=[a, func1(a) AS EXPR$1])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -107,7 +125,7 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func2($0)],
EXPR$2=[func1($0)], EXPR$
Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00 AS EXPR$3])
+- AsyncCalc(select=[f0, func2(a) AS f00])
+- AsyncCalc(select=[a, func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -118,9 +136,9 @@ Calc(select=[f0 AS EXPR$0, f00 AS EXPR$1, f0 AS EXPR$2, f00
AS EXPR$3])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], a2=[$5], b2=[$6], c2=[$7],
d2=[$8])
- +- LogicalJoin(condition=[AND(=($4, $9), REGEXP(func2($0), _UTF-16LE'string
(2|4)'))], joinType=[inner])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], $f4=[func2($0)])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a2=[$6], b2=[$7],
c2=[$8], d2=[$9])
+ +- LogicalJoin(condition=[AND(=($5, $10), REGEXP(func2($0),
_UTF-16LE'string (2|4)'))], joinType=[inner])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
$f5=[func2($0)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(a2=[$0], b2=[$1], c2=[$2], d2=[$3], $f4=[func2($0)])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
@@ -129,11 +147,11 @@ LogicalProject(a=[$0])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a])
-+- Join(joinType=[InnerJoin], where=[=($f4, $f40)], select=[a, $f4, $f40],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[$f4]])
- : +- Calc(select=[a, f0 AS $f4], where=[REGEXP(f0, 'string (2|4)')])
++- Join(joinType=[InnerJoin], where=[=($f5, $f4)], select=[a, $f5, $f4],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[$f5]])
+ : +- Calc(select=[a, f0 AS $f5], where=[REGEXP(f0, 'string (2|4)')])
: +- AsyncCalc(select=[a, func2(a) AS f0])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[$f4]])
+- AsyncCalc(select=[func2(a2) AS $f4])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -147,7 +165,7 @@ Calc(select=[a])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalJoin(condition=[AND(=($0, $4), >(func6($0, $4), 10))],
joinType=[inner])
++- LogicalJoin(condition=[AND(=($0, $5), >(func6($0, $5), 10))],
joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
@@ -159,7 +177,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -174,7 +192,7 @@ Calc(select=[a], where=[>(f0, 10)])
<![CDATA[
LogicalProject(a=[$0])
+- LogicalFilter(condition=[REGEXP(func2($0), _UTF-16LE'val (2|3)')])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -186,7 +204,7 @@ Calc(select=[a])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a], where=[REGEXP(f0, 'val (2|3)')])
: +- AsyncCalc(select=[a, func2(a) AS f0])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2], where=[REGEXP(f0, 'val (2|3)')])
+- AsyncCalc(select=[a2, func2(a2) AS f0])
@@ -201,8 +219,8 @@ Calc(select=[a])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func6($0, $4), 10)])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
++- LogicalFilter(condition=[>(func6($0, $5), 10)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -214,7 +232,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -228,7 +246,7 @@ Calc(select=[a], where=[>(f0, 10)])
<Resource name="ast">
<![CDATA[
LogicalProject(EXPR$0=[func1($0)])
-+- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
++- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
@@ -239,7 +257,7 @@ AsyncCalc(select=[func1(a) AS EXPR$0])
+- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -270,8 +288,8 @@ AsyncCalc(select=[func1(1) AS EXPR$0])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalFilter(condition=[AND(=($0, $4), >(func6($0, $4), 10))])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
++- LogicalFilter(condition=[AND(=($0, $5), >(func6($0, $5), 10))])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -283,7 +301,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -296,10 +314,10 @@ Calc(select=[a], where=[>(f0, 10)])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], a2=[$4])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], a2=[$5], b2=[$6], c2=[$7],
d2=[$8])
- +- LogicalJoin(condition=[=($4, $9)], joinType=[left])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], $f4=[func1($0)])
+LogicalProject(a=[$0], a2=[$5])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a2=[$6], b2=[$7],
c2=[$8], d2=[$9])
+ +- LogicalJoin(condition=[=($5, $10)], joinType=[left])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
$f5=[func1($0)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(a2=[$0], b2=[$1], c2=[$2], d2=[$3], $f4=[func1($0)])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
@@ -308,10 +326,10 @@ LogicalProject(a=[$0], a2=[$4])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, a2])
-+- Join(joinType=[LeftOuterJoin], where=[=($f4, $f40)], select=[a, $f4, a2,
$f40], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[$f4]])
- : +- AsyncCalc(select=[a, func1(a) AS $f4])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
++- Join(joinType=[LeftOuterJoin], where=[=($f5, $f4)], select=[a, $f5, a2,
$f4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[$f5]])
+ : +- AsyncCalc(select=[a, func1(a) AS $f5])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[$f4]])
+- AsyncCalc(select=[a2, func1(a2) AS $f4])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -324,9 +342,9 @@ Calc(select=[a, a2])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], a2=[$4])
+LogicalProject(a=[$0], a2=[$5])
+- LogicalFilter(condition=[REGEXP(func2($0), _UTF-16LE'string (2|3)')])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -337,7 +355,7 @@ Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a,
a2], leftInputSpec=[
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
: +- AsyncCalc(select=[a, func2(a) AS f0])
-: +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+: +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -351,8 +369,8 @@ Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a,
a2], leftInputSpec=[
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func6($0, $4), 10)])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
++- LogicalFilter(condition=[>(func6($0, $5), 10)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -364,7 +382,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -378,7 +396,7 @@ Calc(select=[a], where=[>(f0, 10)])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalJoin(condition=[AND(=($0, $4), >(func1($4), 10))], joinType=[left])
++- LogicalJoin(condition=[AND(=($0, $5), >(func1($5), 10))], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
@@ -389,7 +407,7 @@ Calc(select=[a])
+- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2], where=[>(f0, 10)])
+- AsyncCalc(select=[a2, func1(a2) AS f0])
@@ -404,8 +422,8 @@ Calc(select=[a])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func1($4), 10)])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[left])
++- LogicalFilter(condition=[>(func1($5), 10)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -417,7 +435,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[LeftOuterJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -438,7 +456,7 @@ LogicalProject(EXPR$0=[_UTF-16LE'foo'], EXPR$1=[func1($0)])
<![CDATA[
Calc(select=['foo' AS EXPR$0, f0 AS EXPR$1])
+- AsyncCalc(select=[func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -457,7 +475,7 @@ LogicalProject(EXPR$0=[func1(func1(func1($0)))])
AsyncCalc(select=[func1(f0) AS EXPR$0])
+- AsyncCalc(select=[func1(f0) AS f0])
+- AsyncCalc(select=[func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -493,7 +511,7 @@ LogicalProject(EXPR$0=[CONCAT(func2($0), _UTF-16LE'foo')])
<![CDATA[
Calc(select=[CONCAT(f0, 'foo') AS EXPR$0])
+- AsyncCalc(select=[func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -513,7 +531,7 @@ LogicalProject(blah=[$0])
<![CDATA[
Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string (2|3)')])
+- AsyncCalc(select=[func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -524,8 +542,8 @@ Calc(select=[f0 AS blah], where=[REGEXP(f0, 'string
(2|3)')])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalFilter(condition=[AND(=($0, $4), >(func6($0, $4), 10))])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[right])
++- LogicalFilter(condition=[AND(=($0, $5), >(func6($0, $5), 10))])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -537,7 +555,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[InnerJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -550,10 +568,10 @@ Calc(select=[a], where=[>(f0, 10)])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], a2=[$4])
-+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], a2=[$5], b2=[$6], c2=[$7],
d2=[$8])
- +- LogicalJoin(condition=[=($4, $9)], joinType=[right])
- :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], $f4=[func1($0)])
+LogicalProject(a=[$0], a2=[$5])
++- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], a2=[$6], b2=[$7],
c2=[$8], d2=[$9])
+ +- LogicalJoin(condition=[=($5, $10)], joinType=[right])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
$f5=[func1($0)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+- LogicalProject(a2=[$0], b2=[$1], c2=[$2], d2=[$3], $f4=[func1($0)])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
@@ -562,10 +580,10 @@ LogicalProject(a=[$0], a2=[$4])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, a2])
-+- Join(joinType=[RightOuterJoin], where=[=($f4, $f40)], select=[a, $f4, a2,
$f40], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
- :- Exchange(distribution=[hash[$f4]])
- : +- AsyncCalc(select=[a, func1(a) AS $f4])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
++- Join(joinType=[RightOuterJoin], where=[=($f5, $f4)], select=[a, $f5, a2,
$f4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[$f5]])
+ : +- AsyncCalc(select=[a, func1(a) AS $f5])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[$f4]])
+- AsyncCalc(select=[a2, func1(a2) AS $f4])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -579,8 +597,8 @@ Calc(select=[a, a2])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalFilter(condition=[>(func6($0, $4), 10)])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[right])
++- LogicalFilter(condition=[>(func6($0, $5), 10)])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -592,7 +610,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[RightOuterJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -606,7 +624,7 @@ Calc(select=[a], where=[>(f0, 10)])
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0])
-+- LogicalJoin(condition=[AND(=($0, $4), >(func1($0), 10))], joinType=[right])
++- LogicalJoin(condition=[AND(=($0, $5), >(func1($0), 10))], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
@@ -618,7 +636,7 @@ Calc(select=[a])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a], where=[>(f0, 10)])
: +- AsyncCalc(select=[a, func1(a) AS f0])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -633,7 +651,7 @@ Calc(select=[a])
<![CDATA[
LogicalProject(a=[$0])
+- LogicalFilter(condition=[>(func1($0), 10)])
- +- LogicalJoin(condition=[=($0, $4)], joinType=[right])
+ +- LogicalJoin(condition=[=($0, $5)], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable2]])
]]>
@@ -645,7 +663,7 @@ Calc(select=[a], where=[>(f0, 10)])
+- Join(joinType=[RightOuterJoin], where=[=(a, a2)], select=[a, a2],
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[hash[a2]])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
@@ -665,7 +683,7 @@ LogicalProject(EXPR$0=[func4($3)])
<Resource name="optimized rel plan">
<![CDATA[
AsyncCalc(select=[func4(d) AS EXPR$0])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -682,7 +700,7 @@ LogicalProject(EXPR$0=[func1($0)])
<Resource name="optimized rel plan">
<![CDATA[
AsyncCalc(select=[func1(a) AS EXPR$0])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -703,7 +721,7 @@ AsyncCalc(select=[f0 AS EXPR$0, func1(f1) AS EXPR$1, f2 AS
EXPR$2])
+- AsyncCalc(select=[f2, f1, func1(f0) AS f0])
+- Calc(select=[f0, f0 AS f1, f0 AS f2])
+- AsyncCalc(select=[func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -721,7 +739,7 @@ LogicalProject(EXPR$0=[func1($0)], EXPR$1=[func1($0)])
<![CDATA[
Calc(select=[f0 AS EXPR$0, f0 AS EXPR$1])
+- AsyncCalc(select=[func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -740,7 +758,7 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
+- AsyncCalc(select=[a, func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -759,7 +777,7 @@ LogicalProject(EXPR$0=[func2($0)])
<![CDATA[
Calc(select=[f0 AS EXPR$0], where=[REGEXP(f0, 'val (2|3)')])
+- AsyncCalc(select=[func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -779,7 +797,7 @@ LogicalProject(blah=[$0])
<![CDATA[
Calc(select=[a], where=[REGEXP(f0, 'string (2|3)')])
+- AsyncCalc(select=[a, func2(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -798,7 +816,7 @@ LogicalProject(a=[$0])
<![CDATA[
Calc(select=[a], where=[>=(f0, 12)])
+- AsyncCalc(select=[a, func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -822,7 +840,7 @@ AsyncCalc(select=[func1(a) AS EXPR$0])
+- Join(joinType=[LeftAntiJoin], where=[OR(IS NULL(a), IS NULL(a2), =(a,
a2))], select=[a], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[single])
: +- Calc(select=[a])
- : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
+- Exchange(distribution=[single])
+- Calc(select=[a2])
+- TableSourceScan(table=[[default_catalog, default_database,
MyTable2]], fields=[a2, b2, c2, d2])
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
index dcbe743d2c2..2cdbfbaf162 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AsyncCorrelateSplitRuleTest.xml
@@ -22,7 +22,7 @@ limitations under the License.
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableFunctionScan(invocation=[tableFunc(func1($cor0.a))],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -30,10 +30,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- Correlate(invocation=[tableFunc($4)], correlate=[table(tableFunc(f0))],
select=[a,b,c,d,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, INTEGER ARRAY d, INTEGER f0, VARCHAR(2147483647)
EXPR$0)], joinType=[INNER])
- +- AsyncCalc(select=[a, b, c, d, func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- Correlate(invocation=[tableFunc($5)], correlate=[table(tableFunc(f0))],
select=[a,b,c,d,e,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, INTEGER ARRAY d,
RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)],
joinType=[INNER])
+ +- AsyncCalc(select=[a, b, c, d, e, func1(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -43,7 +43,7 @@ Calc(select=[a, b, c, d, EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableFunctionScan(invocation=[tableFunc(ABS(func1($cor0.a)))],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -51,10 +51,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- Correlate(invocation=[tableFunc(ABS($4))],
correlate=[table(tableFunc(ABS(f0)))], select=[a,b,c,d,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
- +- AsyncCalc(select=[a, b, c, d, func1(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- Correlate(invocation=[tableFunc(ABS($5))],
correlate=[table(tableFunc(ABS(f0)))], select=[a,b,c,d,e,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)],
joinType=[INNER])
+ +- AsyncCalc(select=[a, b, c, d, e, func1(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -64,7 +64,7 @@ Calc(select=[a, b, c, d, EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableFunctionScan(invocation=[tableFunc(func1(ABS($cor0.a)))],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -72,11 +72,11 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- Correlate(invocation=[tableFunc($4)], correlate=[table(tableFunc(f0))],
select=[a,b,c,d,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, INTEGER ARRAY d, INTEGER f0, VARCHAR(2147483647)
EXPR$0)], joinType=[INNER])
- +- AsyncCalc(select=[a, b, c, d, func1(f0) AS f0])
- +- Calc(select=[a, b, c, d, ABS(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- Correlate(invocation=[tableFunc($5)], correlate=[table(tableFunc(f0))],
select=[a,b,c,d,e,f0,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, INTEGER ARRAY d,
RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)],
joinType=[INNER])
+ +- AsyncCalc(select=[a, b, c, d, e, func1(f0) AS f0])
+ +- Calc(select=[a, b, c, d, e, ABS(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -86,7 +86,7 @@ Calc(select=[a, b, c, d, EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+-
LogicalTableFunctionScan(invocation=[asyncTableFunc(CAST(CAST($cor0.a):INTEGER):INTEGER)],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -94,10 +94,31 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- AsyncCorrelate(invocation=[asyncTableFunc($4)],
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
- +- Calc(select=[a, b, c, d, a AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($5)],
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,e,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)],
joinType=[INNER])
+ +- Calc(select=[a, b, c, d, e, a AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testCorrelateWithCompositeFieldAsInput">
+ <Resource name="sql">
+ <![CDATA[select * FROM MyTable, LATERAL TABLE(asyncTableFunc(e.f.h))]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{4}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableFunctionScan(invocation=[asyncTableFunc($cor0.e.f.h)],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($6)],
correlate=[table(asyncTableFunc(f1))], select=[a,b,c,d,e,f0,f1,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, RecordType:peek_no_expand(INTEGER h, DOUBLE i) f0,
INTEGER f1, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
+ +- Calc(select=[a, b, c, d, e, e.f AS f0, e.f.h AS f1])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -107,7 +128,7 @@ Calc(select=[a, b, c, d, EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableFunctionScan(invocation=[asyncTableFunc(scalar($cor0.a))],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -115,10 +136,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
EXPR$0=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- AsyncCorrelate(invocation=[asyncTableFunc($4)],
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
- +- Calc(select=[a, b, c, d, scalar(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($5)],
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,e,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)],
joinType=[INNER])
+ +- Calc(select=[a, b, c, d, e, scalar(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>
@@ -128,7 +149,7 @@ Calc(select=[a, b, c, d, EXPR$0])
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], EXPR$0=[$4])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], EXPR$0=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableFunctionScan(invocation=[asyncTableFunc(ABS($cor0.a))],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
@@ -136,10 +157,10 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3],
EXPR$0=[$4])
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-Calc(select=[a, b, c, d, EXPR$0])
-+- AsyncCorrelate(invocation=[asyncTableFunc($4)],
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, INTEGER f0, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
- +- Calc(select=[a, b, c, d, ABS(a) AS f0])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d])
+Calc(select=[a, b, c, d, e, EXPR$0])
++- AsyncCorrelate(invocation=[asyncTableFunc($5)],
correlate=[table(asyncTableFunc(f0))], select=[a,b,c,d,e,f0,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER ARRAY
d, RecordType:peek_no_expand(RecordType:peek_no_expand(INTEGER h, DOUBLE i) f,
VARCHAR(2147483647) g) e, INTEGER f0, VARCHAR(2147483647) EXPR$0)],
joinType=[INNER])
+ +- Calc(select=[a, b, c, d, e, ABS(a) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, d, e])
]]>
</Resource>
</TestCase>