This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 23ae97a  [hotfix][python][docs] Fix flat_aggregate example in Python 
Doc
23ae97a is described below

commit 23ae97a6f472eb0bbf70ca61ccf072093310ec37
Author: huangxingbo <[email protected]>
AuthorDate: Wed Apr 28 18:35:53 2021 +0800

    [hotfix][python][docs] Fix flat_aggregate example in Python Doc
---
 .../table/operations/row_based_operations.md       |  52 +++++++++------------
 .../docs/dev/python/table/udfs/python_udfs.md      |  35 +++++++-------
 .../table/operations/row_based_operations.md       |  52 +++++++++------------
 .../docs/dev/python/table/udfs/python_udfs.md      |  35 +++++++-------
 docs/static/fig/udtagg-mechanism-python.png        | Bin 0 -> 150918 bytes
 5 files changed, 78 insertions(+), 96 deletions(-)

diff --git 
a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md 
b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
index 56e69d8..0d0e23c 100644
--- a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
@@ -229,9 +229,8 @@ Similar to `aggregate`, you have to close the 
`flat_aggregate` with a select sta
 
 ```python
 from pyflink.common import Row
-from pyflink.table.expressions import col
-from pyflink.table.udf import TableAggregateFunction, udtaf
-from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import udtaf, TableAggregateFunction
 
 class Top2(TableAggregateFunction):
 
@@ -242,21 +241,13 @@ class Top2(TableAggregateFunction):
     def create_accumulator(self):
         return [None, None]
 
-    def accumulate(self, accumulator, *args):
-        if args[0][0] is not None:
-            if accumulator[0] is None or args[0][0] > accumulator[0]:
+    def accumulate(self, accumulator, row):
+        if row[0] is not None:
+            if accumulator[0] is None or row[0] > accumulator[0]:
                 accumulator[1] = accumulator[0]
-                accumulator[0] = args[0][0]
-            elif accumulator[1] is None or args[0][0] > accumulator[1]:
-                accumulator[1] = args[0][0]
-
-    def retract(self, accumulator, *args):
-        accumulator[0] = accumulator[0] - 1
-
-    def merge(self, accumulator, accumulators):
-        for other_acc in accumulators:
-            self.accumulate(accumulator, other_acc[0])
-            self.accumulate(accumulator, other_acc[1])
+                accumulator[0] = row[0]
+            elif accumulator[1] is None or row[0] > accumulator[1]:
+                accumulator[1] = row[0]
 
     def get_accumulator_type(self):
         return DataTypes.ARRAY(DataTypes.BIGINT())
@@ -265,25 +256,26 @@ class Top2(TableAggregateFunction):
         return DataTypes.ROW(
             [DataTypes.FIELD("a", DataTypes.BIGINT())])
 
-mytop = udtaf(Top2())
 
-env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
 table_env = TableEnvironment.create(env_settings)
+# the result type and accumulator type can also be specified in the udtaf 
decorator:
+# top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
+top2 = udtaf(Top2())
 t = table_env.from_elements([(1, 'Hi', 'Hello'),
                               (3, 'Hi', 'hi'),
                               (5, 'Hi2', 'hi'),
                               (7, 'Hi', 'Hello'),
                               (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
-result = t.select(col('a'), col('c')) \
-    .group_by(col('c')) \
-    .flat_aggregate(mytop) \
-    .select(col('b')) \
-    .flat_aggregate(mytop.alias("b")) \
-    .select(col('b'))
 
-result.to_pandas()
-# the result is
-#    b
-# 0  7
-# 1  5
+# call function "inline" without registration in Table API
+result = t.group_by(t.b).flat_aggregate(top2).select('*').to_pandas()
+
+# the result is:
+#      b    a
+# 0  Hi2  5.0
+# 1  Hi2  NaN
+# 2   Hi  7.0
+# 3   Hi  3.0
+
 ```
diff --git a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md 
b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
index c4d5aa1..cbcdc3e 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
@@ -431,7 +431,7 @@ the function is called to compute and return the final 
result.
 
 The following example illustrates the aggregation process:
 
-<img alt="UDTAGG mechanism" src="/fig/udtagg-mechanism.png" width="80%">
+<img alt="UDTAGG mechanism" src="/fig/udtagg-mechanism-python.png" width="80%">
 
 In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
 and `price`) and 5 rows. We would like to find the 2 highest prices of all 
