dianfu commented on a change in pull request #13905:
URL: https://github.com/apache/flink/pull/13905#discussion_r516615229



##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling

Review comment:
       ```suggestion
   For each set of rows that need to be aggregated, the runtime will create an 
empty accumulator by calling
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).

Review comment:
       ```suggestion
   **NOTE:** Currently the general user-defined aggregate function is only 
supported in the GroupBy aggregation of the blink planner in streaming mode. 
For batch mode or windowed aggregation, it's currently not supported and it is 
recommended to use the [Vectorized Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:

Review comment:
       ```suggestion
   The result type and accumulator type of the aggregate function can be 
specified by one of the following two approaches:
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to 
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)` 
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the 
use case:**
+
+- `retract(...)` is required when there are other operations that generate 
retract messages before current UDAF call, e.g. group aggregate , outer join. \

Review comment:
       ```suggestion
   - `retract(...)` is required when there are operations that could generate 
retraction messages before the current aggregation operation, e.g. group 
aggregate , outer join. \
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to 
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)` 
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the 
use case:**
+
+- `retract(...)` is required when there are other operations that generate 
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to 
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result 
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data, 
`pyflink.table.ListView` and `pyflink.table.MapView` 
+provide advanced features for leveraging Flink's state backends in unbounded 
data scenarios. 
+This feature can be enabled by declaring `DataTypes.LIST_VIEW(...)` and 
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
+
+{% highlight python %}
+from pyflink.table import ListView
+
+class ListViewConcatAggregateFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        # the ListView is iterable
+        return accumulator[1].join(accumulator[0])
+
+    def create_accumulator(self):
+        return Row(ListView(), '')
+
+    def accumulate(self, accumulator, *args):
+        accumulator[1] = args[1]
+        # the ListView support add, clear and iterate operations.
+        accumulator[0].add(args[0])
+
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            # declare the first column of the accumulator as a string ListView.
+            DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+    def get_result_type(self):
+        return DataTypes.STRING()
+{% endhighlight %}
+
+Currently there are 2 limitations to use the:
+
+1. The accumulator must be a `Row`.
+2. The `ListView` and `MapView` must be the first level children of the `Row` 
accumulator.
+
+Please see the docs of the corresponding classes for more information about 
this advanced feature.

Review comment:
       ```suggestion
   Please refer to the documentation of the corresponding classes for more 
information about this advanced feature.
   ```
   
   What about adding a link to the class documentation?

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,

Review comment:
       ```suggestion
   In the above example, we assume a table that contains data about beverages. 
The table consists of three columns (`id`, `name`,
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.

Review comment:
       ```suggestion
   - Wrap the function instance with the decorator `udaf` in 
`pyflink.table.udf` and specify the parameters `result_type` and 
`accumulator_type`.
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 

Review comment:
       ```suggestion
   aggregate function will be called to compute the aggregated result. 
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to 
ensure exactly-once semantics.

Review comment:
       ```suggestion
   by Flink's checkpointing mechanism and are restored in case of failover to 
ensure exactly-once semantics.
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator

Review comment:
       ```suggestion
   The `accumulate(...)` method of our `WeightedAvg` class takes three input 
arguments. The first one is the accumulator
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we

Review comment:
       ```suggestion
   needs to store the weighted sum and count of all the data that have already 
been accumulated. In our example, we
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed

Review comment:
       ```suggestion
   use a `Row` object as the accumulator. Accumulators will be managed
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input

Review comment:
       ```suggestion
   `create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
aggregate function will be called for each input
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.

Review comment:
       ```suggestion
   - Implement the method named `get_result_type()` and 
`get_accumulator_type()`.
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.

Review comment:
       ```suggestion
   a `max()` aggregation.
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to 
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)` 
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the 
use case:**
+
+- `retract(...)` is required when there are other operations that generate 
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to 
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result 
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data, 
`pyflink.table.ListView` and `pyflink.table.MapView` 
+provide advanced features for leveraging Flink's state backends in unbounded 
data scenarios. 

Review comment:
       ```suggestion
   could be used instead of list and map. These two data structures provide the 
similar functionalities as list and map, however usually having better 
performance by leveraging Flink's state backend to eliminate unnecessary state 
access.
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to 
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)` 
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the 
use case:**
+
+- `retract(...)` is required when there are other operations that generate 
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to 
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result 
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data, 
`pyflink.table.ListView` and `pyflink.table.MapView` 
+provide advanced features for leveraging Flink's state backends in unbounded 
data scenarios. 
+This feature can be enabled by declaring `DataTypes.LIST_VIEW(...)` and 
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
+
+{% highlight python %}
+from pyflink.table import ListView
+
+class ListViewConcatAggregateFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        # the ListView is iterable
+        return accumulator[1].join(accumulator[0])
+
+    def create_accumulator(self):
+        return Row(ListView(), '')
+
+    def accumulate(self, accumulator, *args):
+        accumulator[1] = args[1]
+        # the ListView support add, clear and iterate operations.
+        accumulator[0].add(args[0])
+
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            # declare the first column of the accumulator as a string ListView.
+            DataTypes.FIELD("f0", DataTypes.LIST_VIEW(DataTypes.STRING())),
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+    def get_result_type(self):
+        return DataTypes.STRING()
+{% endhighlight %}
+
+Currently there are 2 limitations to use the:

Review comment:
       ```suggestion
   Currently there are 2 limitations to use the ListView and MapView:
   ```

##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -227,3 +227,164 @@ def iterable_func(x):
       result = [1, 2, 3]
       return result
 {% endhighlight %}
+
+## Aggregation Functions
+
+A user-defined aggregate function (_UDAGG_) maps scalar values of multiple 
rows to a new scalar value.
+
+**NOTE:** Currently the general user-defined aggregate function is only 
supported in the group aggregate operation of the blink stream planner. For 
batch execution or windowed aggregate, it is recommended to use the [Vectorized 
Aggregate Functions]({% link 
dev/python/table-api-users-guide/udfs/vectorized_python_udfs.md 
%}#vectorized-aggregate-functions).
+
+The behavior of an aggregate function is centered around the concept of an 
accumulator. The _accumulator_
+is an intermediate data structure that stores the aggregated values until a 
final aggregation result
+is computed.
+
+For each set of rows that needs to be aggregated, the runtime will create an 
empty accumulator by calling
+`create_accumulator()`. Subsequently, the `accumulate(...)` method of the 
function is called for each input
+row to update the accumulator. Currently after each row has been processed, 
the `get_value(...)` method of the
+function is called to compute and return the immediate result. 
+
+The following example illustrates the aggregation process:
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl 
}}/fig/udagg-mechanism-python.png" width="80%">
+</center>
+
+In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
+and `price`) and 5 rows. We would like to find the highest price of all 
beverages in the table, i.e., perform
+a `max()` aggregation. We need to consider each of the 5 rows. The result is a 
single numeric value.
+
+In order to define an aggregate function, one has to extend the base class 
`AggregateFunction` in
+`pyflink.table` and implement the evaluation method named `accumulate(...)`. 
+The result type and accumulator type of the aggregate function can be 
specified by such 2 approach:
+
+- implement the method named `get_result_type()` and `get_accumulator_type()`.
+- wrap the function instance with the decorator `udaf` in `pyflink.table.udf` 
and specify the parameter `result_type` and `accumulator_type`.
+
+The following example shows how to define your own aggregate function and call 
it in a query.
+
+{% highlight python %}
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import AggregateFunction, DataTypes, StreamTableEnvironment
+from pyflink.table.expressions import call
+from pyflink.table.udf import udaf
+
+
+class WeightedAvg(AggregateFunction):
+
+    def create_accumulator(self):
+        # Row(sum, count)
+        return Row(0, 0)
+
+    def get_value(self, accumulator):
+        if accumulator[1] == 0:
+            return None
+        else:
+            return accumulator[0] / accumulator[1]
+
+    def accumulate(self, accumulator, value, weight):
+        accumulator[0] += value * weight
+        accumulator[1] += weight
+    
+    def retract(self, accumulator, value, weight):
+        accumulator[0] -= value * weight
+        accumulator[1] -= weight
+        
+    def get_result_type(self):
+        return DataTypes.BIGINT()
+        
+    def get_accumulator_type(self):
+        return DataTypes.ROW([
+            DataTypes.FIELD("f0", DataTypes.BIGINT()), 
+            DataTypes.FIELD("f1", DataTypes.BIGINT())])
+
+
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+# the result type and accumulator type can also be specified in the udaf 
decorator:
+# weighted_avg = udaf(WeightedAvg(), result_type=DataTypes.BIGINT(), 
accumulator_type=...)
+weighted_avg = udaf(WeightedAvg())
+t = table_env.from_elements([(1, 2, "Lee"),
+                             (3, 4, "Jay"),
+                             (5, 6, "Jay"),
+                             (7, 8, "Lee")]).alias("value", "count", "name")
+
+# call function "inline" without registration in Table API
+result = t.group_by(t.name).select(weighted_avg(t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register function
+table_env.create_temporary_function("weighted_avg", WeightedAvg())
+
+# call registered function in Table API
+result = t.group_by(t.name).select(call("weighted_avg", t.value, 
t.count).alias("avg")).to_pandas()
+print(result)
+
+# register table
+table_env.create_temporary_view("source", t)
+
+# call registered function in SQL
+result = table_env.sql_query(
+    "SELECT weighted_avg(`value`, `count`) AS avg FROM source GROUP BY 
name").to_pandas()
+print(result)
+{% endhighlight %}
+
+The `accumulate(...)` method of our `WeightedAvg` class takes three inputs. 
The first one is the accumulator
+and the other two are user-defined inputs. In order to calculate a weighted 
average value, the accumulator
+needs to store the weighted sum and count of all the data that has been 
accumulated. In our example, we
+use a `Row` object to be the accumulator. Accumulators are automatically 
managed
+by Flink's checkpointing mechanism and are restored in case of a failure to 
ensure exactly-once semantics.
+
+### Mandatory and Optional Methods
+
+**The following methods are mandatory for each `AggregateFunction`:**
+
+- `create_accumulator()`
+- `accumulate(...)` 
+- `get_value(...)`
+
+**The following methods of `AggregateFunction` are required depending on the 
use case:**
+
+- `retract(...)` is required when there are other operations that generate 
retract messages before current UDAF call, e.g. group aggregate , outer join. \
+This method is optional, but it is strongly recommended to be implemented to 
ensure the UDAF can be used in any use case.
+- `get_result_type()` and `get_accumulator_type()` is required if the result 
type and accumulator type would not be specified in the `udaf` decorator.
+
+### ListView and MapView
+
+If an accumulator needs to store large amounts of data, 
`pyflink.table.ListView` and `pyflink.table.MapView` 
+provide advanced features for leveraging Flink's state backends in unbounded 
data scenarios. 
+This feature can be enabled by declaring `DataTypes.LIST_VIEW(...)` and 
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:

Review comment:
       ```suggestion
   You can use them by declaring `DataTypes.LIST_VIEW(...)` and 
`DataTypes.MAP_VIEW(...)` in the accumulator type, e.g.:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to