This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f83e603  [FLINK-22954][table-planner] Rewrite Join on constant 
TableFunctionScan to Correlate
f83e603 is described below

commit f83e60387a17bd927004891fe6dcc35dfddf0488
Author: 龙三 <[email protected]>
AuthorDate: Fri Jun 18 15:28:41 2021 +0800

    [FLINK-22954][table-planner] Rewrite Join on constant TableFunctionScan to 
Correlate
    
    This closes #16192
---
 .../JoinTableFunctionScanToCorrelateRule.java      |  64 +++++++++++++
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   4 +-
 ...reamPhysicalConstantTableFunctionScanRule.scala |   2 +-
 .../utils/JavaUserDefinedTableFunctions.java       |   5 +-
 .../planner/plan/stream/table/CorrelateTest.xml    | 106 +++++++++++++++++++--
 .../planner/plan/stream/table/CorrelateTest.scala  |  58 ++++++++++-
 .../runtime/stream/sql/CorrelateITCase.scala       |  64 ++++++++++++-
 7 files changed, 285 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java
new file mode 100644
index 0000000..0ce2e22
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinTableFunctionScanToCorrelateRule.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+
+/** Rule that rewrites Join on TableFunctionScan to Correlate. */
+public class JoinTableFunctionScanToCorrelateRule extends 
RelRule<RelRule.Config> {
+
+    private static final Config RULE_CONFIG =
+            Config.EMPTY
+                    .withOperandSupplier(
+                            b0 ->
+                                    b0.operand(LogicalJoin.class)
+                                            .inputs(
+                                                    b1 -> 
b1.operand(RelNode.class).anyInputs(),
+                                                    b2 ->
+                                                            b2.operand(
+                                                                            
LogicalTableFunctionScan
+                                                                               
     .class)
+                                                                    
.noInputs()))
+                    .withDescription("JoinTableFunctionScanToCorrelateRule");
+
+    public static final JoinTableFunctionScanToCorrelateRule INSTANCE =
+            new JoinTableFunctionScanToCorrelateRule();
+
+    private JoinTableFunctionScanToCorrelateRule() {
+        super(RULE_CONFIG);
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        LogicalJoin join = call.rel(0);
+        RelNode leftInput = call.rel(1);
+        LogicalTableFunctionScan logicalTableFunctionScan = call.rel(2);
+        RelNode correlate =
+                call.builder()
+                        .push(leftInput)
+                        .push(logicalTableFunctionScan)
+                        .correlate(join.getJoinType(), 
join.getCluster().createCorrel())
+                        .build();
+        call.transformTo(correlate);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 0039c33..ae7b0e3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -137,7 +137,9 @@ object FlinkStreamRuleSets {
         // optimize limit 0
         FlinkLimit0RemoveRule.INSTANCE,
         // unnest rule
-        LogicalUnnestRule.INSTANCE
+        LogicalUnnestRule.INSTANCE,
+        // rewrite constant table function scan to correlate
+        JoinTableFunctionScanToCorrelateRule.INSTANCE
       )
     ).asJava)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
index 3ebf076..042635d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
@@ -51,7 +51,7 @@ class StreamPhysicalConstantTableFunctionScanRule
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableFunctionScan = call.rel(0)
-    RexUtil.isConstant(scan.getCall) && scan.getInputs.isEmpty
+    !RexUtil.containsInputRef(scan.getCall) && scan.getInputs.isEmpty
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index f7f0d38..95f6e51 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -118,10 +118,7 @@ public class JavaUserDefinedTableFunctions {
 
         public void eval(String str) {
             String[] values = str.split("#");
-            int endIndex = random.nextInt(values.length);
-            for (int i = 0; i < endIndex; ++i) {
-                collect(values[i]);
-            }
+            collect(values[random.nextInt(values.length)]);
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index 91964cc..4ab3fda 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -164,6 +164,46 @@ Calc(select=[c, name, len])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testFlatMap">
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(f1, f2, f3)]]])
+   +- LogicalProject(f0=[$0], f1_0=[$1])
+      +- 
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
 rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[f0, f10 AS f1])
++- 
Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
 
correlate=[table(org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247(f3))],
 select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2, 
VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
+   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInnerJoinConstantNonDeterministicFunction">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM MyTable, LATERAL TABLE(str_split('Jack,John')) AS T
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John')], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John')], 
correlate=[table(str_split(_UTF-16LE'Jack,John'))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testHierarchyType">
     <Resource name="ast">
       <![CDATA[
@@ -179,21 +219,69 @@ 
Correlate(invocation=[org$apache$flink$table$planner$utils$HierarchyTableFunctio
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testFlatMap">
+  <TestCase name="testInnerJoinConstantFunction">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM MyTable, LATERAL TABLE(str_split('Jack,John', ',')) AS T
+]]>
+    </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{}])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(f1, f2, f3)]]])
-   +- LogicalProject(f0=[$0], f1_0=[$1])
-      +- 
LogicalTableFunctionScan(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
 rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John', 
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0, f10 AS f1])
-+- 
Correlate(invocation=[org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247($2)],
 
correlate=[table(org$apache$flink$table$planner$utils$TableFunc2$e9d8fd0c1a1f8cddfbbd46c86e136247(f3))],
 select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2, 
VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
-   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')], 
correlate=[table(str_split(_UTF-16LE'Jack,John',_UTF-16LE','))], 
select=[a,b,c,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinConstantNonDeterministicFunction">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John')) AS T ON TRUE
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John')], 
rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John')], 
correlate=[table(str_split(_UTF-16LE'Jack,John'))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinConstantFunction">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS T ON TRUE
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], EXPR$0=[$3])
++- LogicalJoin(condition=[true], joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableFunctionScan(invocation=[str_split(_UTF-16LE'Jack,John', 
_UTF-16LE',')], rowType=[RecordType(VARCHAR(2147483647) EXPR$0)])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')], 
correlate=[table(str_split(_UTF-16LE'Jack,John',_UTF-16LE','))], 
select=[a,b,c,EXPR$0], rowType=[RecordType(INTEGER a, BIGINT b, 
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 984ba42..339e7ca 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.Func13
 import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFuncTuple12
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{JavaTableFuncTuple12,
 NonDeterministicTableFunc, StringSplit}
 import org.apache.flink.table.planner.utils._
 
 import org.apache.calcite.rel.rules.CoreRules
@@ -201,4 +201,60 @@ class CorrelateTest extends TableTestBase {
 
     util.verifyExecPlan(sql)
   }
+
+  @Test
+  def testInnerJoinConstantFunction(): Unit = {
+    val util = streamTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTemporarySystemFunction("str_split", new StringSplit())
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable, LATERAL TABLE(str_split('Jack,John', ',')) AS T
+        |""".stripMargin
+
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testLeftJoinConstantFunction(): Unit = {
+    val util = streamTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTemporarySystemFunction("str_split", new StringSplit())
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John', ',')) AS 
T ON TRUE
+        |""".stripMargin
+
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testInnerJoinConstantNonDeterministicFunction(): Unit = {
+    val util = streamTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTemporarySystemFunction("str_split", new 
NonDeterministicTableFunc())
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable, LATERAL TABLE(str_split('Jack,John')) AS T
+        |""".stripMargin
+
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testLeftJoinConstantNonDeterministicFunction(): Unit = {
+    val util = streamTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addTemporarySystemFunction("str_split", new 
NonDeterministicTableFunc())
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable Left JOIN LATERAL TABLE(str_split('Jack,John')) AS T ON 
TRUE
+        |""".stripMargin
+
+    util.verifyExecPlan(sql)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
index 9d73e5d..028678f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CorrelateITCase.scala
@@ -22,12 +22,12 @@ import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.UdfWithOpen
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.{NonDeterministicTableFunc,
 StringSplit}
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, 
TestSinkUtil, TestingAppendSink, TestingAppendTableSink}
 import org.apache.flink.table.planner.utils.{RF, TableFunc7}
 import org.apache.flink.types.Row
 
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
 
 import java.lang.{Boolean => JBoolean}
@@ -110,6 +110,66 @@ class CorrelateITCase extends StreamingTestBase {
   }
 
   @Test
+  def testConstantTableFunc3(): Unit = {
+    val data = List(
+      (1, 2, "abc-bcd"),
+      (1, 2, "hhh"),
+      (1, 2, "xxx"))
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+
+    tEnv.registerFunction("str_split", new StringSplit())
+    val query = "SELECT * FROM T1, LATERAL TABLE(str_split('Jack,John', ',')) 
as T0(d)"
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "1,2,abc-bcd,Jack",
+      "1,2,abc-bcd,John",
+      "1,2,hhh,Jack",
+      "1,2,hhh,John",
+      "1,2,xxx,Jack",
+      "1,2,xxx,John")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testConstantNonDeterministicTableFunc(): Unit = {
+    tEnv.registerFunction("str_split", new NonDeterministicTableFunc())
+    val query = "SELECT * FROM LATERAL TABLE(str_split('Jack#John')) as T0(d)"
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val res = sink.getAppendResults;
+    assertEquals(1, res.size)
+    assertTrue(res(0).equals("Jack") || res(0).equals("John"))
+  }
+
+  @Test
+  def testConstantNonDeterministicTableFunc2(): Unit = {
+    val data = List(
+      (1, 2, "abc-bcd"),
+      (1, 2, "hhh"),
+      (1, 2, "xxx"))
+
+    val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+
+    tEnv.registerFunction("str_split", new NonDeterministicTableFunc())
+    val query = "SELECT * FROM T1, LATERAL TABLE(str_split('Jack#John')) as 
T0(d)"
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(query).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+
+    val res = sink.getAppendResults;
+    assertEquals(3, res.size)
+  }
+
+  @Test
   def testUdfIsOpenedAfterUdtf(): Unit = {
     val data = List(
       (1, 2, "abc-bcd"),

Reply via email to