WeiZhong94 commented on a change in pull request #13348:
URL: https://github.com/apache/flink/pull/13348#discussion_r485301884
##########
File path: docs/dev/python/table_api_tutorial.md
##########
@@ -124,10 +124,12 @@ The table `mySink` has two columns, word and count, and
writes data to the file
You can now create a job which reads input from table `mySource`, preforms
some transformations, and writes the results to table `mySink`.
{% highlight python %}
-t_env.from_path('mySource') \
- .group_by('word') \
- .select('word, count(1)') \
- .insert_into('mySink')
+from pyflink.table.expressions import lit
+
+tab = t_env.from_path('mySource')
+tab.group_by(tab.word) \
+ .select(tab.word, lit(1).count) \
Review comment:
The indent should be 4 spaces.
##########
File path: docs/dev/python/table_api_tutorial.md
##########
@@ -167,10 +170,10 @@ t_env.connect(FileSystem().path('/tmp/output')) \
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
-t_env.from_path('mySource') \
- .group_by('word') \
- .select('word, count(1)') \
- .insert_into('mySink')
+tab = t_env.from_path('mySource')
+tab.group_by(tab.word) \
+ .select(tab.word, lit(1).count) \
Review comment:
ditto
##########
File path: docs/dev/python/table_api_tutorial.zh.md
##########
@@ -171,10 +174,10 @@ t_env.connect(FileSystem().path('/tmp/output')) \
.field('count', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
-t_env.from_path('mySource') \
- .group_by('word') \
- .select('word, count(1)') \
- .insert_into('mySink')
+tab = t_env.from_path('mySource')
+tab.group_by(tab.word) \
+ .select(tab.word, lit(1).count) \
Review comment:
ditto
##########
File path: docs/dev/table/tableApi.md
##########
@@ -2305,7 +2326,7 @@ The following example shows how to define a window
aggregation with additional g
# define window with alias w, group the table by attribute a and window w,
# then aggregate
table = input.window([w: GroupWindow].alias("w")) \
- .group_by("w, a").select("b.sum")
+ .group_by(col('w'), col('a')).select(input.b.sum)
Review comment:
col('a') -> input.a ?
##########
File path: docs/dev/python/table-api-users-guide/conversion_of_pandas.md
##########
@@ -74,7 +74,7 @@ import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
+table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a' > 0.5))
Review comment:
col('a' > 0.5) -> col('a') > 0.5 ?
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1131,22 +1141,23 @@ result =
orders.over_window(Over.partition_by("a").order_by("rowtime")
<td>
<p>Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT
a). Distinct aggregation declares that an aggregation function (built-in or
user-defined) is only applied on distinct input values. Distinct can be applied
to <b>GroupBy Aggregation</b>, <b>GroupBy Window Aggregation</b> and <b>Over
Window Aggregation</b>.</p>
{% highlight python %}
+from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
+
orders = t_env.from_path("Orders")
# Distinct aggregation on group by
-group_by_distinct_result = orders.group_by("a") \
- .select("a, b.sum.distinct as d")
+group_by_distinct_result = orders.group_by(orders.a) \
+ .select(orders.a,
orders.b.sum.distinct.alias('d'))
# Distinct aggregation on time window group by
group_by_window_distinct_result = orders.window(
- Tumble.over("5.minutes").on("rowtime").alias("w")).group_by("a, w") \
- .select("a, b.sum.distinct as d")
+
Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(col('a'),
col('w')) \
Review comment:
col('a') -> orders.a ?
##########
File path: docs/dev/python/table_api_tutorial.zh.md
##########
@@ -128,10 +128,12 @@ t_env.sql_update(my_sink_ddl)
接下来,我们介绍如何创建一个作业:该作业读取表`mySource`中的数据,进行一些变换,然后将结果写入表`mySink`。
{% highlight python %}
-t_env.scan('mySource') \
- .group_by('word') \
- .select('word, count(1)') \
- .insert_into('mySink')
+from pyflink.table.expressions import lit
+
+tab = t_env.from_path('mySource')
+tab.group_by(tab.word) \
+ .select(tab.word, lit(1).count) \
Review comment:
ditto
##########
File path: docs/dev/table/tableApi.md
##########
@@ -202,13 +202,15 @@ val result: Table = orders
{% highlight python %}
# specify table program
+from pyflink.table.expressions import col, lit
+
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)
-result = orders.filter("a.isNotNull && b.isNotNull && c.isNotNull") \
- .select("a.lowerCase() as a, b, rowtime") \
-
.window(Tumble.over("1.hour").on("rowtime").alias("hourlyWindow")) \
- .group_by("hourlyWindow, a") \
- .select("a, hourlyWindow.end as hour, b.avg as
avgBillingAmount")
+result = orders.filter(orders.a.is_not_null & orders.b.is_not_null &
orders.c.is_not_null) \
+ .select(orders.a.lower_case.alias('a'), orders.b,
orders.rowtime) \
+
.window(Tumble.over(lit(1).hour.on(orders.rowtime).alias("hourly_window"))) \
Review comment:
Tumble.over(lit(1).hour.on(orders.rowtime).alias("hourly_window")) ->
Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
##########
@@ -201,11 +209,11 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')
# 注册java自定义函数。
-table_env.register_java_function("split", "my.java.function.Split")
+table_env.create_java_temporary_function("split", "my.java.function.Split")
# 在Python Table API中使用表值函数。 "as"指定表的字段名称。
-my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
-my_table.left_outer_join_lateral("split(a) as (word, length)").select("a,
word, length")
+my_table.join_lateral(call('split', my_table.a).alias("word,
length")).select("a, word, length")
Review comment:
select("a, word, length") -> select(my_table.a, col('word'),
col('length')) ?
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1512,12 +1525,14 @@ result = left.join(right).where("a = d").select("a, b,
e")
<td>
<p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two
tables. Both tables must have distinct field names and at least one equality
join predicate must be defined.</p>
{% highlight python %}
-left = t_env.from_path("Source1").select("a, b, c")
-right = t_env.from_path("Source2").select("d, e, f")
+from pyflink.table.expressions import col
+
+left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
+right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-left_outer_result = left.left_outer_join(right, "a = d").select("a, b, e")
-right_outer_result = left.right_outer_join(right, "a = d").select("a, b, e")
-full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
+left_outer_result = left.left_outer_join(right, left.a ==
right.d).select(col('a'), col('b'), col('e'))
Review comment:
select(col('a'), col('b'), col('e')) -> select(left.a, left.b, right.e) ?
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1961,15 +1982,15 @@ result = left.minus_all(right)
<td>
<p>Similar to a SQL IN clause. In returns true if an expression exists
in a given table sub-query. The sub-query table must consist of one column.
This column must have the same data type as the expression.</p>
{% highlight python %}
-left = t_env.from_path("Source1").select("a, b, c")
-right = t_env.from_path("Source2").select("a")
+left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
+right = t_env.from_path("Source2").select(col('a'))
# using implicit registration
-result = left.select("a, b, c").where("a.in(%s)" % right)
+result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right))
Review comment:
result = left.select(left.a, left.b, left.c).where(left.a.in_(right)) ?
##########
File path: docs/dev/table/tableApi.md
##########
@@ -2336,8 +2357,8 @@ val table = input
# define window with alias w, group the table by attribute a and window w,
# then aggregate and add window start, end, and rowtime timestamps
table = input.window([w: GroupWindow].alias("w")) \
- .group_by("w, a") \
- .select("a, w.start, w.end, w.rowtime, b.count")
+ .group_by(col('w'), col('a')) \
Review comment:
ditto
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.md
##########
@@ -206,11 +214,11 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')
# Register the java function.
-table_env.register_java_function("split", "my.java.function.Split")
+table_env.create_java_temporary_function("split", "my.java.function.Split")
# Use the table function in the Python Table API. "as" specifies the field
names of the table.
Review comment:
"as" -> "alias" ?
##########
File path: docs/dev/python/table-api-users-guide/udfs/python_udfs.zh.md
##########
@@ -201,11 +209,11 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')
# 注册java自定义函数。
-table_env.register_java_function("split", "my.java.function.Split")
+table_env.create_java_temporary_function("split", "my.java.function.Split")
# 在Python Table API中使用表值函数。 "as"指定表的字段名称。
-my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
-my_table.left_outer_join_lateral("split(a) as (word, length)").select("a,
word, length")
+my_table.join_lateral(call('split', my_table.a).alias("word,
length")).select("a, word, length")
+my_table.left_outer_join_lateral(call('split', my_table.a).alias("word,
length")).select("a, word, length")
Review comment:
ditto
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1095,11 +1103,12 @@ result = orders.group_by("a").select("a, b.sum as d")
<p>Groups and aggregates a table on a <a href="#group-windows">group
window</a> and possibly one or more grouping keys.</p>
{% highlight python %}
from pyflink.table.window import Tumble
+from pyflink.table.expressions import lit, col
orders = t_env.from_path("Orders")
-result = orders.window(Tumble.over("5.minutes").on("rowtime").alias("w")) \
- .group_by("a, w") \
- .select("a, w.start, w.end, b.sum as d")
+result =
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \
+ .group_by(col('a'), col('w')) \
Review comment:
col('a') -> orders.a ?
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1494,9 +1505,11 @@ val result = orders
<td>
<p>Similar to a SQL JOIN clause. Joins two tables. Both tables must
have distinct field names and at least one equality join predicate must be
defined through join operator or using a where or filter operator.</p>
{% highlight python %}
-left = t_env.from_path("Source1").select("a, b, c")
-right = t_env.from_path("Source2").select("d, e, f")
-result = left.join(right).where("a = d").select("a, b, e")
+from pyflink.table.expressions import col
+
+left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
+right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
+result = left.join(right).where(left.a == right.d).select(col('a'), col('b'),
col('e'))
Review comment:
select(col('a'), col('b'), col('e')) -> select(left.a, left.b, right.e) ?
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1961,15 +1982,15 @@ result = left.minus_all(right)
<td>
<p>Similar to a SQL IN clause. In returns true if an expression exists
in a given table sub-query. The sub-query table must consist of one column.
This column must have the same data type as the expression.</p>
{% highlight python %}
-left = t_env.from_path("Source1").select("a, b, c")
-right = t_env.from_path("Source2").select("a")
+left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
+right = t_env.from_path("Source2").select(col('a'))
# using implicit registration
-result = left.select("a, b, c").where("a.in(%s)" % right)
+result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right))
# using explicit registration
Review comment:
The implicit/explicit registration could be removed as Java has removed
them. We only need to show the expression usage.
##########
File path: docs/dev/table/tableApi.md
##########
@@ -537,11 +539,13 @@ root
<p>Similar to a SQL SELECT statement. Performs a select operation.</p>
{% highlight python %}
orders = t_env.from_path("Orders")
-result = orders.select("a, c as d")
+result = orders.select(orders.a, orders.c.alias('d'))
{% endhighlight %}
<p>You can use star (<code>*</code>) to act as a wild card, selecting
all of the columns in the table.</p>
{% highlight python %}
-result = orders.select("*")
+from pyflink.table.expressions import lit
+
+result = orders.select(lit("*"))
Review comment:
lit("*") -> col("*") ?
##########
File path: docs/dev/python/table-api-users-guide/conversion_of_pandas.zh.md
##########
@@ -71,7 +71,7 @@ import numpy as np
# 创建PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
+table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a' > 0.5))
Review comment:
ditto
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]