Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3809#discussion_r114521448
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
    @@ -67,52 +68,89 @@ object UserDefinedFunctionUtils {
       }
     
       // 
----------------------------------------------------------------------------------------------
    -  // Utilities for eval methods
    +  // Utilities for user-defined methods
       // 
----------------------------------------------------------------------------------------------
     
       /**
    -    * Returns signatures matching the given signature of 
[[TypeInformation]].
    +    * Returns signatures of eval methods matching the given signature of 
[[TypeInformation]].
         * Elements of the signature can be null (act as a wildcard).
         */
    -  def getSignature(
    -      function: UserDefinedFunction,
    -      signature: Seq[TypeInformation[_]])
    +  def getEvalMethodSignature(
    +    function: UserDefinedFunction,
    +    signature: Seq[TypeInformation[_]])
         : Option[Array[Class[_]]] = {
    -    getEvalMethod(function, signature).map(_.getParameterTypes)
    +    getUserDefinedMethod(function, "eval", 
typeInfoToClass(signature)).map(_.getParameterTypes)
       }
     
       /**
    -    * Returns eval method matching the given signature of 
[[TypeInformation]].
    +    * Returns signatures of accumulate methods matching the given 
signature of [[TypeInformation]].
    +    * Elements of the signature can be null (act as a wildcard).
         */
    -  def getEvalMethod(
    -      function: UserDefinedFunction,
    +  def getAccumulateMethodSignature(
    +      function: AggregateFunction[_, _],
           signature: Seq[TypeInformation[_]])
    +  : Option[Array[Class[_]]] = {
    +    val accType = TypeExtractor.createTypeInfo(
    +      function, classOf[AggregateFunction[_, _]], function.getClass, 1)
    +    val input = (Array(accType) ++ signature).toSeq
    +    getUserDefinedMethod(
    +      function,
    +      "accumulate",
    +      typeInfoToClass(input)).map(_.getParameterTypes)
    +  }
    +
    +  def getParameterTypes(
    +      function: UserDefinedFunction,
    +      signature: Array[Class[_]]): Array[TypeInformation[_]] = {
    +    signature.map { c =>
    +      try {
    +        TypeExtractor.getForClass(c)
    +      } catch {
    +        case ite: InvalidTypesException =>
    +          throw new ValidationException(
    +            s"Parameter types of function 
'${function.getClass.getCanonicalName}' cannot be " +
    +              s"automatically determined. Please provide type information 
manually.")
    +      }
    +    }
    +  }
    +
    +  /**
    +    * Returns user defined method matching the given name and signature.
    +    *
    +    * @param function        function instance
    +    * @param methodName      method name
    +    * @param methodSignature an array of raw Java classes. We compare the 
raw Java classes not the
    +    *                        TypeInformation. TypeInformation does not 
matter during runtime (e.g.
    +    *                        within a MapFunction)
    +    */
    +  def getUserDefinedMethod(
    +      function: UserDefinedFunction,
    +      methodName: String,
    +      methodSignature: Array[Class[_]])
         : Option[Method] = {
    -    // We compare the raw Java classes not the TypeInformation.
    -    // TypeInformation does not matter during runtime (e.g. within a 
MapFunction).
    -    val actualSignature = typeInfoToClass(signature)
    -    val evalMethods = checkAndExtractEvalMethods(function)
     
    -    val filtered = evalMethods
    -      // go over all eval methods and filter out matching methods
    +    val methods = checkAndExtractMethods(function, methodName)
    +
    +    val filtered = methods
    +      // go over all the methods and filter out matching methods
           .filter {
             case cur if !cur.isVarArgs =>
               val signatures = cur.getParameterTypes
               // match parameters of signature to actual parameters
    -          actualSignature.length == signatures.length &&
    +          methodSignature.length == signatures.length &&
                 signatures.zipWithIndex.forall { case (clazz, i) =>
    --- End diff --
    
    can be simplified to `signatures.zip(methodSignature).forall { case (s, m) 
=> parameterTypeEquals(m, s)}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to