Repository: flink
Updated Branches:
  refs/heads/master e753db841 -> e6fddbc3e


[FLINK-7558][table]Improve SQL ValidationException message.

This closes #4620


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6fddbc3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6fddbc3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6fddbc3

Branch: refs/heads/master
Commit: e6fddbc3e5bdb42764191990ac29ba382bbe6016
Parents: e753db8
Author: 金竹 <jincheng.su...@alibaba-inc.com>
Authored: Wed Aug 30 16:55:09 2017 +0800
Committer: Jincheng Sun <jinch...@apache.org>
Committed: Thu Aug 31 09:03:52 2017 +0800

----------------------------------------------------------------------
 .../table/functions/utils/AggSqlFunction.scala  |  4 +-
 .../functions/utils/ScalarSqlFunction.scala     |  9 ++-
 .../UserDefinedFunctionValidationTest.scala     | 61 ++++++++++++++++++++
 3 files changed, 71 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6fddbc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 526ec47..bb71d63 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -95,7 +95,9 @@ object AggSqlFunction {
         val foundSignature = getAccumulateMethodSignature(aggregateFunction, 
operandTypeInfo)
           .getOrElse(
             throw new ValidationException(
-              s"Operand types of ${signatureToString(operandTypeInfo)} could 
not be inferred."))
+              s"Given parameters of function do not match any signature. \n" +
+                s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+                s"Expected: ${signaturesToString(aggregateFunction, 
"accumulate")}"))
 
         val inferredTypes = getParameterTypes(aggregateFunction, 
foundSignature.drop(1))
           .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))

http://git-wip-us.apache.org/repos/asf/flink/blob/e6fddbc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index 0776f7a..784bca7 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -45,7 +45,7 @@ class ScalarSqlFunction(
   extends SqlFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
     createReturnTypeInference(name, scalarFunction, typeFactory),
-    createOperandTypeInference(scalarFunction, typeFactory),
+    createOperandTypeInference(name, scalarFunction, typeFactory),
     createOperandTypeChecker(name, scalarFunction),
     null,
     SqlFunctionCategory.USER_DEFINED_FUNCTION) {
@@ -91,6 +91,7 @@ object ScalarSqlFunction {
   }
 
   private[flink] def createOperandTypeInference(
+      name: String,
       scalarFunction: ScalarFunction,
       typeFactory: FlinkTypeFactory)
     : SqlOperandTypeInference = {
@@ -106,7 +107,11 @@ object ScalarSqlFunction {
         val operandTypeInfo = getOperandTypeInfo(callBinding)
 
         val foundSignature = getEvalMethodSignature(scalarFunction, 
operandTypeInfo)
-          .getOrElse(throw new ValidationException(s"Operand types of could 
not be inferred."))
+          .getOrElse(
+            throw new ValidationException(
+              s"Given parameters of function '$name' do not match any 
signature. \n" +
+                s"Actual: ${signatureToString(operandTypeInfo)} \n" +
+                s"Expected: ${signaturesToString(scalarFunction, "eval")}"))
 
         val inferredTypes = scalarFunction
           .getParameterTypes(foundSignature)

http://git-wip-us.apache.org/repos/asf/flink/blob/e6fddbc3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala
new file mode 100644
index 0000000..6939771
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/UserDefinedFunctionValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.Func0
+import 
org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UserDefinedFunctionValidationTest extends TableTestBase {
+
+  @Test
+  def testScalarFunctionOperandTypeCheck(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Given parameters of function 'func' do not match any signature. \n" +
+        "Actual: (java.lang.String) \n" +
+        "Expected: (int)")
+    val util = streamTestUtil()
+    util.addTable[(Int, String)]("t", 'a, 'b)
+    util.tableEnv.registerFunction("func",Func0)
+    util.verifySql("select func(b) from t","n/a")
+  }
+
+  @Test
+  def testAggregateFunctionOperandTypeCheck(): Unit = {
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Given parameters of function do not match any signature. \n" +
+        "Actual: (java.lang.String, java.lang.Integer) \n" +
+        "Expected: 
(org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions." +
+        "WeightedAvgAccum, int, int), (org.apache.flink.table.runtime.utils." +
+        "JavaUserDefinedAggFunctions.WeightedAvgAccum, long, int)")
+
+    val util = streamTestUtil()
+    val weightAvgFun = new WeightedAvg
+    util.addTable[(Int, String)]("t", 'a, 'b)
+    util.tableEnv.registerFunction("agg",weightAvgFun)
+    util.verifySql("select agg(b, a) from t","n/a")
+  }
+
+}
+

Reply via email to