Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158582750 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -481,4 +484,48 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) } + @Test + def testUnicodeParameter(): Unit = { + val data = new mutable.MutableList[(String, String, String)] + data.+=((null, null, null)) //"" + + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val udf0 = new LiteralUDF("\"\\") + val udf1 = new LiteralUDF("\u0001xyz") + val udf2 = new LiteralUDF("\u0001\u0012") + + tEnv.registerFunction("udf0", udf0) + tEnv.registerFunction("udf1", udf1) + tEnv.registerFunction("udf2", udf2) + + // user have to specify '\' with '\\' in SQL + val sqlQuery = "SELECT " + + "udf0('\"\\\\') as str1, " + + "udf1('\u0001xyz') as str2, " + + "udf2('\u0001\u0012') as str3 from T1" + + val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3) + + tEnv.registerTable("T1", t1) + + val result = tEnv.sql(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List("\"\\,\u0001xyz,\u0001\u0012") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + +} + +class LiteralUDF(constant: String) extends ScalarFunction { + def eval(x: String): String = { + assert(x == constant) + x + } + override def isDeterministic: Boolean = false --- End diff -- I think we should add the case of isDeterministic=true. I think Expression Reduce we do the different processing logic.
---