This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 73251c5  [FLINK-22445][python][docs] Add documentation for row-based 
operations
73251c5 is described below

commit 73251c504c67dd98a137a6aea81e53c837e841e3
Author: huangxingbo <[email protected]>
AuthorDate: Mon Apr 26 10:47:07 2021 +0800

    [FLINK-22445][python][docs] Add documentation for row-based operations
    
    This closes #15757.
---
 .../docs/dev/python/table/operations/_index.md}    |   8 +-
 .../python/table/{ => operations}/operations.md    |   2 +-
 .../table/operations/row_based_operations.md       | 289 +++++++++++++++++++++
 .../docs/dev/python/table/python_types.md          |   4 +-
 docs/content.zh/docs/dev/table/tableApi.md         |   9 +-
 .../table/{operations.md => operations/_index.md}  |   8 +-
 .../python/table/{ => operations}/operations.md    |   2 +-
 .../table/operations/row_based_operations.md       | 289 +++++++++++++++++++++
 docs/content/docs/dev/python/table/python_types.md |   2 +-
 docs/content/docs/dev/table/tableApi.md            |   9 +-
 10 files changed, 593 insertions(+), 29 deletions(-)

diff --git a/docs/content/docs/dev/python/table/operations.md 
b/docs/content.zh/docs/dev/python/table/operations/_index.md
similarity index 81%
copy from docs/content/docs/dev/python/table/operations.md
copy to docs/content.zh/docs/dev/python/table/operations/_index.md
index 05b8de6..ac1e49b 100644
--- a/docs/content/docs/dev/python/table/operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/_index.md
@@ -1,9 +1,7 @@
 ---
-title: "Operations"
+title: Operations
+bookCollapseSection: true
 weight: 30
-type: docs
-aliases:
-- /dev/python/table-api-users-guide/operations.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -23,5 +21,3 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-
-<meta http-equiv="refresh" content="0; url={{< ref "docs/dev/table/tableapi" 
>}} "/>
diff --git a/docs/content.zh/docs/dev/python/table/operations.md 
b/docs/content.zh/docs/dev/python/table/operations/operations.md
similarity index 97%
rename from docs/content.zh/docs/dev/python/table/operations.md
rename to docs/content.zh/docs/dev/python/table/operations/operations.md
index 1928c25..80e156e 100644
--- a/docs/content.zh/docs/dev/python/table/operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/operations.md
@@ -1,5 +1,5 @@
 ---
-title: "Operations"
+title: "Overview"
 weight: 30
 type: docs
 aliases:
