hequn8128 commented on a change in pull request #11531: 
[FLINK-16748][python][doc] Add Python UDTF doc
URL: https://github.com/apache/flink/pull/11531#discussion_r407956730
 
 

 ##########
 File path: docs/dev/table/python/python_udfs.md
 ##########
 @@ -125,3 +125,105 @@ table_env.register_function("add", add)
 # use the function in Python Table API
 my_table.select("add(a, b)")
 {% endhighlight %}
+
+## Table Functions
+Similar to a Python user-defined scalar function, a user-defined table 
function takes zero, one, or 
+multiple scalar values as input parameters. However in contrast to a scalar 
function, it can return 
+an arbitrary number of rows as output instead of a single value. The return 
type of a Python UDTF 
+could be of types Iterable, Iterator or generator.
+
+<span class="label label-info">Note</span> Currently, Python UDTF is supported 
in old planner both under streaming and batch mode while is only supported 
under streaming mode in Blink planner.
+
+The following example shows how to define your own Python multi emit function, 
register it in the 
+TableEnvironment, and call it in a query.
+
+{% highlight python %}
+class Split(TableFunction):
+    def eval(self, string):
+        for s in string.split(" "):
+            yield s, len(s)
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+my_table = ...  # type: Table, table schema: [a: String]
+
+# register the Python Table Function
+table_env.register_function("split", udtf(Split(), DataTypes.STRING(), 
[DataTypes.STRING(), DataTypes.INT()]))
+
+# use the Python Table Function in Python Table API
+my_table.join_lateral("split(a) as (word, length)")
+my_table.left_outer_join_lateral("split(a) as (word, length)")
+
+# use the Python Table function in SQL API
+table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL 
TABLE(split(a)) as T(word, length)")
+table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL 
TABLE(split(a)) as T(word, length) ON TRUE")
+
+{% endhighlight %}
+
+
+It also supports to use Java/Scala table functions in Python Table API 
programs.
+{% highlight python %}
+'''
+Java code:
+
+// The generic type "Tuple2<String, Integer>" determines the schema of the 
returned table as (String, Integer).
+// The java class must have a public no-argument constructor and can be 
founded in current java classloader.
+public class Split extends TableFunction<Tuple2<String, Integer>> {
+    private String separator = " ";
+    
+    public void eval(String str) {
+        for (String s : str.split(separator)) {
+            // use collect(...) to emit a row
+            collect(new Tuple2<String, Integer>(s, s.length()));
+        }
+    }
+}
+'''
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+my_table = ...  # type: Table, table schema: [a: String]
+
+# Register the java function.
+table_env.register_java_function("split", "my.java.function.Split")
+
+# Use the table function in the Python Table API. "as" specifies the field 
names of the table.
+my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
+my_table.left_outer_join_lateral("split(a) as (word, length)").select("a, 
word, length")
+
+# Register the python function.
+
+# Use the table function in SQL with LATERAL and TABLE keywords.
+# CROSS JOIN a table function (equivalent to "join" in Table API).
+table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL 
TABLE(split(a)) as T(word, length)")
+# LEFT JOIN a table function (equivalent to "left_outer_join" in Table API).
+table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL 
TABLE(split(a)) as T(word, length) ON TRUE")
+{% endhighlight %}
+
+Like Python scalar functions, you can use the above five ways to define Python 
TableFunctions.
+
+<span class="label label-info">Note</span> The only difference is that the 
return type of Python Table Functions needs to be an iterable, iterator or 
generator.
+
+{% highlight python %}
+# option 1: return iterable
+@udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
+def iterable_func(x):
+      result = [1, 2, 3]
+      return result
+
+# option 2: return iterator
+@udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
+def iterator_func(x):
+      return range(5)
+
+# option 3: generator function
+@udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
 
 Review comment:
   Move this as option1, it can avoid creating temporary buffers, thus should 
be preferred. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to