beverages in the table, i.e.,
@@ -462,21 +462,13 @@ class Top2(TableAggregateFunction):
     def create_accumulator(self):
         return [None, None]
 
-    def accumulate(self, accumulator, *args):
-        if args[0][0] is not None:
-            if accumulator[0] is None or args[0][0] > accumulator[0]:
+    def accumulate(self, accumulator, row):
+        if row[0] is not None:
+            if accumulator[0] is None or row[0] > accumulator[0]:
                 accumulator[1] = accumulator[0]
-                accumulator[0] = args[0][0]
-            elif accumulator[1] is None or args[0][0] > accumulator[1]:
-                accumulator[1] = args[0][0]
-
-    def retract(self, accumulator, *args):
-        accumulator[0] = accumulator[0] - 1
-
-    def merge(self, accumulator, accumulators):
-        for other_acc in accumulators:
-            self.accumulate(accumulator, other_acc[0])
-            self.accumulate(accumulator, other_acc[1])
+                accumulator[0] = row[0]
+            elif accumulator[1] is None or row[0] > accumulator[1]:
+                accumulator[1] = row[0]
 
     def get_accumulator_type(self):
         return DataTypes.ARRAY(DataTypes.BIGINT())
@@ -486,7 +478,7 @@ class Top2(TableAggregateFunction):
             [DataTypes.FIELD("a", DataTypes.BIGINT())])
 
 
-env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().is_streaming_mode().build()
+env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
 table_env = TableEnvironment.create(env_settings)
 # the result type and accumulator type can also be specified in the udtaf 
decorator:
 # top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
