[ 
https://issues.apache.org/jira/browse/FLINK-10958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698441#comment-16698441
 ] 

ASF GitHub Bot commented on FLINK-10958:
----------------------------------------

asfgit closed pull request #7152: [FLINK-10958] [table] Add overload support 
for user defined function
URL: https://github.com/apache/flink/pull/7152
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index c9a27036797..4faa6ed05ce 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -151,17 +151,17 @@ object UserDefinedFunctionUtils {
           // match parameters of signature to actual parameters
           methodSignature.length == signatures.length &&
             signatures.zipWithIndex.forall { case (clazz, i) =>
-              parameterTypeEquals(methodSignature(i), clazz)
+              parameterTypeApplicable(methodSignature(i), clazz)
           }
         case cur if cur.isVarArgs =>
           val signatures = cur.getParameterTypes
           methodSignature.zipWithIndex.forall {
             // non-varargs
             case (clazz, i) if i < signatures.length - 1  =>
-              parameterTypeEquals(clazz, signatures(i))
+              parameterTypeApplicable(clazz, signatures(i))
             // varargs
             case (clazz, i) if i >= signatures.length - 1 =>
-              parameterTypeEquals(clazz, signatures.last.getComponentType)
+              parameterTypeApplicable(clazz, signatures.last.getComponentType)
           } || (methodSignature.isEmpty && signatures.length == 1) // empty 
varargs
     }
 
@@ -171,14 +171,45 @@ object UserDefinedFunctionUtils {
       fixedMethodsCount > 0 && !cur.isVarArgs ||
       fixedMethodsCount == 0 && cur.isVarArgs
     }
+    val maximallySpecific = if (found.length > 1) {
+      implicit val methodOrdering = new scala.Ordering[Method] {
+        override def compare(x: Method, y: Method): Int = {
+          def specificThan(left: Method, right: Method) = {
+            // left parameter type is more specific than right parameter type
+            left.getParameterTypes.zip(right.getParameterTypes).forall {
+              case (leftParameterType, rightParameterType) =>
+                parameterTypeApplicable(leftParameterType, rightParameterType)
+            } &&
+            // non-equal
+            left.getParameterTypes.zip(right.getParameterTypes).exists {
+              case (leftParameterType, rightParameterType) =>
+                !parameterTypeEquals(leftParameterType, rightParameterType)
+            }
+          }
+
+          if (specificThan(x, y)) {
+            1
+          } else if (specificThan(y, x)) {
+            -1
+          } else {
+            0
+          }
+        }
+      }
+
+      val max = found.max
+      found.filter(methodOrdering.compare(max, _) == 0)
+    } else {
+      found
+    }
 
     // check if there is a Scala varargs annotation
