This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 992836e3809 [python][docs] Update documentation to remove deprecated
API examples
992836e3809 is described below
commit 992836e380998fb1b210d54394f318c5f6060a93
Author: Dian Fu <[email protected]>
AuthorDate: Thu Apr 28 11:08:49 2022 +0800
[python][docs] Update documentation to remove deprecated API examples
---
docs/content.zh/docs/dev/python/debugging.md | 6 +-
.../docs/dev/python/table/conversion_of_pandas.md | 6 +-
.../docs/dev/python/table/intro_to_table_api.md | 17 ++-
docs/content.zh/docs/dev/python/table/metrics.md | 19 ++-
.../table/operations/row_based_operations.md | 11 +-
.../python/table/python_table_api_connectors.md | 6 +-
.../docs/dev/python/table/udfs/python_udfs.md | 36 +++--
.../python/table/udfs/vectorized_python_udfs.md | 25 +++-
docs/content.zh/docs/dev/table/catalogs.md | 2 +-
docs/content.zh/docs/dev/table/common.md | 8 +-
docs/content.zh/docs/dev/table/tableApi.md | 146 ++++++++++----------
docs/content/docs/dev/python/debugging.md | 6 +-
.../docs/dev/python/table/conversion_of_pandas.md | 6 +-
.../docs/dev/python/table/intro_to_table_api.md | 20 +--
docs/content/docs/dev/python/table/metrics.md | 22 ++-
.../table/operations/row_based_operations.md | 13 +-
.../python/table/python_table_api_connectors.md | 6 +-
.../docs/dev/python/table/udfs/python_udfs.md | 36 +++--
.../python/table/udfs/vectorized_python_udfs.md | 25 ++--
docs/content/docs/dev/table/catalogs.md | 2 +-
docs/content/docs/dev/table/common.md | 8 +-
docs/content/docs/dev/table/data_stream_api.md | 2 +-
docs/content/docs/dev/table/tableApi.md | 138 ++++++++++---------
.../table/mixing_use_of_datastream_and_table.py | 3 +-
.../pyflink/examples/table/pandas/pandas_udaf.py | 6 +-
.../pyflink/examples/table/process_json_data.py | 3 +-
.../examples/table/process_json_data_with_udf.py | 3 +-
.../examples/table/windowing/over_window.py | 8 +-
.../examples/table/windowing/session_window.py | 6 +-
.../examples/table/windowing/sliding_window.py | 6 +-
.../examples/table/windowing/tumble_window.py | 6 +-
flink-python/pyflink/table/expression.py | 2 +-
flink-python/pyflink/table/schema.py | 6 +-
flink-python/pyflink/table/table.py | 151 +++++++++------------
flink-python/pyflink/table/table_config.py | 3 +-
flink-python/pyflink/table/table_environment.py | 2 +-
flink-python/pyflink/table/window.py | 33 ++---
37 files changed, 426 insertions(+), 378 deletions(-)
diff --git a/docs/content.zh/docs/dev/python/debugging.md
b/docs/content.zh/docs/dev/python/debugging.md
index 0cefc596b59..8dd058897ad 100644
--- a/docs/content.zh/docs/dev/python/debugging.md
+++ b/docs/content.zh/docs/dev/python/debugging.md
@@ -58,10 +58,14 @@ print(table.get_schema())
在作业运行的过程中,日志信息会打印在 `TaskManager` 的日志文件中。
```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import udf
+
+import logging
+
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
# 使用 logging 模块
- import logging
logging.info("debug")
# 使用 print 函数
print('debug')
diff --git a/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
b/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
index 67544095938..1d16020efa1 100644
--- a/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
+++ b/docs/content.zh/docs/dev/python/table/conversion_of_pandas.md
@@ -36,6 +36,8 @@ PyFlink 支持将 Pandas DataFrame 转换成 PyFlink Table。在内部实现上
以下示例显示如何从 Pandas DataFrame 创建 PyFlink Table:
```python
+from pyflink.table import DataTypes
+
import pandas as pd
import numpy as np
@@ -54,7 +56,7 @@ table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(),
DataTypes.DOUBLE()])
# 由Pandas DataFrame创建列名和列类型的PyFlink表
table = t_env.from_pandas(pdf,
DataTypes.ROW([DataTypes.FIELD("f0",
DataTypes.DOUBLE()),
- DataTypes.FIELD("f1",
DataTypes.DOUBLE())])
+ DataTypes.FIELD("f1",
DataTypes.DOUBLE())]))
```
## 将 PyFlink Table 转换为 Pandas DataFrame
@@ -68,6 +70,8 @@ table = t_env.from_pandas(pdf,
以下示例显示了如何将 PyFlink Table 转换为 Pandas DataFrame:
```python
+from pyflink.table.expressions import col
+
import pandas as pd
import numpy as np
diff --git a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
index fb4008e4cc5..e9fc93aca1b 100644
--- a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
+++ b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md
@@ -319,6 +319,7 @@ new_table.execute().print()
```python
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
# 通过 batch table environment 来执行查询
env_settings = EnvironmentSettings.in_batch_mode()
@@ -329,11 +330,11 @@ orders = table_env.from_elements([('Jack', 'FRANCE', 10),
('Rose', 'ENGLAND', 30
# 计算所有来自法国客户的收入
revenue = orders \
- .select(orders.name, orders.country, orders.revenue) \
- .where(orders.country == 'FRANCE') \
- .group_by(orders.name) \
- .select(orders.name, orders.revenue.sum.alias('rev_sum'))
-
+ .select(col("name"), col("country"), col("revenue")) \
+ .where(col("country") == 'FRANCE') \
+ .group_by(col("name")) \
+ .select(col("name"), col("country").sum.alias('rev_sum'))
+
revenue.execute().print()
```
@@ -686,6 +687,7 @@ Table API 提供了一种机制来查看 `Table` 的逻辑查询计划和优化
```python
# 使用流模式 TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
@@ -693,7 +695,7 @@ table_env = TableEnvironment.create(env_settings)
table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table = table1 \
- .where(table1.data.like('H%')) \
+ .where(col("data").like('H%')) \
.union_all(table2)
print(table.explain())
```
@@ -739,6 +741,7 @@ Stage 136 : Data Source
```python
# 使用流模式 TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(environment_settings=env_settings)
@@ -764,7 +767,7 @@ table_env.execute_sql("""
statement_set = table_env.create_statement_set()
-statement_set.add_insert("print_sink_table",
table1.where(table1.data.like('H%')))
+statement_set.add_insert("print_sink_table",
table1.where(col("data").like('H%')))
statement_set.add_insert("black_hole_sink_table", table2)
print(statement_set.explain())
diff --git a/docs/content.zh/docs/dev/python/table/metrics.md
b/docs/content.zh/docs/dev/python/table/metrics.md
index e00af42c5d0..fb0c7840c00 100644
--- a/docs/content.zh/docs/dev/python/table/metrics.md
+++ b/docs/content.zh/docs/dev/python/table/metrics.md
@@ -127,7 +127,6 @@ class MyUDF(ScalarFunction):
self.meter = None
def open(self, function_context):
- super().open(function_context)
# 120秒内统计的平均每秒事件数,默认是60秒
self.meter = function_context.get_metric_group().meter("my_meter",
time_span_in_seconds=120)
@@ -151,14 +150,14 @@ class MyUDF(ScalarFunction):
{{< tab "Python" >}}
```python
-function_context
- .get_metric_group()
- .add_group("my_metrics")
+function_context \
+ .get_metric_group() \
+ .add_group("my_metrics") \
.counter("my_counter")
-function_context
- .get_metric_group()
- .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
```
@@ -182,9 +181,9 @@ function_context
{{< tabs "6d0715c0-6c39-489a-b3f3-e9bf7d50c268" >}}
{{< tab "Python" >}}
```python
-function_context
- .get_metric_group()
- .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
```
{{< /tab >}}
diff --git
a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
index 59f3a61de5b..59e9fa03a24 100644
--- a/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
+++ b/docs/content.zh/docs/dev/python/table/operations/row_based_operations.md
@@ -44,7 +44,7 @@ table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
- DataTypes.FIELD("data", DataTypes.STRING())]))
+ DataTypes.FIELD("data", DataTypes.STRING())]))
def func1(id: int, data: str) -> Row:
return Row(id, data * 2)
@@ -63,7 +63,7 @@ It also supports to take a Row object (containing all the
columns of the input t
```python
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
- DataTypes.FIELD("data", DataTypes.STRING())]))
+ DataTypes.FIELD("data", DataTypes.STRING())]))
def func2(data: Row) -> Row:
return Row(data.id, data.data * 2)
@@ -86,7 +86,7 @@ It should be noted that the input type and output type should
be pandas.DataFram
```python
import pandas as pd
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
- DataTypes.FIELD("data", DataTypes.STRING())]),
+ DataTypes.FIELD("data", DataTypes.STRING())]),
func_type='pandas')
def func3(data: pd.DataFrame) -> pd.DataFrame:
res = pd.concat([data.id, data.data * 2], axis=1)
@@ -288,10 +288,11 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(7, 'Hi', 'Hello'),
- (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+ (2, 'Hi', 'Hello')],
+ ['a', 'b', 'c'])
# call function "inline" without registration in Table API
-result = t.group_by(t.b).flat_aggregate(top2).select(col('*')).to_pandas()
+result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).to_pandas()
# the result is:
#+----+--------------------------------+----------------------+
diff --git
a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
index dbf94fbd1ee..d718d4d45fe 100644
--- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
@@ -134,12 +134,14 @@ if __name__ == '__main__':
PyFlink 表支持与 Pandas DataFrame 之间互相转换。
```python
+from pyflink.table.expressions import col
+
import pandas as pd
import numpy as np
# 创建一个 PyFlink 表
pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
+table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
# 将 PyFlink 表转换成 Pandas DataFrame
pdf = table.to_pandas()
@@ -150,6 +152,8 @@ pdf = table.to_pandas()
`from_elements()` 用于从一个元素集合中创建一张表。元素类型必须是可支持的原子类型或者复杂类型。
```python
+from pyflink.table import DataTypes
+
table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
# 使用第二个参数指定自定义字段名
diff --git a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
index 7b55f45ca35..7194eb00323 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
@@ -41,7 +41,9 @@ Python 标量函数的行为由名为 `eval` 的方法定义,`eval` 方法支
以下示例显示了如何定义自己的 Python 哈希函数、如何在 TableEnvironment 中注册它以及如何在作业中使用它。
```python
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import ScalarFunction, udf
class HashCode(ScalarFunction):
def __init__(self):
@@ -56,7 +58,7 @@ table_env = TableEnvironment.create(settings)
hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# 在 Python Table API 中使用 Python 自定义函数
-my_table.select(my_table.string, my_table.bigint, hash_code(my_table.bigint),
call(hash_code, my_table.bigint))
+my_table.select(col("string"), col("bigint"), hash_code(col("bigint")),
call(hash_code, col("bigint")))
# 在 SQL API 中使用 Python 自定义函数
table_env.create_temporary_function("hash_code", udf(HashCode(),
result_type=DataTypes.BIGINT()))
@@ -78,7 +80,8 @@ public class HashCode extends ScalarFunction {
}
}
'''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)
@@ -87,7 +90,7 @@ table_env = TableEnvironment.create(settings)
table_env.create_java_temporary_function("hash_code",
"my.java.function.HashCode")
# 在 Python Table API 中使用 Java 函数
-my_table.select(call('hash_code', my_table.string))
+my_table.select(call('hash_code', col("string")))
# 在 SQL API 中使用 Java 函数
table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
@@ -128,10 +131,10 @@ add = udf(functools.partial(partial_add, k=1),
result_type=DataTypes.BIGINT())
# 注册 Python 自定义函数
table_env.create_temporary_function("add", add)
# 在 Python Table API 中使用 Python 自定义函数
-my_table.select("add(a, b)")
+my_table.select(call('add', col('a'), col('b')))
# 也可以在 Python Table API 中直接使用 Python 自定义函数
-my_table.select(add(my_table.a, my_table.b))
+my_table.select(add(col('a'), col('b')))
```
<a name="table-functions"></a>
@@ -143,6 +146,10 @@ my_table.select(add(my_table.a, my_table.b))
以下示例说明了如何定义自己的 Python 自定义表值函数,将其注册到 TableEnvironment 中,并在作业中使用它。
```python
+from pyflink.table.expressions import col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import TableFunction, udtf
+
class Split(TableFunction):
def eval(self, string):
for s in string.split(" "):
@@ -156,8 +163,8 @@ my_table = ... # type: Table, table schema: [a: String]
split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
# 在 Python Table API 中使用 Python 表值函数
-my_table.join_lateral(split(my_table.a).alias("word, length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
+my_table.join_lateral(split(col("a")).alias("word", "length"))
+my_table.left_outer_join_lateral(split(col("a")).alias("word", "length"))
# 在 SQL API 中使用 Python 表值函数
table_env.create_temporary_function("split", udtf(Split(),
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -183,7 +190,8 @@ public class Split extends TableFunction<Tuple2<String,
Integer>> {
}
}
'''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
@@ -193,8 +201,8 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.create_java_temporary_function("split", "my.java.function.Split")
# 在 Python Table API 中使用表值函数。 "alias"指定表的字段名称。
-my_table.join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', col('a')).alias("word",
"length")).select(col('a'), col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', col('a')).alias("word",
"length")).select(col('a'), col('word'), col('length'))
# 注册 Python 函数。
@@ -311,14 +319,14 @@ t = table_env.from_elements([(1, 2, "Lee"),
(7, 8, "Lee")]).alias("value", "count", "name")
# call function "inline" without registration in Table API
-result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(weighted_avg(col("value"),
col("count")).alias("avg")).execute()
result.print()
# register function
table_env.create_temporary_function("weighted_avg", WeightedAvg())
# call registered function in Table API
-result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(call("weighted_avg", col("value"),
col("count")).alias("avg")).execute()
result.print()
# register table
@@ -487,7 +495,7 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
['a', 'b', 'c'])
# call function "inline" without registration in Table API
-t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print()
+t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
# the result is:
+----+--------------------------------+----------------------+
diff --git
a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
index af6c7f60c15..5454af1057d 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -49,6 +49,10 @@ under the License.
以下示例显示了如何定义自己的向量化 Python 标量函数,该函数计算两列的总和,并在查询中使用它:
```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col
+from pyflink.table.udf import udf
+
@udf(result_type=DataTypes.BIGINT(), func_type="pandas")
def add(i, j):
return i + j
@@ -57,7 +61,7 @@ settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)
# use the vectorized Python scalar function in Python Table API
-my_table.select(add(my_table.bigint, my_table.bigint))
+my_table.select(add(col("bigint"), col("bigint")))
# 在SQL API中使用Python向量化标量函数
table_env.create_temporary_function("add", add)
@@ -81,6 +85,11 @@ table_env.sql_query("SELECT add(bigint, bigint) FROM
MyTable")
and `Over Window Aggregation` 使用它:
```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col, lit
+from pyflink.table.udf import udaf
+from pyflink.table.window import Tumble
+
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
return v.mean()
@@ -91,18 +100,17 @@ table_env = TableEnvironment.create(settings)
my_table = ... # type: Table, table schema: [a: String, b: BigInt, c: BigInt]
# 在 GroupBy Aggregation 中使用向量化聚合函数
-my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b)))
+my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b')))
# 在 GroupBy Window Aggregation 中使用向量化聚合函数
-tumble_window = Tumble.over(expr.lit(1).hours) \
- .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+ .on(col("rowtime")) \
.alias("w")
my_table.window(tumble_window) \
- .group_by("w") \
- .select("w.start, w.end, mean_udaf(b)")
-
+ .group_by(col("w")) \
+ .select(col('w').start, col('w').end, mean_udaf(col('b')))
# 在 Over Window Aggregation 中使用向量化聚合函数
table_env.create_temporary_function("mean_udaf", mean_udaf)
@@ -118,6 +126,9 @@ table_env.sql_query("""
以下示例显示了多种定义向量化 Python 聚合函数的方式。该函数需要两个类型为 bigint 的参数作为输入参数,并返回它们的最大值的和作为结果。
```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import AggregateFunction, udaf
+
# 方式一:扩展基类 `AggregateFunction`
class MaxAdd(AggregateFunction):
diff --git a/docs/content.zh/docs/dev/table/catalogs.md
b/docs/content.zh/docs/dev/table/catalogs.md
index 0b86fbd6769..05722d87f65 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -241,7 +241,7 @@ schema = Schema.new_builder() \
catalog_table = t_env.create_table("myhive.mydb.mytable",
TableDescriptor.for_connector("kafka")
.schema(schema)
- // …
+ # …
.build())
# tables should contain "mytable"
diff --git a/docs/content.zh/docs/dev/table/common.md
b/docs/content.zh/docs/dev/table/common.md
index 02f06056a7f..83d0ddacdcc 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -475,9 +475,9 @@ table_env = # see "Create a TableEnvironment" section
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
- .filter(orders.cCountry == 'FRANCE') \
- .group_by(orders.cID, orders.cName) \
- .select(orders.cID, orders.cName, orders.revenue.sum.alias('revSum'))
+ .filter(col('cCountry') == 'FRANCE') \
+ .group_by(col('cID'), col('cName')) \
+ .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
# emit or convert Table
# execute query
@@ -856,7 +856,7 @@ t_env = StreamTableEnvironment.create(env)
table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
- .where(table1.word.like('F%')) \
+ .where(col('word').like('F%')) \
.union_all(table2)
print(table.explain())
diff --git a/docs/content.zh/docs/dev/table/tableApi.md
b/docs/content.zh/docs/dev/table/tableApi.md
index 65a9f9d87a2..e5961db83d5 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -113,6 +113,7 @@ val result = orders
```python
from pyflink.table import *
+from pyflink.table.expressions import col
# 环境配置
t_env = TableEnvironment.create(
@@ -151,8 +152,7 @@ t_env.execute_sql(sink_ddl)
# 指定表程序
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)
-orders.group_by("a").select(orders.a,
orders.b.count.alias('cnt')).execute_insert("result").wait()
-
+orders.group_by(col("a")).select(col("a"),
col("b").count.alias('cnt')).execute_insert("result").wait()
```
{{< /tab >}}
@@ -207,14 +207,15 @@ val result: Table = orders
```python
# 指定表程序
from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)
-result = orders.filter(orders.a.is_not_null & orders.b.is_not_null &
orders.c.is_not_null) \
- .select(orders.a.lower_case.alias('a'), orders.b,
orders.rowtime) \
-
.window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \
+result = orders.filter(col("a").is_not_null & col("b").is_not_null &
col("c").is_not_null) \
+ .select(col("a").lower_case.alias('a'), col("b"),
col("rowtime")) \
+
.window(Tumble.over(lit(1).hour).on(col("rowtime")).alias("hourly_window")) \
.group_by(col('hourly_window'), col('a')) \
- .select(col('a'), col('hourly_window').end.alias('hour'),
b.avg.alias('avg_billing_amount'))
+ .select(col('a'), col('hourly_window').end.alias('hour'),
col("b").avg.alias('avg_billing_amount'))
```
{{< /tab >}}
@@ -369,7 +370,7 @@ Table result = orders.select($"a", $"c" as "d")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.select(orders.a, orders.c.alias('d'))
+result = orders.select(col("a"), col("c").alias('d'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -417,7 +418,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y",
"z", "t")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.alias("x, y, z, t")
+result = orders.alias("x", "y", "z", "t")
```
{{< /tab >}}
{{< /tabs >}}
@@ -446,7 +447,7 @@ val result = orders.filter($"a" % 2 === 0)
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.where(orders.a == 'red')
+result = orders.where(col("a") == 'red')
```
{{< /tab >}}
{{< /tabs >}}
@@ -469,7 +470,7 @@ val result = orders.filter($"a" % 2 === 0)
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.filter(orders.a == 'red')
+result = orders.filter(col("a") == 'red')
```
{{< /tab >}}
{{< /tabs >}}
@@ -504,7 +505,7 @@ val result = orders.addColumns(concat($"c", "Sunny"))
from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
-result = orders.add_columns(concat(orders.c, 'sunny'))
+result = orders.add_columns(concat(col("c"), 'sunny'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -535,7 +536,7 @@ val result = orders.addOrReplaceColumns(concat($"c",
"Sunny") as "desc")
from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
-result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))
+result = orders.add_or_replace_columns(concat(col("c"), 'sunny').alias('desc'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -560,7 +561,7 @@ val result = orders.dropColumns($"b", $"c")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.drop_columns(orders.b, orders.c)
+result = orders.drop_columns(col("b"), col("c"))
```
{{< /tab >}}
{{< /tabs >}}
@@ -588,7 +589,7 @@ val result = orders.renameColumns($"b" as "b2", $"c" as
"c2")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))
+result = orders.rename_columns(col("b").alias('b2'), col("c").alias('c2'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -621,7 +622,7 @@ val result = orders.groupBy($"a").select($"a",
$"b".sum().as("d"))
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))
+result = orders.group_by(col("a")).select(col("a"), col("b").sum.alias('d'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -666,9 +667,9 @@ from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col
orders = t_env.from_path("Orders")
-result =
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \
- .group_by(orders.a, col('w')) \
- .select(orders.a, col('w').start, col('w').end,
orders.b.sum.alias('d'))
+result =
orders.window(Tumble.over(lit(5).minutes).on(col('rowtime')).alias("w")) \
+ .group_by(col('a'), col('w')) \
+ .select(col('a'), col('w').start, col('w').end,
col('b').sum.alias('d'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -721,10 +722,10 @@ from pyflink.table.window import Over
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE
orders = t_env.from_path("Orders")
-result =
orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
+result =
orders.over_window(Over.partition_by(col("a")).order_by(col("rowtime"))
.preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
.alias("w")) \
- .select(orders.a, orders.b.avg.over(col('w')),
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+ .select(col("a"), col("b").avg.over(col('w')),
col("b").max.over(col('w')), col("b").min.over(col('w')))
```
{{< /tab >}}
{{< /tabs >}}
@@ -795,22 +796,23 @@ val result = orders
{{< tab "Python" >}}
```python
from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
+from pyflink.table.window import Over, Tumble
orders = t_env.from_path("Orders")
# 按属性分组后的的互异(互不相同、去重)聚合
-group_by_distinct_result = orders.group_by(orders.a) \
- .select(orders.a,
orders.b.sum.distinct.alias('d'))
+group_by_distinct_result = orders.group_by(col("a")) \
+ .select(col("a"),
col("b").sum.distinct.alias('d'))
# 按属性、时间窗口分组后的互异(互不相同、去重)聚合
-group_by_window_distinct_result = orders.window(
-
Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a,
col('w')) \
- .select(orders.a, orders.b.sum.distinct.alias('d'))
+group_by_window_distinct_result =
orders.window(Tumble.over(lit(5).minutes).on(col("rowtime")).alias("w")) \
+ .group_by(col("a"), col('w')) \
+ .select(col("a"), col("b").sum.distinct.alias('d'))
# over window 上的互异(互不相同、去重)聚合
result = orders.over_window(Over
- .partition_by(orders.a)
- .order_by(orders.rowtime)
- .preceding(UNBOUNDED_RANGE)
- .alias("w")) \
- .select(orders.a, orders.b.avg.distinct.over(col('w')),
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+ .partition_by(col("a"))
+ .order_by(col("rowtime"))
+ .preceding(UNBOUNDED_RANGE)
+ .alias("w")) \
+ .select(col("a"), col("b").avg.distinct.over(col('w')),
col("b").max.over(col('w')), col("b").min.over(col('w')))
```
{{< /tab >}}
{{< /tabs >}}
@@ -911,7 +913,7 @@ from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-result = left.join(right).where(left.a == right.d).select(left.a, left.b,
right.e)
+result = left.join(right).where(col('a') == col('d')).select(col('a'),
col('b'), col('e'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -957,9 +959,9 @@ from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-left_outer_result = left.left_outer_join(right, left.a ==
right.d).select(left.a, left.b, right.e)
-right_outer_result = left.right_outer_join(right, left.a ==
right.d).select(left.a, left.b, right.e)
-full_outer_result = left.full_outer_join(right, left.a ==
right.d).select(left.a, left.b, right.e)
+left_outer_result = left.left_outer_join(right, col('a') ==
col('d')).select(col('a'), col('b'), col('e'))
+right_outer_result = left.right_outer_join(right, col('a') ==
col('d')).select(col('a'), col('b'), col('e'))
+full_outer_result = left.full_outer_join(right, col('a') ==
col('d')).select(col('a'), col('b'), col('e'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1006,9 +1008,9 @@ from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'),
col('rowtime1'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'),
col('rowtime2'))
-
-joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >=
right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 +
lit(2).seconds))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.e,
joined_table.rowtime1)
+
+joined_table = left.join(right).where((col('a') == col('d')) &
(col('rowtime1') >= col('rowtime2') - lit(1).second) & (col('rowtime1') <=
col('rowtime2') + lit(2).seconds))
+result = joined_table.select(col('a'), col('b'), col('e'), col('rowtime1'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1054,8 +1056,8 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
+joined_table = orders.join_lateral(split(col('c')).alias("s", "t", "v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1103,8 +1105,8 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
+joined_table = orders.left_outer_join_lateral(split(col('c')).alias("s", "t",
"v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1370,7 +1372,7 @@ val result = left.select($"a", $"b",
$"c").where($"a".in(right))
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('a'))
-result = left.select(left.a, left.b, left.c).where(left.a.in_(right))
+result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1390,17 +1392,17 @@ result = left.select(left.a, left.b,
left.c).where(left.a.in_(right))
{{< tabs "orderby" >}}
{{< tab "Java" >}}
```java
-Table result = in.orderBy($("a").asc());
+Table result = tab.orderBy($("a").asc());
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val result = in.orderBy($"a".asc)
+val result = tab.orderBy($"a".asc)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
-result = in.order_by(in.a.asc)
+result = tab.order_by(col('a').asc)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1440,13 +1442,13 @@ val result3: Table =
in.orderBy($"a".asc).offset(10).fetch(5)
{{< tab "Python" >}}
```python
# 从已排序的结果集中返回前5条记录
-result1 = table.order_by(table.a.asc).fetch(5)
+result1 = table.order_by(col('a').asc).fetch(5)
# 从已排序的结果集中返回跳过3条记录之后的所有记录
-result2 = table.order_by(table.a.asc).offset(3)
+result2 = table.order_by(col('a').asc).offset(3)
# 从已排序的结果集中返回跳过10条记录之后的前5条记录
-result3 = table.order_by(table.a.asc).offset(10).fetch(5)
+result3 = table.order_by(col('a').asc).offset(10).fetch(5)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1518,7 +1520,7 @@ val table = input
```python
# 定义窗口并指定别名为 w,以窗口 w 对表进行分组,然后再聚合
table = input.window([w: GroupWindow].alias("w")) \
- .group_by(col('w')).select(input.b.sum)
+ .group_by(col('w')).select(col('b').sum)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1554,7 +1556,7 @@ val table = input
# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组,
# 然后再聚合
table = input.window([w: GroupWindow].alias("w")) \
- .group_by(col('w'), input.a).select(input.b.sum)
+ .group_by(col('w'), col('a')).select(col('b').sum)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1583,8 +1585,8 @@ val table = input
# 定义窗口并指定别名为 w,以属性 a 和窗口 w 对表进行分组,
# 然后再聚合并添加窗口开始、结束和 rowtime 时间戳
table = input.window([w: GroupWindow].alias("w")) \
- .group_by(col('w'), input.a) \
- .select(input.a, col('w').start, col('w').end, col('w').rowtime,
input.b.count)
+ .group_by(col('w'), col('a')) \
+ .select(col('a'), col('w').start, col('w').end, col('w').rowtime,
col('b').count)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1989,8 +1991,8 @@ val table = input
{{< tab "Python" >}}
```python
# define over window with alias w and aggregate over the over window w
-table = input.over_window([w: OverWindow].alias("w")) \
- .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))
+table = input.over_window([w: OverWindow].alias("w"))
+ .select(col('a'), col('b').sum.over(col('w')), col('c').min.over(col('w')))
```
{{< /tab >}}
{{< /tabs >}}
@@ -2217,9 +2219,9 @@ func = udf(map_function, result_type=DataTypes.ROW(
table = input.map(func).alias('a', 'b')
# 使用 python 向量化标量函数进行 map 操作
-pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
- [DataTypes.FIELD("a",
DataTypes.BIGINT()),
- DataTypes.FIELD("b",
DataTypes.BIGINT()))]),
+pandas_func = udf(lambda x: x * 2,
+ result_type=DataTypes.ROW([DataTypes.FIELD("a",
DataTypes.BIGINT()),
+ DataTypes.FIELD("b",
DataTypes.BIGINT())]),
func_type='pandas')
table = input.map(pandas_func).alias('a', 'b')
@@ -2451,9 +2453,9 @@ agg = udaf(function,
name=str(function.__class__.__name__))
# 使用 python 通用聚合函数进行聚合
-result = t.group_by(t.a) \
+result = t.group_by(col('a')) \
.aggregate(agg.alias("c", "d")) \
- .select("a, c, d")
+ .select(col('a'), col('c'), col('d'))
# 使用 python 向量化聚合函数进行聚合
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2462,8 +2464,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
- .select("a, b")
-
+ .select(col('a'), col('b'))
```
{{< /tab >}}
@@ -2504,20 +2505,22 @@ val table = input
```python
from pyflink.table import DataTypes
from pyflink.table.udf import AggregateFunction, udaf
+from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
-tumble_window = Tumble.over(expr.lit(1).hours) \
- .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+ .on(col("rowtime")) \
.alias("w")
-t.select(t.b, t.rowtime) \
+t.select(col('b'), col('rowtime')) \
.window(tumble_window) \
- .group_by("w") \
+ .group_by(col("w")) \
.aggregate(pandas_udaf.alias("d", "e")) \
- .select("w.rowtime, d, e")
+ .select(col('w').rowtime, col('d'), col('e'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -2667,6 +2670,7 @@ val result = orders
from pyflink.common import Row
from pyflink.table.udf import TableAggregateFunction, udtaf
from pyflink.table import DataTypes
+from pyflink.table.expressions import col
class Top2(TableAggregateFunction):
@@ -2702,13 +2706,13 @@ t = t_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(7, 'Hi', 'Hello'),
- (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
-result = t.select(t.a, t.c) \
- .group_by(t.c) \
+ (2, 'Hi', 'Hello')],
+ ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+ .group_by(col('c')) \
.flat_aggregate(mytop) \
- .select(t.a) \
+ .select(col('a')) \
.flat_aggregate(mytop.alias("b"))
-
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/dev/python/debugging.md
b/docs/content/docs/dev/python/debugging.md
index a1723e2a55e..d0597032513 100644
--- a/docs/content/docs/dev/python/debugging.md
+++ b/docs/content/docs/dev/python/debugging.md
@@ -60,10 +60,14 @@ You can log contextual and debug information via `print` or
standard Python logg
The logging messages will be printed in the log files of the `TaskManagers`
during job execution.
```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import udf
+
+import logging
+
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
# use logging modules
- import logging
logging.info("debug")
# use print function
print('debug')
diff --git a/docs/content/docs/dev/python/table/conversion_of_pandas.md
b/docs/content/docs/dev/python/table/conversion_of_pandas.md
index 36a43e1c91b..46e90bdd331 100644
--- a/docs/content/docs/dev/python/table/conversion_of_pandas.md
+++ b/docs/content/docs/dev/python/table/conversion_of_pandas.md
@@ -39,6 +39,8 @@ provide exactly-once guarantees.
The following example shows how to create a PyFlink Table from a Pandas
DataFrame:
```python
+from pyflink.table import DataTypes
+
import pandas as pd
import numpy as np
@@ -57,7 +59,7 @@ table = t_env.from_pandas(pdf, [DataTypes.DOUBLE(),
DataTypes.DOUBLE()])
# Create a PyFlink Table from a Pandas DataFrame with the specified row type
table = t_env.from_pandas(pdf,
DataTypes.ROW([DataTypes.FIELD("f0",
DataTypes.DOUBLE()),
- DataTypes.FIELD("f1",
DataTypes.DOUBLE())])
+ DataTypes.FIELD("f1",
DataTypes.DOUBLE())]))
```
## Convert PyFlink Table to Pandas DataFrame
@@ -72,6 +74,8 @@ You can limit the number of rows collected to client side via
{{< pythondoc file
The following example shows how to convert a PyFlink Table to a Pandas
DataFrame:
```python
+from pyflink.table.expressions import col
+
import pandas as pd
import numpy as np
diff --git a/docs/content/docs/dev/python/table/intro_to_table_api.md
b/docs/content/docs/dev/python/table/intro_to_table_api.md
index 938e0379955..13bb85736c0 100644
--- a/docs/content/docs/dev/python/table/intro_to_table_api.md
+++ b/docs/content/docs/dev/python/table/intro_to_table_api.md
@@ -36,6 +36,7 @@ All Table API and SQL programs, both batch and streaming,
follow the same patter
```python
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
@@ -70,7 +71,7 @@ source_table = table_env.from_path("datagen")
# or create a Table from a SQL query:
# source_table = table_env.sql_query("SELECT * FROM datagen")
-result_table = source_table.select(source_table.id + 1, source_table.data)
+result_table = source_table.select(col("id") + 1, col("data"))
# 5. emit query result to sink table
# emit a Table API result Table to a sink table:
@@ -319,6 +320,7 @@ The following example shows a simple Table API aggregation
query:
```python
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
# using batch table environment to execute the queries
env_settings = EnvironmentSettings.in_batch_mode()
@@ -329,11 +331,11 @@ orders = table_env.from_elements([('Jack', 'FRANCE', 10),
('Rose', 'ENGLAND', 30
# compute revenue for all customers from France
revenue = orders \
- .select(orders.name, orders.country, orders.revenue) \
- .where(orders.country == 'FRANCE') \
- .group_by(orders.name) \
- .select(orders.name, orders.revenue.sum.alias('rev_sum'))
-
+ .select(col("name"), col("country"), col("revenue")) \
+ .where(col("country") == 'FRANCE') \
+ .group_by(col("name")) \
+ .select(col("name"), col("country").sum.alias('rev_sum'))
+
revenue.execute().print()
```
@@ -687,6 +689,7 @@ The following code shows how to use the `Table.explain()`
method:
```python
# using a stream TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
@@ -694,7 +697,7 @@ table_env = TableEnvironment.create(env_settings)
table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table = table1 \
- .where(table1.data.like('H%')) \
+ .where(col("data").like('H%')) \
.union_all(table2)
print(table.explain())
```
@@ -740,6 +743,7 @@ The following code shows how to use the
`StatementSet.explain()` method:
```python
# using a stream TableEnvironment
from pyflink.table import EnvironmentSettings, TableEnvironment
+from pyflink.table.expressions import col
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
@@ -765,7 +769,7 @@ table_env.execute_sql("""
statement_set = table_env.create_statement_set()
-statement_set.add_insert("print_sink_table",
table1.where(table1.data.like('H%')))
+statement_set.add_insert("print_sink_table",
table1.where(col("data").like('H%')))
statement_set.add_insert("black_hole_sink_table", table2)
print(statement_set.explain())
diff --git a/docs/content/docs/dev/python/table/metrics.md
b/docs/content/docs/dev/python/table/metrics.md
index 56441980de8..2767c654ecf 100644
--- a/docs/content/docs/dev/python/table/metrics.md
+++ b/docs/content/docs/dev/python/table/metrics.md
@@ -60,7 +60,6 @@ class MyUDF(ScalarFunction):
def eval(self, i):
self.counter.inc(i)
return i
-
```
{{< /tab >}}
{{< /tabs >}}
@@ -137,7 +136,6 @@ class MyUDF(ScalarFunction):
self.meter = None
def open(self, function_context):
- super().open(function_context)
# an average rate of events per second over 120s, default is 60s.
self.meter = function_context.get_metric_group().meter("my_meter",
time_span_in_seconds=120)
@@ -162,17 +160,15 @@ group's sub-groups. In this case, the value group will be
returned, and a user v
{{< tabs "a3040b2d-bf2d-4ce4-be2f-2896f48334c8" >}}
{{< tab "Python" >}}
```python
-
-function_context
- .get_metric_group()
- .add_group("my_metrics")
+function_context \
+ .get_metric_group() \
+ .add_group("my_metrics") \
.counter("my_counter")
-function_context
- .get_metric_group()
- .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
-
```
{{< /tab >}}
{{< /tabs >}}
@@ -195,9 +191,9 @@ specifying the value parameter.
{{< tabs "d27cbda0-da5f-4a77-a02e-2e54e3156e31" >}}
{{< tab "Python" >}}
```python
-function_context
- .get_metric_group()
- .add_group("my_metrics_key", "my_metrics_value")
+function_context \
+ .get_metric_group() \
+ .add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
```
{{< /tab >}}
diff --git
a/docs/content/docs/dev/python/table/operations/row_based_operations.md
b/docs/content/docs/dev/python/table/operations/row_based_operations.md
index 3e313b274bc..640d8a05ddd 100644
--- a/docs/content/docs/dev/python/table/operations/row_based_operations.md
+++ b/docs/content/docs/dev/python/table/operations/row_based_operations.md
@@ -44,7 +44,7 @@ table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
- DataTypes.FIELD("data", DataTypes.STRING())]))
+ DataTypes.FIELD("data", DataTypes.STRING())]))
def func1(id: int, data: str) -> Row:
return Row(id, data * 2)
@@ -63,7 +63,7 @@ It also supports to take a Row object (containing all the
columns of the input t
```python
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
- DataTypes.FIELD("data", DataTypes.STRING())]))
+ DataTypes.FIELD("data", DataTypes.STRING())]))
def func2(data: Row) -> Row:
return Row(data.id, data.data * 2)
@@ -86,7 +86,7 @@ It should be noted that the input type and output type should
be pandas.DataFram
```python
import pandas as pd
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
- DataTypes.FIELD("data", DataTypes.STRING())]),
+ DataTypes.FIELD("data", DataTypes.STRING())]),
func_type='pandas')
def func3(data: pd.DataFrame) -> pd.DataFrame:
res = pd.concat([data.id, data.data * 2], axis=1)
@@ -226,7 +226,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
- .select(col('a'), col('b')).execute().print()
+ .select(col('a'), col('b')).execute().print()
# the result is
#+--------------------------------+-------------+
@@ -288,10 +288,11 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(7, 'Hi', 'Hello'),
- (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
+ (2, 'Hi', 'Hello')],
+ ['a', 'b', 'c'])
# call function "inline" without registration in Table API
-result =
t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print()
+result =
t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
# the result is:
#+----+--------------------------------+----------------------+
diff --git a/docs/content/docs/dev/python/table/python_table_api_connectors.md
b/docs/content/docs/dev/python/table/python_table_api_connectors.md
index a7f31d30cfb..2d85d5c4024 100644
--- a/docs/content/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content/docs/dev/python/table/python_table_api_connectors.md
@@ -138,12 +138,14 @@ The predefined data sinks support writing to Pandas
DataFrame.
PyFlink Tables support conversion to and from Pandas DataFrame.
```python
+from pyflink.table.expressions import col
+
import pandas as pd
import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
+table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
@@ -155,6 +157,8 @@ pdf = table.to_pandas()
be acceptable atomic types or acceptable composite types.
```python
+from pyflink.table import DataTypes
+
table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
# use the second parameter to specify custom field names
diff --git a/docs/content/docs/dev/python/table/udfs/python_udfs.md
b/docs/content/docs/dev/python/table/udfs/python_udfs.md
index 1cbf1a6ee49..d4301a26576 100644
--- a/docs/content/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/python_udfs.md
@@ -41,7 +41,9 @@ The following example shows how to define your own Python
hash code function, re
Note that you can configure your scalar function via a constructor before it
is registered:
```python
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import ScalarFunction, udf
class HashCode(ScalarFunction):
def __init__(self):
@@ -56,7 +58,7 @@ table_env = TableEnvironment.create(settings)
hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# use the Python function in Python Table API
-my_table.select(my_table.string, my_table.bigint, hash_code(my_table.bigint),
call(hash_code, my_table.bigint))
+my_table.select(col("string"), col("bigint"), hash_code(col("bigint")),
call(hash_code, col("bigint")))
# use the Python function in SQL API
table_env.create_temporary_function("hash_code", udf(HashCode(),
result_type=DataTypes.BIGINT()))
@@ -78,7 +80,8 @@ public class HashCode extends ScalarFunction {
}
}
'''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)
@@ -87,7 +90,7 @@ table_env = TableEnvironment.create(settings)
table_env.create_java_temporary_function("hash_code",
"my.java.function.HashCode")
# use the Java function in Python Table API
-my_table.select(call('hash_code', my_table.string))
+my_table.select(call('hash_code', col("string")))
# use the Java function in SQL API
table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
@@ -129,10 +132,10 @@ add = udf(functools.partial(partial_add, k=1),
result_type=DataTypes.BIGINT())
# register the Python function
table_env.create_temporary_function("add", add)
# use the function in Python Table API
-my_table.select("add(a, b)")
+my_table.select(call('add', col('a'), col('b')))
# You can also use the Python function in Python Table API directly
-my_table.select(add(my_table.a, my_table.b))
+my_table.select(add(col('a'), col('b')))
```
## Table Functions
@@ -145,6 +148,10 @@ The following example shows how to define your own Python
multi emit function, r
TableEnvironment, and call it in a query.
```python
+from pyflink.table.expressions import col
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.udf import TableFunction, udtf
+
class Split(TableFunction):
def eval(self, string):
for s in string.split(" "):
@@ -158,8 +165,8 @@ my_table = ... # type: Table, table schema: [a: String]
split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
# use the Python Table Function in Python Table API
-my_table.join_lateral(split(my_table.a).alias("word, length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
+my_table.join_lateral(split(col("a")).alias("word", "length"))
+my_table.left_outer_join_lateral(split(col("a")).alias("word", "length"))
# use the Python Table function in SQL API
table_env.create_temporary_function("split", udtf(Split(),
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -186,7 +193,8 @@ public class Split extends TableFunction<Tuple2<String,
Integer>> {
}
}
'''
-from pyflink.table.expressions import call
+from pyflink.table.expressions import call, col
+from pyflink.table import TableEnvironment, EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
@@ -196,8 +204,8 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.create_java_temporary_function("split", "my.java.function.Split")
# Use the table function in the Python Table API. "alias" specifies the field
names of the table.
-my_table.join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', col('a')).alias("word",
"length")).select(col('a'), col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', col('a')).alias("word",
"length")).select(col('a'), col('word'), col('length'))
# Register the python function.
@@ -313,14 +321,14 @@ t = table_env.from_elements([(1, 2, "Lee"),
(7, 8, "Lee")]).alias("value", "count", "name")
# call function "inline" without registration in Table API
-result = t.group_by(t.name).select(weighted_avg(t.value,
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(weighted_avg(col("value"),
col("count")).alias("avg")).execute()
result.print()
# register function
table_env.create_temporary_function("weighted_avg", WeightedAvg())
# call registered function in Table API
-result = t.group_by(t.name).select(call("weighted_avg", t.value,
t.count).alias("avg")).execute()
+result = t.group_by(col("name")).select(call("weighted_avg", col("value"),
col("count")).alias("avg")).execute()
result.print()
# register table
@@ -489,7 +497,7 @@ t = table_env.from_elements([(1, 'Hi', 'Hello'),
['a', 'b', 'c'])
# call function "inline" without registration in Table API
-t.group_by(t.b).flat_aggregate(top2).select(col('*')).execute().print()
+t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()
# the result is:
# b a
diff --git a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
index 7b79eabab8e..e31d79b54ed 100644
--- a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -49,6 +49,10 @@ The following example shows how to define your own
vectorized Python scalar func
and use it in a query:
```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col
+from pyflink.table.udf import udf
+
@udf(result_type=DataTypes.BIGINT(), func_type="pandas")
def add(i, j):
return i + j
@@ -57,7 +61,7 @@ settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(settings)
# use the vectorized Python scalar function in Python Table API
-my_table.select(add(my_table.bigint, my_table.bigint))
+my_table.select(add(col("bigint"), col("bigint")))
# use the vectorized Python scalar function in SQL API
table_env.create_temporary_function("add", add)
@@ -80,6 +84,11 @@ The following example shows how to define your own
vectorized Python aggregate f
and use it in `GroupBy Aggregation`, `GroupBy Window Aggregation` and `Over
Window Aggregation`:
```python
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
+from pyflink.table.expressions import col, lit
+from pyflink.table.udf import udaf
+from pyflink.table.window import Tumble
+
@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):
return v.mean()
@@ -90,18 +99,17 @@ table_env = TableEnvironment.create(settings)
my_table = ... # type: Table, table schema: [a: String, b: BigInt, c: BigInt]
# use the vectorized Python aggregate function in GroupBy Aggregation
-my_table.group_by(my_table.a).select(my_table.a, mean_udaf(add(my_table.b)))
+my_table.group_by(col('a')).select(col('a'), mean_udaf(col('b')))
# use the vectorized Python aggregate function in GroupBy Window Aggregation
-tumble_window = Tumble.over(expr.lit(1).hours) \
- .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+ .on(col("rowtime")) \
.alias("w")
my_table.window(tumble_window) \
- .group_by("w") \
- .select("w.start, w.end, mean_udaf(b)")
-
+ .group_by(col("w")) \
+ .select(col('w').start, col('w').end, mean_udaf(col('b')))
# use the vectorized Python aggregate function in Over Window Aggregation
table_env.create_temporary_function("mean_udaf", mean_udaf)
@@ -111,7 +119,6 @@ table_env.sql_query("""
over (PARTITION BY a ORDER BY rowtime
ROWS BETWEEN UNBOUNDED preceding AND UNBOUNDED FOLLOWING)
FROM MyTable""")
-
```
There are many ways to define a vectorized Python aggregate functions.
@@ -119,6 +126,8 @@ The following examples show the different ways to define a
vectorized Python agg
which takes two columns of bigint as the inputs and returns the sum of the
maximum of them as the result.
```python
+from pyflink.table import DataTypes
+from pyflink.table.udf import AggregateFunction, udaf
# option 1: extending the base class `AggregateFunction`
class MaxAdd(AggregateFunction):
diff --git a/docs/content/docs/dev/table/catalogs.md
b/docs/content/docs/dev/table/catalogs.md
index d0b7ab6be64..f166ab5f61b 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -245,7 +245,7 @@ schema = Schema.new_builder() \
catalog_table = t_env.create_table("myhive.mydb.mytable",
TableDescriptor.for_connector("kafka")
.schema(schema)
- // …
+ # …
.build())
# tables should contain "mytable"
diff --git a/docs/content/docs/dev/table/common.md
b/docs/content/docs/dev/table/common.md
index 5fa3ead0509..3aad29f7e01 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -480,9 +480,9 @@ table_env = # see "Create a TableEnvironment" section
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
- .filter(orders.cCountry == 'FRANCE') \
- .group_by(orders.cID, orders.cName) \
- .select(orders.cID, orders.cName, orders.revenue.sum.alias('revSum'))
+ .filter(col('cCountry') == 'FRANCE') \
+ .group_by(col('cID'), col('cName')) \
+ .select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
# emit or convert Table
# execute query
@@ -849,7 +849,7 @@ t_env = StreamTableEnvironment.create(env)
table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
- .where(table1.word.like('F%')) \
+ .where(col('word').like('F%')) \
.union_all(table2)
print(table.explain())
diff --git a/docs/content/docs/dev/table/data_stream_api.md
b/docs/content/docs/dev/table/data_stream_api.md
index 28950e2e942..6dd85151de2 100644
--- a/docs/content/docs/dev/table/data_stream_api.md
+++ b/docs/content/docs/dev/table/data_stream_api.md
@@ -2883,7 +2883,7 @@ t_env = ...
table = t_env.from_elements([("john", 35), ("sarah", 32)],
DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),
- DataTypes.FIELD("age", DataTypes.INT())]))
+ DataTypes.FIELD("age", DataTypes.INT())]))
# Convert the Table into an append DataStream of Row by specifying the type
information
ds_row = t_env.to_append_stream(table, Types.ROW([Types.STRING(),
Types.INT()]))
diff --git a/docs/content/docs/dev/table/tableApi.md
b/docs/content/docs/dev/table/tableApi.md
index d4f132c5afb..08e3223b9e9 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -114,6 +114,7 @@ The following example shows how a Python Table API program
is constructed and ho
```python
from pyflink.table import *
+from pyflink.table.expressions import col
# environment configuration
t_env = TableEnvironment.create(
@@ -152,8 +153,7 @@ t_env.execute_sql(sink_ddl)
# specify table program
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)
-orders.group_by("a").select(orders.a,
orders.b.count.alias('cnt')).execute_insert("result").wait()
-
+orders.group_by(col("a")).select(col("a"),
col("b").count.alias('cnt')).execute_insert("result").wait()
```
{{< /tab >}}
@@ -208,14 +208,15 @@ val result: Table = orders
```python
# specify table program
from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)
-result = orders.filter(orders.a.is_not_null & orders.b.is_not_null &
orders.c.is_not_null) \
- .select(orders.a.lower_case.alias('a'), orders.b,
orders.rowtime) \
-
.window(Tumble.over(lit(1).hour).on(orders.rowtime).alias("hourly_window")) \
+result = orders.filter(col("a").is_not_null & col("b").is_not_null &
col("c").is_not_null) \
+ .select(col("a").lower_case.alias('a'), col("b"),
col("rowtime")) \
+
.window(Tumble.over(lit(1).hour).on(col("rowtime")).alias("hourly_window")) \
.group_by(col('hourly_window'), col('a')) \
- .select(col('a'), col('hourly_window').end.alias('hour'),
b.avg.alias('avg_billing_amount'))
+ .select(col('a'), col('hourly_window').end.alias('hour'),
col("b").avg.alias('avg_billing_amount'))
```
{{< /tab >}}
@@ -370,7 +371,7 @@ Table result = orders.select($"a", $"c" as "d")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.select(orders.a, orders.c.alias('d'))
+result = orders.select(col("a"), col("c").alias('d'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -418,7 +419,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y",
"z", "t")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.alias("x, y, z, t")
+result = orders.alias("x", "y", "z", "t")
```
{{< /tab >}}
{{< /tabs >}}
@@ -446,7 +447,7 @@ val result = orders.filter($"a" % 2 === 0)
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.where(orders.a == 'red')
+result = orders.where(col("a") == 'red')
```
{{< /tab >}}
{{< /tabs >}}
@@ -469,7 +470,7 @@ val result = orders.filter($"a" % 2 === 0)
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.filter(orders.a == 'red')
+result = orders.filter(col("a") == 'red')
```
{{< /tab >}}
{{< /tabs >}}
@@ -502,7 +503,7 @@ val result = orders.addColumns(concat($"c", "Sunny"))
from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
-result = orders.add_columns(concat(orders.c, 'sunny'))
+result = orders.add_columns(concat(col("c"), 'sunny'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -533,7 +534,7 @@ val result = orders.addOrReplaceColumns(concat($"c",
"Sunny") as "desc")
from pyflink.table.expressions import concat
orders = t_env.from_path("Orders")
-result = orders.add_or_replace_columns(concat(orders.c, 'sunny').alias('desc'))
+result = orders.add_or_replace_columns(concat(col("c"), 'sunny').alias('desc'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -558,7 +559,7 @@ val result = orders.dropColumns($"b", $"c")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.drop_columns(orders.b, orders.c)
+result = orders.drop_columns(col("b"), col("c"))
```
{{< /tab >}}
{{< /tabs >}}
@@ -586,7 +587,7 @@ val result = orders.renameColumns($"b" as "b2", $"c" as
"c2")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.rename_columns(orders.b.alias('b2'), orders.c.alias('c2'))
+result = orders.rename_columns(col("b").alias('b2'), col("c").alias('c2'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -619,7 +620,7 @@ val result = orders.groupBy($"a").select($"a",
$"b".sum().as("d"))
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.group_by(orders.a).select(orders.a, orders.b.sum.alias('d'))
+result = orders.group_by(col("a")).select(col("a"), col("b").sum.alias('d'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -664,9 +665,9 @@ from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col
orders = t_env.from_path("Orders")
-result =
orders.window(Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")) \
- .group_by(orders.a, col('w')) \
- .select(orders.a, col('w').start, col('w').end,
orders.b.sum.alias('d'))
+result =
orders.window(Tumble.over(lit(5).minutes).on(col('rowtime')).alias("w")) \
+ .group_by(col('a'), col('w')) \
+ .select(col('a'), col('w').start, col('w').end,
col('b').sum.alias('d'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -720,10 +721,10 @@ from pyflink.table.window import Over
from pyflink.table.expressions import col, UNBOUNDED_RANGE, CURRENT_RANGE
orders = t_env.from_path("Orders")
-result =
orders.over_window(Over.partition_by(orders.a).order_by(orders.rowtime)
+result =
orders.over_window(Over.partition_by(col("a")).order_by(col("rowtime"))
.preceding(UNBOUNDED_RANGE).following(CURRENT_RANGE)
.alias("w")) \
- .select(orders.a, orders.b.avg.over(col('w')),
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+ .select(col("a"), col("b").avg.over(col('w')),
col("b").max.over(col('w')), col("b").min.over(col('w')))
```
{{< /tab >}}
{{< /tabs >}}
@@ -794,22 +795,23 @@ val result = orders
{{< tab "Python" >}}
```python
from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE
+from pyflink.table.window import Over, Tumble
orders = t_env.from_path("Orders")
# Distinct aggregation on group by
-group_by_distinct_result = orders.group_by(orders.a) \
- .select(orders.a,
orders.b.sum.distinct.alias('d'))
+group_by_distinct_result = orders.group_by(col("a")) \
+ .select(col("a"),
col("b").sum.distinct.alias('d'))
# Distinct aggregation on time window group by
-group_by_window_distinct_result = orders.window(
-
Tumble.over(lit(5).minutes).on(orders.rowtime).alias("w")).group_by(orders.a,
col('w')) \
- .select(orders.a, orders.b.sum.distinct.alias('d'))
+group_by_window_distinct_result =
orders.window(Tumble.over(lit(5).minutes).on(col("rowtime")).alias("w")) \
+ .group_by(col("a"), col('w')) \
+ .select(col("a"), col("b").sum.distinct.alias('d'))
# Distinct aggregation on over window
result = orders.over_window(Over
- .partition_by(orders.a)
- .order_by(orders.rowtime)
+ .partition_by(col("a"))
+ .order_by(col("rowtime"))
.preceding(UNBOUNDED_RANGE)
.alias("w")) \
- .select(orders.a, orders.b.avg.distinct.over(col('w')),
orders.b.max.over(col('w')), orders.b.min.over(col('w')))
+ .select(col("a"), col("b").avg.distinct.over(col('w')),
col("b").max.over(col('w')), col("b").min.over(col('w')))
```
{{< /tab >}}
{{< /tabs >}}
@@ -910,7 +912,7 @@ from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-result = left.join(right).where(left.a == right.d).select(left.a, left.b,
right.e)
+result = left.join(right).where(col('a') == col('d')).select(col('a'),
col('b'), col('e'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -956,9 +958,9 @@ from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'))
-left_outer_result = left.left_outer_join(right, left.a ==
right.d).select(left.a, left.b, right.e)
-right_outer_result = left.right_outer_join(right, left.a ==
right.d).select(left.a, left.b, right.e)
-full_outer_result = left.full_outer_join(right, left.a ==
right.d).select(left.a, left.b, right.e)
+left_outer_result = left.left_outer_join(right, col('a') ==
col('d')).select(col('a'), col('b'), col('e'))
+right_outer_result = left.right_outer_join(right, col('a') ==
col('d')).select(col('a'), col('b'), col('e'))
+full_outer_result = left.full_outer_join(right, col('a') ==
col('d')).select(col('a'), col('b'), col('e'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1006,8 +1008,8 @@ from pyflink.table.expressions import col
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'),
col('rowtime1'))
right = t_env.from_path("Source2").select(col('d'), col('e'), col('f'),
col('rowtime2'))
-joined_table = left.join(right).where((left.a == right.d) & (left.rowtime1 >=
right.rowtime2 - lit(1).second) & (left.rowtime1 <= right.rowtime2 +
lit(2).seconds))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.e,
joined_table.rowtime1)
+joined_table = left.join(right).where((col('a') == col('d')) &
(col('rowtime1') >= col('rowtime2') - lit(1).second) & (col('rowtime1') <=
col('rowtime2') + lit(2).seconds))
+result = joined_table.select(col('a'), col('b'), col('e'), col('rowtime1'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1053,8 +1055,8 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
+joined_table = orders.join_lateral(split(col('c')).alias("s", "t", "v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1102,8 +1104,8 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
-result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
+joined_table = orders.left_outer_join_lateral(split(col('c')).alias("s", "t",
"v"))
+result = joined_table.select(col('a'), col('b'), col('s'), col('t'), col('v'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1369,7 +1371,7 @@ val result = left.select($"a", $"b",
$"c").where($"a".in(right))
left = t_env.from_path("Source1").select(col('a'), col('b'), col('c'))
right = t_env.from_path("Source2").select(col('a'))
-result = left.select(left.a, left.b, left.c).where(left.a.in_(right))
+result = left.select(col('a'), col('b'), col('c')).where(col('a').in_(right))
```
{{< /tab >}}
{{< /tabs >}}
@@ -1389,17 +1391,17 @@ Similar to a SQL `ORDER BY` clause. Returns records
globally sorted across all p
{{< tabs "orderby" >}}
{{< tab "Java" >}}
```java
-Table result = in.orderBy($("a").asc());
+Table result = tab.orderBy($("a").asc());
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
-val result = in.orderBy($"a".asc)
+val result = tab.orderBy($"a".asc)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
-result = in.order_by(in.a.asc)
+result = tab.order_by(col('a').asc)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1439,13 +1441,13 @@ val result3: Table =
in.orderBy($"a".asc).offset(10).fetch(5)
{{< tab "Python" >}}
```python
# returns the first 5 records from the sorted result
-result1 = table.order_by(table.a.asc).fetch(5)
+result1 = table.order_by(col('a').asc).fetch(5)
# skips the first 3 records and returns all following records from the sorted
result
-result2 = table.order_by(table.a.asc).offset(3)
+result2 = table.order_by(col('a').asc).offset(3)
# skips the first 10 records and returns the next 5 records from the sorted
result
-result3 = table.order_by(table.a.asc).offset(10).fetch(5)
+result3 = table.order_by(col('a').asc).offset(10).fetch(5)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1517,7 +1519,7 @@ The following example shows how to define a window
aggregation on a table.
```python
# define window with alias w, group the table by window w, then aggregate
table = input.window([w: GroupWindow].alias("w")) \
- .group_by(col('w')).select(input.b.sum)
+ .group_by(col('w')).select(col('b').sum)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1553,7 +1555,7 @@ The following example shows how to define a window
aggregation with additional g
# define window with alias w, group the table by attribute a and window w,
# then aggregate
table = input.window([w: GroupWindow].alias("w")) \
- .group_by(col('w'), input.a).select(input.b.sum)
+ .group_by(col('w'), col('a')).select(col('b').sum)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1582,8 +1584,8 @@ val table = input
# define window with alias w, group the table by attribute a and window w,
# then aggregate and add window start, end, and rowtime timestamps
table = input.window([w: GroupWindow].alias("w")) \
- .group_by(col('w'), input.a) \
- .select(input.a, col('w').start, col('w').end, col('w').rowtime,
input.b.count)
+ .group_by(col('w'), col('a')) \
+ .select(col('a'), col('w').start, col('w').end, col('w').rowtime,
col('b').count)
```
{{< /tab >}}
{{< /tabs >}}
@@ -1989,7 +1991,7 @@ val table = input
```python
# define over window with alias w and aggregate over the over window w
table = input.over_window([w: OverWindow].alias("w")) \
- .select(input.a, input.b.sum.over(col('w')), input.c.min.over(col('w')))
+ .select(col('a'), col('b').sum.over(col('w')), col('c').min.over(col('w')))
```
{{< /tab >}}
{{< /tabs >}}
@@ -2216,9 +2218,9 @@ func = udf(map_function, result_type=DataTypes.ROW(
table = input.map(func).alias('a', 'b')
# map operation with a python vectorized scalar function
-pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
- [DataTypes.FIELD("a",
DataTypes.BIGINT()),
- DataTypes.FIELD("b",
DataTypes.BIGINT()))]),
+pandas_func = udf(lambda x: x * 2,
+ result_type=DataTypes.ROW([DataTypes.FIELD("a",
DataTypes.BIGINT()),
+ DataTypes.FIELD("b",
DataTypes.BIGINT())]),
func_type='pandas')
table = input.map(pandas_func).alias('a', 'b')
@@ -2450,9 +2452,9 @@ agg = udaf(function,
name=str(function.__class__.__name__))
# aggregate with a python general aggregate function
-result = t.group_by(t.a) \
+result = t.group_by(col('a')) \
.aggregate(agg.alias("c", "d")) \
- .select("a, c, d")
+ .select(col('a'), col('c'), col('d'))
# aggregate with a python vectorized aggregate function
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2461,8 +2463,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
- .select("a, b")
-
+ .select(col('a'), col('b'))
```
{{< /tab >}}
@@ -2504,20 +2505,22 @@ val table = input
```python
from pyflink.table import DataTypes
from pyflink.table.udf import AggregateFunction, udaf
+from pyflink.table.expressions import col, lit
+from pyflink.table.window import Tumble
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
result_type=DataTypes.ROW(
[DataTypes.FIELD("a", DataTypes.FLOAT()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
-tumble_window = Tumble.over(expr.lit(1).hours) \
- .on(expr.col("rowtime")) \
+tumble_window = Tumble.over(lit(1).hours) \
+ .on(col("rowtime")) \
.alias("w")
-t.select(t.b, t.rowtime) \
+t.select(col('b'), col('rowtime')) \
.window(tumble_window) \
- .group_by("w") \
+ .group_by(col("w")) \
.aggregate(pandas_udaf.alias("d", "e")) \
- .select("w.rowtime, d, e")
+ .select(col('w').rowtime, col('d'), col('e'))
```
{{< /tab >}}
{{< /tabs >}}
@@ -2667,6 +2670,7 @@ Similar to a **GroupBy Aggregation**. Groups the rows on
the grouping keys with
from pyflink.common import Row
from pyflink.table.udf import TableAggregateFunction, udtaf
from pyflink.table import DataTypes
+from pyflink.table.expressions import col
class Top2(TableAggregateFunction):
@@ -2702,13 +2706,13 @@ t = t_env.from_elements([(1, 'Hi', 'Hello'),
(3, 'Hi', 'hi'),
(5, 'Hi2', 'hi'),
(7, 'Hi', 'Hello'),
- (2, 'Hi', 'Hello')], ['a', 'b', 'c'])
-result = t.select(t.a, t.c) \
- .group_by(t.c) \
+ (2, 'Hi', 'Hello')],
+ ['a', 'b', 'c'])
+result = t.select(col('a'), col('c')) \
+ .group_by(col('c')) \
.flat_aggregate(mytop) \
- .select(t.a) \
+ .select(col('a')) \
.flat_aggregate(mytop.alias("b"))
-
```
{{< /tab >}}
{{< /tabs >}}
diff --git
a/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
b/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
index dc518f4e3d2..3dc9b7d82dd 100644
--- a/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
+++ b/flink-python/pyflink/examples/table/mixing_use_of_datastream_and_table.py
@@ -21,6 +21,7 @@ import sys
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (DataTypes, TableDescriptor, Schema,
StreamTableEnvironment)
+from pyflink.table.expressions import col
from pyflink.table.udf import udf
@@ -55,7 +56,7 @@ def mixing_use_of_datastream_and_table():
# perform table api operations
table = t_env.from_path("source")
- table = table.select(table.id, length(table.data))
+ table = table.select(col('id'), length(col('data')))
# convert table to datastream and perform datastream api operations
ds = t_env.to_data_stream(table)
diff --git a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
index e4e8b9ea522..72febe590e7 100644
--- a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
+++ b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
@@ -55,7 +55,7 @@ def pandas_udaf():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts, name, price")
+ ).alias("ts", "name", "price")
# define the sink
t_env.create_temporary_table(
@@ -75,8 +75,8 @@ def pandas_udaf():
# define the tumble window operation
table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w"))
\
- .group_by(table.name, col('w')) \
- .select(table.name, mean_udaf(table.price), col("w").start,
col("w").end)
+ .group_by(col('name'), col('w')) \
+ .select(col('name'), mean_udaf(col('price')), col("w").start,
col("w").end)
# submit for execution
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/process_json_data.py
b/flink-python/pyflink/examples/table/process_json_data.py
index eeb6f7c88cd..b410d11c58e 100644
--- a/flink-python/pyflink/examples/table/process_json_data.py
+++ b/flink-python/pyflink/examples/table/process_json_data.py
@@ -20,6 +20,7 @@ import sys
from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes,
TableDescriptor,
Schema)
+from pyflink.table.expressions import col
def process_json_data():
@@ -45,7 +46,7 @@ def process_json_data():
.build())
.build())
- table = table.select(table.id, table.data.json_value('$.addr.country',
DataTypes.STRING()))
+ table = table.select(col('id'), col('data').json_value('$.addr.country',
DataTypes.STRING()))
# execute
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/process_json_data_with_udf.py
b/flink-python/pyflink/examples/table/process_json_data_with_udf.py
index 31969e2f675..934e73a68d8 100644
--- a/flink-python/pyflink/examples/table/process_json_data_with_udf.py
+++ b/flink-python/pyflink/examples/table/process_json_data_with_udf.py
@@ -21,6 +21,7 @@ import sys
from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes,
TableDescriptor,
Schema)
+from pyflink.table.expressions import col
from pyflink.table.udf import udf
@@ -54,7 +55,7 @@ def process_json_data_with_udf():
json_data['tel'] += 1
return json.dumps(json_data)
- table = table.select(table.id, update_tel(table.data))
+ table = table.select(col('id'), update_tel(col('data')))
# execute
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/over_window.py
b/flink-python/pyflink/examples/table/windowing/over_window.py
index 982d6b860d4..d958e75ddc1 100644
--- a/flink-python/pyflink/examples/table/windowing/over_window.py
+++ b/flink-python/pyflink/examples/table/windowing/over_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts, name, price")
+ ).alias("ts", "name", "price")
# define the sink
t_env.create_temporary_table(
@@ -68,12 +68,12 @@ def tumble_window_demo():
# define the over window operation
table = table.over_window(
- Over.partition_by("name")
- .order_by("ts")
+ Over.partition_by(col("name"))
+ .order_by(col("ts"))
.preceding(row_interval(2))
.following(CURRENT_ROW)
.alias('w')) \
- .select(table.name, table.price.max.over(col('w')))
+ .select(col('name'), col('price').max.over(col('w')))
# submit for execution
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/session_window.py
b/flink-python/pyflink/examples/table/windowing/session_window.py
index 5b40a7b9af8..e7506b0f8b3 100644
--- a/flink-python/pyflink/examples/table/windowing/session_window.py
+++ b/flink-python/pyflink/examples/table/windowing/session_window.py
@@ -52,7 +52,7 @@ def session_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts, name, price")
+ ).alias("ts", "name", "price")
# define the sink
t_env.create_temporary_table(
@@ -68,8 +68,8 @@ def session_window_demo():
# define the session window operation
table =
table.window(Session.with_gap(lit(5).seconds).on(col("ts")).alias("w")) \
- .group_by(table.name, col('w')) \
- .select(table.name, table.price.sum, col("w").start,
col("w").end)
+ .group_by(col('name'), col('w')) \
+ .select(col('name'), col('price').sum, col("w").start,
col("w").end)
# submit for execution
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/sliding_window.py
b/flink-python/pyflink/examples/table/windowing/sliding_window.py
index 1b8bb150fd2..10ba068b1a5 100644
--- a/flink-python/pyflink/examples/table/windowing/sliding_window.py
+++ b/flink-python/pyflink/examples/table/windowing/sliding_window.py
@@ -54,7 +54,7 @@ def sliding_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts, name, price")
+ ).alias("ts", "name", "price")
# define the sink
t_env.create_temporary_table(
@@ -70,8 +70,8 @@ def sliding_window_demo():
# define the sliding window operation
table =
table.window(Slide.over(lit(5).seconds).every(lit(2).seconds).on(col("ts")).alias("w"))\
- .group_by(table.name, col('w')) \
- .select(table.name, table.price.sum, col("w").start,
col("w").end)
+ .group_by(col('name'), col('w')) \
+ .select(col('name'), col('price').sum, col("w").start,
col("w").end)
# submit for execution
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/examples/table/windowing/tumble_window.py
b/flink-python/pyflink/examples/table/windowing/tumble_window.py
index dd3ba2ea408..ff5bb99516e 100644
--- a/flink-python/pyflink/examples/table/windowing/tumble_window.py
+++ b/flink-python/pyflink/examples/table/windowing/tumble_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts, name, price")
+ ).alias("ts", "name", "price")
# define the sink
t_env.create_temporary_table(
@@ -70,8 +70,8 @@ def tumble_window_demo():
# define the tumble window operation
table = table.window(Tumble.over(lit(5).seconds).on(col("ts")).alias("w"))
\
- .group_by(table.name, col('w')) \
- .select(table.name, table.price.sum, col("w").start,
col("w").end)
+ .group_by(col('name'), col('w')) \
+ .select(col('name'), col('price').sum, col("w").start,
col("w").end)
# submit for execution
table.execute_insert('sink') \
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index 89e11e08f11..6cf95e441ea 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -911,7 +911,7 @@ class Expression(Generic[T]):
::
>>> tab.where(col("a").in_(1, 2, 3))
- >>> table_a.where(col("x").in_(table_b.select("y")))
+ >>> table_a.where(col("x").in_(table_b.select(col("y"))))
"""
from pyflink.table import Table
if isinstance(first_element_or_table, Table):
diff --git a/flink-python/pyflink/table/schema.py
b/flink-python/pyflink/table/schema.py
index 5903557783e..d8856bf1669 100644
--- a/flink-python/pyflink/table/schema.py
+++ b/flink-python/pyflink/table/schema.py
@@ -139,9 +139,9 @@ class Schema(object):
Example:
::
- >>> Schema.new_builder().
- ... column_by_expression("ts", "orig_ts - INTERVAL '60'
MINUTE").
- ... column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3),
"timestamp")
+ >>> Schema.new_builder() \\
+ ... .column_by_expression("ts", "orig_ts - INTERVAL '60'
MINUTE") \\
+ ... .column_by_metadata("orig_ts", DataTypes.TIMESTAMP(3),
"timestamp")
:param column_name: Column name
:param expr: Computation of the column
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index 200cafa2342..26b57c805e6 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -84,7 +84,7 @@ class Table(object):
Example:
::
- >>> from pyflink.table import TableEnvironment
+ >>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>> from pyflink.table.expressions import *
>>> env_settings = EnvironmentSettings.in_streaming_mode()
>>> t_env = TableEnvironment.create(env_settings)
@@ -122,11 +122,9 @@ class Table(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> tab.select(tab.key, expr.concat(tab.value, 'hello'))
- >>> tab.select(expr.col('key'), expr.concat(expr.col('value'),
'hello'))
-
- >>> tab.select("key, value + 'hello'")
+ >>> from pyflink.table.expressions import col, concat
+ >>> tab.select(tab.key, concat(tab.value, 'hello'))
+ >>> tab.select(col('key'), concat(col('value'), 'hello'))
:return: The result table.
"""
@@ -146,7 +144,6 @@ class Table(object):
::
>>> tab.alias("a", "b", "c")
- >>> tab.alias("a, b, c")
:param field: Field alias.
:param fields: Additional field aliases.
@@ -164,8 +161,7 @@ class Table(object):
Example:
::
- >>> tab.filter(tab.name == 'Fred')
- >>> tab.filter("name = 'Fred'")
+ >>> tab.filter(col('name') == 'Fred')
:param predicate: Predicate expression string.
:return: The result table.
@@ -180,8 +176,7 @@ class Table(object):
Example:
::
- >>> tab.where(tab.name == 'Fred')
- >>> tab.where("name = 'Fred'")
+ >>> tab.where(col('name') == 'Fred')
:param predicate: Predicate expression string.
:return: The result table.
@@ -196,8 +191,7 @@ class Table(object):
Example:
::
- >>> tab.group_by(tab.key).select(tab.key, tab.value.avg)
- >>> tab.group_by("key").select("key, value.avg")
+ >>> tab.group_by(col('key')).select(col('key'), col('value').avg)
:param fields: Group keys.
:return: The grouped table.
@@ -216,7 +210,7 @@ class Table(object):
Example:
::
- >>> tab.select(tab.key, tab.value).distinct()
+ >>> tab.select(col('key'), col('value')).distinct()
:return: The result table.
"""
@@ -236,9 +230,8 @@ class Table(object):
Example:
::
- >>> left.join(right).where((left.a == right.b) && (left.c > 3))
- >>> left.join(right).where("a = b && c > 3")
- >>> left.join(right, left.a == right.b)
+ >>> left.join(right).where((col('a') == col('b')) && (col('c') >
3))
+ >>> left.join(right, col('a') == col('b'))
:param right: Right table.
:param join_predicate: Optional, the join predicate expression string.
@@ -267,8 +260,7 @@ class Table(object):
::
>>> left.left_outer_join(right)
- >>> left.left_outer_join(right, left.a == right.b)
- >>> left.left_outer_join(right, "a = b")
+ >>> left.left_outer_join(right, col('a') == col('b'))
:param right: Right table.
:param join_predicate: Optional, the join predicate expression string.
@@ -296,8 +288,7 @@ class Table(object):
Example:
::
- >>> left.right_outer_join(right, left.a == right.b)
- >>> left.right_outer_join(right, "a = b")
+ >>> left.right_outer_join(right, col('a') == col('b'))
:param right: Right table.
:param join_predicate: The join predicate expression string.
@@ -322,8 +313,7 @@ class Table(object):
Example:
::
- >>> left.full_outer_join(right, left.a == right.b)
- >>> left.full_outer_join(right, "a = b")
+ >>> left.full_outer_join(right, col('a') == col('b'))
:param right: Right table.
:param join_predicate: The join predicate expression string.
@@ -343,12 +333,10 @@ class Table(object):
Example:
::
+ >>> from pyflink.table.expressions import *
>>> t_env.create_java_temporary_system_function("split",
- ... "java.table.function.class.name")
- >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
-
- >>> from pyflink.table import expressions as expr
- >>> tab.join_lateral(expr.call('split', ' ').alias('b'),
expr.col('a') == expr.col('b'))
+ ... "java.table.function.class.name")
+ >>> tab.join_lateral(call('split', ' ').alias('b'), col('a') ==
col('b'))
>>> # take all the columns as inputs
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
... def split_row(row: Row):
@@ -394,9 +382,9 @@ class Table(object):
>>> t_env.create_java_temporary_system_function("split",
... "java.table.function.class.name")
- >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
- >>> from pyflink.table import expressions as expr
- >>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
+ >>> from pyflink.table.expressions import *
+ >>> tab.left_outer_join_lateral(call('split', ' ').alias('b'))
+
>>> # take all the columns as inputs
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
... def split_row(row: Row):
@@ -560,8 +548,7 @@ class Table(object):
Example:
::
- >>> tab.order_by(tab.name.desc)
- >>> tab.order_by("name.desc")
+ >>> tab.order_by(col('name').desc)
For unbounded tables, this operation requires a sorting on a time
attribute or a subsequent
fetch operation.
@@ -588,10 +575,9 @@ class Table(object):
::
# skips the first 3 rows and returns all following rows.
- >>> tab.order_by(tab.name.desc).offset(3)
- >>> tab.order_by("name.desc").offset(3)
+ >>> tab.order_by(col('name').desc).offset(3)
# skips the first 10 rows and returns the next 5 rows.
- >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
+ >>> tab.order_by(col('name').desc).offset(10).fetch(5)
For unbounded tables, this operation requires a subsequent fetch
operation.
@@ -613,13 +599,12 @@ class Table(object):
Returns the first 3 records.
::
- >>> tab.order_by(tab.name.desc).fetch(3)
- >>> tab.order_by("name.desc").fetch(3)
+ >>> tab.order_by(col('name').desc).fetch(3)
Skips the first 10 rows and returns the next 5 rows.
::
- >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
+ >>> tab.order_by(col('name').desc).offset(10).fetch(5)
:param fetch: The number of records to return. Fetch must be >= 0.
:return: The result table.
@@ -675,10 +660,10 @@ class Table(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>>
tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\
+ >>> from pyflink.table.expressions import col, lit
+ >>>
tab.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias('w')) \\
... .group_by(col('w')) \\
- ... .select(tab.a.sum.alias('a'),
+ ... .select(col('a').sum.alias('a'),
... col('w').start.alias('b'),
... col('w').end.alias('c'),
... col('w').rowtime.alias('d'))
@@ -700,10 +685,10 @@ class Table(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> tab.over_window(Over.partition_by(tab.c).order_by(tab.rowtime)
\\
+ >>> from pyflink.table.expressions import col, lit
+ >>>
tab.over_window(Over.partition_by(col('c')).order_by(col('rowtime')) \\
... .preceding(lit(10).seconds).alias("ow")) \\
- ... .select(tab.c, tab.b.count.over(col('ow'),
tab.e.sum.over(col('ow'))))
+ ... .select(col('c'), col('b').count.over(col('ow'),
col('e').sum.over(col('ow'))))
.. note::
@@ -732,9 +717,8 @@ class Table(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b,
'sunny').alias('b1'))
- >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
+ >>> from pyflink.table.expressions import col, concat
+ >>> tab.add_columns((col('a') + 1).alias('a1'), concat(col('b'),
'sunny').alias('b1'))
:param fields: Column list string.
:return: The result table.
@@ -756,10 +740,9 @@ class Table(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> tab.add_or_replace_columns((tab.a + 1).alias('a1'),
- ... expr.concat(tab.b,
'sunny').alias('b1'))
- >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as
b1")
+ >>> from pyflink.table.expressions import col, concat
+ >>> tab.add_or_replace_columns((col('a') + 1).alias('a1'),
+ ... concat(col('b'),
'sunny').alias('b1'))
:param fields: Column list string.
:return: The result table.
@@ -780,8 +763,7 @@ class Table(object):
Example:
::
- >>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1'))
- >>> tab.rename_columns("a as a1, b as b1")
+ >>> tab.rename_columns(col('a').alias('a1'), col('b').alias('b1'))
:param fields: Column list string.
:return: The result table.
@@ -801,8 +783,7 @@ class Table(object):
Example:
::
- >>> tab.drop_columns(tab.a, tab.b)
- >>> tab.drop_columns("a, b")
+ >>> tab.drop_columns(col('a'), col('b'))
:param fields: Column list string.
:return: The result table.
@@ -824,7 +805,7 @@ class Table(object):
>>> add = udf(lambda x: Row(x + 1, x * x),
result_type=DataTypes.Row(
... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b",
DataTypes.INT())]))
- >>> tab.map(add(tab.a)).alias("a, b")
+ >>> tab.map(add(col('a'))).alias("a", "b")
>>> # take all the columns as inputs
>>> identity = udf(lambda row: row, result_type=DataTypes.Row(
... [DataTypes.FIELD("a", DataTypes.INT()), DataTypes.FIELD("b",
DataTypes.INT())]))
@@ -854,7 +835,7 @@ class Table(object):
... def split(x, string):
... for s in string.split(","):
... yield x, s
- >>> tab.flat_map(split(tab.a, table.b))
+ >>> tab.flat_map(split(col('a'), col('b')))
>>> # take all the columns as inputs
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
... def split_row(row: Row):
@@ -889,7 +870,7 @@ class Table(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
+ >>> tab.aggregate(agg(col('a')).alias("a", "b")).select(col('a'),
col('b'))
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.a.max()),
@@ -897,7 +878,7 @@ class Table(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.aggregate(agg.alias("a, b")).select("a, b")
+ >>> tab.aggregate(agg.alias("a", "b")).select(col("a"), col("b"))
:param func: user-defined aggregate function.
:return: The result table.
@@ -928,7 +909,7 @@ class Table(object):
::
>>> table_agg = udtaf(MyTableAggregateFunction())
- >>> tab.flat_aggregate(table_agg(tab.a).alias("a",
"b")).select("a, b")
+ >>> tab.flat_aggregate(table_agg(col('a')).alias("a",
"b")).select(col('a'), col('b'))
>>> # take all the columns as inputs
>>> class Top2(TableAggregateFunction):
... def emit_value(self, accumulator):
@@ -954,7 +935,7 @@ class Table(object):
... return DataTypes.ROW(
... [DataTypes.FIELD("a", DataTypes.BIGINT())])
>>> top2 = udtaf(Top2())
- >>> tab.flat_aggregate(top2.alias("a", "b")).select("a, b")
+ >>> tab.flat_aggregate(top2.alias("a", "b")).select(col("a"),
col("b"))
:param func: user-defined table aggregate function.
:return: The result table.
@@ -985,7 +966,7 @@ class Table(object):
>>> pdf = pd.DataFrame(np.random.rand(1000, 2))
>>> table = table_env.from_pandas(pdf, ["a", "b"])
- >>> table.filter(table.a > 0.5).to_pandas()
+ >>> table.filter(col('a') > 0.5).to_pandas()
:return: the result pandas DataFrame.
@@ -1156,9 +1137,7 @@ class GroupedTable(object):
Example:
::
- >>> tab.group_by(tab.key).select(tab.key,
tab.value.avg.alias('average'))
- >>> tab.group_by("key").select("key, value.avg as average")
-
+ >>> tab.group_by(col('key')).select(col('key'),
col('value').avg.alias('average'))
:param fields: Expression string that contains group keys and
aggregate function calls.
:return: The result table.
@@ -1184,7 +1163,8 @@ class GroupedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c",
"d")).select("a, c, d")
+ >>> tab.group_by(col('a')).aggregate(agg(col('b')).alias("c",
"d")).select(
+ ... col('a'), col('c'), col('d'))
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1192,7 +1172,7 @@ class GroupedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
+ >>> tab.group_by(col('a')).aggregate(agg.alias("a",
"b")).select(col('a'), col('b'))
:param func: user-defined aggregate function.
:return: The result table.
@@ -1223,7 +1203,8 @@ class GroupedTable(object):
::
>>> table_agg = udtaf(MyTableAggregateFunction())
- >>>
tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select("c, a")
+ >>>
tab.group_by(col('c')).flat_aggregate(table_agg(col('a')).alias("a")).select(
+ ... col('c'), col('a'))
>>> # take all the columns as inputs
>>> class Top2(TableAggregateFunction):
... def emit_value(self, accumulator):
@@ -1249,7 +1230,9 @@ class GroupedTable(object):
... return DataTypes.ROW(
... [DataTypes.FIELD("a", DataTypes.BIGINT())])
>>> top2 = udtaf(Top2())
- >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a",
"b")).select("a, b")
+ >>> tab.group_by(col('c')) \\
+ ... .flat_aggregate(top2.alias("a", "b")) \\
+ ... .select(col('a'), col('b'))
:param func: user-defined table aggregate function.
:return: The result table.
@@ -1294,10 +1277,10 @@ class GroupWindowedTable(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>>
tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\
+ >>> from pyflink.table.expressions import col, lit
+ >>>
tab.window(Tumble.over(lit(10).minutes).on(col('rowtime')).alias('w')) \\
... .group_by(col('w')) \\
- ... .select(tab.a.sum.alias('a'),
+ ... .select(col('a').sum.alias('a'),
... col('w').start.alias('b'),
... col('w').end.alias('c'),
... col('w').rowtime.alias('d'))
@@ -1335,7 +1318,6 @@ class WindowGroupedTable(object):
>>> window_grouped_table.select(col('key'),
... col('window').start,
... col('value').avg.alias('valavg'))
- >>> window_grouped_table.select("key, window.start, value.avg as
valavg")
:param fields: Expression string.
:return: The result table.
@@ -1361,10 +1343,10 @@ class WindowGroupedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> window_grouped_table.group_by("w") \
- ... .aggregate(agg(window_grouped_table.b) \
- ... .alias("c", "d")) \
- ... .select("c, d")
+ >>> window_grouped_table.group_by(col("w")) \\
+ ... .aggregate(agg(col('b'))) \\
+ ... .alias("c", "d") \\
+ ... .select(col('c'), col('d'))
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1372,7 +1354,7 @@ class WindowGroupedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> window_grouped_table.group_by("w, a").aggregate(agg_row)
+ >>> window_grouped_table.group_by(col("w"),
col("a")).aggregate(agg_row)
:param func: user-defined aggregate function.
:return: The result table.
@@ -1427,7 +1409,6 @@ class OverWindowedTable(object):
>>> over_windowed_table.select(col('c'),
... col('b').count.over(col('ow')),
... col('e').sum.over(col('ow')))
- >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
:param fields: Expression string.
:return: The result table.
@@ -1462,7 +1443,7 @@ class AggregatedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
+ >>> tab.aggregate(agg(col('a')).alias("a", "b")).select(col('a'),
col('b'))
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1470,7 +1451,7 @@ class AggregatedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
+ >>> tab.group_by(col('a')).aggregate(agg.alias("a",
"b")).select(col('a'), col('b'))
:param fields: Expression string.
:return: The result table.
@@ -1502,7 +1483,7 @@ class FlatAggregateTable(object):
::
>>> table_agg = udtaf(MyTableAggregateFunction())
- >>> tab.flat_aggregate(table_agg(tab.a).alias("a",
"b")).select("a, b")
+ >>> tab.flat_aggregate(table_agg(col('a')).alias("a",
"b")).select(col('a'), col('b'))
>>> # take all the columns as inputs
>>> class Top2(TableAggregateFunction):
... def emit_value(self, accumulator):
@@ -1528,7 +1509,9 @@ class FlatAggregateTable(object):
... return DataTypes.ROW(
... [DataTypes.FIELD("a", DataTypes.BIGINT())])
>>> top2 = udtaf(Top2())
- >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a",
"b")).select("a, b")
+ >>> tab.group_by(col('c')) \\
+ ... .flat_aggregate(top2.alias("a", "b")) \\
+ ... .select(col('a'), col('b'))
:param fields: Expression string.
:return: The result table.
diff --git a/flink-python/pyflink/table/table_config.py
b/flink-python/pyflink/table/table_config.py
index 5fa9896a566..630e0f2f927 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -236,8 +236,7 @@ class TableConfig(object):
Example:
::
- >>> table_config = TableConfig() \\
- ... .set_idle_state_retention(datetime.timedelta(days=1))
+ >>>
table_config.set_idle_state_retention(datetime.timedelta(days=1))
.. note::
diff --git a/flink-python/pyflink/table/table_environment.py
b/flink-python/pyflink/table/table_environment.py
index 2e38ad9006a..2d13f2ea544 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -1940,7 +1940,7 @@ class StreamTableEnvironment(TableEnvironment):
... .column("id", DataTypes.BIGINT())
... .column("payload", DataTypes.ROW(
... [DataTypes.FIELD("name",
DataTypes.STRING()),
- ... DataTypes.FIELD("age",
DataTypes.INT())]))
+ ... DataTypes.FIELD("age",
DataTypes.INT())]))
... .build())
Note that the type system of the table ecosystem is richer than the
one of the DataStream
diff --git a/flink-python/pyflink/table/window.py
b/flink-python/pyflink/table/window.py
index 282660c6481..9dcd54998c7 100644
--- a/flink-python/pyflink/table/window.py
+++ b/flink-python/pyflink/table/window.py
@@ -61,12 +61,10 @@ class Tumble(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> Tumble.over(expr.lit(10).minutes)
- ... .on(expr.col("rowtime"))
+ >>> from pyflink.table.expressions import col, lit
+ >>> Tumble.over(lit(10).minutes) \\
+ ... .on(col("rowtime")) \\
... .alias("w")
-
- >>> Tumble.over("10.minutes").on("rowtime").alias("w")
"""
@classmethod
@@ -140,13 +138,10 @@ class Session(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> Session.with_gap(expr.lit(10).minutes)
- ... .on(expr.col("rowtime"))
+ >>> from pyflink.table.expressions import col, lit
+ >>> Session.with_gap(lit(10).minutes) \\
+ ... .on(col("rowtime")) \\
... .alias("w")
-
- >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
-
"""
@classmethod
@@ -225,13 +220,11 @@ class Slide(object):
Example:
::
- >>> from pyflink.table import expressions as expr
- >>> Slide.over(expr.lit(10).minutes)
- ... .every(expr.lit(5).minutes)
- ... .on(expr.col("rowtime"))
+ >>> from pyflink.table.expressions import col, lit
+ >>> Slide.over(lit(10).minutes) \\
+ ... .every(lit(5).minutes) \\
+ ... .on(col("rowtime")) \\
... .alias("w")
-
- >>>
Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
"""
@classmethod
@@ -333,13 +326,11 @@ class Over(object):
Example:
::
- >>> from pyflink.table import expressions as expr
+ >>> from pyflink.table.expressions import col, UNBOUNDED_RANGE
>>> Over.partition_by(col("a")) \\
... .order_by(col("rowtime")) \\
- ... .preceding(expr.UNBOUNDED_RANGE) \\
+ ... .preceding(UNBOUNDED_RANGE) \\
... .alias("w")
-
- >>>
Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
"""
@classmethod