sjwiesman commented on a change in pull request #12660:
URL: https://github.com/apache/flink/pull/12660#discussion_r440277236
##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -1416,15 +1862,6 @@ tab
</div>
-{% top %}
-
-Best Practices for Implementing UDFs
-------------------------------------
-
-The Table API and SQL code generation internally tries to work with primitive
values as much as possible. A user-defined function can introduce much overhead
through object creation, casting, and (un)boxing. Therefore, it is highly
recommended to declare parameters and result types as primitive types instead
of their boxed classes. `Types.DATE` and `Types.TIME` can also be represented
as `int`. `Types.TIMESTAMP` can be represented as `long`.
-
-We recommended that user-defined functions should be written by Java instead
of Scala as Scala types pose a challenge for Flink's type extractor.
-
Review comment:
Its below the diff, but can you change the code example in "Integrating
UDFs with the Runtime" to use `TableEnvironment` instead of
`BatchTableEnvironment`?
##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -22,322 +22,768 @@ specific language governing permissions and limitations
under the License.
-->
-User-defined functions are an important feature, because they significantly
extend the expressiveness of queries.
+User-defined functions (UDFs) are extension points to call frequently used
logic or custom logic that cannot be expressed otherwise in queries.
+
+User-defined functions can be implemented in a JVM language (such as Java or
Scala) or Python. An implementer can use arbitrary third party libraries within
a UDF. This page will focus on JVM-based languages.
* This will be replaced by the TOC
{:toc}
-Register User-Defined Functions
--------------------------------
-In most cases, a user-defined function must be registered before it can be
used in an query. It is not necessary to register functions for the Scala Table
API.
-
-Functions are registered at the `TableEnvironment` by calling a
`registerFunction()` method. When a user-defined function is registered, it is
inserted into the function catalog of the `TableEnvironment` such that the
Table API or SQL parser can recognize and properly translate it.
+Overview
+--------
-Please find detailed examples of how to register and how to call each type of
user-defined function
-(`ScalarFunction`, `TableFunction`, and `AggregateFunction`) in the following
sub-sessions.
+Currently, Flink distinguishes between the following kinds of functions:
+- *Scalar functions* map scalar values to a new scalar value.
+- *Table functions* map scalar values to new rows.
+- *Aggregate functions* map scalar values of multiple rows to a new scalar
value.
+- *Table aggregate functions* map scalar values of multiple rows to new rows.
+- *Async table functions* are special functions for table sources that perform
a lookup.
-{% top %}
+<span class="label label-danger">Attention</span> Scalar and table functions
have been updated to the new type system based on [data types](../types.html).
Aggregating functions still use the old type system based on `TypeInformation`.
-Scalar Functions
-----------------
+The following example shows how to create a simple scalar function and how to
call the function in both Table API and SQL.
-If a required scalar function is not contained in the built-in functions, it
is possible to define custom, user-defined scalar functions for both the Table
API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar
values to a new scalar value.
+For SQL queries, a function must always be registered under a name. For Table
API, a function can be registered or directly used _inline_.
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a scalar function is determined by
the evaluation method. An evaluation method must be declared publicly and named
`eval`. The parameter types and return type of the evaluation method also
determine the parameter and return types of the scalar function. Evaluation
methods can also be overloaded by implementing multiple methods named `eval`.
Evaluation methods can also support variable arguments, such as `eval(String...
strs)`.
-
-The following example shows how to define your own hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public class HashCode extends ScalarFunction {
- private int factor = 12;
-
- public HashCode(int factor) {
- this.factor = factor;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define function logic
+public static class SubstringFunction extends ScalarFunction {
+ public String eval(String s, Integer begin, Integer end) {
+ return s.substring(begin, end);
}
-
- public int eval(String s) {
- return s.hashCode() * factor;
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction",
SubstringFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define function logic
+class SubstringFunction extends ScalarFunction {
+ def eval(s: String, begin: Integer, end: Integer): String = {
+ s.substring(begin, end)
}
}
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+val env = TableEnvironment.create(...)
-// register the function
-tableEnv.registerFunction("hashCode", new HashCode(10));
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
-// use the function in Java Table API
-myTable.select("string, string.hashCode(), hashCode(string)");
+// register function
+env.createTemporarySystemFunction("SubstringFunction",
classOf[SubstringFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
-// use the function in SQL API
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
{% endhighlight %}
+</div>
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+</div>
+
+For interactive sessions, it is also possible to parameterize functions before
using or
+registering them. In this case, function _instances_ instead of function
_classes_ can be
+used as temporary functions.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+It requires that the parameters are serializable for shipping
+function instances to the cluster.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public static class TimestampModifier extends ScalarFunction {
- public long eval(long t) {
- return t % 1000;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define parameterizable function logic
+public static class SubstringFunction extends ScalarFunction {
+
+ private boolean endInclusive;
+
+ public SubstringFunction(boolean endInclusive) {
+ this.endInclusive = endInclusive;
}
- public TypeInformation<?> getResultType(Class<?>[] signature) {
- return Types.SQL_TIMESTAMP;
+ public String eval(String s, Integer begin, Integer end) {
+ return s.substring(a, endInclusive ? end + 1 : end);
}
}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a scalar function is determined by
the evaluation method. An evaluation method must be declared publicly and named
`eval`. The parameter types and return type of the evaluation method also
determine the parameter and return types of the scalar function. Evaluation
methods can also be overloaded by implementing multiple methods named `eval`.
Evaluation methods can also support variable arguments, such as `@varargs def
eval(str: String*)`.
+TableEnvironment env = TableEnvironment.create(...);
-The following example shows how to define your own hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5,
12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new
SubstringFunction(true));
+
+{% endhighlight %}
+</div>
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-// must be defined in static/object context
-class HashCode(factor: Int) extends ScalarFunction {
- def eval(s: String): Int = {
- s.hashCode() * factor
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define parameterizable function logic
+class SubstringFunction(val endInclusive) extends ScalarFunction {
+ def eval(s: String, begin: Integer, end: Integer): String = {
+ s.substring(endInclusive ? end + 1 : end)
}
}
-val tableEnv = BatchTableEnvironment.create(env)
+val env = TableEnvironment.create(...)
-// use the function in Scala Table API
-val hashCode = new HashCode(10)
-myTable.select('string, hashCode('string))
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5,
12))
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new
SubstringFunction(true))
-// register and use the function in SQL
-tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
{% endhighlight %}
+</div>
+
+</div>
+
+{% top %}
+
+Implementation Guide
+--------------------
+
+<span class="label label-danger">Attention</span> This section only applies to
scalar and table functions for now; until aggregate functions have been updated
to the new type system.
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+Independent of the kind of function, all user-defined functions follow some
basic implementation principles.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+### Function Class
+An implementation class must extend from one of the available base classes
(e.g. `org.apache.flink.table.functions.ScalarFunction`).
+
+The class must be declared `public`, not `abstract`, and should be globally
accessible. Thus, non-static inner or anonymous classes are not allowed.
+
+For storing a user-defined function in a persistent catalog, the class must
have a default constructor and must be instantiable during runtime.
+
+### Evaluation Methods
+
+The base class provides a set of methods that can be overridden such as
`open()`, `close()`, or `isDeterministic()`.
+
+However, in addition to those declared methods, the main runtime logic that is
applied to every incoming record must be implemented through specialized
_evaluation methods_.
+
+Depending on the function kind, evaluation methods such as `eval()`,
`accumulate()`, or `retract()` are called by code-generated operators during
runtime.
+
+The methods must be declared `public` and take a well-defined set of arguments.
+
+Regular JVM method calling semantics apply. Therefore, it is possible to:
+- implement overloaded methods such as `eval(Integer)` and
`eval(LocalDateTime)`,
+- use var-args such as `eval(Integer...)`,
+- use object inheritance such as `eval(Object)` that takes both
`LocalDateTime` and `Integer`,
+- and combinations of the above such as `eval(Object...)` that takes all kinds
of arguments.
+
+The following snippets shows an example of an overloaded function:
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.functions.ScalarFunction;
+
+// function with overloaded evaluation methods
+public static class SumFunction extends ScalarFunction {
+
+ public Integer eval(Integer a, Integer b) {
+ return a + b;
+ }
+
+ public Integer eval(String a, String b) {
+ return Integer.valueOf(a) + Integer.valueOf();
+ }
+
+ public Integer eval(Double... d) {
+ double result = 0;
+ for (double value : d)
+ result += value;
+ return (int) result;
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-object TimestampModifier extends ScalarFunction {
- def eval(t: Long): Long = {
- t % 1000
+import org.apache.flink.table.functions.ScalarFunction
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class SumFunction extends ScalarFunction {
+
+ def eval(a: Integer, b: Integer): Integer = {
+ a + b
}
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
{
- Types.TIMESTAMP
+ def eval(a: String, b: String): Integer = {
+ Integer.valueOf(a) + Integer.valueOf(b)
+ }
+
+ @varargs // generate var-args like Java
+ def eval(d: Double*): Integer = {
+ d.sum.toInt
}
}
+
{% endhighlight %}
</div>
-<div data-lang="python" markdown="1">
-In order to define a Python scalar function, one can extend the base class
`ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The
behavior of a Python scalar function is determined by the evaluation method
which is named `eval`.
+</div>
-The following example shows how to define your own Python hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+### Type Inference
-{% highlight python %}
-class HashCode(ScalarFunction):
- def __init__(self):
- self.factor = 12
+The table ecosystem (similar to the SQL standard) is a strongly typed API.
Therefore, both function parameters and return types must be mapped to a [data
type](../types.html).
+
+From a logical perspective, the planner needs information about expected
types, precision, and scale. From a JVM perspective, the planner needs
information about how internal data structures are represented as JVM objects
when calling a user-defined function.
+
+The logic for validating input arguments and deriving data types for both the
parameters and the result of a function is summarized under the term _type
inference_.
+
+Flink's user-defined functions implement an automatic type inference
extraction that derives data types from the function's class and its evaluation
methods via reflection. If this implicit reflective extraction approach is not
successful, the extraction process can be supported by annotating affected
parameters, classes, or methods with `@DataTypeHint` and `@FunctionHint`. More
examples on how to annotate functions are shown below.
+
+If more advanced type inference logic is required, an implementer can
explicitly override the `getTypeInference()` method in every user-defined
function. However, the annotation approach is recommended because it keeps
custom type inference logic close to the affected locations and falls back to
the default behavior for the remaining implementation.
+
+#### Automatic Type Inference
+
+The automatic type inference inspects the function's class and evaluation
methods to derive data types for the arguments and result of a function.
`@DataTypeHint` and `@FunctionHint` annotations support the automatic
extraction.
+
+For a full list of classes that can be implicitly mapped to a data type, see
the [data type section](../types.html#data-type-annotations).
+
+**`@DataTypeHint`**
+
+In many scenarios, it is required to support the automatic extraction _inline_
for paramaters and return types of a function
+
+The following example shows how to use data type hints. More information can
be found in the documentation of the annotation class.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+public static class OverloadedFunction extends ScalarFunction {
- def eval(self, s):
- return hash(s) * self.factor
+ // no hint required
+ public Long eval(long a, long b) {
+ return a + b;
+ }
-table_env = BatchTableEnvironment.create(env)
+ // define the precision and scale of a decimal
+ public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
+ return BigDecimal.valueOf(a + b);
+ }
-# register the Python function
-table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(),
DataTypes.BIGINT()))
+ // define a nested data type
+ @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ public Row eval(int i) {
+ return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
+ }
-# use the function in Python Table API
-my_table.select("string, bigint, string.hash_code(), hash_code(string)")
+ // allow wildcard input and customly serialized output
+ @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
+ public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ return MyUtils.serializeToByteBuffer(o);
+ }
+}
-# use the function in SQL API
-table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class OverloadedFunction extends ScalarFunction {
+
+ // no hint required
+ def eval(a: Long, b: Long): Long = {
+ a + b
+ }
+
+ // define the precision and scale of a decimal
+ @DataTypeHint("DECIMAL(12, 3)")
+ def eval(double a, double b): BigDecimal = {
+ java.lang.BigDecimal.valueOf(a + b)
+ }
+
+ // define a nested data type
+ @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ def eval(Int i): Row = {
+ Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
+ }
-There are many ways to define a Python scalar function besides extending the
base class `ScalarFunction`.
-Please refer to the [Python Scalar Function]({{ site.baseurl
}}/dev/table/python/python_udfs.html#scalar-functions) documentation for more
details.
+ // allow wildcard input and customly serialized output
+ @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
+ def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o):
java.nio.ByteBuffer = {
+ MyUtils.serializeToByteBuffer(o)
+ }
+}
+
+{% endhighlight %}
</div>
+
</div>
-{% top %}
+**`@FunctionHint`**
-Table Functions
----------------
+In some scenarios, it is desirable that one evaluation method handles multiple
different data types at the same time. Furthermore, in some scenarios,
overloaded evaluation methods have a common result type that should be declared
only once.
-Similar to a user-defined scalar function, a user-defined table function takes
zero, one, or multiple scalar values as input parameters. However in contrast
to a scalar function, it can return an arbitrary number of rows as output
instead of a single value. The returned rows may consist of one or more
columns.
+The `@FunctionHint` annotation can provide a mapping from argument data types
to a result data type. It enables annotating entire function classes or
evaluation methods for input, accumulator, and result data types. One or more
annotations can be declared on top of a class or individually for each
evaluation method for overloading function signatures. All hint parameters are
optional. If a parameter is not defined, the default reflection-based
extraction is used. Hint parameters defined on top of a function class are
inherited by all evaluation methods.
+
+The following example shows how to use function hints. More information can be
found in the documentation of the annotation class.
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a table function one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a table function is determined by its
evaluation methods. An evaluation method must be declared `public` and named
`eval`. The `TableFunction` can be overloaded by implementing multiple methods
named `eval`. The parameter types of the evaluation methods determine all valid
parameters of the table function. Evaluation methods can also support variable
arguments, such as `eval(String... strs)`. The type of the returned table is
determined by the generic type of `TableFunction`. Evaluation methods emit
output rows using the protected `collect(T)` method.
-In the Table API, a table function is used with `.joinLateral` or
`.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from
the outer table (table on the left of the operator) with all rows produced by
the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table. In SQL use `LATERAL
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join
condition (see examples below).
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+public static class OverloadedFunction extends TableFunction<Row> {
+
+ public void eval(int a, int b) {
+ collect(Row.of("Sum", a + b));
+ }
-The following example shows how to define table-valued function, register it
in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+ // overloading of arguments is still possible
+ public void eval() {
+ collect(Row.of("Empty args", -1));
+ }
+}
-{% highlight java %}
-// The generic type "Tuple2<String, Integer>" determines the schema of the
returned table as (String, Integer).
-public class Split extends TableFunction<Tuple2<String, Integer>> {
- private String separator = " ";
-
- public Split(String separator) {
- this.separator = separator;
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+ input = [@DataTypeHint("INT"), @DataTypeHint("INT")],
+ output = @DataTypeHint("INT")
+)
+@FunctionHint(
+ input = [@DataTypeHint("LONG"), @DataTypeHint("LONG")],
+ output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+ input = [],
+ output = @DataTypeHint("BOOLEAN")
+)
+public static class OverloadedFunction extends TableFunction<Object> {
+
+ // an implementer just needs to make sure that a method exists
+ // that can be called by the JVM
+ public void eval(Object... o) {
+ if (o.length == 0) {
+ collect(false);
}
-
- public void eval(String str) {
- for (String s : str.split(separator)) {
- // use collect(...) to emit a row
- collect(new Tuple2<String, Integer>(s, s.length()));
- }
+ collect(o[0]);
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+class OverloadedFunction extends TableFunction[Row] {
+
+ def eval(a: Int, b: Int): Unit = {
+ collect(Row.of("Sum", Int.box(a + b)))
+ }
+
+ // overloading of arguments is still possible
+ def eval(): Unit = {
+ collect(Row.of("Empty args", Int.box(-1)))
+ }
+}
+
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+ input = Array(@DataTypeHint("INT"), @DataTypeHint("INT")),
+ output = @DataTypeHint("INT")
+)
+@FunctionHint(
+ input = Array(@DataTypeHint("LONG"), @DataTypeHint("LONG")),
+ output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+ input = Array(),
+ output = @DataTypeHint("BOOLEAN")
+)
+class OverloadedFunction extends TableFunction[AnyRef] {
+
+ // an implementer just needs to make sure that a method exists
+ // that can be called by the JVM
+ @varargs
+ def eval(o: AnyRef*) = {
+ if (o.length == 0) {
+ collect(Boolean.box(false))
}
+ collect(o(0))
+ }
}
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
-Table myTable = ... // table schema: [a: String]
-
-// Register the function.
-tableEnv.registerFunction("split", new Split("#"));
-
-// Use the table function in the Java Table API. "as" specifies the field
names of the table.
-myTable.joinLateral("split(a) as (word, length)")
- .select("a, word, length");
-myTable.leftOuterJoinLateral("split(a) as (word, length)")
- .select("a, word, length");
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
</div>
-Please note that POJO types do not have a deterministic field order.
Therefore, you cannot rename the fields of POJO returned by a table function
using `AS`.
+</div>
+
+#### Custom Type Inference
-By default the result type of a `TableFunction` is determined by Flink’s
automatic type extraction facilities. This works well for basic types and
simple POJOs but might be wrong for more complex, custom, or composite types.
In such a case, the type of the result can be manually specified by overriding
`TableFunction#getResultType()` which returns its `TypeInformation`.
+For most scenarios, `@DataTypeHint` and `@FunctionHint` should be sufficient
to model user-defined functions. However, by overriding the automatic type
inference defined in `getTypeInference()`, implementers can create arbitrary
functions that behave like built-in system functions.
-The following example shows an example of a `TableFunction` that returns a
`Row` type which requires explicit type information. We define that the
returned table type should be `RowTypeInfo(String, Integer)` by overriding
`TableFunction#getResultType()`.
+The following example implemented in Java illustrates the potential of a
custom type inference logic. It uses a string literal argument to determine the
result type of a function. The function takes two string arguments: the first
argument represents the string to be parsed, the second argument represents the
target type.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public class CustomTypeSplit extends TableFunction<Row> {
- public void eval(String str) {
- for (String s : str.split(" ")) {
- Row row = new Row(2);
- row.setField(0, s);
- row.setField(1, s.length());
- collect(row);
- }
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+public static class LiteralFunction extends ScalarFunction {
+ public Object eval(String s, String type) {
+ switch (type) {
+ case "INT":
+ return Integer.valueOf(s);
+ case "DOUBLE":
+ return Double.valueOf(s);
+ case "STRING":
+ default:
+ return s;
}
+ }
- @Override
- public TypeInformation<Row> getResultType() {
- return Types.ROW(Types.STRING(), Types.INT());
- }
+ // the automatic, reflection-based type inference is disabled and
+ // replaced by the following logic
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ // specify typed arguments
+ // parameters will be casted implicitly to those types if necessary
+ .typedArguments(DataTypes.STRING(), DataTypes.STRING())
+ // specify a strategy for the result data type of the function
+ .outputTypeStrategy(callContext -> {
+ if (!callContext.isArgumentLiteral(1) ||
callContext.isArgumentNull(1)) {
+ throw callContext.newValidationError("Literal expected for second
argument.");
+ }
+ // return a data type based on a literal
+ final String literal = callContext.getArgumentValue(1,
String.class).orElse("STRING");
+ switch (literal) {
+ case "INT":
+ return Optional.of(DataTypes.INT().notNull());
+ case "DOUBLE":
+ return Optional.of(DataTypes.DOUBLE().notNull());
+ case "STRING":
+ default:
+ return Optional.of(DataTypes.STRING());
+ }
+ })
+ .build();
+ }
}
+
{% endhighlight %}
+</div>
-<div data-lang="scala" markdown="1">
-In order to define a table function one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a table function is determined by its
evaluation methods. An evaluation method must be declared `public` and named
`eval`. The `TableFunction` can be overloaded by implementing multiple methods
named `eval`. The parameter types of the evaluation methods determine all valid
parameters of the table function. Evaluation methods can also support variable
arguments, such as `eval(String... strs)`. The type of the returned table is
determined by the generic type of `TableFunction`. Evaluation methods emit
output rows using the protected `collect(T)` method.
+</div>
-In the Table API, a table function is used with `.joinLateral` or
`.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from
the outer table (table on the left of the operator) with all rows produced by
the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table. In SQL use `LATERAL
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join
condition (see examples below).
+{% top %}
-The following example shows how to define table-valued function, register it
in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+Scalar Functions
+----------------
-{% highlight scala %}
-// The generic type "(String, Int)" determines the schema of the returned
table as (String, Integer).
-class Split(separator: String) extends TableFunction[(String, Int)] {
- def eval(str: String): Unit = {
- // use collect(...) to emit a row.
- str.split(separator).foreach(x => collect((x, x.length)))
+A user-defined scalar function maps zero, one, or multiple scalar values to a
new scalar value. Any data type listed in the [data types
section](../types.html) can be used as a parameter or return type of an
evaluation method.
+
+In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement one or
more evaluation methods named `eval(...)`.
+
+The following example shows how to define your own hash code function and call
it in a query. See the [Implementation Guide](#implementation-guide) for more
details.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class HashFunction extends ScalarFunction {
+
+ // take any data type and return INT
+ public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ return o.hashCode();
}
}
-val tableEnv = BatchTableEnvironment.create(env)
-val myTable = ... // table schema: [a: String]
-
-// Use the table function in the Scala Table API (Note: No registration
required in Scala Table API).
-val split = new Split("#")
-// "as" specifies the field names of the generated table.
-myTable.joinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
-myTable.leftOuterJoinLateral(split('a) as ('word, 'length)).select('a, 'word,
'length)
-
-// Register the table function to use it in SQL queries.
-tableEnv.registerFunction("split", new Split("#"))
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)")
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE")
-{% endhighlight %}
-**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object
is a singleton and will cause concurrency issues.
+TableEnvironment env = TableEnvironment.create(...);
-Please note that POJO types do not have a deterministic field order.
Therefore, you cannot rename the fields of POJO returned by a table function
using `AS`.
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(HashFunction.class, $("myField")));
-By default the result type of a `TableFunction` is determined by Flink’s
automatic type extraction facilities. This works well for basic types and
simple POJOs but might be wrong for more complex, custom, or composite types.
In such a case, the type of the result can be manually specified by overriding
`TableFunction#getResultType()` which returns its `TypeInformation`.
+// register function
+env.createTemporarySystemFunction("HashFunction", HashFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $("myField")));
-The following example shows an example of a `TableFunction` that returns a
`Row` type which requires explicit type information. We define that the
returned table type should be `RowTypeInfo(String, Integer)` by overriding
`TableFunction#getResultType()`.
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
+
+{% endhighlight %}
+</div>
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-class CustomTypeSplit extends TableFunction[Row] {
- def eval(str: String): Unit = {
- str.split(" ").foreach({ s =>
- val row = new Row(2)
- row.setField(0, s)
- row.setField(1, s.length)
- collect(row)
- })
- }
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+class HashFunction extends ScalarFunction {
- override def getResultType: TypeInformation[Row] = {
- Types.ROW(Types.STRING, Types.INT)
+ // take any data type and return INT
+ def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int {
+ return o.hashCode();
}
}
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[HashFunction], $"myField"))
+
+// register function
+env.createTemporarySystemFunction("HashFunction", classOf[HashFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $"myField"))
+
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")
+
{% endhighlight %}
+</div>
</div>
-<div data-lang="python" markdown="1">
-In order to define a Python table function, one can extend the base class
`TableFunction` in `pyflink.table.udtf` and Implement an evaluation method. The
behavior of a Python table function is determined by the evaluation method
which is named eval.
+If you intend to implement or call functions in Python, please refer to the
[Python Scalar Functions](../python/python_udfs.html#scalar-functions)
documentation for more details.
-In the Python Table API, a Python table function is used with `.join_lateral`
or `.left_outer_join_lateral`. The `join_lateral` operator (cross) joins each
row from the outer table (table on the left of the operator) with all rows
produced by the table-valued function (which is on the right side of the
operator). The `left_outer_join_lateral` operator joins each row from the outer
table (table on the left of the operator) with all rows produced by the
table-valued function (which is on the right side of the operator) and
preserves outer rows for which the table function returns an empty table. In
SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an
ON TRUE join condition (see examples below).
+{% top %}
-The following example shows how to define a Python table function, registered
it in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+Table Functions
+---------------
-{% highlight python %}
-class Split(TableFunction):
- def eval(self, string):
- for s in string.split(" "):
- yield s, len(s)
+Similar to a user-defined scalar function, a user-defined table function takes
zero, one, or multiple scalar values as input arguments. However, in contrast
to a scalar function, it can return an arbitrary number of rows (or structured
types) as output instead of a single value. The returned record may consist of
one or more fields. If an output record consists of only one field, the
structured record can be omitted and a scalar value can be emitted. It will be
wrapped into an implicit row by the runtime.
+
+In order to define a table function, one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement one or more
evaluation methods named `eval(...)`. Similar to other functions, input and
output data types are automatically extracted using reflection. This includes
the generic argument `T` of the class for determining an output data type. In
contrast to scalar functions, the evaluation method itself must not have a
return type, instead, table functions provide a `collect(T)` method that can be
called within every evaluation method for emitting zero, one, or more records.
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-my_table = ... # type: Table, table schema: [a: String]
+In the Table API, a table function is used with `.joinLateral(...)` or
`.leftOuterJoinLateral(...)`. The `joinLateral` operator (cross) joins each row
from the outer table (table on the left of the operator) with all rows produced
by the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table.
-# register the Python Table Function
-table_env.register_function("split", udtf(Split(), DataTypes.STRING(),
[DataTypes.STRING(), DataTypes.INT()]))
+In SQL, use `LATERAL TABLE(<TableFunction>)` with `JOIN` or `LEFT JOIN` with
an `ON TRUE` join condition.
-# use the Python Table Function in Python Table API
-my_table.join_lateral("split(a) as (word, length)")
-my_table.left_outer_join_lateral("split(a) as (word, length)")
+The following example shows how to define your own split function and call it
in a query. See the [Implementation Guide](#implementation-guide) for more
details.
-# use the Python Table function in SQL API
-table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)")
-table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE")
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import static org.apache.flink.table.api.Expressions.*;
+
+@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
+public static class SplitFunction extends TableFunction<Row> {
+
+ public void eval(String str) {
+ for (String s : str.split(" ")) {
+ // use collect(...) to emit a row
+ collect(Row.of(s, s.length()));
+ }
+ }
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env
+ .from("MyTable")
+ .joinLateral(call(SplitFunction.class, $("myField")))
+ .select($("myField"), $("word"), $("length"));
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(SplitFunction.class, $("myField")))
+ .select($("myField"), $("word"), $("length"));
+
+// rename fields of the function in Table API
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord",
"newLength"))
+ .select($("myField"), $("newWord"), $("newLength"));
+
+// register function
+env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
+
+// call registered function in Table API
+env
+ .from("MyTable")
+ .joinLateral(call("SplitFunction", $("myField")))
+ .select($("myField"), $("word"), $("length"));
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call("SplitFunction", $("myField")))
+ .select($("myField"), $("word"), $("length"));
+
+// call registered function in SQL
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
+
+// rename fields of the function in SQL
+env.sqlQuery(
+ "SELECT myField, newWord, newLength " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON
TRUE");
{% endhighlight %}
+</div>
-There are many ways to define a Python table function besides extending the
base class `TableFunction`.
-Please refer to the [Python Table Function]({{ site.baseurl
}}/dev/table/python/python_udfs.html#table-functions) documentation for more
details.
-
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
+class SplitFunction extends TableFunction[Row] {
+
+ def eval(str: String): Unit = {
+ // use collect(...) to emit a row
+ str.split(" ").foreach(s => collect(Row.of(s, s.length)))
+ }
+}
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env
+ .from("MyTable")
+ .joinLateral(call(classOf[SplitFunction], $"myField")
+ .select($"myField", $"word", $"length")
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField"))
+ .select($"myField", $"word", $"length")
+
+// rename fields of the function in Table API
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord",
"newLength"))
+ .select($"myField", $"newWord", $"newLength")
+
+// register function
+env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
+
+// call registered function in Table API
+env
+ .from("MyTable")
+ .joinLateral(call("SplitFunction", $"myField"))
+ .select($"myField", $"word", $"length")
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call("SplitFunction", $"myField"))
+ .select($"myField", $"word", $"length")
+
+// call registered function in SQL
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE")
+
+// rename fields of the function in SQL
+env.sqlQuery(
+ "SELECT myField, newWord, newLength " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON
TRUE")
+
+{% endhighlight %}
</div>
+
</div>
-{% top %}
+If you intend to implement functions in Scala, do not implement a table
function as a Scala `object`. Scala `object`s are singletons and will cause
concurrency issues.
+If you intend to implement or call functions in Python, please refer to the
[Python Table Functions](../python/python_udfs.html#table-functions)
documentation for more details.
Review comment:
Out of scope for this change, but I don't like how the python docs are
seperated from everything else.
##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -22,322 +22,768 @@ specific language governing permissions and limitations
under the License.
-->
-User-defined functions are an important feature, because they significantly
extend the expressiveness of queries.
+User-defined functions (UDFs) are extension points to call frequently used
logic or custom logic that cannot be expressed otherwise in queries.
+
+User-defined functions can be implemented in a JVM language (such as Java or
Scala) or Python. An implementer can use arbitrary third party libraries within
a UDF. This page will focus on JVM-based languages.
* This will be replaced by the TOC
{:toc}
-Register User-Defined Functions
--------------------------------
-In most cases, a user-defined function must be registered before it can be
used in an query. It is not necessary to register functions for the Scala Table
API.
-
-Functions are registered at the `TableEnvironment` by calling a
`registerFunction()` method. When a user-defined function is registered, it is
inserted into the function catalog of the `TableEnvironment` such that the
Table API or SQL parser can recognize and properly translate it.
+Overview
+--------
-Please find detailed examples of how to register and how to call each type of
user-defined function
-(`ScalarFunction`, `TableFunction`, and `AggregateFunction`) in the following
sub-sessions.
+Currently, Flink distinguishes between the following kinds of functions:
+- *Scalar functions* map scalar values to a new scalar value.
+- *Table functions* map scalar values to new rows.
+- *Aggregate functions* map scalar values of multiple rows to a new scalar
value.
+- *Table aggregate functions* map scalar values of multiple rows to new rows.
+- *Async table functions* are special functions for table sources that perform
a lookup.
-{% top %}
+<span class="label label-danger">Attention</span> Scalar and table functions
have been updated to the new type system based on [data types](../types.html).
Aggregating functions still use the old type system based on `TypeInformation`.
-Scalar Functions
-----------------
+The following example shows how to create a simple scalar function and how to
call the function in both Table API and SQL.
-If a required scalar function is not contained in the built-in functions, it
is possible to define custom, user-defined scalar functions for both the Table
API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar
values to a new scalar value.
+For SQL queries, a function must always be registered under a name. For Table
API, a function can be registered or directly used _inline_.
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a scalar function is determined by
the evaluation method. An evaluation method must be declared publicly and named
`eval`. The parameter types and return type of the evaluation method also
determine the parameter and return types of the scalar function. Evaluation
methods can also be overloaded by implementing multiple methods named `eval`.
Evaluation methods can also support variable arguments, such as `eval(String...
strs)`.
-
-The following example shows how to define your own hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public class HashCode extends ScalarFunction {
- private int factor = 12;
-
- public HashCode(int factor) {
- this.factor = factor;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define function logic
+public static class SubstringFunction extends ScalarFunction {
+ public String eval(String s, Integer begin, Integer end) {
+ return s.substring(begin, end);
}
-
- public int eval(String s) {
- return s.hashCode() * factor;
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction",
SubstringFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define function logic
+class SubstringFunction extends ScalarFunction {
+ def eval(s: String, begin: Integer, end: Integer): String = {
+ s.substring(begin, end)
}
}
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+val env = TableEnvironment.create(...)
-// register the function
-tableEnv.registerFunction("hashCode", new HashCode(10));
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
-// use the function in Java Table API
-myTable.select("string, string.hashCode(), hashCode(string)");
+// register function
+env.createTemporarySystemFunction("SubstringFunction",
classOf[SubstringFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
-// use the function in SQL API
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
{% endhighlight %}
+</div>
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+</div>
+
+For interactive sessions, it is also possible to parameterize functions before
using or
+registering them. In this case, function _instances_ instead of function
_classes_ can be
+used as temporary functions.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+It requires that the parameters are serializable for shipping
+function instances to the cluster.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public static class TimestampModifier extends ScalarFunction {
- public long eval(long t) {
- return t % 1000;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define parameterizable function logic
+public static class SubstringFunction extends ScalarFunction {
+
+ private boolean endInclusive;
+
+ public SubstringFunction(boolean endInclusive) {
+ this.endInclusive = endInclusive;
}
- public TypeInformation<?> getResultType(Class<?>[] signature) {
- return Types.SQL_TIMESTAMP;
+ public String eval(String s, Integer begin, Integer end) {
+ return s.substring(a, endInclusive ? end + 1 : end);
}
}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a scalar function is determined by
the evaluation method. An evaluation method must be declared publicly and named
`eval`. The parameter types and return type of the evaluation method also
determine the parameter and return types of the scalar function. Evaluation
methods can also be overloaded by implementing multiple methods named `eval`.
Evaluation methods can also support variable arguments, such as `@varargs def
eval(str: String*)`.
+TableEnvironment env = TableEnvironment.create(...);
-The following example shows how to define your own hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5,
12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new
SubstringFunction(true));
+
+{% endhighlight %}
+</div>
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-// must be defined in static/object context
-class HashCode(factor: Int) extends ScalarFunction {
- def eval(s: String): Int = {
- s.hashCode() * factor
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define parameterizable function logic
+class SubstringFunction(val endInclusive) extends ScalarFunction {
+ def eval(s: String, begin: Integer, end: Integer): String = {
+ s.substring(endInclusive ? end + 1 : end)
}
}
-val tableEnv = BatchTableEnvironment.create(env)
+val env = TableEnvironment.create(...)
-// use the function in Scala Table API
-val hashCode = new HashCode(10)
-myTable.select('string, hashCode('string))
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5,
12))
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new
SubstringFunction(true))
-// register and use the function in SQL
-tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
{% endhighlight %}
+</div>
+
+</div>
+
+{% top %}
+
+Implementation Guide
+--------------------
+
+<span class="label label-danger">Attention</span> This section only applies to
scalar and table functions for now; until aggregate functions have been updated
to the new type system.
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+Independent of the kind of function, all user-defined functions follow some
basic implementation principles.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+### Function Class
+An implementation class must extend from one of the available base classes
(e.g. `org.apache.flink.table.functions.ScalarFunction`).
+
+The class must be declared `public`, not `abstract`, and should be globally
accessible. Thus, non-static inner or anonymous classes are not allowed.
+
+For storing a user-defined function in a persistent catalog, the class must
have a default constructor and must be instantiable during runtime.
+
+### Evaluation Methods
+
+The base class provides a set of methods that can be overridden such as
`open()`, `close()`, or `isDeterministic()`.
+
+However, in addition to those declared methods, the main runtime logic that is
applied to every incoming record must be implemented through specialized
_evaluation methods_.
+
+Depending on the function kind, evaluation methods such as `eval()`,
`accumulate()`, or `retract()` are called by code-generated operators during
runtime.
+
+The methods must be declared `public` and take a well-defined set of arguments.
+
+Regular JVM method calling semantics apply. Therefore, it is possible to:
+- implement overloaded methods such as `eval(Integer)` and
`eval(LocalDateTime)`,
+- use var-args such as `eval(Integer...)`,
+- use object inheritance such as `eval(Object)` that takes both
`LocalDateTime` and `Integer`,
+- and combinations of the above such as `eval(Object...)` that takes all kinds
of arguments.
+
+The following snippets shows an example of an overloaded function:
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.functions.ScalarFunction;
+
+// function with overloaded evaluation methods
+public static class SumFunction extends ScalarFunction {
+
+ public Integer eval(Integer a, Integer b) {
+ return a + b;
+ }
+
+ public Integer eval(String a, String b) {
+ return Integer.valueOf(a) + Integer.valueOf();
+ }
+
+ public Integer eval(Double... d) {
+ double result = 0;
+ for (double value : d)
+ result += value;
+ return (int) result;
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-object TimestampModifier extends ScalarFunction {
- def eval(t: Long): Long = {
- t % 1000
+import org.apache.flink.table.functions.ScalarFunction
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class SumFunction extends ScalarFunction {
+
+ def eval(a: Integer, b: Integer): Integer = {
+ a + b
}
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
{
- Types.TIMESTAMP
+ def eval(a: String, b: String): Integer = {
+ Integer.valueOf(a) + Integer.valueOf(b)
+ }
+
+ @varargs // generate var-args like Java
+ def eval(d: Double*): Integer = {
+ d.sum.toInt
}
}
+
{% endhighlight %}
</div>
-<div data-lang="python" markdown="1">
-In order to define a Python scalar function, one can extend the base class
`ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The
behavior of a Python scalar function is determined by the evaluation method
which is named `eval`.
+</div>
-The following example shows how to define your own Python hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+### Type Inference
-{% highlight python %}
-class HashCode(ScalarFunction):
- def __init__(self):
- self.factor = 12
+The table ecosystem (similar to the SQL standard) is a strongly typed API.
Therefore, both function parameters and return types must be mapped to a [data
type](../types.html).
+
+From a logical perspective, the planner needs information about expected
types, precision, and scale. From a JVM perspective, the planner needs
information about how internal data structures are represented as JVM objects
when calling a user-defined function.
+
+The logic for validating input arguments and deriving data types for both the
parameters and the result of a function is summarized under the term _type
inference_.
+
+Flink's user-defined functions implement an automatic type inference
extraction that derives data types from the function's class and its evaluation
methods via reflection. If this implicit reflective extraction approach is not
successful, the extraction process can be supported by annotating affected
parameters, classes, or methods with `@DataTypeHint` and `@FunctionHint`. More
examples on how to annotate functions are shown below.
+
+If more advanced type inference logic is required, an implementer can
explicitly override the `getTypeInference()` method in every user-defined
function. However, the annotation approach is recommended because it keeps
custom type inference logic close to the affected locations and falls back to
the default behavior for the remaining implementation.
+
+#### Automatic Type Inference
+
+The automatic type inference inspects the function's class and evaluation
methods to derive data types for the arguments and result of a function.
`@DataTypeHint` and `@FunctionHint` annotations support the automatic
extraction.
+
+For a full list of classes that can be implicitly mapped to a data type, see
the [data type section](../types.html#data-type-annotations).
+
+**`@DataTypeHint`**
+
+In many scenarios, it is required to support the automatic extraction _inline_
for paramaters and return types of a function
+
+The following example shows how to use data type hints. More information can
be found in the documentation of the annotation class.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+public static class OverloadedFunction extends ScalarFunction {
- def eval(self, s):
- return hash(s) * self.factor
+ // no hint required
+ public Long eval(long a, long b) {
+ return a + b;
+ }
-table_env = BatchTableEnvironment.create(env)
+ // define the precision and scale of a decimal
+ public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
+ return BigDecimal.valueOf(a + b);
+ }
-# register the Python function
-table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(),
DataTypes.BIGINT()))
+ // define a nested data type
+ @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ public Row eval(int i) {
+ return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
+ }
-# use the function in Python Table API
-my_table.select("string, bigint, string.hash_code(), hash_code(string)")
+ // allow wildcard input and customly serialized output
+ @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
+ public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ return MyUtils.serializeToByteBuffer(o);
+ }
+}
-# use the function in SQL API
-table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class OverloadedFunction extends ScalarFunction {
+
+ // no hint required
+ def eval(a: Long, b: Long): Long = {
+ a + b
+ }
+
+ // define the precision and scale of a decimal
+ @DataTypeHint("DECIMAL(12, 3)")
+ def eval(double a, double b): BigDecimal = {
+ java.lang.BigDecimal.valueOf(a + b)
+ }
+
+ // define a nested data type
+ @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ def eval(Int i): Row = {
+ Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
+ }
-There are many ways to define a Python scalar function besides extending the
base class `ScalarFunction`.
-Please refer to the [Python Scalar Function]({{ site.baseurl
}}/dev/table/python/python_udfs.html#scalar-functions) documentation for more
details.
+ // allow wildcard input and customly serialized output
+ @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
+ def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o):
java.nio.ByteBuffer = {
+ MyUtils.serializeToByteBuffer(o)
+ }
+}
+
+{% endhighlight %}
</div>
+
</div>
-{% top %}
+**`@FunctionHint`**
-Table Functions
----------------
+In some scenarios, it is desirable that one evaluation method handles multiple
different data types at the same time. Furthermore, in some scenarios,
overloaded evaluation methods have a common result type that should be declared
only once.
-Similar to a user-defined scalar function, a user-defined table function takes
zero, one, or multiple scalar values as input parameters. However in contrast
to a scalar function, it can return an arbitrary number of rows as output
instead of a single value. The returned rows may consist of one or more
columns.
+The `@FunctionHint` annotation can provide a mapping from argument data types
to a result data type. It enables annotating entire function classes or
evaluation methods for input, accumulator, and result data types. One or more
annotations can be declared on top of a class or individually for each
evaluation method for overloading function signatures. All hint parameters are
optional. If a parameter is not defined, the default reflection-based
extraction is used. Hint parameters defined on top of a function class are
inherited by all evaluation methods.
+
+The following example shows how to use function hints. More information can be
found in the documentation of the annotation class.
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a table function one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a table function is determined by its
evaluation methods. An evaluation method must be declared `public` and named
`eval`. The `TableFunction` can be overloaded by implementing multiple methods
named `eval`. The parameter types of the evaluation methods determine all valid
parameters of the table function. Evaluation methods can also support variable
arguments, such as `eval(String... strs)`. The type of the returned table is
determined by the generic type of `TableFunction`. Evaluation methods emit
output rows using the protected `collect(T)` method.
-In the Table API, a table function is used with `.joinLateral` or
`.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from
the outer table (table on the left of the operator) with all rows produced by
the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table. In SQL use `LATERAL
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join
condition (see examples below).
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+public static class OverloadedFunction extends TableFunction<Row> {
+
+ public void eval(int a, int b) {
+ collect(Row.of("Sum", a + b));
+ }
-The following example shows how to define table-valued function, register it
in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+ // overloading of arguments is still possible
+ public void eval() {
+ collect(Row.of("Empty args", -1));
+ }
+}
-{% highlight java %}
-// The generic type "Tuple2<String, Integer>" determines the schema of the
returned table as (String, Integer).
-public class Split extends TableFunction<Tuple2<String, Integer>> {
- private String separator = " ";
-
- public Split(String separator) {
- this.separator = separator;
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+ input = [@DataTypeHint("INT"), @DataTypeHint("INT")],
+ output = @DataTypeHint("INT")
+)
+@FunctionHint(
+ input = [@DataTypeHint("LONG"), @DataTypeHint("LONG")],
+ output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+ input = [],
+ output = @DataTypeHint("BOOLEAN")
+)
+public static class OverloadedFunction extends TableFunction<Object> {
+
+ // an implementer just needs to make sure that a method exists
+ // that can be called by the JVM
+ public void eval(Object... o) {
+ if (o.length == 0) {
+ collect(false);
}
-
- public void eval(String str) {
- for (String s : str.split(separator)) {
- // use collect(...) to emit a row
- collect(new Tuple2<String, Integer>(s, s.length()));
- }
+ collect(o[0]);
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+class OverloadedFunction extends TableFunction[Row] {
+
+ def eval(a: Int, b: Int): Unit = {
+ collect(Row.of("Sum", Int.box(a + b)))
+ }
+
+ // overloading of arguments is still possible
+ def eval(): Unit = {
+ collect(Row.of("Empty args", Int.box(-1)))
+ }
+}
+
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+ input = Array(@DataTypeHint("INT"), @DataTypeHint("INT")),
+ output = @DataTypeHint("INT")
+)
+@FunctionHint(
+ input = Array(@DataTypeHint("LONG"), @DataTypeHint("LONG")),
+ output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+ input = Array(),
+ output = @DataTypeHint("BOOLEAN")
+)
+class OverloadedFunction extends TableFunction[AnyRef] {
+
+ // an implementer just needs to make sure that a method exists
+ // that can be called by the JVM
+ @varargs
+ def eval(o: AnyRef*) = {
+ if (o.length == 0) {
+ collect(Boolean.box(false))
}
+ collect(o(0))
+ }
}
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
-Table myTable = ... // table schema: [a: String]
-
-// Register the function.
-tableEnv.registerFunction("split", new Split("#"));
-
-// Use the table function in the Java Table API. "as" specifies the field
names of the table.
-myTable.joinLateral("split(a) as (word, length)")
- .select("a, word, length");
-myTable.leftOuterJoinLateral("split(a) as (word, length)")
- .select("a, word, length");
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
</div>
-Please note that POJO types do not have a deterministic field order.
Therefore, you cannot rename the fields of POJO returned by a table function
using `AS`.
+</div>
+
+#### Custom Type Inference
-By default the result type of a `TableFunction` is determined by Flink’s
automatic type extraction facilities. This works well for basic types and
simple POJOs but might be wrong for more complex, custom, or composite types.
In such a case, the type of the result can be manually specified by overriding
`TableFunction#getResultType()` which returns its `TypeInformation`.
+For most scenarios, `@DataTypeHint` and `@FunctionHint` should be sufficient
to model user-defined functions. However, by overriding the automatic type
inference defined in `getTypeInference()`, implementers can create arbitrary
functions that behave like built-in system functions.
-The following example shows an example of a `TableFunction` that returns a
`Row` type which requires explicit type information. We define that the
returned table type should be `RowTypeInfo(String, Integer)` by overriding
`TableFunction#getResultType()`.
+The following example implemented in Java illustrates the potential of a
custom type inference logic. It uses a string literal argument to determine the
result type of a function. The function takes two string arguments: the first
argument represents the string to be parsed, the second argument represents the
target type.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public class CustomTypeSplit extends TableFunction<Row> {
- public void eval(String str) {
- for (String s : str.split(" ")) {
- Row row = new Row(2);
- row.setField(0, s);
- row.setField(1, s.length());
- collect(row);
- }
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+public static class LiteralFunction extends ScalarFunction {
+ public Object eval(String s, String type) {
+ switch (type) {
+ case "INT":
+ return Integer.valueOf(s);
+ case "DOUBLE":
+ return Double.valueOf(s);
+ case "STRING":
+ default:
+ return s;
}
+ }
- @Override
- public TypeInformation<Row> getResultType() {
- return Types.ROW(Types.STRING(), Types.INT());
- }
+ // the automatic, reflection-based type inference is disabled and
+ // replaced by the following logic
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ // specify typed arguments
+ // parameters will be casted implicitly to those types if necessary
+ .typedArguments(DataTypes.STRING(), DataTypes.STRING())
+ // specify a strategy for the result data type of the function
+ .outputTypeStrategy(callContext -> {
+ if (!callContext.isArgumentLiteral(1) ||
callContext.isArgumentNull(1)) {
+ throw callContext.newValidationError("Literal expected for second
argument.");
+ }
+ // return a data type based on a literal
+ final String literal = callContext.getArgumentValue(1,
String.class).orElse("STRING");
+ switch (literal) {
+ case "INT":
+ return Optional.of(DataTypes.INT().notNull());
+ case "DOUBLE":
+ return Optional.of(DataTypes.DOUBLE().notNull());
+ case "STRING":
+ default:
+ return Optional.of(DataTypes.STRING());
+ }
+ })
+ .build();
+ }
}
+
{% endhighlight %}
+</div>
-<div data-lang="scala" markdown="1">
-In order to define a table function one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a table function is determined by its
evaluation methods. An evaluation method must be declared `public` and named
`eval`. The `TableFunction` can be overloaded by implementing multiple methods
named `eval`. The parameter types of the evaluation methods determine all valid
parameters of the table function. Evaluation methods can also support variable
arguments, such as `eval(String... strs)`. The type of the returned table is
determined by the generic type of `TableFunction`. Evaluation methods emit
output rows using the protected `collect(T)` method.
+</div>
-In the Table API, a table function is used with `.joinLateral` or
`.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from
the outer table (table on the left of the operator) with all rows produced by
the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table. In SQL use `LATERAL
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join
condition (see examples below).
+{% top %}
-The following example shows how to define table-valued function, register it
in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+Scalar Functions
+----------------
-{% highlight scala %}
-// The generic type "(String, Int)" determines the schema of the returned
table as (String, Integer).
-class Split(separator: String) extends TableFunction[(String, Int)] {
- def eval(str: String): Unit = {
- // use collect(...) to emit a row.
- str.split(separator).foreach(x => collect((x, x.length)))
+A user-defined scalar function maps zero, one, or multiple scalar values to a
new scalar value. Any data type listed in the [data types
section](../types.html) can be used as a parameter or return type of an
evaluation method.
+
+In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement one or
more evaluation methods named `eval(...)`.
+
+The following example shows how to define your own hash code function and call
it in a query. See the [Implementation Guide](#implementation-guide) for more
details.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class HashFunction extends ScalarFunction {
+
+ // take any data type and return INT
+ public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ return o.hashCode();
}
}
-val tableEnv = BatchTableEnvironment.create(env)
-val myTable = ... // table schema: [a: String]
-
-// Use the table function in the Scala Table API (Note: No registration
required in Scala Table API).
-val split = new Split("#")
-// "as" specifies the field names of the generated table.
-myTable.joinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
-myTable.leftOuterJoinLateral(split('a) as ('word, 'length)).select('a, 'word,
'length)
-
-// Register the table function to use it in SQL queries.
-tableEnv.registerFunction("split", new Split("#"))
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)")
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE")
-{% endhighlight %}
-**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object
is a singleton and will cause concurrency issues.
+TableEnvironment env = TableEnvironment.create(...);
-Please note that POJO types do not have a deterministic field order.
Therefore, you cannot rename the fields of POJO returned by a table function
using `AS`.
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(HashFunction.class, $("myField")));
-By default the result type of a `TableFunction` is determined by Flink’s
automatic type extraction facilities. This works well for basic types and
simple POJOs but might be wrong for more complex, custom, or composite types.
In such a case, the type of the result can be manually specified by overriding
`TableFunction#getResultType()` which returns its `TypeInformation`.
+// register function
+env.createTemporarySystemFunction("HashFunction", HashFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $("myField")));
-The following example shows an example of a `TableFunction` that returns a
`Row` type which requires explicit type information. We define that the
returned table type should be `RowTypeInfo(String, Integer)` by overriding
`TableFunction#getResultType()`.
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
+
+{% endhighlight %}
+</div>
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-class CustomTypeSplit extends TableFunction[Row] {
- def eval(str: String): Unit = {
- str.split(" ").foreach({ s =>
- val row = new Row(2)
- row.setField(0, s)
- row.setField(1, s.length)
- collect(row)
- })
- }
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+class HashFunction extends ScalarFunction {
- override def getResultType: TypeInformation[Row] = {
- Types.ROW(Types.STRING, Types.INT)
+ // take any data type and return INT
+ def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int {
+ return o.hashCode();
}
}
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[HashFunction], $"myField"))
+
+// register function
+env.createTemporarySystemFunction("HashFunction", classOf[HashFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $"myField"))
+
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")
+
{% endhighlight %}
+</div>
</div>
-<div data-lang="python" markdown="1">
-In order to define a Python table function, one can extend the base class
`TableFunction` in `pyflink.table.udtf` and Implement an evaluation method. The
behavior of a Python table function is determined by the evaluation method
which is named eval.
+If you intend to implement or call functions in Python, please refer to the
[Python Scalar Functions](../python/python_udfs.html#scalar-functions)
documentation for more details.
Review comment:
```suggestion
If you intend to implement or call functions in Python, please refer to the
[Python Scalar Functions]({% link dev/table/python/python_udfs.md
%}#scalar-functions) documentation for more details.
```
##########
File path: docs/dev/table/functions/udfs.md
##########
@@ -22,322 +22,768 @@ specific language governing permissions and limitations
under the License.
-->
-User-defined functions are an important feature, because they significantly
extend the expressiveness of queries.
+User-defined functions (UDFs) are extension points to call frequently used
logic or custom logic that cannot be expressed otherwise in queries.
+
+User-defined functions can be implemented in a JVM language (such as Java or
Scala) or Python. An implementer can use arbitrary third party libraries within
a UDF. This page will focus on JVM-based languages.
* This will be replaced by the TOC
{:toc}
-Register User-Defined Functions
--------------------------------
-In most cases, a user-defined function must be registered before it can be
used in an query. It is not necessary to register functions for the Scala Table
API.
-
-Functions are registered at the `TableEnvironment` by calling a
`registerFunction()` method. When a user-defined function is registered, it is
inserted into the function catalog of the `TableEnvironment` such that the
Table API or SQL parser can recognize and properly translate it.
+Overview
+--------
-Please find detailed examples of how to register and how to call each type of
user-defined function
-(`ScalarFunction`, `TableFunction`, and `AggregateFunction`) in the following
sub-sessions.
+Currently, Flink distinguishes between the following kinds of functions:
+- *Scalar functions* map scalar values to a new scalar value.
+- *Table functions* map scalar values to new rows.
+- *Aggregate functions* map scalar values of multiple rows to a new scalar
value.
+- *Table aggregate functions* map scalar values of multiple rows to new rows.
+- *Async table functions* are special functions for table sources that perform
a lookup.
-{% top %}
+<span class="label label-danger">Attention</span> Scalar and table functions
have been updated to the new type system based on [data types](../types.html).
Aggregating functions still use the old type system based on `TypeInformation`.
-Scalar Functions
-----------------
+The following example shows how to create a simple scalar function and how to
call the function in both Table API and SQL.
-If a required scalar function is not contained in the built-in functions, it
is possible to define custom, user-defined scalar functions for both the Table
API and SQL. A user-defined scalar functions maps zero, one, or multiple scalar
values to a new scalar value.
+For SQL queries, a function must always be registered under a name. For Table
API, a function can be registered or directly used _inline_.
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a scalar function is determined by
the evaluation method. An evaluation method must be declared publicly and named
`eval`. The parameter types and return type of the evaluation method also
determine the parameter and return types of the scalar function. Evaluation
methods can also be overloaded by implementing multiple methods named `eval`.
Evaluation methods can also support variable arguments, such as `eval(String...
strs)`.
-
-The following example shows how to define your own hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public class HashCode extends ScalarFunction {
- private int factor = 12;
-
- public HashCode(int factor) {
- this.factor = factor;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define function logic
+public static class SubstringFunction extends ScalarFunction {
+ public String eval(String s, Integer begin, Integer end) {
+ return s.substring(begin, end);
}
-
- public int eval(String s) {
- return s.hashCode() * factor;
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction",
SubstringFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define function logic
+class SubstringFunction extends ScalarFunction {
+ def eval(s: String, begin: Integer, end: Integer): String = {
+ s.substring(begin, end)
}
}
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
+val env = TableEnvironment.create(...)
-// register the function
-tableEnv.registerFunction("hashCode", new HashCode(10));
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
-// use the function in Java Table API
-myTable.select("string, string.hashCode(), hashCode(string)");
+// register function
+env.createTemporarySystemFunction("SubstringFunction",
classOf[SubstringFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
+
+// call registered function in SQL
+env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
-// use the function in SQL API
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
{% endhighlight %}
+</div>
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+</div>
+
+For interactive sessions, it is also possible to parameterize functions before
using or
+registering them. In this case, function _instances_ instead of function
_classes_ can be
+used as temporary functions.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+It requires that the parameters are serializable for shipping
+function instances to the cluster.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public static class TimestampModifier extends ScalarFunction {
- public long eval(long t) {
- return t % 1000;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+// define parameterizable function logic
+public static class SubstringFunction extends ScalarFunction {
+
+ private boolean endInclusive;
+
+ public SubstringFunction(boolean endInclusive) {
+ this.endInclusive = endInclusive;
}
- public TypeInformation<?> getResultType(Class<?>[] signature) {
- return Types.SQL_TIMESTAMP;
+ public String eval(String s, Integer begin, Integer end) {
+ return s.substring(a, endInclusive ? end + 1 : end);
}
}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a scalar function is determined by
the evaluation method. An evaluation method must be declared publicly and named
`eval`. The parameter types and return type of the evaluation method also
determine the parameter and return types of the scalar function. Evaluation
methods can also be overloaded by implementing multiple methods named `eval`.
Evaluation methods can also support variable arguments, such as `@varargs def
eval(str: String*)`.
+TableEnvironment env = TableEnvironment.create(...);
-The following example shows how to define your own hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5,
12));
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new
SubstringFunction(true));
+
+{% endhighlight %}
+</div>
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-// must be defined in static/object context
-class HashCode(factor: Int) extends ScalarFunction {
- def eval(s: String): Int = {
- s.hashCode() * factor
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+// define parameterizable function logic
+class SubstringFunction(val endInclusive) extends ScalarFunction {
+ def eval(s: String, begin: Integer, end: Integer): String = {
+ s.substring(endInclusive ? end + 1 : end)
}
}
-val tableEnv = BatchTableEnvironment.create(env)
+val env = TableEnvironment.create(...)
-// use the function in Scala Table API
-val hashCode = new HashCode(10)
-myTable.select('string, hashCode('string))
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5,
12))
+
+// register function
+env.createTemporarySystemFunction("SubstringFunction", new
SubstringFunction(true))
-// register and use the function in SQL
-tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable")
{% endhighlight %}
+</div>
+
+</div>
+
+{% top %}
+
+Implementation Guide
+--------------------
+
+<span class="label label-danger">Attention</span> This section only applies to
scalar and table functions for now; until aggregate functions have been updated
to the new type system.
-By default the result type of an evaluation method is determined by Flink's
type extraction facilities. This is sufficient for basic types or simple POJOs
but might be wrong for more complex, custom, or composite types. In these cases
`TypeInformation` of the result type can be manually defined by overriding
`ScalarFunction#getResultType()`.
+Independent of the kind of function, all user-defined functions follow some
basic implementation principles.
-The following example shows an advanced example which takes the internal
timestamp representation and also returns the internal timestamp representation
as a long value. By overriding `ScalarFunction#getResultType()` we define that
the returned long value should be interpreted as a `Types.TIMESTAMP` by the
code generation.
+### Function Class
+An implementation class must extend from one of the available base classes
(e.g. `org.apache.flink.table.functions.ScalarFunction`).
+
+The class must be declared `public`, not `abstract`, and should be globally
accessible. Thus, non-static inner or anonymous classes are not allowed.
+
+For storing a user-defined function in a persistent catalog, the class must
have a default constructor and must be instantiable during runtime.
+
+### Evaluation Methods
+
+The base class provides a set of methods that can be overridden such as
`open()`, `close()`, or `isDeterministic()`.
+
+However, in addition to those declared methods, the main runtime logic that is
applied to every incoming record must be implemented through specialized
_evaluation methods_.
+
+Depending on the function kind, evaluation methods such as `eval()`,
`accumulate()`, or `retract()` are called by code-generated operators during
runtime.
+
+The methods must be declared `public` and take a well-defined set of arguments.
+
+Regular JVM method calling semantics apply. Therefore, it is possible to:
+- implement overloaded methods such as `eval(Integer)` and
`eval(LocalDateTime)`,
+- use var-args such as `eval(Integer...)`,
+- use object inheritance such as `eval(Object)` that takes both
`LocalDateTime` and `Integer`,
+- and combinations of the above such as `eval(Object...)` that takes all kinds
of arguments.
+
+The following snippets shows an example of an overloaded function:
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.functions.ScalarFunction;
+
+// function with overloaded evaluation methods
+public static class SumFunction extends ScalarFunction {
+
+ public Integer eval(Integer a, Integer b) {
+ return a + b;
+ }
+
+ public Integer eval(String a, String b) {
+ return Integer.valueOf(a) + Integer.valueOf();
+ }
+
+ public Integer eval(Double... d) {
+ double result = 0;
+ for (double value : d)
+ result += value;
+ return (int) result;
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-object TimestampModifier extends ScalarFunction {
- def eval(t: Long): Long = {
- t % 1000
+import org.apache.flink.table.functions.ScalarFunction
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class SumFunction extends ScalarFunction {
+
+ def eval(a: Integer, b: Integer): Integer = {
+ a + b
}
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
{
- Types.TIMESTAMP
+ def eval(a: String, b: String): Integer = {
+ Integer.valueOf(a) + Integer.valueOf(b)
+ }
+
+ @varargs // generate var-args like Java
+ def eval(d: Double*): Integer = {
+ d.sum.toInt
}
}
+
{% endhighlight %}
</div>
-<div data-lang="python" markdown="1">
-In order to define a Python scalar function, one can extend the base class
`ScalarFunction` in `pyflink.table.udf` and implement an evaluation method. The
behavior of a Python scalar function is determined by the evaluation method
which is named `eval`.
+</div>
-The following example shows how to define your own Python hash code function,
register it in the TableEnvironment, and call it in a query. Note that you can
configure your scalar function via a constructor before it is registered:
+### Type Inference
-{% highlight python %}
-class HashCode(ScalarFunction):
- def __init__(self):
- self.factor = 12
+The table ecosystem (similar to the SQL standard) is a strongly typed API.
Therefore, both function parameters and return types must be mapped to a [data
type](../types.html).
+
+From a logical perspective, the planner needs information about expected
types, precision, and scale. From a JVM perspective, the planner needs
information about how internal data structures are represented as JVM objects
when calling a user-defined function.
+
+The logic for validating input arguments and deriving data types for both the
parameters and the result of a function is summarized under the term _type
inference_.
+
+Flink's user-defined functions implement an automatic type inference
extraction that derives data types from the function's class and its evaluation
methods via reflection. If this implicit reflective extraction approach is not
successful, the extraction process can be supported by annotating affected
parameters, classes, or methods with `@DataTypeHint` and `@FunctionHint`. More
examples on how to annotate functions are shown below.
+
+If more advanced type inference logic is required, an implementer can
explicitly override the `getTypeInference()` method in every user-defined
function. However, the annotation approach is recommended because it keeps
custom type inference logic close to the affected locations and falls back to
the default behavior for the remaining implementation.
+
+#### Automatic Type Inference
+
+The automatic type inference inspects the function's class and evaluation
methods to derive data types for the arguments and result of a function.
`@DataTypeHint` and `@FunctionHint` annotations support the automatic
extraction.
+
+For a full list of classes that can be implicitly mapped to a data type, see
the [data type section](../types.html#data-type-annotations).
+
+**`@DataTypeHint`**
+
+In many scenarios, it is required to support the automatic extraction _inline_
for paramaters and return types of a function
+
+The following example shows how to use data type hints. More information can
be found in the documentation of the annotation class.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+public static class OverloadedFunction extends ScalarFunction {
- def eval(self, s):
- return hash(s) * self.factor
+ // no hint required
+ public Long eval(long a, long b) {
+ return a + b;
+ }
-table_env = BatchTableEnvironment.create(env)
+ // define the precision and scale of a decimal
+ public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
+ return BigDecimal.valueOf(a + b);
+ }
-# register the Python function
-table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(),
DataTypes.BIGINT()))
+ // define a nested data type
+ @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ public Row eval(int i) {
+ return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
+ }
-# use the function in Python Table API
-my_table.select("string, bigint, string.hash_code(), hash_code(string)")
+ // allow wildcard input and customly serialized output
+ @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
+ public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ return MyUtils.serializeToByteBuffer(o);
+ }
+}
-# use the function in SQL API
-table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
+import scala.annotation.varargs
+
+// function with overloaded evaluation methods
+class OverloadedFunction extends ScalarFunction {
+
+ // no hint required
+ def eval(a: Long, b: Long): Long = {
+ a + b
+ }
+
+ // define the precision and scale of a decimal
+ @DataTypeHint("DECIMAL(12, 3)")
+ def eval(double a, double b): BigDecimal = {
+ java.lang.BigDecimal.valueOf(a + b)
+ }
+
+ // define a nested data type
+ @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>")
+ def eval(Int i): Row = {
+ Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
+ }
-There are many ways to define a Python scalar function besides extending the
base class `ScalarFunction`.
-Please refer to the [Python Scalar Function]({{ site.baseurl
}}/dev/table/python/python_udfs.html#scalar-functions) documentation for more
details.
+ // allow wildcard input and customly serialized output
+ @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
+ def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o):
java.nio.ByteBuffer = {
+ MyUtils.serializeToByteBuffer(o)
+ }
+}
+
+{% endhighlight %}
</div>
+
</div>
-{% top %}
+**`@FunctionHint`**
-Table Functions
----------------
+In some scenarios, it is desirable that one evaluation method handles multiple
different data types at the same time. Furthermore, in some scenarios,
overloaded evaluation methods have a common result type that should be declared
only once.
-Similar to a user-defined scalar function, a user-defined table function takes
zero, one, or multiple scalar values as input parameters. However in contrast
to a scalar function, it can return an arbitrary number of rows as output
instead of a single value. The returned rows may consist of one or more
columns.
+The `@FunctionHint` annotation can provide a mapping from argument data types
to a result data type. It enables annotating entire function classes or
evaluation methods for input, accumulator, and result data types. One or more
annotations can be declared on top of a class or individually for each
evaluation method for overloading function signatures. All hint parameters are
optional. If a parameter is not defined, the default reflection-based
extraction is used. Hint parameters defined on top of a function class are
inherited by all evaluation methods.
+
+The following example shows how to use function hints. More information can be
found in the documentation of the annotation class.
<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-In order to define a table function one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a table function is determined by its
evaluation methods. An evaluation method must be declared `public` and named
`eval`. The `TableFunction` can be overloaded by implementing multiple methods
named `eval`. The parameter types of the evaluation methods determine all valid
parameters of the table function. Evaluation methods can also support variable
arguments, such as `eval(String... strs)`. The type of the returned table is
determined by the generic type of `TableFunction`. Evaluation methods emit
output rows using the protected `collect(T)` method.
-In the Table API, a table function is used with `.joinLateral` or
`.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from
the outer table (table on the left of the operator) with all rows produced by
the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table. In SQL use `LATERAL
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join
condition (see examples below).
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+public static class OverloadedFunction extends TableFunction<Row> {
+
+ public void eval(int a, int b) {
+ collect(Row.of("Sum", a + b));
+ }
-The following example shows how to define table-valued function, register it
in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+ // overloading of arguments is still possible
+ public void eval() {
+ collect(Row.of("Empty args", -1));
+ }
+}
-{% highlight java %}
-// The generic type "Tuple2<String, Integer>" determines the schema of the
returned table as (String, Integer).
-public class Split extends TableFunction<Tuple2<String, Integer>> {
- private String separator = " ";
-
- public Split(String separator) {
- this.separator = separator;
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+ input = [@DataTypeHint("INT"), @DataTypeHint("INT")],
+ output = @DataTypeHint("INT")
+)
+@FunctionHint(
+ input = [@DataTypeHint("LONG"), @DataTypeHint("LONG")],
+ output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+ input = [],
+ output = @DataTypeHint("BOOLEAN")
+)
+public static class OverloadedFunction extends TableFunction<Object> {
+
+ // an implementer just needs to make sure that a method exists
+ // that can be called by the JVM
+ public void eval(Object... o) {
+ if (o.length == 0) {
+ collect(false);
}
-
- public void eval(String str) {
- for (String s : str.split(separator)) {
- // use collect(...) to emit a row
- collect(new Tuple2<String, Integer>(s, s.length()));
- }
+ collect(o[0]);
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+// function with overloaded evaluation methods
+// but globally defined output type
+@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
+class OverloadedFunction extends TableFunction[Row] {
+
+ def eval(a: Int, b: Int): Unit = {
+ collect(Row.of("Sum", Int.box(a + b)))
+ }
+
+ // overloading of arguments is still possible
+ def eval(): Unit = {
+ collect(Row.of("Empty args", Int.box(-1)))
+ }
+}
+
+// decouples the type inference from evaluation methods,
+// the type inference is entirely determined by the function hints
+@FunctionHint(
+ input = Array(@DataTypeHint("INT"), @DataTypeHint("INT")),
+ output = @DataTypeHint("INT")
+)
+@FunctionHint(
+ input = Array(@DataTypeHint("LONG"), @DataTypeHint("LONG")),
+ output = @DataTypeHint("LONG")
+)
+@FunctionHint(
+ input = Array(),
+ output = @DataTypeHint("BOOLEAN")
+)
+class OverloadedFunction extends TableFunction[AnyRef] {
+
+ // an implementer just needs to make sure that a method exists
+ // that can be called by the JVM
+ @varargs
+ def eval(o: AnyRef*) = {
+ if (o.length == 0) {
+ collect(Boolean.box(false))
}
+ collect(o(0))
+ }
}
-BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
-Table myTable = ... // table schema: [a: String]
-
-// Register the function.
-tableEnv.registerFunction("split", new Split("#"));
-
-// Use the table function in the Java Table API. "as" specifies the field
names of the table.
-myTable.joinLateral("split(a) as (word, length)")
- .select("a, word, length");
-myTable.leftOuterJoinLateral("split(a) as (word, length)")
- .select("a, word, length");
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)");
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE");
{% endhighlight %}
</div>
-Please note that POJO types do not have a deterministic field order.
Therefore, you cannot rename the fields of POJO returned by a table function
using `AS`.
+</div>
+
+#### Custom Type Inference
-By default the result type of a `TableFunction` is determined by Flink’s
automatic type extraction facilities. This works well for basic types and
simple POJOs but might be wrong for more complex, custom, or composite types.
In such a case, the type of the result can be manually specified by overriding
`TableFunction#getResultType()` which returns its `TypeInformation`.
+For most scenarios, `@DataTypeHint` and `@FunctionHint` should be sufficient
to model user-defined functions. However, by overriding the automatic type
inference defined in `getTypeInference()`, implementers can create arbitrary
functions that behave like built-in system functions.
-The following example shows an example of a `TableFunction` that returns a
`Row` type which requires explicit type information. We define that the
returned table type should be `RowTypeInfo(String, Integer)` by overriding
`TableFunction#getResultType()`.
+The following example implemented in Java illustrates the potential of a
custom type inference logic. It uses a string literal argument to determine the
result type of a function. The function takes two string arguments: the first
argument represents the string to be parsed, the second argument represents the
target type.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Java" markdown="1">
{% highlight java %}
-public class CustomTypeSplit extends TableFunction<Row> {
- public void eval(String str) {
- for (String s : str.split(" ")) {
- Row row = new Row(2);
- row.setField(0, s);
- row.setField(1, s.length());
- collect(row);
- }
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+public static class LiteralFunction extends ScalarFunction {
+ public Object eval(String s, String type) {
+ switch (type) {
+ case "INT":
+ return Integer.valueOf(s);
+ case "DOUBLE":
+ return Double.valueOf(s);
+ case "STRING":
+ default:
+ return s;
}
+ }
- @Override
- public TypeInformation<Row> getResultType() {
- return Types.ROW(Types.STRING(), Types.INT());
- }
+ // the automatic, reflection-based type inference is disabled and
+ // replaced by the following logic
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ // specify typed arguments
+ // parameters will be casted implicitly to those types if necessary
+ .typedArguments(DataTypes.STRING(), DataTypes.STRING())
+ // specify a strategy for the result data type of the function
+ .outputTypeStrategy(callContext -> {
+ if (!callContext.isArgumentLiteral(1) ||
callContext.isArgumentNull(1)) {
+ throw callContext.newValidationError("Literal expected for second
argument.");
+ }
+ // return a data type based on a literal
+ final String literal = callContext.getArgumentValue(1,
String.class).orElse("STRING");
+ switch (literal) {
+ case "INT":
+ return Optional.of(DataTypes.INT().notNull());
+ case "DOUBLE":
+ return Optional.of(DataTypes.DOUBLE().notNull());
+ case "STRING":
+ default:
+ return Optional.of(DataTypes.STRING());
+ }
+ })
+ .build();
+ }
}
+
{% endhighlight %}
+</div>
-<div data-lang="scala" markdown="1">
-In order to define a table function one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement (one or
more) evaluation methods. The behavior of a table function is determined by its
evaluation methods. An evaluation method must be declared `public` and named
`eval`. The `TableFunction` can be overloaded by implementing multiple methods
named `eval`. The parameter types of the evaluation methods determine all valid
parameters of the table function. Evaluation methods can also support variable
arguments, such as `eval(String... strs)`. The type of the returned table is
determined by the generic type of `TableFunction`. Evaluation methods emit
output rows using the protected `collect(T)` method.
+</div>
-In the Table API, a table function is used with `.joinLateral` or
`.leftOuterJoinLateral`. The `joinLateral` operator (cross) joins each row from
the outer table (table on the left of the operator) with all rows produced by
the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table. In SQL use `LATERAL
TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an ON TRUE join
condition (see examples below).
+{% top %}
-The following example shows how to define table-valued function, register it
in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+Scalar Functions
+----------------
-{% highlight scala %}
-// The generic type "(String, Int)" determines the schema of the returned
table as (String, Integer).
-class Split(separator: String) extends TableFunction[(String, Int)] {
- def eval(str: String): Unit = {
- // use collect(...) to emit a row.
- str.split(separator).foreach(x => collect((x, x.length)))
+A user-defined scalar function maps zero, one, or multiple scalar values to a
new scalar value. Any data type listed in the [data types
section](../types.html) can be used as a parameter or return type of an
evaluation method.
+
+In order to define a scalar function, one has to extend the base class
`ScalarFunction` in `org.apache.flink.table.functions` and implement one or
more evaluation methods named `eval(...)`.
+
+The following example shows how to define your own hash code function and call
it in a query. See the [Implementation Guide](#implementation-guide) for more
details.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.InputGroup;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.ScalarFunction;
+import static org.apache.flink.table.api.Expressions.*;
+
+public static class HashFunction extends ScalarFunction {
+
+ // take any data type and return INT
+ public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
+ return o.hashCode();
}
}
-val tableEnv = BatchTableEnvironment.create(env)
-val myTable = ... // table schema: [a: String]
-
-// Use the table function in the Scala Table API (Note: No registration
required in Scala Table API).
-val split = new Split("#")
-// "as" specifies the field names of the generated table.
-myTable.joinLateral(split('a) as ('word, 'length)).select('a, 'word, 'length)
-myTable.leftOuterJoinLateral(split('a) as ('word, 'length)).select('a, 'word,
'length)
-
-// Register the table function to use it in SQL queries.
-tableEnv.registerFunction("split", new Split("#"))
-
-// Use the table function in SQL with LATERAL and TABLE keywords.
-// CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)")
-// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE")
-{% endhighlight %}
-**IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object
is a singleton and will cause concurrency issues.
+TableEnvironment env = TableEnvironment.create(...);
-Please note that POJO types do not have a deterministic field order.
Therefore, you cannot rename the fields of POJO returned by a table function
using `AS`.
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(HashFunction.class, $("myField")));
-By default the result type of a `TableFunction` is determined by Flink’s
automatic type extraction facilities. This works well for basic types and
simple POJOs but might be wrong for more complex, custom, or composite types.
In such a case, the type of the result can be manually specified by overriding
`TableFunction#getResultType()` which returns its `TypeInformation`.
+// register function
+env.createTemporarySystemFunction("HashFunction", HashFunction.class);
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $("myField")));
-The following example shows an example of a `TableFunction` that returns a
`Row` type which requires explicit type information. We define that the
returned table type should be `RowTypeInfo(String, Integer)` by overriding
`TableFunction#getResultType()`.
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
+
+{% endhighlight %}
+</div>
+<div data-lang="Scala" markdown="1">
{% highlight scala %}
-class CustomTypeSplit extends TableFunction[Row] {
- def eval(str: String): Unit = {
- str.split(" ").foreach({ s =>
- val row = new Row(2)
- row.setField(0, s)
- row.setField(1, s.length)
- collect(row)
- })
- }
+import org.apache.flink.table.annotation.InputGroup
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.ScalarFunction
+
+class HashFunction extends ScalarFunction {
- override def getResultType: TypeInformation[Row] = {
- Types.ROW(Types.STRING, Types.INT)
+ // take any data type and return INT
+ def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int {
+ return o.hashCode();
}
}
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env.from("MyTable").select(call(classOf[HashFunction], $"myField"))
+
+// register function
+env.createTemporarySystemFunction("HashFunction", classOf[HashFunction])
+
+// call registered function in Table API
+env.from("MyTable").select(call("HashFunction", $"myField"))
+
+// call registered function in SQL
+env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")
+
{% endhighlight %}
+</div>
</div>
-<div data-lang="python" markdown="1">
-In order to define a Python table function, one can extend the base class
`TableFunction` in `pyflink.table.udtf` and Implement an evaluation method. The
behavior of a Python table function is determined by the evaluation method
which is named eval.
+If you intend to implement or call functions in Python, please refer to the
[Python Scalar Functions](../python/python_udfs.html#scalar-functions)
documentation for more details.
-In the Python Table API, a Python table function is used with `.join_lateral`
or `.left_outer_join_lateral`. The `join_lateral` operator (cross) joins each
row from the outer table (table on the left of the operator) with all rows
produced by the table-valued function (which is on the right side of the
operator). The `left_outer_join_lateral` operator joins each row from the outer
table (table on the left of the operator) with all rows produced by the
table-valued function (which is on the right side of the operator) and
preserves outer rows for which the table function returns an empty table. In
SQL use `LATERAL TABLE(<TableFunction>)` with CROSS JOIN and LEFT JOIN with an
ON TRUE join condition (see examples below).
+{% top %}
-The following example shows how to define a Python table function, registered
it in the TableEnvironment, and call it in a query. Note that you can configure
your table function via a constructor before it is registered:
+Table Functions
+---------------
-{% highlight python %}
-class Split(TableFunction):
- def eval(self, string):
- for s in string.split(" "):
- yield s, len(s)
+Similar to a user-defined scalar function, a user-defined table function takes
zero, one, or multiple scalar values as input arguments. However, in contrast
to a scalar function, it can return an arbitrary number of rows (or structured
types) as output instead of a single value. The returned record may consist of
one or more fields. If an output record consists of only one field, the
structured record can be omitted and a scalar value can be emitted. It will be
wrapped into an implicit row by the runtime.
+
+In order to define a table function, one has to extend the base class
`TableFunction` in `org.apache.flink.table.functions` and implement one or more
evaluation methods named `eval(...)`. Similar to other functions, input and
output data types are automatically extracted using reflection. This includes
the generic argument `T` of the class for determining an output data type. In
contrast to scalar functions, the evaluation method itself must not have a
return type, instead, table functions provide a `collect(T)` method that can be
called within every evaluation method for emitting zero, one, or more records.
-env = StreamExecutionEnvironment.get_execution_environment()
-table_env = StreamTableEnvironment.create(env)
-my_table = ... # type: Table, table schema: [a: String]
+In the Table API, a table function is used with `.joinLateral(...)` or
`.leftOuterJoinLateral(...)`. The `joinLateral` operator (cross) joins each row
from the outer table (table on the left of the operator) with all rows produced
by the table-valued function (which is on the right side of the operator). The
`leftOuterJoinLateral` operator joins each row from the outer table (table on
the left of the operator) with all rows produced by the table-valued function
(which is on the right side of the operator) and preserves outer rows for which
the table function returns an empty table.
-# register the Python Table Function
-table_env.register_function("split", udtf(Split(), DataTypes.STRING(),
[DataTypes.STRING(), DataTypes.INT()]))
+In SQL, use `LATERAL TABLE(<TableFunction>)` with `JOIN` or `LEFT JOIN` with
an `ON TRUE` join condition.
-# use the Python Table Function in Python Table API
-my_table.join_lateral("split(a) as (word, length)")
-my_table.left_outer_join_lateral("split(a) as (word, length)")
+The following example shows how to define your own split function and call it
in a query. See the [Implementation Guide](#implementation-guide) for more
details.
-# use the Python Table function in SQL API
-table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL
TABLE(split(a)) as T(word, length)")
-table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL
TABLE(split(a)) as T(word, length) ON TRUE")
+<div class="codetabs" markdown="1">
+
+<div data-lang="Java" markdown="1">
+{% highlight java %}
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import static org.apache.flink.table.api.Expressions.*;
+
+@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
+public static class SplitFunction extends TableFunction<Row> {
+
+ public void eval(String str) {
+ for (String s : str.split(" ")) {
+ // use collect(...) to emit a row
+ collect(Row.of(s, s.length()));
+ }
+ }
+}
+
+TableEnvironment env = TableEnvironment.create(...);
+
+// call function "inline" without registration in Table API
+env
+ .from("MyTable")
+ .joinLateral(call(SplitFunction.class, $("myField")))
+ .select($("myField"), $("word"), $("length"));
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(SplitFunction.class, $("myField")))
+ .select($("myField"), $("word"), $("length"));
+
+// rename fields of the function in Table API
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord",
"newLength"))
+ .select($("myField"), $("newWord"), $("newLength"));
+
+// register function
+env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
+
+// call registered function in Table API
+env
+ .from("MyTable")
+ .joinLateral(call("SplitFunction", $("myField")))
+ .select($("myField"), $("word"), $("length"));
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call("SplitFunction", $("myField")))
+ .select($("myField"), $("word"), $("length"));
+
+// call registered function in SQL
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
+
+// rename fields of the function in SQL
+env.sqlQuery(
+ "SELECT myField, newWord, newLength " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON
TRUE");
{% endhighlight %}
+</div>
-There are many ways to define a Python table function besides extending the
base class `TableFunction`.
-Please refer to the [Python Table Function]({{ site.baseurl
}}/dev/table/python/python_udfs.html#table-functions) documentation for more
details.
-
+<div data-lang="Scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.annotation.DataTypeHint
+import org.apache.flink.table.annotation.FunctionHint
+import org.apache.flink.table.api._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.types.Row
+
+@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
+class SplitFunction extends TableFunction[Row] {
+
+ def eval(str: String): Unit = {
+ // use collect(...) to emit a row
+ str.split(" ").foreach(s => collect(Row.of(s, s.length)))
+ }
+}
+
+val env = TableEnvironment.create(...)
+
+// call function "inline" without registration in Table API
+env
+ .from("MyTable")
+ .joinLateral(call(classOf[SplitFunction], $"myField")
+ .select($"myField", $"word", $"length")
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField"))
+ .select($"myField", $"word", $"length")
+
+// rename fields of the function in Table API
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord",
"newLength"))
+ .select($"myField", $"newWord", $"newLength")
+
+// register function
+env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
+
+// call registered function in Table API
+env
+ .from("MyTable")
+ .joinLateral(call("SplitFunction", $"myField"))
+ .select($"myField", $"word", $"length")
+env
+ .from("MyTable")
+ .leftOuterJoinLateral(call("SplitFunction", $"myField"))
+ .select($"myField", $"word", $"length")
+
+// call registered function in SQL
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
+env.sqlQuery(
+ "SELECT myField, word, length " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE")
+
+// rename fields of the function in SQL
+env.sqlQuery(
+ "SELECT myField, newWord, newLength " +
+ "FROM MyTable " +
+ "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON
TRUE")
+
+{% endhighlight %}
</div>
+
</div>
-{% top %}
+If you intend to implement functions in Scala, do not implement a table
function as a Scala `object`. Scala `object`s are singletons and will cause
concurrency issues.
+If you intend to implement or call functions in Python, please refer to the
[Python Table Functions](../python/python_udfs.html#table-functions)
documentation for more details.
Review comment:
```suggestion
If you intend to implement or call functions in Python, please refer to the
[Python Table Functions]({% link dev/table/python/python_udfs.md
%}#table-functions) documentation for more details.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]