hequn8128 commented on a change in pull request #9886:
[FLINK-14027][python][doc] Add documentation for Python user-defined functions.
URL: https://github.com/apache/flink/pull/9886#discussion_r335262675
##########
File path: docs/dev/table/udfs.md
##########
@@ -112,50 +152,63 @@ public class HashCode extends ScalarFunction {
}
'''
+class PyHashCode(ScalarFunction):
+ def __init__(self):
+ self.factor = 12
+
+ def eval(self, s):
+ return hash(s) * self.factor
+
table_env = BatchTableEnvironment.create(env)
-# register the java function
+# register the Java function
table_env.register_java_function("hashCode", "my.java.function.HashCode")
+# register the Python function
+table_env.register_function("py_hash_code", udf(PyHashCode(),
DataTypes.BIGINT(), DataTypes.BIGINT()))
+
# use the function in Python Table API
-my_table.select("string, string.hashCode(), hashCode(string)")
+my_table.select("string, bigint, string.hashCode(), hashCode(string),
bigint.py_hash_code(), py_hash_code(bigint)")
# use the function in SQL API
-table_env.sql_query("SELECT string, hashCode(string) FROM MyTable")
+table_env.sql_query("SELECT string, bigint, hashCode(string),
py_hash_code(bigint) FROM MyTable")
{% endhighlight %}
-</div>
-</div>
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+There are many ways to define a Python scalar function besides extending the
base class `ScalarFunction`. The following example shows the different ways to
define a Python scalar function which takes two columns of bigint as input
parameters and returns the sum of them as the result.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+{% highlight python %}
+# option 1: extending the base class `ScalarFunction`
+class Add(ScalarFunction):
+ def eval(self, i, j):
+ return i + j
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-public static class TimestampModifier extends ScalarFunction {
- public long eval(long t) {
- return t % 1000;
- }
+add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
- public TypeInformation<?> getResultType(Class<?>[] signature) {
- return Types.SQL_TIMESTAMP;
- }
-}
-{% endhighlight %}
-</div>
+# option 2: Python function
+@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
+def add(i, j):
+ return i + j
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-object TimestampModifier extends ScalarFunction {
- def eval(t: Long): Long = {
- t % 1000
- }
+# option 3: lambda function
+add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
DataTypes.BIGINT())
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
{
- Types.TIMESTAMP
- }
-}
+# option 4: callable function
+class CallableAdd(object):
+ def __call__(self, i, j):
+ return i + j
+
+add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()],
DataTypes.BIGINT())
+
+# option 5: partial function
+def partial_add(i, j):
Review comment:
define it to `def partial_add(i, j, k):` so that we can pass two bigint and
one constant to it. Just as the example below shows.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services