Repository: flink Updated Branches: refs/heads/master e753db841 -> e6fddbc3e
[FLINK-7558][table]Improve SQL ValidationException message. This closes #4620 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6fddbc3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6fddbc3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6fddbc3 Branch: refs/heads/master Commit: e6fddbc3e5bdb42764191990ac29ba382bbe6016 Parents: e753db8 Author: é竹 <jincheng.su...@alibaba-inc.com> Authored: Wed Aug 30 16:55:09 2017 +0800 Committer: Jincheng Sun <jinch...@apache.org> Committed: Thu Aug 31 09:03:52 2017 +0800 ---------------------------------------------------------------------- .../table/functions/utils/AggSqlFunction.scala | 4 +- .../functions/utils/ScalarSqlFunction.scala | 9 ++- .../UserDefinedFunctionValidationTest.scala | 61 ++++++++++++++++++++ 3 files changed, 71 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e6fddbc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala index 526ec47..bb71d63 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala @@ -95,7 +95,9 @@ object AggSqlFunction { val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) .getOrElse( throw new ValidationException( - s"Operand types of ${signatureToString(operandTypeInfo)} could not be inferred.")) + s"Given parameters of function do not match any signature. \n" + + s"Actual: ${signatureToString(operandTypeInfo)} \n" + + s"Expected: ${signaturesToString(aggregateFunction, "accumulate")}")) val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true)) http://git-wip-us.apache.org/repos/asf/flink/blob/e6fddbc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala index 0776f7a..784bca7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala @@ -45,7 +45,7 @@ class ScalarSqlFunction( extends SqlFunction( new SqlIdentifier(name, SqlParserPos.ZERO), createReturnTypeInference(name, scalarFunction, typeFactory), - createOperandTypeInference(scalarFunction, typeFactory), + createOperandTypeInference(name, scalarFunction, typeFactory), createOperandTypeChecker(name, scalarFunction), null, SqlFunctionCategory.USER_DEFINED_FUNCTION) { @@ -91,6 +91,7 @@ object ScalarSqlFunction { } private[flink] def createOperandTypeInference( + name: String, scalarFunction: ScalarFunction, typeFactory: FlinkTypeFactory) : SqlOperandTypeInference = { @@ -106,7 +107,11 @@ object ScalarSqlFunction { val operandTypeInfo = getOperandTypeInfo(callBinding) val foundSignature = getEvalMethodSignature(scalarFunction, operandTypeInfo) - .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + .getOrElse( + throw new ValidationException( + s"Given parameters of function '$name' do not match any signature. \n" + + s"Actual: ${signatureToString(operandTypeInfo)} \n" + + s"Expected: ${signaturesToString(scalarFunction, "eval")}")) val inferredTypes = scalarFunction .getParameterTypes(foundSignature) http://git-wip-us.apache.org/repos/asf/flink/blob/e6fddbc3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala new file mode 100644 index 0000000..6939771 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.validation + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.Func0 +import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +class UserDefinedFunctionValidationTest extends TableTestBase { + + @Test + def testScalarFunctionOperandTypeCheck(): Unit = { + thrown.expect(classOf[ValidationException]) + thrown.expectMessage( + "Given parameters of function 'func' do not match any signature. \n" + + "Actual: (java.lang.String) \n" + + "Expected: (int)") + val util = streamTestUtil() + util.addTable[(Int, String)]("t", 'a, 'b) + util.tableEnv.registerFunction("func",Func0) + util.verifySql("select func(b) from t","n/a") + } + + @Test + def testAggregateFunctionOperandTypeCheck(): Unit = { + thrown.expect(classOf[ValidationException]) + thrown.expectMessage( + "Given parameters of function do not match any signature. \n" + + "Actual: (java.lang.String, java.lang.Integer) \n" + + "Expected: (org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions." + + "WeightedAvgAccum, int, int), (org.apache.flink.table.runtime.utils." + + "JavaUserDefinedAggFunctions.WeightedAvgAccum, long, int)") + + val util = streamTestUtil() + val weightAvgFun = new WeightedAvg + util.addTable[(Int, String)]("t", 'a, 'b) + util.tableEnv.registerFunction("agg",weightAvgFun) + util.verifySql("select agg(b, a) from t","n/a") + } + +} +