Repository: flink Updated Branches: refs/heads/master f6c9b32c1 -> ff552b440
[FLINK-5827] [table] Exception when do filter after join a udtf which returns a POJO type This closes #3357. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff552b44 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff552b44 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff552b44 Branch: refs/heads/master Commit: ff552b440e3d493b41083b6b63534cfcd83961d9 Parents: f6c9b32 Author: kaibozhou <[email protected]> Authored: Fri Feb 24 11:32:34 2017 +0800 Committer: twalthr <[email protected]> Committed: Thu Mar 2 16:27:27 2017 +0100 ---------------------------------------------------------------------- .../table/plan/nodes/CommonCorrelate.scala | 2 +- .../DataSetUserDefinedFunctionITCase.scala | 3 +- .../DataSetUserDefinedFunctionITCase.scala | 206 ------------------ .../DataStreamUserDefinedFunctionITCase.scala | 211 +++++++++++++++++++ 4 files changed, 214 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 61b7ffb..6c4066b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -188,7 +188,7 @@ trait CommonCorrelate { |getCollector().collect(${crossResultExpr.resultTerm}); |""".stripMargin } else { - val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) + val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo, None, pojoFieldMapping) filterGenerator.input1Term = filterGenerator.input2Term val filterCondition = filterGenerator.generateExpression(condition.get) s""" http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala index d268594..3d20803 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala @@ -140,11 +140,12 @@ class DataSetUserDefinedFunctionITCase( val pojo = new PojoTableFunc() val result = in .join(pojo('c)) + .where(('age > 20)) .select('c, 'name, 'age) .toDataSet[Row] val results = result.collect() - val expected = "Jack#22,Jack,22\n" + "John#19,John,19\n" + "Anna#44,Anna,44\n" + val expected = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n" TestBaseUtils.compareResultAsText(results.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala deleted file mode 100644 index 21b87e9..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataSetUserDefinedFunctionITCase.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.runtime.datastream - -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.api.TableEnvironment -import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData} -import org.apache.flink.table.expressions.utils.{Func13, RichFunc2} -import org.apache.flink.table.utils.{RichTableFunc1, TableFunc0, TableFunc3, UserDefinedFunctionTestUtils} -import org.apache.flink.types.Row -import org.junit.Assert._ -import org.junit.Test - -import scala.collection.mutable - -class DataSetUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { - - @Test - def testCrossJoin(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) - val func0 = new TableFunc0 - - val result = t - .join(func0('c) as('d, 'e)) - .select('c, 'd, 'e) - .toDataStream[Row] - - result.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testLeftOuterJoin(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) - val func0 = new TableFunc0 - - val result = t - .leftOuterJoin(func0('c) as('d, 'e)) - .select('c, 'd, 'e) - .toDataStream[Row] - - result.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "nosharp,null,null", "Jack#22,Jack,22", - "John#19,John,19", "Anna#44,Anna,44") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testUserDefinedTableFunctionWithParameter(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - val tableFunc1 = new RichTableFunc1 - tEnv.registerFunction("RichTableFunc1", tableFunc1) - UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " ")) - StreamITCase.testResults = mutable.MutableList() - - val result = StreamTestData.getSmall3TupleDataStream(env) - .toTable(tEnv, 'a, 'b, 'c) - .join(tableFunc1('c) as 's) - .select('a, 's) - - val results = result.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("3,Hello", "3,world") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - val tableFunc1 = new RichTableFunc1 - val richFunc2 = new RichFunc2 - tEnv.registerFunction("RichTableFunc1", tableFunc1) - tEnv.registerFunction("RichFunc2", richFunc2) - UserDefinedFunctionTestUtils.setJobParameters( - env, - Map("word_separator" -> "#", "string.value" -> "test")) - StreamITCase.testResults = mutable.MutableList() - - val result = StreamTestData.getSmall3TupleDataStream(env) - .toTable(tEnv, 'a, 'b, 'c) - .join(tableFunc1(richFunc2('c)) as 's) - .select('a, 's) - - val results = result.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "1,Hi", - "1,test", - "2,Hello", - "2,test", - "3,Hello world", - "3,test") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testTableFunctionConstructorWithParams(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) - val config = Map("key1" -> "value1", "key2" -> "value2") - val func30 = new TableFunc3(null) - val func31 = new TableFunc3("OneConf_") - val func32 = new TableFunc3("TwoConf_", config) - - val result = t - .join(func30('c) as('d, 'e)) - .select('c, 'd, 'e) - .join(func31('c) as ('f, 'g)) - .select('c, 'd, 'e, 'f, 'g) - .join(func32('c) as ('h, 'i)) - .select('c, 'd, 'f, 'h, 'e, 'g, 'i) - .toDataStream[Row] - - result.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44", - "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44", - "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22", - "Jack#22,Jack,OneConf_Jack,TwoConf__key=key2_value=value2_Jack,22,22,22", - "John#19,John,OneConf_John,TwoConf__key=key1_value=value1_John,19,19,19", - "John#19,John,OneConf_John,TwoConf__key=key2_value=value2_John,19,19,19" - ) - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test - def testScalarFunctionConstructorWithParams(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.clear - - val t = testData(env).toTable(tEnv).as('a, 'b, 'c) - val func0 = new Func13("default") - val func1 = new Func13("Sunny") - val func2 = new Func13("kevin2") - - val result = t.select(func0('c), func1('c),func2('c)) - - result.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList( - "default-Anna#44,Sunny-Anna#44,kevin2-Anna#44", - "default-Jack#22,Sunny-Jack#22,kevin2-Jack#22", - "default-John#19,Sunny-John#19,kevin2-John#19", - "default-nosharp,Sunny-nosharp,kevin2-nosharp" - ) - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - private def testData( - env: StreamExecutionEnvironment) - : DataStream[(Int, Long, String)] = { - - val data = new mutable.MutableList[(Int, Long, String)] - data.+=((1, 1L, "Jack#22")) - data.+=((2, 2L, "John#19")) - data.+=((3, 2L, "Anna#44")) - data.+=((4, 3L, "nosharp")) - env.fromCollection(data) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ff552b44/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala new file mode 100644 index 0000000..e7ce457 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamUserDefinedFunctionITCase.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.datastream + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData} +import org.apache.flink.table.expressions.utils.{Func13, RichFunc2} +import org.apache.flink.table.utils.{RichTableFunc1, TableFunc0, TableFunc3, UserDefinedFunctionTestUtils} +import org.apache.flink.table.utils.PojoTableFunc +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class DataStreamUserDefinedFunctionITCase extends StreamingMultipleProgramsTestBase { + + @Test + def testCrossJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = testData(env).toTable(tEnv).as('a, 'b, 'c) + val func0 = new TableFunc0 + val pojoFunc0 = new PojoTableFunc() + + val result = t + .join(func0('c) as('d, 'e)) + .select('c, 'd, 'e) + .join(pojoFunc0('c)) + .where(('age > 20)) + .select('c, 'name, 'age) + .toDataStream[Row] + + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testLeftOuterJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = testData(env).toTable(tEnv).as('a, 'b, 'c) + val func0 = new TableFunc0 + + val result = t + .leftOuterJoin(func0('c) as('d, 'e)) + .select('c, 'd, 'e) + .toDataStream[Row] + + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "nosharp,null,null", "Jack#22,Jack,22", + "John#19,John,19", "Anna#44,Anna,44") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUserDefinedTableFunctionWithParameter(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val tableFunc1 = new RichTableFunc1 + tEnv.registerFunction("RichTableFunc1", tableFunc1) + UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " ")) + StreamITCase.testResults = mutable.MutableList() + + val result = StreamTestData.getSmall3TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c) + .join(tableFunc1('c) as 's) + .select('a, 's) + + val results = result.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList("3,Hello", "3,world") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + val tableFunc1 = new RichTableFunc1 + val richFunc2 = new RichFunc2 + tEnv.registerFunction("RichTableFunc1", tableFunc1) + tEnv.registerFunction("RichFunc2", richFunc2) + UserDefinedFunctionTestUtils.setJobParameters( + env, + Map("word_separator" -> "#", "string.value" -> "test")) + StreamITCase.testResults = mutable.MutableList() + + val result = StreamTestData.getSmall3TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c) + .join(tableFunc1(richFunc2('c)) as 's) + .select('a, 's) + + val results = result.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "1,Hi", + "1,test", + "2,Hello", + "2,test", + "3,Hello world", + "3,test") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testTableFunctionConstructorWithParams(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = testData(env).toTable(tEnv).as('a, 'b, 'c) + val config = Map("key1" -> "value1", "key2" -> "value2") + val func30 = new TableFunc3(null) + val func31 = new TableFunc3("OneConf_") + val func32 = new TableFunc3("TwoConf_", config) + + val result = t + .join(func30('c) as('d, 'e)) + .select('c, 'd, 'e) + .join(func31('c) as ('f, 'g)) + .select('c, 'd, 'e, 'f, 'g) + .join(func32('c) as ('h, 'i)) + .select('c, 'd, 'f, 'h, 'e, 'g, 'i) + .toDataStream[Row] + + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44", + "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44", + "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22", + "Jack#22,Jack,OneConf_Jack,TwoConf__key=key2_value=value2_Jack,22,22,22", + "John#19,John,OneConf_John,TwoConf__key=key1_value=value1_John,19,19,19", + "John#19,John,OneConf_John,TwoConf__key=key2_value=value2_John,19,19,19" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testScalarFunctionConstructorWithParams(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = testData(env).toTable(tEnv).as('a, 'b, 'c) + val func0 = new Func13("default") + val func1 = new Func13("Sunny") + val func2 = new Func13("kevin2") + + val result = t.select(func0('c), func1('c),func2('c)) + + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "default-Anna#44,Sunny-Anna#44,kevin2-Anna#44", + "default-Jack#22,Sunny-Jack#22,kevin2-Jack#22", + "default-John#19,Sunny-John#19,kevin2-John#19", + "default-nosharp,Sunny-nosharp,kevin2-nosharp" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + private def testData( + env: StreamExecutionEnvironment) + : DataStream[(Int, Long, String)] = { + + val data = new mutable.MutableList[(Int, Long, String)] + data.+=((1, 1L, "Jack#22")) + data.+=((2, 2L, "John#19")) + data.+=((3, 2L, "Anna#44")) + data.+=((4, 3L, "nosharp")) + env.fromCollection(data) + } + +}
