[
https://issues.apache.org/jira/browse/FLINK-10958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16698441#comment-16698441
]
ASF GitHub Bot commented on FLINK-10958:
----------------------------------------
asfgit closed pull request #7152: [FLINK-10958] [table] Add overload support
for user defined function
URL: https://github.com/apache/flink/pull/7152
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index c9a27036797..4faa6ed05ce 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -151,17 +151,17 @@ object UserDefinedFunctionUtils {
// match parameters of signature to actual parameters
methodSignature.length == signatures.length &&
signatures.zipWithIndex.forall { case (clazz, i) =>
- parameterTypeEquals(methodSignature(i), clazz)
+ parameterTypeApplicable(methodSignature(i), clazz)
}
case cur if cur.isVarArgs =>
val signatures = cur.getParameterTypes
methodSignature.zipWithIndex.forall {
// non-varargs
case (clazz, i) if i < signatures.length - 1 =>
- parameterTypeEquals(clazz, signatures(i))
+ parameterTypeApplicable(clazz, signatures(i))
// varargs
case (clazz, i) if i >= signatures.length - 1 =>
- parameterTypeEquals(clazz, signatures.last.getComponentType)
+ parameterTypeApplicable(clazz, signatures.last.getComponentType)
} || (methodSignature.isEmpty && signatures.length == 1) // empty
varargs
}
@@ -171,14 +171,45 @@ object UserDefinedFunctionUtils {
fixedMethodsCount > 0 && !cur.isVarArgs ||
fixedMethodsCount == 0 && cur.isVarArgs
}
+ val maximallySpecific = if (found.length > 1) {
+ implicit val methodOrdering = new scala.Ordering[Method] {
+ override def compare(x: Method, y: Method): Int = {
+ def specificThan(left: Method, right: Method) = {
+ // left parameter type is more specific than right parameter type
+ left.getParameterTypes.zip(right.getParameterTypes).forall {
+ case (leftParameterType, rightParameterType) =>
+ parameterTypeApplicable(leftParameterType, rightParameterType)
+ } &&
+ // non-equal
+ left.getParameterTypes.zip(right.getParameterTypes).exists {
+ case (leftParameterType, rightParameterType) =>
+ !parameterTypeEquals(leftParameterType, rightParameterType)
+ }
+ }
+
+ if (specificThan(x, y)) {
+ 1
+ } else if (specificThan(y, x)) {
+ -1
+ } else {
+ 0
+ }
+ }
+ }
+
+ val max = found.max
+ found.filter(methodOrdering.compare(max, _) == 0)
+ } else {
+ found
+ }
// check if there is a Scala varargs annotation
- if (found.isEmpty &&
+ if (maximallySpecific.isEmpty &&
methods.exists { method =>
val signatures = method.getParameterTypes
signatures.zipWithIndex.forall {
case (clazz, i) if i < signatures.length - 1 =>
- parameterTypeEquals(methodSignature(i), clazz)
+ parameterTypeApplicable(methodSignature(i), clazz)
case (clazz, i) if i == signatures.length - 1 =>
clazz.getName.equals("scala.collection.Seq")
}
@@ -186,11 +217,11 @@ object UserDefinedFunctionUtils {
throw new ValidationException(
s"Scala-style variable arguments in '$methodName' methods are not
supported. Please " +
s"add a @scala.annotation.varargs annotation.")
- } else if (found.length > 1) {
+ } else if (maximallySpecific.length > 1) {
throw new ValidationException(
s"Found multiple '$methodName' methods which match the signature.")
}
- found.headOption
+ maximallySpecific.headOption
}
/**
@@ -719,10 +750,14 @@ object UserDefinedFunctionUtils {
* Compares parameter candidate classes with expected classes. If true, the
parameters match.
* Candidate can be null (acts as a wildcard).
*/
+ private def parameterTypeApplicable(candidate: Class[_], expected:
Class[_]): Boolean =
+ parameterTypeEquals(candidate, expected) ||
+ ((expected != null && expected.isAssignableFrom(candidate)) ||
+ expected.isPrimitive &&
Primitives.wrap(expected).isAssignableFrom(candidate))
+
private def parameterTypeEquals(candidate: Class[_], expected: Class[_]):
Boolean =
candidate == null ||
candidate == expected ||
- expected == classOf[Object] ||
expected.isPrimitive && Primitives.wrap(expected) == candidate ||
// time types
candidate == classOf[Date] && (expected == classOf[Int] || expected ==
classOf[JInt]) ||
@@ -730,8 +765,7 @@ object UserDefinedFunctionUtils {
candidate == classOf[Timestamp] && (expected == classOf[Long] || expected
== classOf[JLong]) ||
// arrays
(candidate.isArray && expected.isArray &&
- (candidate.getComponentType == expected.getComponentType ||
- expected.getComponentType == classOf[Object]))
+ (candidate.getComponentType == expected.getComponentType))
/**
* Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 912bb047cba..29de7e0f9d1 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -310,6 +310,26 @@ class Func20 extends ScalarFunction {
}
}
+object Func21 extends ScalarFunction {
+ def eval(p: People): String = {
+ p.name
+ }
+
+ def eval(p: Student): String = {
+ "student#" + p.name
+ }
+}
+
+object Func22 extends ScalarFunction {
+ def eval(a: Array[People]): String = {
+ a.head.name
+ }
+
+ def eval(a: Array[Student]): String = {
+ "student#" + a.head.name
+ }
+}
+
class SplitUDF(deterministic: Boolean) extends ScalarFunction {
def eval(x: String, sep: String, index: Int): String = {
val splits = StringUtils.splitByWholeSeparator(x, sep)
@@ -321,3 +341,9 @@ class SplitUDF(deterministic: Boolean) extends
ScalarFunction {
}
override def isDeterministic: Boolean = deterministic
}
+
+class People(val name: String)
+
+class Student(name: String) extends People(name)
+
+class GraduatedStudent(name: String) extends Student(name)
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 9d29018fdb3..e433742fb67 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -23,7 +23,7 @@ import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2,
SplitUDF}
+import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData,
UserDefinedFunctionTestUtils}
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.types.Row
@@ -350,4 +350,62 @@ class CalcITCase extends AbstractTestBase {
"{9=Comment#3}")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+
+ @Test
+ def testOverload(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+
+ val testData = new mutable.MutableList[GraduatedStudent]
+ testData.+=(new GraduatedStudent("Jack#22"))
+ testData.+=(new GraduatedStudent("John#19"))
+ testData.+=(new GraduatedStudent("Anna#44"))
+ testData.+=(new GraduatedStudent("nosharp"))
+
+ val t = env.fromCollection(testData).toTable(tEnv).as('a)
+
+ val result = t.select(Func21('a))
+
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "student#Jack#22",
+ "student#John#19",
+ "student#Anna#44",
+ "student#nosharp"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testOverloadWithArray(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+
+ val testData = new mutable.MutableList[Array[GraduatedStudent]]
+ testData.+=(Array(new GraduatedStudent("Jack#22")))
+ testData.+=(Array(new GraduatedStudent("John#19")))
+ testData.+=(Array(new GraduatedStudent("Anna#44")))
+ testData.+=(Array(new GraduatedStudent("nosharp")))
+
+ val t = env.fromCollection(testData).toTable(tEnv).as('a)
+
+ val result = t.select(Func22('a))
+
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "student#Jack#22",
+ "student#John#19",
+ "student#Anna#44",
+ "student#nosharp"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add overload support for user defined function
> -----------------------------------------------
>
> Key: FLINK-10958
> URL: https://issues.apache.org/jira/browse/FLINK-10958
> Project: Flink
> Issue Type: Task
> Components: Table API & SQL
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
>
> Currently overload is not supported in user defined function and given the
> following UDF
> {code:java}
> class Func21 extends ScalarFunction {
> def eval(p: People): String = {
> p.name
> }
> def eval(p: Student): String = {
> "student#" + p.name
> }
> }
> class People(val name: String)
> class Student(name: String) extends People(name)
> class GraduatedStudent(name: String) extends Student(name)
> {code}
> Queries such as the following will compile failed with error msg "Found
> multiple 'eval' methods which match the signature."
>
> {code:java}
> val udf = new Func21
> val table = ...
> table.select(udf(new GraduatedStudent("test"))) {code}
> That's because overload is not supported in user defined function currently.
> I think it will make sense to support overload following the java language
> specification in section
> [15.2|https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.12].
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)