diff --git 
a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md 
b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
new file mode 100644
index 0000000..56e69d8
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
@@ -0,0 +1,289 @@
+---
+title: "Row-based Operations"
+weight: 31
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Row-based Operations
+
+This page describes how to use row-based operations in PyFlink Table API.
+
+## Map
+
+Performs a `map` operation with a python [general scalar function]({{< ref 
"docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized 
scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" 
>}}#vectorized-scalar-functions).
+The output will be flattened if the output type is a composite type.
+
+```python
+from pyflink.common import Row
+from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.types import DataTypes
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+                               DataTypes.FIELD("data", DataTypes.STRING())]))
+def func1(id: int, data: str) -> Row:
+    return Row(id, data * 2)
+
+# the input columns are specified as the inputs
+table.map(func1(col('id'), col('data'))).to_pandas()
+# result is 
+#    _c0         _c1
+#  0    1        HiHi
+#  1    2  HelloHello
+```
+
+It also supports to take a Row object (containing all the columns of the input 
table) as input.
+
+```python
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+                               DataTypes.FIELD("data", DataTypes.STRING())]))
+def func2(data: Row) -> Row:
+    return Row(data[0], data[1] * 2)
+
+# specify the function without the input columns
+table.map(func2).alias('id', 'data').to_pandas()
+# result is 
+#      id        data
+#  0    1        HiHi
+#  1    2  HelloHello
+```
+
+<span class="label label-info">Note</span> The input columns should not be 
specified when using func2 in the map operation.
+
+It also supports to use [vectorized scalar function]({{< ref 
"docs/dev/python/table/udfs/vectorized_python_udfs" 
>}}#vectorized-scalar-functions) in the `map` operation.
+It should be noted that the input type and output type should be 
pandas.DataFrame instead of Row in this case.
+
+```python
+import pandas as pd
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+                               DataTypes.FIELD("data", DataTypes.STRING())]),
+                               func_type='pandas')
+def func3(data: pd.DataFrame) -> pd.DataFrame:
+    res = pd.concat([data.id, data.data * 2], axis=1)
+    return res
+
+table.map(func3).alias('id', 'data').to_pandas()
+# result is 
+#      id        data
+#  0    1        HiHi
+#  1    2  HelloHello
+```
+
+
+## FlatMap
+
+Performs a `flat_map` operation with a python [table function]({{< ref 
"docs/dev/python/table/udfs/python_udfs" >}}#table-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table.udf import udtf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 
'data'])
+
+@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
+def split(x: Row) -> Row:
+    for s in x[1].split(","):
+        yield x[0], s
+
+# use split in `flat_map`
+table.flat_map(split).to_pandas()
+# result is
+#    f0       f1
+# 0   1       Hi
+# 1   1    Flink
+# 2   2    Hello
+```
+
+The python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs" 
>}}#table-functions) could also be used in `join_lateral` and 
`left_outer_join_lateral`.
+
+```python
+# use table function in `join_lateral` or `left_outer_join_lateral`
+table.join_lateral(split.alias('a', 'b')).to_pandas()
+# result is 
+#    id      data  a      b
+# 0   1  Hi,Flink  1     Hi
+# 1   1  Hi,Flink  1  Flink
+# 2   2     Hello  2  Hello
+```
+
+## Aggregate
+
+Performs an `aggregate` operation with a python [general aggregate 
function]({{< ref "docs/dev/python/table/udfs/python_udfs" 
>}}#aggregate-functions) or [vectorized aggregate function]({{< ref 
"docs/dev/python/table/udfs/vectorized_python_udfs" 
>}}#vectorized-aggregate-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.udf import AggregateFunction, udaf
+
+class CountAndSumAggregateFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        return Row(accumulator[0], accumulator[1])
+
+    def create_accumulator(self):
+        return Row(0, 0)
+
+    def accumulate(self, accumulator, *args):
+        accumulator[0] += 1
+        accumulator[1] += args[0][1]
+
+    def retract(self, accumulator, *args):
+        accumulator[0] -= 1
+        accumulator[1] -= args[0][1]
+
+    def merge(self, accumulator, accumulators):
+        for other_acc in accumulators:
+            accumulator[0] += other_acc[0]
+            accumulator[1] += other_acc[1]
+
+    def get_accumulator_type(self):
+        return DataTypes.ROW(
+            [DataTypes.FIELD("a", DataTypes.BIGINT()),
+             DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+    def get_result_type(self):
+        return DataTypes.ROW(
+            [DataTypes.FIELD("a", DataTypes.BIGINT()),
+             DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+function = CountAndSumAggregateFunction()
+agg = udaf(function,
+           result_type=function.get_result_type(),
+           accumulator_type=function.get_accumulator_type(),
+           name=str(function.__class__.__name__))
+
+# aggregate with a python general aggregate function
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+result = t.group_by(col('a')) \
+    .aggregate(agg.alias("c", "d")) \
+    .select(col('a'), col('c'), col('d'))
+result.to_pandas()
+
+# the result is
+#    a  c  d
+# 0  1  2  5
+# 1  2  1  1
+
+# aggregate with a python vectorized aggregate function
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
+                   result_type=DataTypes.ROW(
+                       [DataTypes.FIELD("a", DataTypes.FLOAT()),
+                        DataTypes.FIELD("b", DataTypes.INT())]),
+                   func_type="pandas")
+t.aggregate(pandas_udaf.alias("a", "b")) \
+    .select(col('a'), col('b')).to_pandas()
+
+# the result is
+#      a  b
+# 0  2.0  3
+```
+
+<span class="label label-info">Note</span> Similar to `map` operation, if you 
specify the aggregate function without the input columns in `aggregate` 
operation, it will take Row or Pandas.DataFrame as input which contains all the 
columns of the input table including the grouping keys.
+<span class="label label-info">Note</span> You have to close the "aggregate" 
with a select statement and it should not contain aggregate functions in the 
select statement.
+Besides, the output of aggregate will be flattened if it is a composite type.
+
+## FlatAggregate
+
+Performs a `flat_aggregate` operation with a python general [Table Aggregate 
Function]({{< ref "docs/dev/python/table/udfs/python_udfs" 
>}}#table-aggregate-functions)
+
+Similar to `GroupBy Aggregation`, `FlatAggregate` groups the inputs on the 
grouping keys.
+Different from `AggregateFunction`, `TableAggregateFunction` could return 0, 
1, or more records for a grouping key.
+Similar to `aggregate`, you have to close the `flat_aggregate` with a select 
statement and the select statement should not contain aggregate functions.
+
+```python
+from pyflink.common import Row
+from pyflink.table.expressions import col
+from pyflink.table.udf import TableAggregateFunction, udtaf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+class Top2(TableAggregateFunction):
+
+    def emit_value(self, accumulator):
+        yield Row(accumulator[0])
+        yield Row(accumulator[1])
+
+    def create_accumulator(self):
+        return [None, None]
+
+    def accumulate(self, accumulator, *args):
+        if args[0][0] is not None:
+            if accumulator[0] is None or args[0][0] > accumulator[0]:
+                accumulator[1] = accumulator[0]
+                accumulator[0] = args[0][0]
+            elif accumulator[1] is None or args[0][0] > accumulator[1]:
+                accumulator[1] = args[0][0]
+
+    def retract(self, accumulator, *args):
+        accumulator[0] = accumulator[0] - 1
+
+    def merge(self, accumulator, accumulators):
+        for other_acc in accumulators:
+            self.accumulate(accumulator, other_acc[0])
+            self.accumulate(accumulator, other_acc[1])
+
+    def get_accumulator_type(self):
+        return DataTypes.ARRAY(DataTypes.BIGINT())
+
+    def get_result_type(self):
+        return DataTypes.ROW(
+            [DataTypes.FIELD("a", DataTypes.BIGINT())])
+
+mytop = udtaf(Top2())
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 'Hi', 'Hello'),
+                              (3, 'Hi', 'hi'),
+                              (5, 'Hi2', 'hi'),
+                              (7, 'Hi', 'Hello'),
+                              (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+    .group_by(col('c')) \
+    .flat_aggregate(mytop) \
+    .select(col('b')) \
+    .flat_aggregate(mytop.alias("b")) \
+    .select(col('b'))
+
+result.to_pandas()
+# the result is
+#    b
+# 0  7
+# 1  5
+```
diff --git a/docs/content.zh/docs/dev/python/table/python_types.md 
b/docs/content.zh/docs/dev/python/table/python_types.md
index c65c646..b61989f 100644
--- a/docs/content.zh/docs/dev/python/table/python_types.md
+++ b/docs/content.zh/docs/dev/python/table/python_types.md
@@ -1,6 +1,6 @@
 ---
 title: "数据类型"
-weight: 31
+weight: 32
 type: docs
 aliases:
   - /zh/dev/python/table-api-users-guide/python_types.html
@@ -69,4 +69,4 @@ Python Table API的用户可以在Python Table API中,或者定义Python用户
 | `ARRAY` | `list` | `numpy.ndarray` |
 | `MULTISET` | `list` | `Not Supported Yet` |
 | `MAP` | `dict` | `Not Supported Yet` |
-| `ROW` | `Row` | `dict` |
\ No newline at end of file
+| `ROW` | `Row` | `dict` |
diff --git a/docs/content.zh/docs/dev/table/tableApi.md 
b/docs/content.zh/docs/dev/table/tableApi.md
index 9725c15..8030076 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row:
 # map operation with a python general scalar function
 func = udf(map_function, result_type=DataTypes.ROW(
                                      [DataTypes.FIELD("a", DataTypes.BIGINT()),
-                                      DataTypes.FIELD("b", 
DataTypes.BIGINT()))]))
-table = input.map(map_function).alias('a', 'b')
+                                      DataTypes.FIELD("b", 
DataTypes.BIGINT())]))
+table = input.map(func).alias('a', 'b')
 
 # map operation with a python vectorized scalar function
 pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
