This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90c70e183440c6278ea31f7315cf718f339f6f59 Author: Timo Walther <[email protected]> AuthorDate: Thu Mar 11 12:22:26 2021 +0100 [FLINK-21725][table] Add tests for large tuples --- .../utils/JavaUserDefinedTableFunctions.java | 37 +++++++++++++++ .../planner/plan/stream/table/CorrelateTest.xml | 52 +++++++++++++++------- .../planner/plan/stream/table/CorrelateTest.scala | 16 +++++++ 3 files changed, 90 insertions(+), 15 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java index 3384257..f7f0d38 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.functions.TableFunction; @@ -128,4 +129,40 @@ public class JavaUserDefinedTableFunctions { return false; } } + + /** Function with large tuple. */ + public static class JavaTableFuncTuple12 + extends TableFunction< + Tuple12< + String, + String, + String, + String, + String, + String, + Integer, + Integer, + Integer, + Integer, + Integer, + Integer>> { + private static final long serialVersionUID = -8258882510989374448L; + + public void eval(String str) { + collect( + Tuple12.of( + str + "_a", + str + "_b", + str + "_c", + str + "_d", + str + "_e", + str + "_f", + str.length(), + str.length() + 1, + str.length() + 2, + str.length() + 3, + str.length() + 4, + str.length() + 5)); + } + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml index 1fc96bf..e5698c5 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml @@ -16,6 +16,43 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testCorrelatePythonTableFunction"> + <Resource name="planBefore"> + <![CDATA[ +LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) +:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ++- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +PythonCorrelate(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testCorrelateTuple12"> + <Resource name="sql"> + <![CDATA[ +SELECT * +FROM MyTable, LATERAL TABLE(func1(c)) AS T +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], f0=[$3], f1=[$4], f2=[$5], f3=[$6], f4=[$7], f5=[$8], f6=[$9], f7=[$10], f8=[$11], f9=[$12], f10=[$13], f11=[$14]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) + +- LogicalTableFunctionScan(invocation=[func1($cor0.c)], rowType=[*org.apache.flink.api.java.tuple.Tuple12*]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9, INTEGER f10, INTEGER f11)], joinType=[INNER]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> <TestCase name="testCorrelateWithMultiFilter"> <Resource name="planBefore"> <![CDATA[ @@ -206,19 +243,4 @@ Correlate(invocation=[org$apache$flink$table$planner$utils$PojoTableFunc$eb4ab6b ]]> </Resource> </TestCase> - <TestCase name="testCorrelatePythonTableFunction"> - <Resource name="planBefore"> - <![CDATA[ -LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) -:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) -+- LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -PythonCorrelate(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER]) -+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) -]]> - </Resource> - </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala index 6875be7..fa18dc9 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala @@ -21,6 +21,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.planner.expressions.utils.Func13 import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFuncTuple12 import org.apache.flink.table.planner.utils._ import org.apache.calcite.rel.rules.{CalcMergeRule, FilterCalcMergeRule, ProjectCalcMergeRule} @@ -188,4 +189,19 @@ class CorrelateTest extends TableTestBase { util.verifyPlan(result) } + + @Test + def testCorrelateTuple12(): Unit = { + val util = streamTestUtil() + util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + val function = new JavaTableFuncTuple12 + util.addTemporarySystemFunction("func1", function) + val sql = + """ + |SELECT * + |FROM MyTable, LATERAL TABLE(func1(c)) AS T + |""".stripMargin + + util.verifyPlan(sql) + } }
