[ 
https://issues.apache.org/jira/browse/FLINK-4469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713897#comment-15713897
 ] 

ASF GitHub Bot commented on FLINK-4469:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2653#discussion_r90582259
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
 ---
    @@ -0,0 +1,121 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.functions
    +
    +import java.util
    +
    +import org.apache.flink.api.common.functions.InvalidTypesException
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.TypeExtractor
    +import org.apache.flink.api.table.ValidationException
    +
    +/**
    +  * Base class for a user-defined table function (UDTF). A user-defined 
table functions works on
    +  * zero, one, or multiple scalar values as input and returns multiple 
rows as output.
    +  *
    +  * The behavior of a [[TableFunction]] can be defined by implementing a 
custom evaluation
    +  * method. An evaluation method must be declared publicly and named 
"eval". Evaluation methods
    +  * can also be overloaded by implementing multiple methods named "eval".
    +  *
    +  * User-defined functions must have a default constructor and must be 
instantiable during runtime.
    +  *
    +  * By default the result type of an evaluation method is determined by 
Flink's type extraction
    +  * facilities. This is sufficient for basic types or simple POJOs but 
might be wrong for more
    +  * complex, custom, or composite types. In these cases 
[[TypeInformation]] of the result type
    +  * can be manually defined by overriding [[getResultType()]].
    +  *
    +  * Internally, the Table/SQL API code generation works with primitive 
values as much as possible.
    +  * If a user-defined table function should not introduce much overhead 
during runtime, it is
    +  * recommended to declare parameters and result types as primitive types 
instead of their boxed
    +  * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
    +  *
    +  * Example:
    +  *
    +  * {{{
    +  *
    +  *   public class Split extends TableFunction<String> {
    +  *
    +  *     // implement an "eval" method with several parameters you want
    +  *     public void eval(String str) {
    +  *       for (String s : str.split(" ")) {
    +  *         collect(s);   // use collect(...) to emit an output row
    +  *       }
    +  *     }
    +  *
    +  *     // can overloading eval method here ...
    +  *   }
    +  *
    +  *   val tEnv: TableEnvironment = ...
    +  *   val table: Table = ...    // schema: [a: String]
    +  *
    +  *   // for Scala users
    +  *   val split = new Split()
    +  *   table.crossApply(split('c) as ('s)).select('a, 's)
    +  *
    +  *   // for Java users
    +  *   tEnv.registerFunction("split", new Split())   // register table 
function first
    +  *   table.crossApply("split(a) as (s)").select("a, s")
    +  *
    +  *   // for SQL users
    +  *   tEnv.registerFunction("split", new Split())   // register table 
function first
    +  *   tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
    +  *
    +  * }}}
    +  *
    +  * @tparam T The type of the output row
    +  */
    +abstract class TableFunction[T] {
    +
    +  private val rows: util.ArrayList[T] = new util.ArrayList[T]()
    +
    +  /**
    +    * Emit an output row.
    +    *
    +    * @param row the output row
    +    */
    +  protected def collect(row: T): Unit = {
    +    // cache rows for now, maybe immediately process them further
    +    rows.add(row)
    +  }
    +
    +  /**
    +    * Internal use. Get an iterator of the buffered rows.
    +    */
    +  def getRowsIterator = rows.iterator()
    +
    +  /**
    +    * Internal use. Clear buffered rows.
    +    */
    +  def clear() = rows.clear()
    +
    +  // 
----------------------------------------------------------------------------------------------
    +
    +  /**
    +    * Returns the result type of the evaluation method with a given 
signature.
    +    *
    +    * This method needs to be overriden in case Flink's type extraction 
facilities are not
    +    * sufficient to extract the [[TypeInformation]] based on the return 
type of the evaluation
    +    * method. Flink's type extraction facilities can handle basic types or
    +    * simple POJOs but might be wrong for more complex, custom, or 
composite types.
    +    *
    +    * @return [[TypeInformation]] of result type or null if Flink should 
determine the type
    +    */
    +  def getResultType: TypeInformation[T] = null
    --- End diff --
    
    I removed `getParameterTypes()` from `TableFunction` intentionally. Because 
it is never used in current implementation, and it is a little complex and 
needs sufficient tests to support it. I would like to support it in next JIRA, 
and I created [FLINK-5225](https://issues.apache.org/jira/browse/FLINK-5225) 
just now.
    
    This needs to implement a custom Calcite's TableFunction and override 
`getParameters()`. But currently, the `FlinkTableFunctionImpl` extends 
`ReflectiveFunctionBase` which determines the parameter types of the eval 
method.


> Add support for user defined table function in Table API & SQL
> --------------------------------------------------------------
>
>                 Key: FLINK-4469
>                 URL: https://issues.apache.org/jira/browse/FLINK-4469
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>
> Normal user-defined functions, such as concat(), take in a single input row 
> and output a single output row. In contrast, table-generating functions 
> transform a single input row to multiple output rows. It is very useful in 
> some cases, such as look up in HBase by rowkey and return one or more rows.
> Adding a user defined table function should:
> 1. inherit from UDTF class with specific generic type T
> 2. define one or more evel function. 
> NOTE: 
> 1. the eval method must be public and non-static.
> 2. the generic type T is the row type returned by table function. Because of 
> Java type erasure, we can’t extract T from the Iterable.
> 3. use {{collect(T)}} to emit table row
> 4. eval method can be overload. Blink will choose the best match eval method 
> to call according to parameter types and number.
> {code}
> public class Word {
>   public String word;
>   public Integer length;
> }
> public class SplitStringUDTF extends UDTF<Word> {
>     public Iterable<Word> eval(String str) {
>         if (str != null) {
>             for (String s : str.split(",")) {
>                 collect(new Word(s, s.length()));
>             }
>         }
>     }
> }
> // in SQL
> tableEnv.registerFunction("split", new SplitStringUDTF())
> tableEnv.sql("SELECT a, b, t.* FROM MyTable, LATERAL TABLE(split(c)) AS 
> t(w,l)")
> // in Java Table API
> tableEnv.registerFunction("split", new SplitStringUDTF())
> // rename split table columns to “w” and “l”
> table.crossApply("split(c) as (w, l)")        
>      .select("a, b, w, l")
> // without renaming, we will use the origin field names in the POJO/case/...
> table.crossApply("split(c)")
>      .select("a, b, word, length")
> // in Scala Table API
> val split = new SplitStringUDTF()
> table.crossApply(split('c) as ('w, 'l))
>      .select('a, 'b, 'w, 'l)
> // outerApply for outer join to a UDTF
> table.outerApply(split('c))
>      .select('a, 'b, 'word, 'length)
> {code}
> See [1] for more information about UDTF design.
> [1] 
> https://docs.google.com/document/d/15iVc1781dxYWm3loVQlESYvMAxEzbbuVFPZWBYuY1Ek/edit#



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to