@@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf
 class CountAndSumAggregateFunction(AggregateFunction):
 
     def get_value(self, accumulator):
-        from pyflink.common import Row
         return Row(accumulator[0], accumulator[1])
 
     def create_accumulator(self):
-        from pyflink.common import Row
         return Row(0, 0)
 
     def accumulate(self, accumulator, *args):
@@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                        [DataTypes.FIELD("a", DataTypes.FLOAT()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
-t.select(t.b) \
-    .aggregate(pandas_udaf.alias("a", "b")) \
+t.aggregate(pandas_udaf.alias("a", "b")) \
     .select("a, b")
 
 ```
diff --git a/docs/content/docs/dev/python/table/operations.md 
b/docs/content/docs/dev/python/table/operations/_index.md
similarity index 81%
copy from docs/content/docs/dev/python/table/operations.md
copy to docs/content/docs/dev/python/table/operations/_index.md
index 05b8de6..ac1e49b 100644
--- a/docs/content/docs/dev/python/table/operations.md
+++ b/docs/content/docs/dev/python/table/operations/_index.md
@@ -1,9 +1,7 @@
 ---
-title: "Operations"
+title: Operations
+bookCollapseSection: true
 weight: 30
-type: docs
-aliases:
-- /dev/python/table-api-users-guide/operations.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -23,5 +21,3 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-
-<meta http-equiv="refresh" content="0; url={{< ref "docs/dev/table/tableapi" 
>}} "/>
diff --git a/docs/content/docs/dev/python/table/operations.md 
b/docs/content/docs/dev/python/table/operations/operations.md
similarity index 97%
rename from docs/content/docs/dev/python/table/operations.md
rename to docs/content/docs/dev/python/table/operations/operations.md
index 05b8de6..fe5aabb 100644
--- a/docs/content/docs/dev/python/table/operations.md
+++ b/docs/content/docs/dev/python/table/operations/operations.md
@@ -1,5 +1,5 @@
 ---
-title: "Operations"
+title: "Overview"
 weight: 30
 type: docs
 aliases:
diff --git 
a/docs/content/docs/dev/python/table/operations/row_based_operations.md 
b/docs/content/docs/dev/python/table/operations/row_based_operations.md
new file mode 100644
index 0000000..56e69d8
--- /dev/null
+++ b/docs/content/docs/dev/python/table/operations/row_based_operations.md
@@ -0,0 +1,289 @@
+---
+title: "Row-based Operations"
+weight: 31
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Row-based Operations
+
+This page describes how to use row-based operations in PyFlink Table API.
+
+## Map
+
+Performs a `map` operation with a python [general scalar function]({{< ref 
"docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized 
scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" 
>}}#vectorized-scalar-functions).
+The output will be flattened if the output type is a composite type.
+
+```python
+from pyflink.common import Row
+from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.types import DataTypes
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+                               DataTypes.FIELD("data", DataTypes.STRING())]))
+def func1(id: int, data: str) -> Row:
+    return Row(id, data * 2)
+
+# the input columns are specified as the inputs
+table.map(func1(col('id'), col('data'))).to_pandas()
+# result is 
+#    _c0         _c1
+#  0    1        HiHi
+#  1    2  HelloHello
+```
+
+It also supports to take a Row object (containing all the columns of the input 
table) as input.
+
+```python
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+                               DataTypes.FIELD("data", DataTypes.STRING())]))
+def func2(data: Row) -> Row:
+    return Row(data[0], data[1] * 2)
+
+# specify the function without the input columns
+table.map(func2).alias('id', 'data').to_pandas()
+# result is 
+#      id        data
+#  0    1        HiHi
+#  1    2  HelloHello
+```
+
+<span class="label label-info">Note</span> The input columns should not be 
specified when using func2 in the map operation.
+
+It also supports to use [vectorized scalar function]({{< ref 
"docs/dev/python/table/udfs/vectorized_python_udfs" 
>}}#vectorized-scalar-functions) in the `map` operation.
+It should be noted that the input type and output type should be 
pandas.DataFrame instead of Row in this case.
+
+```python
+import pandas as pd
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+                               DataTypes.FIELD("data", DataTypes.STRING())]),
+                               func_type='pandas')
+def func3(data: pd.DataFrame) -> pd.DataFrame:
+    res = pd.concat([data.id, data.data * 2], axis=1)
+    return res
+
+table.map(func3).alias('id', 'data').to_pandas()
+# result is 
+#      id        data
+#  0    1        HiHi
+#  1    2  HelloHello
+```
+
+
+## FlatMap
+
+Performs a `flat_map` operation with a python [table function]({{< ref 
"docs/dev/python/table/udfs/python_udfs" >}}#table-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table.udf import udtf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 
'data'])
+
+@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
+def split(x: Row) -> Row:
+    for s in x[1].split(","):
+        yield x[0], s
+
+# use split in `flat_map`
+table.flat_map(split).to_pandas()
+# result is
+#    f0       f1
+# 0   1       Hi
+# 1   1    Flink
+# 2   2    Hello
+```
+
+The python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs" 
>}}#table-functions) could also be used in `join_lateral` and 
`left_outer_join_lateral`.
+
+```python
+# use table function in `join_lateral` or `left_outer_join_lateral`
+table.join_lateral(split.alias('a', 'b')).to_pandas()
+# result is 
+#    id      data  a      b
+# 0   1  Hi,Flink  1     Hi
+# 1   1  Hi,Flink  1  Flink
+# 2   2     Hello  2  Hello
+```
+
+## Aggregate
+
+Performs an `aggregate` operation with a python [general aggregate 
function]({{< ref "docs/dev/python/table/udfs/python_udfs" 
>}}#aggregate-functions) or [vectorized aggregate function]({{< ref 
"docs/dev/python/table/udfs/vectorized_python_udfs" 
>}}#vectorized-aggregate-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.udf import AggregateFunction, udaf
+
+class CountAndSumAggregateFunction(AggregateFunction):
+
+    def get_value(self, accumulator):
+        return Row(accumulator[0], accumulator[1])
+
+    def create_accumulator(self):
+        return Row(0, 0)
+
+    def accumulate(self, accumulator, *args):
+        accumulator[0] += 1
+        accumulator[1] += args[0][1]
+
+    def retract(self, accumulator, *args):
+        accumulator[0] -= 1
+        accumulator[1] -= args[0][1]
+
+    def merge(self, accumulator, accumulators):
+        for other_acc in accumulators:
+            accumulator[0] += other_acc[0]
+            accumulator[1] += other_acc[1]
+
+    def get_accumulator_type(self):
+        return DataTypes.ROW(
+            [DataTypes.FIELD("a", DataTypes.BIGINT()),
+             DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+    def get_result_type(self):
+        return DataTypes.ROW(
+            [DataTypes.FIELD("a", DataTypes.BIGINT()),
+             DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+function = CountAndSumAggregateFunction()
+agg = udaf(function,
+           result_type=function.get_result_type(),
+           accumulator_type=function.get_accumulator_type(),
+           name=str(function.__class__.__name__))
+
+# aggregate with a python general aggregate function
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+result = t.group_by(col('a')) \
+    .aggregate(agg.alias("c", "d")) \
+    .select(col('a'), col('c'), col('d'))
+result.to_pandas()
+
+# the result is
+#    a  c  d
+# 0  1  2  5
+# 1  2  1  1
+
+# aggregate with a python vectorized aggregate function
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
+                   result_type=DataTypes.ROW(
+                       [DataTypes.FIELD("a", DataTypes.FLOAT()),
+                        DataTypes.FIELD("b", DataTypes.INT())]),
+                   func_type="pandas")
+t.aggregate(pandas_udaf.alias("a", "b")) \
+    .select(col('a'), col('b')).to_pandas()
+
+# the result is
+#      a  b
+# 0  2.0  3
+```
+
+<span class="label label-info">Note</span> Similar to `map` operation, if you 
specify the aggregate function without the input columns in `aggregate` 
operation, it will take Row or Pandas.DataFrame as input which contains all the 
columns of the input table including the grouping keys.
+<span class="label label-info">Note</span> You have to close the "aggregate" 
with a select statement and it should not contain aggregate functions in the 
select statement.
+Besides, the output of aggregate will be flattened if it is a composite type.
+
+## FlatAggregate
+
+Performs a `flat_aggregate` operation with a python general [Table Aggregate 
Function]({{< ref "docs/dev/python/table/udfs/python_udfs" 
>}}#table-aggregate-functions)
+
+Similar to `GroupBy Aggregation`, `FlatAggregate` groups the inputs on the 
grouping keys.
+Different from `AggregateFunction`, `TableAggregateFunction` could return 0, 
1, or more records for a grouping key.
+Similar to `aggregate`, you have to close the `flat_aggregate` with a select 
statement and the select statement should not contain aggregate functions.
+
+```python
+from pyflink.common import Row
+from pyflink.table.expressions import col
+from pyflink.table.udf import TableAggregateFunction, udtaf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+class Top2(TableAggregateFunction):
+
+    def emit_value(self, accumulator):
+        yield Row(accumulator[0])
+        yield Row(accumulator[1])
+
+    def create_accumulator(self):
+        return [None, None]
+
+    def accumulate(self, accumulator, *args):
+        if args[0][0] is not None:
+            if accumulator[0] is None or args[0][0] > accumulator[0]:
+                accumulator[1] = accumulator[0]
+                accumulator[0] = args[0][0]
+            elif accumulator[1] is None or args[0][0] > accumulator[1]:
+                accumulator[1] = args[0][0]
+
+    def retract(self, accumulator, *args):
+        accumulator[0] = accumulator[0] - 1
+
+    def merge(self, accumulator, accumulators):
+        for other_acc in accumulators:
+            self.accumulate(accumulator, other_acc[0])
+            self.accumulate(accumulator, other_acc[1])
+
+    def get_accumulator_type(self):
+        return DataTypes.ARRAY(DataTypes.BIGINT())
+
+    def get_result_type(self):
+        return DataTypes.ROW(
+            [DataTypes.FIELD("a", DataTypes.BIGINT())])
+
+mytop = udtaf(Top2())
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 'Hi', 'Hello'),
+                              (3, 'Hi', 'hi'),
+                              (5, 'Hi2', 'hi'),
+                              (7, 'Hi', 'Hello'),
+                              (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+    .group_by(col('c')) \
+    .flat_aggregate(mytop) \
+    .select(col('b')) \
+    .flat_aggregate(mytop.alias("b")) \
+    .select(col('b'))
+
+result.to_pandas()
+# the result is
+#    b
+# 0  7
+# 1  5
+```
diff --git a/docs/content/docs/dev/python/table/python_types.md 
b/docs/content/docs/dev/python/table/python_types.md
index ff5e101..f1ec4fa 100644
--- a/docs/content/docs/dev/python/table/python_types.md
+++ b/docs/content/docs/dev/python/table/python_types.md
@@ -1,6 +1,6 @@
 ---
 title: "Data Types"
-weight: 31
+weight: 32
 type: docs
 aliases:
   - /dev/python/table-api-users-guide/python_types.html
diff --git a/docs/content/docs/dev/table/tableApi.md 
b/docs/content/docs/dev/table/tableApi.md
index 6f25389..4ca942b 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row:
 # map operation with a python general scalar function
 func = udf(map_function, result_type=DataTypes.ROW(
                                      [DataTypes.FIELD("a", DataTypes.BIGINT()),
-                                      DataTypes.FIELD("b", 
DataTypes.BIGINT()))]))
-table = input.map(map_function).alias('a', 'b')
+                                      DataTypes.FIELD("b", 
DataTypes.BIGINT())]))
+table = input.map(func).alias('a', 'b')
 
 # map operation with a python vectorized scalar function
 pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
@@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf
 class CountAndSumAggregateFunction(AggregateFunction):
 
     def get_value(self, accumulator):
-        from pyflink.common import Row
         return Row(accumulator[0], accumulator[1])
 
     def create_accumulator(self):
-        from pyflink.common import Row
         return Row(0, 0)
 
     def accumulate(self, accumulator, *args):
@@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                        [DataTypes.FIELD("a", DataTypes.FLOAT()),
                         DataTypes.FIELD("b", DataTypes.INT())]),
                    func_type="pandas")
-t.select(t.b) \
-    .aggregate(pandas_udaf.alias("a", "b")) \
+t.aggregate(pandas_udaf.alias("a", "b")) \
     .select("a, b")
 
 ```

Reply via email to