@@ -498,8 +490,15 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
                               (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
 
 # call function "inline" without registration in Table API
-result = t.group_by(t.name).flat_aggregate(top2).to_pandas()
-print(result)
+result = t.group_by(t.b).flat_aggregate(top2).select('*').to_pandas()
+
+# the result is:
+#      b    a
+# 0  Hi2  5.0
+# 1  Hi2  NaN
+# 2   Hi  7.0
+# 3   Hi  3.0
+
 ```
 
 The `accumulate(...)` method of our `Top2` class takes two inputs. The first 
one is the accumulator
diff --git 
a/docs/content/docs/dev/python/table/operations/row_based_operations.md 
b/docs/content/docs/dev/python/table/operations/row_based_operations.md
index 56e69d8..0d0e23c 100644
--- a/docs/content/docs/dev/python/table/operations/row_based_operations.md
+++ b/docs/content/docs/dev/python/table/operations/row_based_operations.md
@@ -229,9 +229,8 @@ Similar to `aggregate`, you have to close the 
`flat_aggregate` with a select sta
 
 ```python
 from pyflink.common import Row
-from pyflink.table.expressions import col
-from pyflink.table.udf import TableAggregateFunction, udtaf
-from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import udtaf, TableAggregateFunction
 
 class Top2(TableAggregateFunction):
 
@@ -242,21 +241,13 @@ class Top2(TableAggregateFunction):
     def create_accumulator(self):
         return [None, None]
 
-    def accumulate(self, accumulator, *args):
-        if args[0][0] is not None:
-            if accumulator[0] is None or args[0][0] > accumulator[0]:
+    def accumulate(self, accumulator, row):
+        if row[0] is not None:
+            if accumulator[0] is None or row[0] > accumulator[0]:
                 accumulator[1] = accumulator[0]
-                accumulator[0] = args[0][0]
-            elif accumulator[1] is None or args[0][0] > accumulator[1]:
-                accumulator[1] = args[0][0]
-
-    def retract(self, accumulator, *args):
-        accumulator[0] = accumulator[0] - 1
-
-    def merge(self, accumulator, accumulators):
-        for other_acc in accumulators:
-            self.accumulate(accumulator, other_acc[0])
-            self.accumulate(accumulator, other_acc[1])
+                accumulator[0] = row[0]
+            elif accumulator[1] is None or row[0] > accumulator[1]:
+                accumulator[1] = row[0]
 
     def get_accumulator_type(self):
         return DataTypes.ARRAY(DataTypes.BIGINT())
@@ -265,25 +256,26 @@ class Top2(TableAggregateFunction):
         return DataTypes.ROW(
             [DataTypes.FIELD("a", DataTypes.BIGINT())])
 
-mytop = udtaf(Top2())
 
-env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
 table_env = TableEnvironment.create(env_settings)
+# the result type and accumulator type can also be specified in the udtaf 
decorator:
+# top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
+top2 = udtaf(Top2())
 t = table_env.from_elements([(1, 'Hi', 'Hello'),
                               (3, 'Hi', 'hi'),
                               (5, 'Hi2', 'hi'),
                               (7, 'Hi', 'Hello'),
                               (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
-result = t.select(col('a'), col('c')) \
-    .group_by(col('c')) \
-    .flat_aggregate(mytop) \
-    .select(col('b')) \
-    .flat_aggregate(mytop.alias("b")) \
-    .select(col('b'))
 
-result.to_pandas()
-# the result is
-#    b
-# 0  7
-# 1  5
+# call function "inline" without registration in Table API
+result = t.group_by(t.b).flat_aggregate(top2).select('*').to_pandas()
+
+# the result is:
+#      b    a
+# 0  Hi2  5.0
+# 1  Hi2  NaN
+# 2   Hi  7.0
+# 3   Hi  3.0
+
 ```
diff --git a/docs/content/docs/dev/python/table/udfs/python_udfs.md 
b/docs/content/docs/dev/python/table/udfs/python_udfs.md
index 55d87d5..f8ab4a1 100644
--- a/docs/content/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/python_udfs.md
@@ -430,7 +430,7 @@ the function is called to compute and return the final 
result.
 
 The following example illustrates the aggregation process:
 
-<img alt="UDTAGG mechanism" src="/fig/udtagg-mechanism.png" width="80%">
+<img alt="UDTAGG mechanism" src="/fig/udtagg-mechanism-python.png" width="80%">
 
 In the example, we assume a table that contains data about beverages. The 
table consists of three columns (`id`, `name`,
 and `price`) and 5 rows. We would like to find the 2 highest prices of all 
beverages in the table, i.e.,
@@ -461,21 +461,13 @@ class Top2(TableAggregateFunction):
     def create_accumulator(self):
         return [None, None]
 
-    def accumulate(self, accumulator, *args):
-        if args[0][0] is not None:
-            if accumulator[0] is None or args[0][0] > accumulator[0]:
+    def accumulate(self, accumulator, row):
+        if row[0] is not None:
+            if accumulator[0] is None or row[0] > accumulator[0]:
                 accumulator[1] = accumulator[0]
-                accumulator[0] = args[0][0]
-            elif accumulator[1] is None or args[0][0] > accumulator[1]:
-                accumulator[1] = args[0][0]
-
-    def retract(self, accumulator, *args):
-        accumulator[0] = accumulator[0] - 1
-
-    def merge(self, accumulator, accumulators):
-        for other_acc in accumulators:
-            self.accumulate(accumulator, other_acc[0])
-            self.accumulate(accumulator, other_acc[1])
+                accumulator[0] = row[0]
+            elif accumulator[1] is None or row[0] > accumulator[1]:
+                accumulator[1] = row[0]
 
     def get_accumulator_type(self):
         return DataTypes.ARRAY(DataTypes.BIGINT())
@@ -485,7 +477,7 @@ class Top2(TableAggregateFunction):
             [DataTypes.FIELD("a", DataTypes.BIGINT())])
 
 
-env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().is_streaming_mode().build()
+env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
 table_env = TableEnvironment.create(env_settings)
 # the result type and accumulator type can also be specified in the udtaf 
decorator:
 # top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", 
DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))
@@ -497,8 +489,15 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
                               (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
 
 # call function "inline" without registration in Table API
-result = t.group_by(t.name).flat_aggregate(top2).to_pandas()
-print(result)
+result = t.group_by(t.b).flat_aggregate(top2).select('*').to_pandas()
+
+# the result is:
+#      b    a
+# 0  Hi2  5.0
+# 1  Hi2  NaN
+# 2   Hi  7.0
+# 3   Hi  3.0
+
 ```
 
 The `accumulate(...)` method of our `Top2` class takes two inputs. The first 
one is the accumulator
diff --git a/docs/static/fig/udtagg-mechanism-python.png 
b/docs/static/fig/udtagg-mechanism-python.png
new file mode 100644
index 0000000..ceee3d9
Binary files /dev/null and b/docs/static/fig/udtagg-mechanism-python.png differ

Reply via email to