Repository: flink Updated Branches: refs/heads/master d74869f8a -> 91f00ec91
[FLINK-8312][table] Fix ScalarFunction varargs length exceed 254 This closes #5206 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f00ec9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f00ec9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f00ec9 Branch: refs/heads/master Commit: 91f00ec91b55b28a59114e88127af2f85f279eb5 Parents: d74869f Author: Xpray <leonxp...@gmail.com> Authored: Tue Dec 26 17:40:14 2017 +0800 Committer: sunjincheng121 <sunjincheng...@gmail.com> Committed: Fri Dec 29 17:49:32 2017 +0800 ---------------------------------------------------------------------- .../functions/utils/ScalarSqlFunction.scala | 27 ++++++++++++-------- .../table/runtime/stream/sql/SqlITCase.scala | 27 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/91f00ec9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala index 27e093d..cbe2ac7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala @@ -140,7 +140,7 @@ object ScalarSqlFunction { scalarFunction: ScalarFunction) : SqlOperandTypeChecker = { - val signatures = getMethodSignatures(scalarFunction, "eval") + val methods = checkAndExtractMethods(scalarFunction, "eval") /** * Operand type checker based on [[ScalarFunction]] given information. @@ -151,17 +151,24 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { - var min = 255 + var min = 254 // according to JVM spec 4.3.3 var max = -1 - signatures.foreach( sig => { - var len = sig.length - if (len > 0 && sig(sig.length - 1).isArray) { - max = 254 // according to JVM spec 4.3.3 - len = sig.length - 1 + var isVarargs = false + methods.foreach( + m => { + var len = m.getParameterTypes.length + if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { + isVarargs = true + len = len - 1 + } + max = Math.max(len, max) + min = Math.min(len, min) } - max = Math.max(len, max) - min = Math.min(len, min) - }) + ) + if (isVarargs) { + // if eval method is varargs, set max to -1 to skip length check in Calcite + max = -1 + } SqlOperandCountRanges.between(min, max) } http://git-wip-us.apache.org/repos/asf/flink/blob/91f00ec9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 18b45a3..76d0126 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -26,10 +26,12 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils.SplitUDF +import org.apache.flink.table.expressions.utils.Func15 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row import org.apache.flink.table.utils.MemoryTableSinkUtil + import org.junit.Assert._ import org.junit._ @@ -516,4 +518,29 @@ class SqlITCase extends StreamingWithStateTestBase { val expected = List("a,a,d,d,e,e", "x,x,z,z,z,z") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testUDFWithLongVarargs(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + tEnv.registerFunction("func15", Func15) + + val parameters = "c," + (0 until 255).map(_ => "a").mkString(",") + val sqlQuery = s"SELECT func15($parameters) FROM T1" + + val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("T1", t1) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List( + "Hi255", + "Hello255", + "Hello world255") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } }