Github user sunjincheng121 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5203#discussion_r158598552
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
---
@@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
}
+ @Test
+ def testDeterministicUdfWithUnicodeParameter(): Unit = {
+ val data = new mutable.MutableList[(String, String, String)]
+ data.+=((null, null, null))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val udf0 = new LiteralUDF("\"\\", deterministic = true)
+ val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+ val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+ tEnv.registerFunction("udf0", udf0)
+ tEnv.registerFunction("udf1", udf1)
+ tEnv.registerFunction("udf2", udf2)
+
+ // user have to specify '\' with '\\' in SQL
+ val sqlQuery = "SELECT " +
+ "udf0('\"\\\\') as str1, " +
+ "udf1('\u0001xyz') as str2, " +
+ "udf2('\u0001\u0012') as str3 from T1"
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("\"\\,\u0001xyz,\u0001\u0012")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
+ val data = new mutable.MutableList[(String, String, String)]
--- End diff --
Same suggest as above.
---