dianfu commented on a change in pull request #13905:
URL: https://github.com/apache/flink/pull/13905#discussion_r516615229
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
Review comment:
```suggestion
For each set of rows that need to be aggregated, the runtime will create an
empty accumulator by calling
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
Review comment:
```suggestion
**NOTE:** Currently the general user-defined aggregate function is only
supported in the GroupBy aggregation of the blink planner in streaming mode.
For batch mode or windowed aggregation, it's currently not supported and it is
recommended to use the [Vectorized Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
Review comment:
```suggestion
The result type and accumulator type of the aggregate function can be
specified by one of the following two approaches:
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)`
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the
use case:**
+
+- `retract(...)` is required when there are other operations that generate
retract messages before current UDAF call, e.g. group aggregate , outer join. \
Review comment:
```suggestion
- `retract(...)` is required when there are operations that could generate
retraction messages before the current aggregation operation, e.g. group
aggregate , outer join. \
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)`
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the
use case:**
+
+- `retract(...)` is required when there are other operations that generate
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data,
`pyflink.table.ListView` and `pyflink.table.MapView`
+provide advanced features for leveraging Flink's state backends in unbounded
data scenarios.
+This feature can be enabled by declaring `DataTypes.LIST_VIEW(...)` and
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
+
+{% highlight python %}
+from pyflink.table import ListView
+
+class ListViewConcatAggregateFunction(AggregateFunction):
+
+ def get_value(self, accumulator):
+ # the ListView is iterable
+ return accumulator[1].join(accumulator[0])
+
+ def create_accumulator(self):
+ return Row(ListView(), '')
+
+ def accumulate(self, accumulator, *args):
+ accumulator[1] = args[1]
+ # the ListView support add, clear and iterate operations.
+ accumulator[0].add(args[0])
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ # declare the first column of the accumulator as a string ListView.
+ DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+ def get_result_type(self):
+ return DataTypes.STRING()
+{% endhighlight %}
+
+Currently there are 2 limitations to use the:
+
+1. The accumulator must be a `Row`.
+2. The `ListView` and `MapView` must be the first level children of the `Row`
accumulator.
+
+Please see the docs of the corresponding classes for more information about
this advanced feature.
Review comment:
```suggestion
Please refer to the documentation of the corresponding classes for more
information about this advanced feature.
```
What about adding a link to the class documentation?
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
Review comment:
```suggestion
In the above example, we assume a table that contains data about beverages.
The table consists of three columns (`id`, `name`,
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
Review comment:
```suggestion
- Wrap the function instance with the decorator `udaf` in
`pyflink.table.udf` and specify the parameters `result_type` and
`accumulator_type`.
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
Review comment:
```suggestion
aggregate function will be called to compute the aggregated result.
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
Review comment:
```suggestion
by Flink's checkpointing mechanism and are restored in case of failover to
ensure exactly-once semantics.
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
Review comment:
```suggestion
The `accumulate(...)` method of our `WeightedAvg` class takes three input
arguments. The first one is the accumulator
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
Review comment:
```suggestion
needs to store the weighted sum and count of all the data that have already
been accumulated. In our example, we
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
Review comment:
```suggestion
use a `Row` object as the accumulator. Accumulators will be managed
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
Review comment:
```suggestion
`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
aggregate function will be called for each input
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
Review comment:
```suggestion
- Implement the method named `get_result_type()` and
`get_accumulator_type()`.
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
Review comment:
```suggestion
a `max()` aggregation.
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)`
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the
use case:**
+
+- `retract(...)` is required when there are other operations that generate
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data,
`pyflink.table.ListView` and `pyflink.table.MapView`
+provide advanced features for leveraging Flink's state backends in unbounded
data scenarios.
Review comment:
```suggestion
could be used instead of list and map. These two data structures provide the
similar functionalities as list and map, however usually having better
performance by leveraging Flink's state backend to eliminate unnecessary state
access.
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)`
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the
use case:**
+
+- `retract(...)` is required when there are other operations that generate
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data,
`pyflink.table.ListView` and `pyflink.table.MapView`
+provide advanced features for leveraging Flink's state backends in unbounded
data scenarios.
+This feature can be enabled by declaring `DataTypes.LIST_VIEW(...)` and
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
+
+{% highlight python %}
+from pyflink.table import ListView
+
+class ListViewConcatAggregateFunction(AggregateFunction):
+
+ def get_value(self, accumulator):
+ # the ListView is iterable
+ return accumulator[1].join(accumulator[0])
+
+ def create_accumulator(self):
+ return Row(ListView(), '')
+
+ def accumulate(self, accumulator, *args):
+ accumulator[1] = args[1]
+ # the ListView support add, clear and iterate operations.
+ accumulator[0].add(args[0])
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ # declare the first column of the accumulator as a string ListView.
+ DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+ def get_result_type(self):
+ return DataTypes.STRING()
+{% endhighlight %}
+
+Currently there are 2 limitations to use the:
Review comment:
```suggestion
Currently there are 2 limitations to use the ListView and MapView:
```
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
result = [1, 2, 3]
return result
{% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only
supported in the group aggregate operation of the blink stream planner. For
batch execution or windowed aggregate, it is recommended to use the [Vectorized
Aggregate Functions]({% link
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the
function is called for each input
+row to update the accumulator. Currently after each row has been processed,
the `get_value(...)` method of the
+function is called to compute and return the immediate result.
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`.
+The result type and accumulator type of the aggregate function can be
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf`
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+ def create_accumulator(self):
+ # Row(sum, count)
+ return Row(0, 0)
+
+ def get_value(self, accumulator):
+ if accumulator[1] == 0:
+ return None
+ else:
+ return accumulator[0] / accumulator[1]
+
+ def accumulate(self, accumulator, value, weight):
+ accumulator[0] += value * weight
+ accumulator[1] += weight
+
+ def retract(self, accumulator, value, weight):
+ accumulator[0] -= value * weight
+ accumulator[1] -= weight
+
+ def get_result_type(self):
+ return DataTypes.BIGINT()
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW([
+ DataTypes.FIELD("f0", DataTypes.BIGINT()),
+ DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(),
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+ (3, 4, "Jay"),
+ (5, 6, "Jay"),
+ (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+ "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs.
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)`
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the
use case:**
+
+- `retract(...)` is required when there are other operations that generate
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data,
`pyflink.table.ListView` and `pyflink.table.MapView`
+provide advanced features for leveraging Flink's state backends in unbounded
data scenarios.
+This feature can be enabled by declaring `DataTypes.LIST_VIEW(...)` and
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
Review comment:
```suggestion
You can use them by declaring `DataTypes.LIST_VIEW(...)` and
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]