-    if (found.isEmpty &&
+    if (maximallySpecific.isEmpty &&
       methods.exists { method =>
         val signatures = method.getParameterTypes
         signatures.zipWithIndex.forall {
           case (clazz, i) if i < signatures.length - 1 =>
-            parameterTypeEquals(methodSignature(i), clazz)
+            parameterTypeApplicable(methodSignature(i), clazz)
           case (clazz, i) if i == signatures.length - 1 =>
             clazz.getName.equals("scala.collection.Seq")
         }
@@ -186,11 +217,11 @@ object UserDefinedFunctionUtils {
       throw new ValidationException(
         s"Scala-style variable arguments in '$methodName' methods are not 
supported. Please " +
           s"add a @scala.annotation.varargs annotation.")
-    } else if (found.length > 1) {
+    } else if (maximallySpecific.length > 1) {
       throw new ValidationException(
         s"Found multiple '$methodName' methods which match the signature.")
     }
-    found.headOption
+    maximallySpecific.headOption
   }
 
   /**
@@ -719,10 +750,14 @@ object UserDefinedFunctionUtils {
     * Compares parameter candidate classes with expected classes. If true, the 
parameters match.
     * Candidate can be null (acts as a wildcard).
     */
+  private def parameterTypeApplicable(candidate: Class[_], expected: 
Class[_]): Boolean =
+    parameterTypeEquals(candidate, expected) ||
+      ((expected != null && expected.isAssignableFrom(candidate)) ||
+        expected.isPrimitive && 
Primitives.wrap(expected).isAssignableFrom(candidate))
+
   private def parameterTypeEquals(candidate: Class[_], expected: Class[_]): 
Boolean =
   candidate == null ||
     candidate == expected ||
-    expected == classOf[Object] ||
     expected.isPrimitive && Primitives.wrap(expected) == candidate ||
     // time types
     candidate == classOf[Date] && (expected == classOf[Int] || expected == 
classOf[JInt])  ||
@@ -730,8 +765,7 @@ object UserDefinedFunctionUtils {
     candidate == classOf[Timestamp] && (expected == classOf[Long] || expected 
== classOf[JLong]) ||
     // arrays
     (candidate.isArray && expected.isArray &&
-      (candidate.getComponentType == expected.getComponentType ||
-        expected.getComponentType == classOf[Object]))
+      (candidate.getComponentType == expected.getComponentType))
 
   /**
     * Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 912bb047cba..29de7e0f9d1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -310,6 +310,26 @@ class Func20 extends ScalarFunction {
   }
 }
 
+object Func21 extends ScalarFunction {
+  def eval(p: People): String = {
+    p.name
+  }
+
+  def eval(p: Student): String = {
+    "student#" + p.name
+  }
+}
+
+object Func22 extends ScalarFunction {
+  def eval(a: Array[People]): String = {
+    a.head.name
+  }
+
+  def eval(a: Array[Student]): String = {
+    "student#" + a.head.name
+  }
+}
+
 class SplitUDF(deterministic: Boolean) extends ScalarFunction {
   def eval(x: String, sep: String, index: Int): String = {
     val splits = StringUtils.splitByWholeSeparator(x, sep)
@@ -321,3 +341,9 @@ class SplitUDF(deterministic: Boolean) extends 
ScalarFunction {
   }
   override def isDeterministic: Boolean = deterministic
 }
+
+class People(val name: String)
+
+class Student(name: String) extends People(name)
+
+class GraduatedStudent(name: String) extends Student(name)
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 9d29018fdb3..e433742fb67 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, 
SplitUDF}
+import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
UserDefinedFunctionTestUtils}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
@@ -350,4 +350,62 @@ class CalcITCase extends AbstractTestBase {
       "{9=Comment#3}")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testOverload(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val testData = new mutable.MutableList[GraduatedStudent]
+    testData.+=(new GraduatedStudent("Jack#22"))
+    testData.+=(new GraduatedStudent("John#19"))
+    testData.+=(new GraduatedStudent("Anna#44"))
+    testData.+=(new GraduatedStudent("nosharp"))
+
+    val t = env.fromCollection(testData).toTable(tEnv).as('a)
+
+    val result = t.select(Func21('a))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "student#Jack#22",
+      "student#John#19",
+      "student#Anna#44",
+      "student#nosharp"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testOverloadWithArray(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val testData = new mutable.MutableList[Array[GraduatedStudent]]
+    testData.+=(Array(new GraduatedStudent("Jack#22")))
+    testData.+=(Array(new GraduatedStudent("John#19")))
+    testData.+=(Array(new GraduatedStudent("Anna#44")))
+    testData.+=(Array(new GraduatedStudent("nosharp")))
+
+    val t = env.fromCollection(testData).toTable(tEnv).as('a)
+
+    val result = t.select(Func22('a))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "student#Jack#22",
+      "student#John#19",
+      "student#Anna#44",
+      "student#nosharp"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add overload support for user defined function 
> -----------------------------------------------
>
>                 Key: FLINK-10958
>                 URL: https://issues.apache.org/jira/browse/FLINK-10958
>             Project: Flink
>          Issue Type: Task
>          Components: Table API &amp; SQL
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>
> Currently overload is not supported in user defined function and given the 
> following UDF
> {code:java}
> class Func21 extends ScalarFunction {
>   def eval(p: People): String = {
>     p.name
>   }
>   def eval(p: Student): String = {
>     "student#" + p.name
>   }
> }
> class People(val name: String)
> class Student(name: String) extends People(name)
> class GraduatedStudent(name: String) extends Student(name)
> {code}
> Queries such as the following will compile failed with error msg "Found 
> multiple 'eval' methods which match the signature."
>  
> {code:java}
> val udf = new Func21
> val table = ...
> table.select(udf(new GraduatedStudent("test"))) {code}
> That's because overload is not supported in user defined function currently. 
> I think it will make sense to support overload following the java language 
> specification in section 
> [15.2|https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.12].
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to