Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5203#discussion_r158598561
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
---
@@ -352,4 +354,64 @@ class CalcITCase extends
StreamingMultipleProgramsTestBase {
"{9=Comment#3}")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testDeterministicUDFWithUnicodeParameter(): Unit = {
+ val data = List(
+ ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+ ("x\u0001y", "y\"z", "z\\\"\u0004z")
+ )
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val splitUDF = new SplitUDF(deterministic = true)
+ val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(splitUDF('a, "\u0001", 0) as 'a,
+ splitUDF('b, "\"", 1) as 'b,
+ splitUDF('c, "\\\"\u0004", 0) as 'c
+ )
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = mutable.MutableList(
+ "a,d,e", "x,z,z"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+ val data = List(
--- End diff --
Same suggest as above.
---