[ 
https://issues.apache.org/jira/browse/FLINK-5571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869049#comment-15869049
 ] 

ASF GitHub Bot commented on FLINK-5571:
---------------------------------------

Github user godfreyhe commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3176#discussion_r101437939
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -122,6 +123,18 @@ class CodeGenerator(
       // we use a LinkedHashSet to keep the insertion order
       private val reusableInitStatements = mutable.LinkedHashSet[String]()
     
    +  // generate RichFunction(e.g. RichFlatMapFunction) if true
    +  // generate Function(e.g. FlatMapFunction) if false
    +  private var generatedRichFunction = false
    --- End diff --
    
    OK


> add open and close methods for UserDefinedFunction in TableAPI & SQL
> --------------------------------------------------------------------
>
>                 Key: FLINK-5571
>                 URL: https://issues.apache.org/jira/browse/FLINK-5571
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: godfrey he
>            Assignee: godfrey he
>
> Currently, a User Defined Function (UDF) in table API & SQL works on zero, 
> one, or multiple values in custom evaluation method. Many UDFs need more 
> complex features, e.g. report metrics, get parameters from job configuration, 
> or get extra data from distribute cache file, etc. Adding open and close 
> methods in UserDefinedFunction class can solve this problem. The code cloud 
> look like:
> {code}
> trait UserDefinedFunction {
>   def open(context: UDFContext): Unit = {}
>   def close(): Unit = {}
> }
> {code}
> UDFContext contains the information about metric reporters, job parameters, 
> distribute cache, etc. The code cloud look like:
> {code}
> class UDFContext(context: RuntimeContext) {
>   def getMetricGroup: MetricGroup = ???
>   def getDistributedCacheFile(name: String): File = ???
>   def getJobParameter(key: String, default: String): String = ???
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to