This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f83e603 [FLINK-22954][table-planner] Rewrite Join on constant
TableFunctionScan to Correlate
f83e603 is described below
commit f83e60387a17bd927004891fe6dcc35dfddf0488
Author: 龙三 <[email protected]>
AuthorDate: Fri Jun 18 15:28:41 2021 +0800
[FLINK-22954][table-planner] Rewrite Join on constant TableFunctionScan to
Correlate
This closes #16192
---
.../JoinTableFunctionScanToCorrelateRule.java | 64 +++++++++++++
.../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +-
...reamPhysicalConstantTableFunctionScanRule.scala | 2 +-
.../utils/JavaUserDefinedTableFunctions.java | 5 +-
.../planner/plan/stream/table/CorrelateTest.xml | 106 +++++++++++++++++++--
.../planner/plan/stream/table/CorrelateTest.scala | 58 ++++++++++-
.../runtime/stream/sql/CorrelateITCase.scala | 64 ++++++++++++-
7 files changed, 285 insertions(+), 18 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java
new file mode 100644
index 0000000..0ce2e22
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+
+/** Rule that rewrites Join on TableFunctionScan to Correlate. */
+public class JoinTableFunctionScanToCorrelateRule extends
RelRule<RelRule.Config> {
+
+ private static final Config RULE_CONFIG =
+ Config.EMPTY
+ .withOperandSupplier(
+ b0 ->
+ b0.operand(LogicalJoin.class)
+ .inputs(
+ b1 ->
b1.operand(RelNode.class).anyInputs(),
+ b2 ->
+ b2.operand(
+
LogicalTableFunctionScan
+
.class)
+
.noInputs()))
+ .withDescription("JoinTableFunctionScanToCorrelateRule");
+
+ public static final JoinTableFunctionScanToCorrelateRule INSTANCE =
+ new JoinTableFunctionScanToCorrelateRule();
+
+ private JoinTableFunctionScanToCorrelateRule() {
+ super(RULE_CONFIG);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalJoin join = call.rel(0);
+ RelNode leftInput = call.rel(1);
+ LogicalTableFunctionScan logicalTableFunctionScan = call.rel(2);
+ RelNode correlate =
+ call.builder()
+ .push(leftInput)
+ .push(logicalTableFunctionScan)
+ .correlate(join.getJoinType(),
join.getCluster().createCorrel())
+ .build();
+ call.transformTo(correlate);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 0039c33..ae7b0e3 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -137,7 +137,9 @@ object FlinkStreamRuleSets {
// optimize limit 0
FlinkLimit0RemoveRule.INSTANCE,
// unnest rule
- LogicalUnnestRule.INSTANCE
+ LogicalUnnestRule.INSTANCE,
+ // rewrite constant table function scan to correlate
+ JoinTableFunctionScanToCorrelateRule.INSTANCE
)
).asJava)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
index 3ebf076..042635d 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
@@ -51,7 +51,7 @@ class StreamPhysicalConstantTableFunctionScanRule
override def matches(call: RelOptRuleCall): Boolean = {
val scan: FlinkLogicalTableFunctionScan = call.rel(0)
- RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
+ !RexUtil.containsInputRef(scan.getCall) && scan.getInputs.isEmpty
}
override def onMatch(call: RelOptRuleCall): Unit = {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index f7f0d38..95f6e51 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -118,10 +118,7 @@ public class JavaUserDefinedTableFunctions {
public void eval(String str) {
String[] values = str.split("#");
- int endIndex = random.nextInt(values.length);
- for (int i = 0; i < endIndex; ++i) {
- collect(values[i]);
- }
+ collect(values[random.nextInt(values.length)]);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index 91964cc..4ab3fda 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -164,6 +164,46 @@ Calc(select=[c, name, len])
]]>
</Resource>
</TestCase>
+ <TestCase name="testFlatMap">
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(f1, f2, f3)]]])
+ +- LogicalProject(f0=[$0], f1_0=[$1])
+ +-
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[f0, f10 AS f1])
++-
Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
correlate=[table(org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247(f3))],
select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2,
VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testInnerJoinConstantNonDeterministicFunction">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM MyTable, LATERAL TABLE(str_split('Jack,John')) AS T
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John')],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John')],
correlate=[table(str_split(_UTF-16LE'Jack,John'))], select=[a,b,c,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testHierarchyType">
<Resource name="ast">
<![CDATA[
@@ -179,21 +219,69 @@
Correlate(invocation=[org$apache$flink$table$planner$utils$HierarchyTableFunctio
]]>
</Resource>
</TestCase>
- <TestCase name="testFlatMap">
+ <TestCase name="testInnerJoinConstantFunction">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM MyTable, LATERAL TABLE(str_split('Jack,John', ',')) AS T
+]]>
+ </Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(f1, f2, f3)]]])
- +- LogicalProject(f0=[$0], f1_0=[$1])
- +-
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John',
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[f0, f10 AS f1])
-+-
Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
correlate=[table(org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247(f3))],
select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2,
VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')],
correlate=[table(str_split(_UTF-16LE'Jack,John',_UTF-16LE','))],
select=[a,b,c,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinConstantNonDeterministicFunction">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John')) AS T ON TRUE
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John')],
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John')],
correlate=[table(str_split(_UTF-16LE'Jack,John'))], select=[a,b,c,EXPR$0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinConstantFunction">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS T ON TRUE
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]])
+ +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John',
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')],
correlate=[table(str_split(_UTF-16LE'Jack,John',_UTF-16LE','))],
select=[a,b,c,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 984ba42..339e7ca 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.planner.expressions.utils.Func13
import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram
-import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFuncTuple12
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFuncTuple12,
NonDeterministicTableFunc, StringSplit}
import org.apache.flink.table.planner.utils._
import org.apache.calcite.rel.rules.CoreRules
@@ -201,4 +201,60 @@ class CorrelateTest extends TableTestBase {
util.verifyExecPlan(sql)
}
+
+ @Test
+ def testInnerJoinConstantFunction(): Unit = {
+ val util = streamTestUtil()
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addTemporarySystemFunction("str_split", new StringSplit())
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable, LATERAL TABLE(str_split('Jack,John', ',')) AS T
+ |""".stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testLeftJoinConstantFunction(): Unit = {
+ val util = streamTestUtil()
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addTemporarySystemFunction("str_split", new StringSplit())
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS
T ON TRUE
+ |""".stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testInnerJoinConstantNonDeterministicFunction(): Unit = {
+ val util = streamTestUtil()
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addTemporarySystemFunction("str_split", new
NonDeterministicTableFunc())
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable, LATERAL TABLE(str_split('Jack,John')) AS T
+ |""".stripMargin
+
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testLeftJoinConstantNonDeterministicFunction(): Unit = {
+ val util = streamTestUtil()
+ util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addTemporarySystemFunction("str_split", new
NonDeterministicTableFunc())
+ val sql =
+ """
+ |SELECT *
+ |FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John')) AS T ON
TRUE
+ |""".stripMargin
+
+ util.verifyExecPlan(sql)
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index 9d73e5d..028678f 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -22,12 +22,12 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.internal.TableEnvironmentInternal
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.UdfWithOpen
-import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit
+import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{NonDeterministicTableFunc,
StringSplit}
import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase,
TestSinkUtil, TestingAppendSink, TestingAppendTableSink}
import org.apache.flink.table.planner.utils.{RF, TableFunc7}
import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.{Before, Test}
import java.lang.{Boolean => JBoolean}
@@ -110,6 +110,66 @@ class CorrelateITCase extends StreamingTestBase {
}
@Test
+ def testConstantTableFunc3(): Unit = {
+ val data = List(
+ (1, 2, "abc-bcd"),
+ (1, 2, "hhh"),
+ (1, 2, "xxx"))
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+
+ tEnv.registerFunction("str_split", new StringSplit())
+ val query = "SELECT * FROM T1, LATERAL TABLE(str_split('Jack,John', ','))
as T0(d)"
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "1,2,abc-bcd,Jack",
+ "1,2,abc-bcd,John",
+ "1,2,hhh,Jack",
+ "1,2,hhh,John",
+ "1,2,xxx,Jack",
+ "1,2,xxx,John")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testConstantNonDeterministicTableFunc(): Unit = {
+ tEnv.registerFunction("str_split", new NonDeterministicTableFunc())
+ val query = "SELECT * FROM LATERAL TABLE(str_split('Jack#John')) as T0(d)"
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val res = sink.getAppendResults;
+ assertEquals(1, res.size)
+ assertTrue(res(0).equals("Jack") || res(0).equals("John"))
+ }
+
+ @Test
+ def testConstantNonDeterministicTableFunc2(): Unit = {
+ val data = List(
+ (1, 2, "abc-bcd"),
+ (1, 2, "hhh"),
+ (1, 2, "xxx"))
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+
+ tEnv.registerFunction("str_split", new NonDeterministicTableFunc())
+ val query = "SELECT * FROM T1, LATERAL TABLE(str_split('Jack#John')) as
T0(d)"
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+
+ val res = sink.getAppendResults;
+ assertEquals(3, res.size)
+ }
+
+ @Test
def testUdfIsOpenedAfterUdtf(): Unit = {
val data = List(
(1, 2, "abc-bcd"),