This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 73251c5 [FLINK-22445][python][docs] Add documentation for row-based
operations
73251c5 is described below
commit 73251c504c67dd98a137a6aea81e53c837e841e3
Author: huangxingbo <[email protected]>
AuthorDate: Mon Apr 26 10:47:07 2021 +0800
[FLINK-22445][python][docs] Add documentation for row-based operations
This closes #15757.
---
.../docs/dev/python/table/operations/_index.md} | 8 +-
.../python/table/{ => operations}/operations.md | 2 +-
.../table/operations/row_based_operations.md | 289 +++++++++++++++++++++
.../docs/dev/python/table/python_types.md | 4 +-
docs/content.zh/docs/dev/table/tableApi.md | 9 +-
.../table/{operations.md => operations/_index.md} | 8 +-
.../python/table/{ => operations}/operations.md | 2 +-
.../table/operations/row_based_operations.md | 289 +++++++++++++++++++++
docs/content/docs/dev/python/table/python_types.md | 2 +-
docs/content/docs/dev/table/tableApi.md | 9 +-
10 files changed, 593 insertions(+), 29 deletions(-)
diff --git a/docs/content/docs/dev/python/table/operations.md
b/docs/content.zh/docs/dev/python/table/operations/_index.md
similarity index 81%
copy from docs/content/docs/dev/python/table/operations.md
copy to docs/content.zh/docs/dev/python/table/operations/_index.md
index 05b8de6..ac1e49b 100644
--- a/docs/content/docs/dev/python/table/operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/_index.md
@@ -1,9 +1,7 @@
---
-title: "Operations"
+title: Operations
+bookCollapseSection: true
weight: 30
-type: docs
-aliases:
-- /dev/python/table-api-users-guide/operations.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -23,5 +21,3 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-
-<meta http-equiv="refresh" content="0; url={{< ref "docs/dev/table/tableapi"
>}} "/>
diff --git a/docs/content.zh/docs/dev/python/table/operations.md
b/docs/content.zh/docs/dev/python/table/operations/operations.md
similarity index 97%
rename from docs/content.zh/docs/dev/python/table/operations.md
rename to docs/content.zh/docs/dev/python/table/operations/operations.md
index 1928c25..80e156e 100644
--- a/docs/content.zh/docs/dev/python/table/operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/operations.md
@@ -1,5 +1,5 @@
---
-title: "Operations"
+title: "Overview"
weight: 30
type: docs
aliases:
diff --git
a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
new file mode 100644
index 0000000..56e69d8
--- /dev/null
+++ b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
@@ -0,0 +1,289 @@
+---
+title: "Row-based Operations"
+weight: 31
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Row-based Operations
+
+This page describes how to use row-based operations in PyFlink Table API.
+
+## Map
+
+Performs a `map` operation with a python [general scalar function]({{< ref
"docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized
scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs"
>}}#vectorized-scalar-functions).
+The output will be flattened if the output type is a composite type.
+
+```python
+from pyflink.common import Row
+from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.types import DataTypes
+
+env_settings =
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("data", DataTypes.STRING())]))
+def func1(id: int, data: str) -> Row:
+ return Row(id, data * 2)
+
+# the input columns are specified as the inputs
+table.map(func1(col('id'), col('data'))).to_pandas()
+# result is
+# _c0 _c1
+# 0 1 HiHi
+# 1 2 HelloHello
+```
+
+It also supports to take a Row object (containing all the columns of the input
table) as input.
+
+```python
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("data", DataTypes.STRING())]))
+def func2(data: Row) -> Row:
+ return Row(data[0], data[1] * 2)
+
+# specify the function without the input columns
+table.map(func2).alias('id', 'data').to_pandas()
+# result is
+# id data
+# 0 1 HiHi
+# 1 2 HelloHello
+```
+
+<span class="label label-info">Note</span> The input columns should not be
specified when using func2 in the map operation.
+
+It also supports to use [vectorized scalar function]({{< ref
"docs/dev/python/table/udfs/vectorized_python_udfs"
>}}#vectorized-scalar-functions) in the `map` operation.
+It should be noted that the input type and output type should be
pandas.DataFrame instead of Row in this case.
+
+```python
+import pandas as pd
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("data", DataTypes.STRING())]),
+ func_type='pandas')
+def func3(data: pd.DataFrame) -> pd.DataFrame:
+ res = pd.concat([data.id, data.data * 2], axis=1)
+ return res
+
+table.map(func3).alias('id', 'data').to_pandas()
+# result is
+# id data
+# 0 1 HiHi
+# 1 2 HelloHello
+```
+
+
+## FlatMap
+
+Performs a `flat_map` operation with a python [table function]({{< ref
"docs/dev/python/table/udfs/python_udfs" >}}#table-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table.udf import udtf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id',
'data'])
+
+@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
+def split(x: Row) -> Row:
+ for s in x[1].split(","):
+ yield x[0], s
+
+# use split in `flat_map`
+table.flat_map(split).to_pandas()
+# result is
+# f0 f1
+# 0 1 Hi
+# 1 1 Flink
+# 2 2 Hello
+```
+
+The python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs"
>}}#table-functions) could also be used in `join_lateral` and
`left_outer_join_lateral`.
+
+```python
+# use table function in `join_lateral` or `left_outer_join_lateral`
+table.join_lateral(split.alias('a', 'b')).to_pandas()
+# result is
+# id data a b
+# 0 1 Hi,Flink 1 Hi
+# 1 1 Hi,Flink 1 Flink
+# 2 2 Hello 2 Hello
+```
+
+## Aggregate
+
+Performs an `aggregate` operation with a python [general aggregate
function]({{< ref "docs/dev/python/table/udfs/python_udfs"
>}}#aggregate-functions) or [vectorized aggregate function]({{< ref
"docs/dev/python/table/udfs/vectorized_python_udfs"
>}}#vectorized-aggregate-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.udf import AggregateFunction, udaf
+
+class CountAndSumAggregateFunction(AggregateFunction):
+
+ def get_value(self, accumulator):
+ return Row(accumulator[0], accumulator[1])
+
+ def create_accumulator(self):
+ return Row(0, 0)
+
+ def accumulate(self, accumulator, *args):
+ accumulator[0] += 1
+ accumulator[1] += args[0][1]
+
+ def retract(self, accumulator, *args):
+ accumulator[0] -= 1
+ accumulator[1] -= args[0][1]
+
+ def merge(self, accumulator, accumulators):
+ for other_acc in accumulators:
+ accumulator[0] += other_acc[0]
+ accumulator[1] += other_acc[1]
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT()),
+ DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+ def get_result_type(self):
+ return DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT()),
+ DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+function = CountAndSumAggregateFunction()
+agg = udaf(function,
+ result_type=function.get_result_type(),
+ accumulator_type=function.get_accumulator_type(),
+ name=str(function.__class__.__name__))
+
+# aggregate with a python general aggregate function
+
+env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+result = t.group_by(col('a')) \
+ .aggregate(agg.alias("c", "d")) \
+ .select(col('a'), col('c'), col('d'))
+result.to_pandas()
+
+# the result is
+# a c d
+# 0 1 2 5
+# 1 2 1 1
+
+# aggregate with a python vectorized aggregate function
+env_settings =
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
+ result_type=DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.FLOAT()),
+ DataTypes.FIELD("b", DataTypes.INT())]),
+ func_type="pandas")
+t.aggregate(pandas_udaf.alias("a", "b")) \
+ .select(col('a'), col('b')).to_pandas()
+
+# the result is
+# a b
+# 0 2.0 3
+```
+
+<span class="label label-info">Note</span> Similar to `map` operation, if you
specify the aggregate function without the input columns in `aggregate`
operation, it will take Row or Pandas.DataFrame as input which contains all the
columns of the input table including the grouping keys.
+<span class="label label-info">Note</span> You have to close the "aggregate"
with a select statement and it should not contain aggregate functions in the
select statement.
+Besides, the output of aggregate will be flattened if it is a composite type.
+
+## FlatAggregate
+
+Performs a `flat_aggregate` operation with a python general [Table Aggregate
Function]({{< ref "docs/dev/python/table/udfs/python_udfs"
>}}#table-aggregate-functions)
+
+Similar to `GroupBy Aggregation`, `FlatAggregate` groups the inputs on the
grouping keys.
+Different from `AggregateFunction`, `TableAggregateFunction` could return 0,
1, or more records for a grouping key.
+Similar to `aggregate`, you have to close the `flat_aggregate` with a select
statement and the select statement should not contain aggregate functions.
+
+```python
+from pyflink.common import Row
+from pyflink.table.expressions import col
+from pyflink.table.udf import TableAggregateFunction, udtaf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+class Top2(TableAggregateFunction):
+
+ def emit_value(self, accumulator):
+ yield Row(accumulator[0])
+ yield Row(accumulator[1])
+
+ def create_accumulator(self):
+ return [None, None]
+
+ def accumulate(self, accumulator, *args):
+ if args[0][0] is not None:
+ if accumulator[0] is None or args[0][0] > accumulator[0]:
+ accumulator[1] = accumulator[0]
+ accumulator[0] = args[0][0]
+ elif accumulator[1] is None or args[0][0] > accumulator[1]:
+ accumulator[1] = args[0][0]
+
+ def retract(self, accumulator, *args):
+ accumulator[0] = accumulator[0] - 1
+
+ def merge(self, accumulator, accumulators):
+ for other_acc in accumulators:
+ self.accumulate(accumulator, other_acc[0])
+ self.accumulate(accumulator, other_acc[1])
+
+ def get_accumulator_type(self):
+ return DataTypes.ARRAY(DataTypes.BIGINT())
+
+ def get_result_type(self):
+ return DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT())])
+
+mytop = udtaf(Top2())
+
+env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 'Hi', 'Hello'),
+ (3, 'Hi', 'hi'),
+ (5, 'Hi2', 'hi'),
+ (7, 'Hi', 'Hello'),
+ (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+ .group_by(col('c')) \
+ .flat_aggregate(mytop) \
+ .select(col('b')) \
+ .flat_aggregate(mytop.alias("b")) \
+ .select(col('b'))
+
+result.to_pandas()
+# the result is
+# b
+# 0 7
+# 1 5
+```
diff --git a/docs/content.zh/docs/dev/python/table/python_types.md
b/docs/content.zh/docs/dev/python/table/python_types.md
index c65c646..b61989f 100644
--- a/docs/content.zh/docs/dev/python/table/python_types.md
+++ b/docs/content.zh/docs/dev/python/table/python_types.md
@@ -1,6 +1,6 @@
---
title: "数据类型"
-weight: 31
+weight: 32
type: docs
aliases:
- /zh/dev/python/table-api-users-guide/python_types.html
@@ -69,4 +69,4 @@ Python Table API的用户可以在Python Table API中,或者定义Python用户
| `ARRAY` | `list` | `numpy.ndarray` |
| `MULTISET` | `list` | `Not Supported Yet` |
| `MAP` | `dict` | `Not Supported Yet` |
-| `ROW` | `Row` | `dict` |
\ No newline at end of file
+| `ROW` | `Row` | `dict` |
diff --git a/docs/content.zh/docs/dev/table/tableApi.md
b/docs/content.zh/docs/dev/table/tableApi.md
index 9725c15..8030076 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row:
# map operation with a python general scalar function
func = udf(map_function, result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
- DataTypes.FIELD("b",
DataTypes.BIGINT()))]))
-table = input.map(map_function).alias('a', 'b')
+ DataTypes.FIELD("b",
DataTypes.BIGINT())]))
+table = input.map(func).alias('a', 'b')
# map operation with a python vectorized scalar function
pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
@@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf
class CountAndSumAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
- from pyflink.common import Row
return Row(accumulator[0], accumulator[1])
def create_accumulator(self):
- from pyflink.common import Row
return Row(0, 0)
def accumulate(self, accumulator, *args):
@@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
-t.select(t.b) \
- .aggregate(pandas_udaf.alias("a", "b")) \
+t.aggregate(pandas_udaf.alias("a", "b")) \
.select("a, b")
```
diff --git a/docs/content/docs/dev/python/table/operations.md
b/docs/content/docs/dev/python/table/operations/_index.md
similarity index 81%
copy from docs/content/docs/dev/python/table/operations.md
copy to docs/content/docs/dev/python/table/operations/_index.md
index 05b8de6..ac1e49b 100644
--- a/docs/content/docs/dev/python/table/operations.md
+++ b/docs/content/docs/dev/python/table/operations/_index.md
@@ -1,9 +1,7 @@
---
-title: "Operations"
+title: Operations
+bookCollapseSection: true
weight: 30
-type: docs
-aliases:
-- /dev/python/table-api-users-guide/operations.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -23,5 +21,3 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-
-<meta http-equiv="refresh" content="0; url={{< ref "docs/dev/table/tableapi"
>}} "/>
diff --git a/docs/content/docs/dev/python/table/operations.md
b/docs/content/docs/dev/python/table/operations/operations.md
similarity index 97%
rename from docs/content/docs/dev/python/table/operations.md
rename to docs/content/docs/dev/python/table/operations/operations.md
index 05b8de6..fe5aabb 100644
--- a/docs/content/docs/dev/python/table/operations.md
+++ b/docs/content/docs/dev/python/table/operations/operations.md
@@ -1,5 +1,5 @@
---
-title: "Operations"
+title: "Overview"
weight: 30
type: docs
aliases:
diff --git
a/docs/content/docs/dev/python/table/operations/row_based_operations.md
b/docs/content/docs/dev/python/table/operations/row_based_operations.md
new file mode 100644
index 0000000..56e69d8
--- /dev/null
+++ b/docs/content/docs/dev/python/table/operations/row_based_operations.md
@@ -0,0 +1,289 @@
+---
+title: "Row-based Operations"
+weight: 31
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Row-based Operations
+
+This page describes how to use row-based operations in PyFlink Table API.
+
+## Map
+
+Performs a `map` operation with a python [general scalar function]({{< ref
"docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized
scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs"
>}}#vectorized-scalar-functions).
+The output will be flattened if the output type is a composite type.
+
+```python
+from pyflink.common import Row
+from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.types import DataTypes
+
+env_settings =
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("data", DataTypes.STRING())]))
+def func1(id: int, data: str) -> Row:
+ return Row(id, data * 2)
+
+# the input columns are specified as the inputs
+table.map(func1(col('id'), col('data'))).to_pandas()
+# result is
+# _c0 _c1
+# 0 1 HiHi
+# 1 2 HelloHello
+```
+
+It also supports to take a Row object (containing all the columns of the input
table) as input.
+
+```python
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("data", DataTypes.STRING())]))
+def func2(data: Row) -> Row:
+ return Row(data[0], data[1] * 2)
+
+# specify the function without the input columns
+table.map(func2).alias('id', 'data').to_pandas()
+# result is
+# id data
+# 0 1 HiHi
+# 1 2 HelloHello
+```
+
+<span class="label label-info">Note</span> The input columns should not be
specified when using func2 in the map operation.
+
+It also supports to use [vectorized scalar function]({{< ref
"docs/dev/python/table/udfs/vectorized_python_udfs"
>}}#vectorized-scalar-functions) in the `map` operation.
+It should be noted that the input type and output type should be
pandas.DataFrame instead of Row in this case.
+
+```python
+import pandas as pd
+@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("data", DataTypes.STRING())]),
+ func_type='pandas')
+def func3(data: pd.DataFrame) -> pd.DataFrame:
+ res = pd.concat([data.id, data.data * 2], axis=1)
+ return res
+
+table.map(func3).alias('id', 'data').to_pandas()
+# result is
+# id data
+# 0 1 HiHi
+# 1 2 HelloHello
+```
+
+
+## FlatMap
+
+Performs a `flat_map` operation with a python [table function]({{< ref
"docs/dev/python/table/udfs/python_udfs" >}}#table-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table.udf import udtf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id',
'data'])
+
+@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
+def split(x: Row) -> Row:
+ for s in x[1].split(","):
+ yield x[0], s
+
+# use split in `flat_map`
+table.flat_map(split).to_pandas()
+# result is
+# f0 f1
+# 0 1 Hi
+# 1 1 Flink
+# 2 2 Hello
+```
+
+The python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs"
>}}#table-functions) could also be used in `join_lateral` and
`left_outer_join_lateral`.
+
+```python
+# use table function in `join_lateral` or `left_outer_join_lateral`
+table.join_lateral(split.alias('a', 'b')).to_pandas()
+# result is
+# id data a b
+# 0 1 Hi,Flink 1 Hi
+# 1 1 Hi,Flink 1 Flink
+# 2 2 Hello 2 Hello
+```
+
+## Aggregate
+
+Performs an `aggregate` operation with a python [general aggregate
function]({{< ref "docs/dev/python/table/udfs/python_udfs"
>}}#aggregate-functions) or [vectorized aggregate function]({{< ref
"docs/dev/python/table/udfs/vectorized_python_udfs"
>}}#vectorized-aggregate-functions).
+
+```python
+from pyflink.common import Row
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
+from pyflink.table.udf import AggregateFunction, udaf
+
+class CountAndSumAggregateFunction(AggregateFunction):
+
+ def get_value(self, accumulator):
+ return Row(accumulator[0], accumulator[1])
+
+ def create_accumulator(self):
+ return Row(0, 0)
+
+ def accumulate(self, accumulator, *args):
+ accumulator[0] += 1
+ accumulator[1] += args[0][1]
+
+ def retract(self, accumulator, *args):
+ accumulator[0] -= 1
+ accumulator[1] -= args[0][1]
+
+ def merge(self, accumulator, accumulators):
+ for other_acc in accumulators:
+ accumulator[0] += other_acc[0]
+ accumulator[1] += other_acc[1]
+
+ def get_accumulator_type(self):
+ return DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT()),
+ DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+ def get_result_type(self):
+ return DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT()),
+ DataTypes.FIELD("b", DataTypes.BIGINT())])
+
+function = CountAndSumAggregateFunction()
+agg = udaf(function,
+ result_type=function.get_result_type(),
+ accumulator_type=function.get_accumulator_type(),
+ name=str(function.__class__.__name__))
+
+# aggregate with a python general aggregate function
+
+env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+result = t.group_by(col('a')) \
+ .aggregate(agg.alias("c", "d")) \
+ .select(col('a'), col('c'), col('d'))
+result.to_pandas()
+
+# the result is
+# a c d
+# 0 1 2 5
+# 1 2 1 1
+
+# aggregate with a python vectorized aggregate function
+env_settings =
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+
+t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
+
+pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
+ result_type=DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.FLOAT()),
+ DataTypes.FIELD("b", DataTypes.INT())]),
+ func_type="pandas")
+t.aggregate(pandas_udaf.alias("a", "b")) \
+ .select(col('a'), col('b')).to_pandas()
+
+# the result is
+# a b
+# 0 2.0 3
+```
+
+<span class="label label-info">Note</span> Similar to `map` operation, if you
specify the aggregate function without the input columns in `aggregate`
operation, it will take Row or Pandas.DataFrame as input which contains all the
columns of the input table including the grouping keys.
+<span class="label label-info">Note</span> You have to close the "aggregate"
with a select statement and it should not contain aggregate functions in the
select statement.
+Besides, the output of aggregate will be flattened if it is a composite type.
+
+## FlatAggregate
+
+Performs a `flat_aggregate` operation with a python general [Table Aggregate
Function]({{< ref "docs/dev/python/table/udfs/python_udfs"
>}}#table-aggregate-functions)
+
+Similar to `GroupBy Aggregation`, `FlatAggregate` groups the inputs on the
grouping keys.
+Different from `AggregateFunction`, `TableAggregateFunction` could return 0,
1, or more records for a grouping key.
+Similar to `aggregate`, you have to close the `flat_aggregate` with a select
statement and the select statement should not contain aggregate functions.
+
+```python
+from pyflink.common import Row
+from pyflink.table.expressions import col
+from pyflink.table.udf import TableAggregateFunction, udtaf
+from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+
+class Top2(TableAggregateFunction):
+
+ def emit_value(self, accumulator):
+ yield Row(accumulator[0])
+ yield Row(accumulator[1])
+
+ def create_accumulator(self):
+ return [None, None]
+
+ def accumulate(self, accumulator, *args):
+ if args[0][0] is not None:
+ if accumulator[0] is None or args[0][0] > accumulator[0]:
+ accumulator[1] = accumulator[0]
+ accumulator[0] = args[0][0]
+ elif accumulator[1] is None or args[0][0] > accumulator[1]:
+ accumulator[1] = args[0][0]
+
+ def retract(self, accumulator, *args):
+ accumulator[0] = accumulator[0] - 1
+
+ def merge(self, accumulator, accumulators):
+ for other_acc in accumulators:
+ self.accumulate(accumulator, other_acc[0])
+ self.accumulate(accumulator, other_acc[1])
+
+ def get_accumulator_type(self):
+ return DataTypes.ARRAY(DataTypes.BIGINT())
+
+ def get_result_type(self):
+ return DataTypes.ROW(
+ [DataTypes.FIELD("a", DataTypes.BIGINT())])
+
+mytop = udtaf(Top2())
+
+env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = TableEnvironment.create(env_settings)
+t = table_env.from_elements([(1, 'Hi', 'Hello'),
+ (3, 'Hi', 'hi'),
+ (5, 'Hi2', 'hi'),
+ (7, 'Hi', 'Hello'),
+ (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+ .group_by(col('c')) \
+ .flat_aggregate(mytop) \
+ .select(col('b')) \
+ .flat_aggregate(mytop.alias("b")) \
+ .select(col('b'))
+
+result.to_pandas()
+# the result is
+# b
+# 0 7
+# 1 5
+```
diff --git a/docs/content/docs/dev/python/table/python_types.md
b/docs/content/docs/dev/python/table/python_types.md
index ff5e101..f1ec4fa 100644
--- a/docs/content/docs/dev/python/table/python_types.md
+++ b/docs/content/docs/dev/python/table/python_types.md
@@ -1,6 +1,6 @@
---
title: "Data Types"
-weight: 31
+weight: 32
type: docs
aliases:
- /dev/python/table-api-users-guide/python_types.html
diff --git a/docs/content/docs/dev/table/tableApi.md
b/docs/content/docs/dev/table/tableApi.md
index 6f25389..4ca942b 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row:
# map operation with a python general scalar function
func = udf(map_function, result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.BIGINT()),
- DataTypes.FIELD("b",
DataTypes.BIGINT()))]))
-table = input.map(map_function).alias('a', 'b')
+ DataTypes.FIELD("b",
DataTypes.BIGINT())]))
+table = input.map(func).alias('a', 'b')
# map operation with a python vectorized scalar function
pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
@@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf
class CountAndSumAggregateFunction(AggregateFunction):
def get_value(self, accumulator):
- from pyflink.common import Row
return Row(accumulator[0], accumulator[1])
def create_accumulator(self):
- from pyflink.common import Row
return Row(0, 0)
def accumulate(self, accumulator, *args):
@@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
-t.select(t.b) \
- .aggregate(pandas_udaf.alias("a", "b")) \
+t.aggregate(pandas_udaf.alias("a", "b")) \
.select("a, b")
```