Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5203#discussion_r158582801
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
---
@@ -352,4 +354,38 @@ class CalcITCase extends
StreamingMultipleProgramsTestBase {
"{9=Comment#3}")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testUDFWithUnicodeParameter(): Unit = {
+ val data = List(
+ ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+ ("x\u0001y", "y\"z", "z\\\"\u0004z")
+ )
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(SplitUDF('a, "\u0001", 0) as 'a,
+ SplitUDF('b, "\"", 1) as 'b,
+ SplitUDF('c, "\\\"\u0004", 0) as 'c
+ )
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = mutable.MutableList(
+ "a,d,e", "x,z,z"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object SplitUDF extends ScalarFunction {
+ def eval(x: String, sep: String, index: Int): String = {
+ val splits = StringUtils.splitByWholeSeparator(x, sep)
--- End diff --
Add test case of isDeterministic: Boolean = false.
---