[
https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15991546#comment-15991546
]
ASF GitHub Bot commented on FLINK-6334:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3791#discussion_r114188108
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
---
@@ -50,72 +50,85 @@ class UserDefinedTableFunctionTest extends
TableTestBase {
// Java environment
val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
+ val jtEnv = TableEnvironment.getTableEnvironment(javaEnv)
+ val in2 = jtEnv.fromDataStream(jDs).as("a, b, c")
// test cross join
val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
+ jtEnv.registerFunction("func1", func1)
var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join("func1(c).as(s)").select("c, s")
+ var javaTable = in2.join(jtEnv.tableApply("func1(c)")
+ .as("s")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test left outer join
scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+ javaTable = in2.leftOuterJoin(
+ jtEnv.tableApply("func1(c)")
+ .as("s")
+ ).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test overloading
scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+ javaTable = in2.join(jtEnv.tableApply("func1(c, '$')")
+ .as("s")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test custom result type
val func2 = new TableFunc2
- javaTableEnv.registerFunction("func2", func2)
+ jtEnv.registerFunction("func2", func2)
scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name,
'len)
- javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+ javaTable = in2.join(
+ jtEnv.tableApply("func2(c)")
+ .as("name, len"))
+ .select("c, name, len")
verifyTableEquals(scalaTable, javaTable)
// test hierarchy generic type
val hierarchy = new HierarchyTableFunction
- javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
+ jtEnv.registerFunction("hierarchy", hierarchy)
+ scalaTable = in1.join(hierarchy('c) as ('name, 'len, 'adult))
.select('c, 'name, 'len, 'adult)
- javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+ javaTable = in2.join(jtEnv.tableApply("hierarchy(c)")
+ .as("name, len, adult"))
.select("c, name, len, adult")
verifyTableEquals(scalaTable, javaTable)
// test pojo type
val pojo = new PojoTableFunc
- javaTableEnv.registerFunction("pojo", pojo)
+ jtEnv.registerFunction("pojo", pojo)
scalaTable = in1.join(pojo('c))
.select('c, 'name, 'age)
- javaTable = in2.join("pojo(c)")
+ javaTable = in2.join(jtEnv.tableApply("pojo(c)"))
.select("c, name, age")
verifyTableEquals(scalaTable, javaTable)
// test with filter
scalaTable = in1.join(func2('c) as ('name, 'len))
.select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join("func2(c) as (name, len)")
+ javaTable = in2.join(jtEnv.tableApply("func2(c)") as ("name, len"))
.select("c, name, len").filter("len > 2")
verifyTableEquals(scalaTable, javaTable)
// test with scalar function
scalaTable = in1.join(func1('c.substring(2)) as 's)
.select('a, 'c, 's)
- javaTable = in2.join("func1(substring(c, 2)) as (s)")
+ javaTable = in2.join(jtEnv.tableApply("func1(substring(c, 2))") as
("s"))
.select("a, c, s")
verifyTableEquals(scalaTable, javaTable)
// check scala object is forbidden
expectExceptionThrown(
tableEnv.registerFunction("func3", ObjectTableFunction), "Scala
object")
expectExceptionThrown(
- javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala
object")
+ jtEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
expectExceptionThrown(
- in1.join(ObjectTableFunction('a, 1)), "Scala object")
+ {
--- End diff --
Please avoid unnecessary refactorings. They make PRs harder to review and
might be reverted by the next person going over this code.
> Refactoring UDTF interface
> --------------------------
>
> Key: FLINK-6334
> URL: https://issues.apache.org/jira/browse/FLINK-6334
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Ruidong Li
> Assignee: Ruidong Li
>
> The current UDTF leverages the table.join(expression) interface, which is not
> a proper interface in terms of semantics. We would like to refactor this to
> let UDTF use table.join(table) interface. Very briefly, UDTF's apply method
> will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as
> join(Table)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)