This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90c70e183440c6278ea31f7315cf718f339f6f59
Author: Timo Walther <[email protected]>
AuthorDate: Thu Mar 11 12:22:26 2021 +0100

    [FLINK-21725][table] Add tests for large tuples
---
 .../utils/JavaUserDefinedTableFunctions.java       | 37 +++++++++++++++
 .../planner/plan/stream/table/CorrelateTest.xml    | 52 +++++++++++++++-------
 .../planner/plan/stream/table/CorrelateTest.scala  | 16 +++++++
 3 files changed, 90 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 3384257..f7f0d38 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple12;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.functions.TableFunction;
 
@@ -128,4 +129,40 @@ public class JavaUserDefinedTableFunctions {
             return false;
         }
     }
+
+    /** Function with large tuple. */
+    public static class JavaTableFuncTuple12
+            extends TableFunction<
+                    Tuple12<
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            Integer,
+                            Integer,
+                            Integer,
+                            Integer,
+                            Integer,
+                            Integer>> {
+        private static final long serialVersionUID = -8258882510989374448L;
+
+        public void eval(String str) {
+            collect(
+                    Tuple12.of(
+                            str + "_a",
+                            str + "_b",
+                            str + "_c",
+                            str + "_d",
+                            str + "_e",
+                            str + "_f",
+                            str.length(),
+                            str.length() + 1,
+                            str.length() + 2,
+                            str.length() + 3,
+                            str.length() + 4,
+                            str.length() + 5));
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index 1fc96bf..e5698c5 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -16,6 +16,43 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testCorrelatePythonTableFunction">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
+:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
++- 
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0,
 $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class 
[Ljava.lang.Object;])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+PythonCorrelate(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0,
 $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], 
rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, 
INTEGER y)], joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCorrelateTuple12">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM MyTable, LATERAL TABLE(func1(c)) AS T
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], f0=[$3], f1=[$4], f2=[$5], f3=[$6], 
f4=[$7], f5=[$8], f6=[$9], f7=[$10], f8=[$11], f9=[$12], f10=[$13], f11=[$14])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableFunctionScan(invocation=[func1($cor0.c)], 
rowType=[*org.apache.flink.api.java.tuple.Tuple12*])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], 
select=[a,b,c,f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, 
VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER 
f6, INTEGER f7, INTEGER f8, INTEGER f9, INTEGER f10, INTEGER f11)], 
joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testCorrelateWithMultiFilter">
     <Resource name="planBefore">
       <![CDATA[
@@ -206,19 +243,4 @@ 
Correlate(invocation=[org$apache$flink$table$planner$utils$PojoTableFunc$eb4ab6b
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testCorrelatePythonTableFunction">
-    <Resource name="planBefore">
-         <![CDATA[
-LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
-:- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
-+- 
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0,
 $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class 
[Ljava.lang.Object;])
-]]>
-         </Resource>
-    <Resource name="planAfter">
-         <![CDATA[
-PythonCorrelate(invocation=[org$apache$flink$table$planner$utils$MockPythonTableFunction$2db9292ec254f97ef03c249bd4e602fd($0,
 $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], 
rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, 
INTEGER y)], joinType=[INNER])
-+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-]]>
-       </Resource>
-  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 6875be7..fa18dc9 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.Func13
 import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFuncTuple12
 import org.apache.flink.table.planner.utils._
 
 import org.apache.calcite.rel.rules.{CalcMergeRule, FilterCalcMergeRule, 
ProjectCalcMergeRule}
@@ -188,4 +189,19 @@ class CorrelateTest extends TableTestBase {
 
     util.verifyPlan(result)
   }
+
+  @Test
+  def testCorrelateTuple12(): Unit = {
+    val util = streamTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new JavaTableFuncTuple12
+    util.addTemporarySystemFunction("func1", function)
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable, LATERAL TABLE(func1(c)) AS T
+        |""".stripMargin
+
+    util.verifyPlan(sql)
+  }
 }

Reply via email to