This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 1bfbdc9 Revert "[FLINK-26986][python] Remove deprecated string
expressions in Python Table API"
1bfbdc9 is described below
commit 1bfbdc9c76de3b3fab4d70058257b7a307269570
Author: Dian Fu <[email protected]>
AuthorDate: Sat Apr 2 09:58:17 2022 +0800
Revert "[FLINK-26986][python] Remove deprecated string expressions in
Python Table API"
This reverts commits 02c5e4136c809eac7b5d723be0d043b639ddf477.
---
.../python/table/python_table_api_connectors.md | 2 +-
.../docs/dev/python/table/udfs/python_udfs.md | 12 +-
.../python/table/udfs/vectorized_python_udfs.md | 3 +-
docs/content.zh/docs/dev/table/catalogs.md | 2 +-
docs/content.zh/docs/dev/table/tableApi.md | 15 +-
.../python/table/python_table_api_connectors.md | 2 +-
.../docs/dev/python/table/udfs/python_udfs.md | 12 +-
.../python/table/udfs/vectorized_python_udfs.md | 3 +-
docs/content/docs/dev/table/catalogs.md | 2 +-
docs/content/docs/dev/table/tableApi.md | 15 +-
.../pyflink/examples/table/pandas/pandas_udaf.py | 2 +-
.../examples/table/windowing/over_window.py | 6 +-
.../examples/table/windowing/session_window.py | 2 +-
.../examples/table/windowing/sliding_window.py | 2 +-
.../examples/table/windowing/tumble_window.py | 2 +-
flink-python/pyflink/table/table.py | 214 +++++++++++++++------
flink-python/pyflink/table/window.py | 7 +
17 files changed, 204 insertions(+), 99 deletions(-)
diff --git
a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
index 6be2c1e..dbf94fb 100644
--- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md
@@ -139,7 +139,7 @@ import numpy as np
# 创建一个 PyFlink 表
pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
+table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
# 将 PyFlink 表转换成 Pandas DataFrame
pdf = table.to_pandas()
diff --git a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
index f16c433..124d1bb 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/python_udfs.md
@@ -128,7 +128,7 @@ add = udf(functools.partial(partial_add, k=1),
result_type=DataTypes.BIGINT())
# 注册 Python 自定义函数
table_env.create_temporary_function("add", add)
# 在 Python Table API 中使用 Python 自定义函数
-my_table.select(call('add', my_table.a, my_table.b))
+my_table.select("add(a, b)")
# 也可以在 Python Table API 中直接使用 Python 自定义函数
my_table.select(add(my_table.a, my_table.b))
@@ -156,8 +156,8 @@ my_table = ... # type: Table, table schema: [a: String]
split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
# 在 Python Table API 中使用 Python 表值函数
-my_table.join_lateral(split(my_table.a).alias("word", "length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word", "length"))
+my_table.join_lateral(split(my_table.a).alias("word, length"))
+my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
# 在 SQL API 中使用 Python 表值函数
table_env.create_temporary_function("split", udtf(Split(),
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -194,8 +194,8 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.create_java_temporary_function("split", "my.java.function.Split")
# 在 Python Table API 中使用表值函数。 "alias"指定表的字段名称。
-my_table.join_lateral(call('split', my_table.a).alias("word",
"length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word",
"length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
# 注册 Python 函数。
@@ -337,7 +337,7 @@ tumble_window = Tumble.over(lit(1).hours) \
result = t.window(tumble_window) \
.group_by(col('w'), col('name')) \
- .select(col('w').start, col('w').end, weighted_avg(col('value'),
col('count'))) \
+ .select("w.start, w.end, weighted_avg(value, count)") \
.to_pandas()
print(result)
diff --git
a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
index 10012d8..af6c7f6 100644
--- a/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content.zh/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -101,7 +101,8 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
my_table.window(tumble_window) \
.group_by("w") \
- .select(col('w').start, col('w').end, mean_udaf(col('b')))
+ .select("w.start, w.end, mean_udaf(b)")
+
# 在 Over Window Aggregation 中使用向量化聚合函数
table_env.create_temporary_function("mean_udaf", mean_udaf)
diff --git a/docs/content.zh/docs/dev/table/catalogs.md
b/docs/content.zh/docs/dev/table/catalogs.md
index 05722d8..0b86fbd 100644
--- a/docs/content.zh/docs/dev/table/catalogs.md
+++ b/docs/content.zh/docs/dev/table/catalogs.md
@@ -241,7 +241,7 @@ schema = Schema.new_builder() \
catalog_table = t_env.create_table("myhive.mydb.mytable",
TableDescriptor.for_connector("kafka")
.schema(schema)
- # …
+ // …
.build())
# tables should contain "mytable"
diff --git a/docs/content.zh/docs/dev/table/tableApi.md
b/docs/content.zh/docs/dev/table/tableApi.md
index d2b198c..65a9f9d 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -417,7 +417,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y",
"z", "t")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.alias("x", "y", "z", "t")
+result = orders.alias("x, y, z, t")
```
{{< /tab >}}
{{< /tabs >}}
@@ -1054,7 +1054,7 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s", "t", "v"))
+joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
```
{{< /tab >}}
@@ -1103,7 +1103,7 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s", "t",
"v"))
+joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
```
{{< /tab >}}
@@ -2453,7 +2453,7 @@ agg = udaf(function,
# 使用 python 通用聚合函数进行聚合
result = t.group_by(t.a) \
.aggregate(agg.alias("c", "d")) \
- .select(col('a'), col('c'), col('d'))
+ .select("a, c, d")
# 使用 python 向量化聚合函数进行聚合
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2462,7 +2462,8 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
- .select(col('a'), col('b'))
+ .select("a, b")
+
```
{{< /tab >}}
@@ -2514,9 +2515,9 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
.alias("w")
t.select(t.b, t.rowtime) \
.window(tumble_window) \
- .group_by(col("w")) \
+ .group_by("w") \
.aggregate(pandas_udaf.alias("d", "e")) \
- .select(col('w').rowtime, col('d'), col('e'))
+ .select("w.rowtime, d, e")
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/docs/content/docs/dev/python/table/python_table_api_connectors.md
b/docs/content/docs/dev/python/table/python_table_api_connectors.md
index b3ee248..a7f31d3 100644
--- a/docs/content/docs/dev/python/table/python_table_api_connectors.md
+++ b/docs/content/docs/dev/python/table/python_table_api_connectors.md
@@ -143,7 +143,7 @@ import numpy as np
# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
-table = t_env.from_pandas(pdf, ["a", "b"]).filter(col('a') > 0.5)
+table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")
# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
diff --git a/docs/content/docs/dev/python/table/udfs/python_udfs.md
b/docs/content/docs/dev/python/table/udfs/python_udfs.md
index c33a51b..745124b 100644
--- a/docs/content/docs/dev/python/table/udfs/python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/python_udfs.md
@@ -129,7 +129,7 @@ add = udf(functools.partial(partial_add, k=1),
result_type=DataTypes.BIGINT())
# register the Python function
table_env.create_temporary_function("add", add)
# use the function in Python Table API
-my_table.select(call('add', my_table.a, my_table.b))
+my_table.select("add(a, b)")
# You can also use the Python function in Python Table API directly
my_table.select(add(my_table.a, my_table.b))
@@ -158,8 +158,8 @@ my_table = ... # type: Table, table schema: [a: String]
split = udtf(Split(), result_types=[DataTypes.STRING(), DataTypes.INT()])
# use the Python Table Function in Python Table API
-my_table.join_lateral(split(my_table.a).alias("word", "length"))
-my_table.left_outer_join_lateral(split(my_table.a).alias("word", "length"))
+my_table.join_lateral(split(my_table.a).alias("word, length"))
+my_table.left_outer_join_lateral(split(my_table.a).alias("word, length"))
# use the Python Table function in SQL API
table_env.create_temporary_function("split", udtf(Split(),
result_types=[DataTypes.STRING(), DataTypes.INT()]))
@@ -196,8 +196,8 @@ my_table = ... # type: Table, table schema: [a: String]
table_env.create_java_temporary_function("split", "my.java.function.Split")
# Use the table function in the Python Table API. "alias" specifies the field
names of the table.
-my_table.join_lateral(call('split', my_table.a).alias("word",
"length")).select(my_table.a, col('word'), col('length'))
-my_table.left_outer_join_lateral(call('split', my_table.a).alias("word",
"length")).select(my_table.a, col('word'), col('length'))
+my_table.join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
+my_table.left_outer_join_lateral(call('split', my_table.a).alias("word,
length")).select(my_table.a, col('word'), col('length'))
# Register the python function.
@@ -338,7 +338,7 @@ tumble_window = Tumble.over(lit(1).hours) \
result = t.window(tumble_window) \
.group_by(col('w'), col('name')) \
- .select(col('w').start, col('w').end, weighted_avg(col('value'),
col('count'))) \
+ .select("w.start, w.end, weighted_avg(value, count)") \
.to_pandas()
print(result)
diff --git a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
index c3461b3..7b79eab 100644
--- a/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
+++ b/docs/content/docs/dev/python/table/udfs/vectorized_python_udfs.md
@@ -100,7 +100,8 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
my_table.window(tumble_window) \
.group_by("w") \
- .select(col('w').start, col('w').end, mean_udaf(col('b')))
+ .select("w.start, w.end, mean_udaf(b)")
+
# use the vectorized Python aggregate function in Over Window Aggregation
table_env.create_temporary_function("mean_udaf", mean_udaf)
diff --git a/docs/content/docs/dev/table/catalogs.md
b/docs/content/docs/dev/table/catalogs.md
index f166ab5..d0b7ab6 100644
--- a/docs/content/docs/dev/table/catalogs.md
+++ b/docs/content/docs/dev/table/catalogs.md
@@ -245,7 +245,7 @@ schema = Schema.new_builder() \
catalog_table = t_env.create_table("myhive.mydb.mytable",
TableDescriptor.for_connector("kafka")
.schema(schema)
- # …
+ // …
.build())
# tables should contain "mytable"
diff --git a/docs/content/docs/dev/table/tableApi.md
b/docs/content/docs/dev/table/tableApi.md
index 91ab6cf..d4f132c 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -418,7 +418,7 @@ val orders: Table = tableEnv.from("Orders").as("x", "y",
"z", "t")
{{< tab "Python" >}}
```python
orders = t_env.from_path("Orders")
-result = orders.alias("x", "y", "z", "t")
+result = orders.alias("x, y, z, t")
```
{{< /tab >}}
{{< /tabs >}}
@@ -1053,7 +1053,7 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.join_lateral(split(orders.c).alias("s", "t", "v"))
+joined_table = orders.join_lateral(split(orders.c).alias("s, t, v"))
result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
```
{{< /tab >}}
@@ -1102,7 +1102,7 @@ def split(x):
# join
orders = t_env.from_path("Orders")
-joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s", "t",
"v"))
+joined_table = orders.left_outer_join_lateral(split(orders.c).alias("s, t, v"))
result = joined_table.select(joined_table.a, joined_table.b, joined_table.s,
joined_table.t, joined_table.v)
```
{{< /tab >}}
@@ -2452,7 +2452,7 @@ agg = udaf(function,
# aggregate with a python general aggregate function
result = t.group_by(t.a) \
.aggregate(agg.alias("c", "d")) \
- select(col('a'), col('c'), col('d'))
+ .select("a, c, d")
# aggregate with a python vectorized aggregate function
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
@@ -2461,7 +2461,8 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
DataTypes.FIELD("b", DataTypes.INT())]),
func_type="pandas")
t.aggregate(pandas_udaf.alias("a", "b")) \
- select(col('a'), col('b'))
+ .select("a, b")
+
```
{{< /tab >}}
@@ -2514,9 +2515,9 @@ tumble_window = Tumble.over(expr.lit(1).hours) \
.alias("w")
t.select(t.b, t.rowtime) \
.window(tumble_window) \
- .group_by(col("w")) \
+ .group_by("w") \
.aggregate(pandas_udaf.alias("d", "e")) \
- .select(col('w').rowtime, col('d'), col('e'))
+ .select("w.rowtime, d, e")
```
{{< /tab >}}
{{< /tabs >}}
diff --git a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
index 4b884dd..e4e8b9e 100644
--- a/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
+++ b/flink-python/pyflink/examples/table/pandas/pandas_udaf.py
@@ -55,7 +55,7 @@ def pandas_udaf():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts", "name", "price")
+ ).alias("ts, name, price")
# define the sink
t_env.create_temporary_table(
diff --git a/flink-python/pyflink/examples/table/windowing/over_window.py
b/flink-python/pyflink/examples/table/windowing/over_window.py
index 5fd736b..982d6b8 100644
--- a/flink-python/pyflink/examples/table/windowing/over_window.py
+++ b/flink-python/pyflink/examples/table/windowing/over_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts", "name", "price")
+ ).alias("ts, name, price")
# define the sink
t_env.create_temporary_table(
@@ -68,8 +68,8 @@ def tumble_window_demo():
# define the over window operation
table = table.over_window(
- Over.partition_by(col("name"))
- .order_by(col("ts"))
+ Over.partition_by("name")
+ .order_by("ts")
.preceding(row_interval(2))
.following(CURRENT_ROW)
.alias('w')) \
diff --git a/flink-python/pyflink/examples/table/windowing/session_window.py
b/flink-python/pyflink/examples/table/windowing/session_window.py
index 49e4680..5b40a7b 100644
--- a/flink-python/pyflink/examples/table/windowing/session_window.py
+++ b/flink-python/pyflink/examples/table/windowing/session_window.py
@@ -52,7 +52,7 @@ def session_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts", "name", "price")
+ ).alias("ts, name, price")
# define the sink
t_env.create_temporary_table(
diff --git a/flink-python/pyflink/examples/table/windowing/sliding_window.py
b/flink-python/pyflink/examples/table/windowing/sliding_window.py
index fc460c6..1b8bb15 100644
--- a/flink-python/pyflink/examples/table/windowing/sliding_window.py
+++ b/flink-python/pyflink/examples/table/windowing/sliding_window.py
@@ -54,7 +54,7 @@ def sliding_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts", "name", "price")
+ ).alias("ts, name, price")
# define the sink
t_env.create_temporary_table(
diff --git a/flink-python/pyflink/examples/table/windowing/tumble_window.py
b/flink-python/pyflink/examples/table/windowing/tumble_window.py
index c778747..dd3ba2e 100644
--- a/flink-python/pyflink/examples/table/windowing/tumble_window.py
+++ b/flink-python/pyflink/examples/table/windowing/tumble_window.py
@@ -54,7 +54,7 @@ def tumble_window_demo():
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
- ).alias("ts", "name", "price")
+ ).alias("ts, name, price")
# define the sink
t_env.create_temporary_table(
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index ed46d8a..b7d8ae6 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -90,7 +90,7 @@ class Table(object):
% (name, ', '.join(self.get_schema().get_field_names())))
return col(name)
- def select(self, *fields: Expression) -> 'Table':
+ def select(self, *fields: Union[str, Expression]) -> 'Table':
"""
Performs a selection operation. Similar to a SQL SELECT statement. The
field expressions
can contain complex expressions.
@@ -102,9 +102,16 @@ class Table(object):
>>> tab.select(tab.key, expr.concat(tab.value, 'hello'))
>>> tab.select(expr.col('key'), expr.concat(expr.col('value'),
'hello'))
+ >>> tab.select("key, value + 'hello'")
+
:return: The result table.
"""
- return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.select(fields[0]), self._t_env)
def alias(self, field: str, *fields: str) -> 'Table':
"""
@@ -115,6 +122,7 @@ class Table(object):
::
>>> tab.alias("a", "b", "c")
+ >>> tab.alias("a, b, c")
:param field: Field alias.
:param fields: Additional field aliases.
@@ -133,6 +141,7 @@ class Table(object):
::
>>> tab.filter(tab.name == 'Fred')
+ >>> tab.filter("name = 'Fred'")
:param predicate: Predicate expression string.
:return: The result table.
@@ -148,13 +157,14 @@ class Table(object):
::
>>> tab.where(tab.name == 'Fred')
+ >>> tab.where("name = 'Fred'")
:param predicate: Predicate expression string.
:return: The result table.
"""
return Table(self._j_table.where(_get_java_expression(predicate)),
self._t_env)
- def group_by(self, *fields: Expression) -> 'GroupedTable':
+ def group_by(self, *fields: Union[str, Expression]) -> 'GroupedTable':
"""
Groups the elements on some grouping keys. Use this before a selection
with aggregations
to perform the aggregation on a per-group basis. Similar to a SQL
GROUP BY statement.
@@ -163,11 +173,17 @@ class Table(object):
::
>>> tab.group_by(tab.key).select(tab.key, tab.value.avg)
+ >>> tab.group_by("key").select("key, value.avg")
:param fields: Group keys.
:return: The grouped table.
"""
- return
GroupedTable(self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return
GroupedTable(self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return GroupedTable(self._j_table.groupBy(fields[0]), self._t_env)
def distinct(self) -> 'Table':
"""
@@ -212,7 +228,7 @@ class Table(object):
def left_outer_join(self,
right: 'Table',
- join_predicate: Expression[bool] = None) -> 'Table':
+ join_predicate: Union[str, Expression[bool]] = None)
-> 'Table':
"""
Joins two :class:`~pyflink.table.Table`. Similar to a SQL left outer
join. The fields of
the two joined operations must not overlap, use
:func:`~pyflink.table.Table.alias` to
@@ -228,6 +244,7 @@ class Table(object):
>>> left.left_outer_join(right)
>>> left.left_outer_join(right, left.a == right.b)
+ >>> left.left_outer_join(right, "a = b")
:param right: Right table.
:param join_predicate: Optional, the join predicate expression string.
@@ -256,6 +273,7 @@ class Table(object):
::
>>> left.right_outer_join(right, left.a == right.b)
+ >>> left.right_outer_join(right, "a = b")
:param right: Right table.
:param join_predicate: The join predicate expression string.
@@ -281,6 +299,7 @@ class Table(object):
::
>>> left.full_outer_join(right, left.a == right.b)
+ >>> left.full_outer_join(right, "a = b")
:param right: Right table.
:param join_predicate: The join predicate expression string.
@@ -301,7 +320,8 @@ class Table(object):
::
>>> t_env.create_java_temporary_system_function("split",
- ... "java.table.function.class.name")
+ ... "java.table.function.class.name")
+ >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
>>> from pyflink.table import expressions as expr
>>> tab.join_lateral(expr.call('split', ' ').alias('b'),
expr.col('a') == expr.col('b'))
@@ -350,6 +370,7 @@ class Table(object):
>>> t_env.create_java_temporary_system_function("split",
... "java.table.function.class.name")
+ >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
>>> from pyflink.table import expressions as expr
>>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
>>> # take all the columns as inputs
@@ -507,7 +528,7 @@ class Table(object):
"""
return Table(self._j_table.intersectAll(right._j_table), self._t_env)
- def order_by(self, *fields: Expression) -> 'Table':
+ def order_by(self, *fields: Union[str, Expression]) -> 'Table':
"""
Sorts the given :class:`~pyflink.table.Table`. Similar to SQL ORDER BY.
The resulting Table is sorted globally sorted across all parallel
partitions.
@@ -516,6 +537,7 @@ class Table(object):
::
>>> tab.order_by(tab.name.desc)
+ >>> tab.order_by("name.desc")
For unbounded tables, this operation requires a sorting on a time
attribute or a subsequent
fetch operation.
@@ -523,7 +545,12 @@ class Table(object):
:param fields: Order fields expression string.
:return: The result table.
"""
- return Table(self._j_table.orderBy(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return Table(self._j_table.orderBy(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.orderBy(fields[0]), self._t_env)
def offset(self, offset: int) -> 'Table':
"""
@@ -538,6 +565,7 @@ class Table(object):
# skips the first 3 rows and returns all following rows.
>>> tab.order_by(tab.name.desc).offset(3)
+ >>> tab.order_by("name.desc").offset(3)
# skips the first 10 rows and returns the next 5 rows.
>>> tab.order_by(tab.name.desc).offset(10).fetch(5)
@@ -562,6 +590,7 @@ class Table(object):
::
>>> tab.order_by(tab.name.desc).fetch(3)
+ >>> tab.order_by("name.desc").fetch(3)
Skips the first 10 rows and returns the next 5 rows.
::
@@ -670,7 +699,7 @@ class Table(object):
[item._java_over_window for item in
over_windows])
return OverWindowedTable(self._j_table.window(window_array),
self._t_env)
- def add_columns(self, *fields: Expression) -> 'Table':
+ def add_columns(self, *fields: Union[str, Expression]) -> 'Table':
"""
Adds additional columns. Similar to a SQL SELECT statement. The field
expressions
can contain complex expressions, but can not contain aggregations. It
will throw an
@@ -681,13 +710,19 @@ class Table(object):
>>> from pyflink.table import expressions as expr
>>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b,
'sunny').alias('b1'))
+ >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
:param fields: Column list string.
:return: The result table.
"""
- return Table(self._j_table.addColumns(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return
Table(self._j_table.addColumns(to_expression_jarray(fields)), self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.addColumns(fields[0]), self._t_env)
- def add_or_replace_columns(self, *fields: Expression) -> 'Table':
+ def add_or_replace_columns(self, *fields: Union[str, Expression]) ->
'Table':
"""
Adds additional columns. Similar to a SQL SELECT statement. The field
expressions
can contain complex expressions, but can not contain aggregations.
Existing fields will be
@@ -700,14 +735,20 @@ class Table(object):
>>> from pyflink.table import expressions as expr
>>> tab.add_or_replace_columns((tab.a + 1).alias('a1'),
... expr.concat(tab.b,
'sunny').alias('b1'))
+ >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as
b1")
:param fields: Column list string.
:return: The result table.
"""
- return
Table(self._j_table.addOrReplaceColumns(to_expression_jarray(fields)),
- self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return
Table(self._j_table.addOrReplaceColumns(to_expression_jarray(fields)),
+ self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.addOrReplaceColumns(fields[0]),
self._t_env)
- def rename_columns(self, *fields: Expression) -> 'Table':
+ def rename_columns(self, *fields: Union[str, Expression]) -> 'Table':
"""
Renames existing columns. Similar to a field alias statement. The
field expressions
should be alias expressions, and only the existing fields can be
renamed.
@@ -716,14 +757,20 @@ class Table(object):
::
>>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1'))
+ >>> tab.rename_columns("a as a1, b as b1")
:param fields: Column list string.
:return: The result table.
"""
- return Table(self._j_table.renameColumns(to_expression_jarray(fields)),
- self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return
Table(self._j_table.renameColumns(to_expression_jarray(fields)),
+ self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.renameColumns(fields[0]), self._t_env)
- def drop_columns(self, *fields: Expression) -> 'Table':
+ def drop_columns(self, *fields: Union[str, Expression]) -> 'Table':
"""
Drops existing columns. The field expressions should be field
reference expressions.
@@ -731,14 +778,20 @@ class Table(object):
::
>>> tab.drop_columns(tab.a, tab.b)
+ >>> tab.drop_columns("a, b")
:param fields: Column list string.
:return: The result table.
"""
- return Table(self._j_table.dropColumns(to_expression_jarray(fields)),
- self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return
Table(self._j_table.dropColumns(to_expression_jarray(fields)),
+ self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.dropColumns(fields[0]), self._t_env)
- def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper])
-> 'Table':
+ def map(self, func: Union[str, Expression,
UserDefinedScalarFunctionWrapper]) -> 'Table':
"""
Performs a map operation with a user-defined scalar function.
@@ -758,13 +811,15 @@ class Table(object):
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return Table(self._j_table.map(func), self._t_env)
+ elif isinstance(func, Expression):
return Table(self._j_table.map(func._j_expr), self._t_env)
else:
func._set_takes_row_as_input()
return
Table(self._j_table.map(func(with_columns(col("*")))._j_expr), self._t_env)
- def flat_map(self, func: Union[Expression,
UserDefinedTableFunctionWrapper]) -> 'Table':
+ def flat_map(self, func: Union[str, Expression,
UserDefinedTableFunctionWrapper]) -> 'Table':
"""
Performs a flatMap operation with a user-defined table function.
@@ -788,13 +843,15 @@ class Table(object):
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return Table(self._j_table.flatMap(func), self._t_env)
+ elif isinstance(func, Expression):
return Table(self._j_table.flatMap(func._j_expr), self._t_env)
else:
func._set_takes_row_as_input()
return
Table(self._j_table.flatMap(func(with_columns(col("*")))._j_expr), self._t_env)
- def aggregate(self, func: Union[Expression,
UserDefinedAggregateFunctionWrapper]) \
+ def aggregate(self, func: Union[str, Expression,
UserDefinedAggregateFunctionWrapper]) \
-> 'AggregatedTable':
"""
Performs a global aggregate operation with an aggregate function. You
have to close the
@@ -808,7 +865,7 @@ class Table(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.aggregate(agg(tab.a).alias("a", "b")).select(col('a'),
col('b'))
+ >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.a.max()),
@@ -816,14 +873,16 @@ class Table(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.aggregate(agg.alias("a, b")).select(col('a'), col('b'))
+ >>> tab.aggregate(agg.alias("a, b")).select("a, b")
:param func: user-defined aggregate function.
:return: The result table.
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+ elif isinstance(func, Expression):
return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
else:
func._set_takes_row_as_input()
@@ -834,7 +893,7 @@ class Table(object):
func = func(with_columns(col("*")))
return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
- def flat_aggregate(self, func: Union[Expression,
UserDefinedAggregateFunctionWrapper]) \
+ def flat_aggregate(self, func: Union[str, Expression,
UserDefinedAggregateFunctionWrapper]) \
-> 'FlatAggregateTable':
"""
Perform a global flat_aggregate without group_by. flat_aggregate takes
a
@@ -845,7 +904,7 @@ class Table(object):
::
>>> table_agg = udtaf(MyTableAggregateFunction())
- >>> tab.flat_aggregate(table_agg(tab.a).alias("a",
"b")).select(col('a'), col('b'))
+ >>> tab.flat_aggregate(table_agg(tab.a).alias("a",
"b")).select("a, b")
>>> # take all the columns as inputs
>>> class Top2(TableAggregateFunction):
... def emit_value(self, accumulator):
@@ -871,14 +930,16 @@ class Table(object):
... return DataTypes.ROW(
... [DataTypes.FIELD("a", DataTypes.BIGINT())])
>>> top2 = udtaf(Top2())
- >>> tab.flat_aggregate(top2.alias("a", "b")).select(col('a'),
col('b'))
+ >>> tab.flat_aggregate(top2.alias("a", "b")).select("a, b")
:param func: user-defined table aggregate function.
:return: The result table.
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return FlatAggregateTable(self._j_table.flatAggregate(func),
self._t_env)
+ elif isinstance(func, Expression):
return
FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
else:
func._set_takes_row_as_input()
@@ -1063,7 +1124,7 @@ class GroupedTable(object):
self._j_table = java_table
self._t_env = t_env
- def select(self, *fields: Expression) -> 'Table':
+ def select(self, *fields: Union[str, Expression]) -> 'Table':
"""
Performs a selection operation on a grouped table. Similar to an SQL
SELECT statement.
The field expressions can contain complex expressions and aggregations.
@@ -1072,13 +1133,20 @@ class GroupedTable(object):
::
>>> tab.group_by(tab.key).select(tab.key,
tab.value.avg.alias('average'))
+ >>> tab.group_by("key").select("key, value.avg as average")
+
:param fields: Expression string that contains group keys and
aggregate function calls.
:return: The result table.
"""
- return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.select(fields[0]), self._t_env)
- def aggregate(self, func: Union[Expression,
UserDefinedAggregateFunctionWrapper]) \
+ def aggregate(self, func: Union[str, Expression,
UserDefinedAggregateFunctionWrapper]) \
-> 'AggregatedTable':
"""
Performs a aggregate operation with an aggregate function. You have to
close the
@@ -1092,8 +1160,7 @@ class GroupedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c",
"d")).select(
- ... col('a'), col('c'), col('d'))
+ >>> tab.group_by(tab.a).aggregate(agg(tab.b).alias("c",
"d")).select("a, c, d")
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1101,14 +1168,16 @@ class GroupedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.group_by(tab.a).aggregate(agg.alias("a,
b")).select(col('a'), col('b'))
+ >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
:param func: user-defined aggregate function.
:return: The result table.
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+ elif isinstance(func, Expression):
return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
else:
func._set_takes_row_as_input()
@@ -1119,7 +1188,7 @@ class GroupedTable(object):
func = func(with_columns(col("*")))
return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
- def flat_aggregate(self, func: Union[Expression,
UserDefinedAggregateFunctionWrapper]) \
+ def flat_aggregate(self, func: Union[str, Expression,
UserDefinedAggregateFunctionWrapper]) \
-> 'FlatAggregateTable':
"""
Performs a flat_aggregate operation on a grouped table. flat_aggregate
takes a
@@ -1130,8 +1199,7 @@ class GroupedTable(object):
::
>>> table_agg = udtaf(MyTableAggregateFunction())
- >>>
tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select(
- ... col('c'), col('a'))
+ >>>
tab.group_by(tab.c).flat_aggregate(table_agg(tab.a).alias("a")).select("c, a")
>>> # take all the columns as inputs
>>> class Top2(TableAggregateFunction):
... def emit_value(self, accumulator):
@@ -1157,14 +1225,16 @@ class GroupedTable(object):
... return DataTypes.ROW(
... [DataTypes.FIELD("a", DataTypes.BIGINT())])
>>> top2 = udtaf(Top2())
- >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a",
"b")).select(col('a'), col('b'))
+ >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a",
"b")).select("a, b")
:param func: user-defined table aggregate function.
:return: The result table.
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return FlatAggregateTable(self._j_table.flatAggregate(func),
self._t_env)
+ elif isinstance(func, Expression):
return
FlatAggregateTable(self._j_table.flatAggregate(func._j_expr), self._t_env)
else:
func._set_takes_row_as_input()
@@ -1185,7 +1255,7 @@ class GroupWindowedTable(object):
self._j_table = java_group_windowed_table
self._t_env = t_env
- def group_by(self, *fields: Expression) -> 'WindowGroupedTable':
+ def group_by(self, *fields: Union[str, Expression]) ->
'WindowGroupedTable':
"""
Groups the elements by a mandatory window and one or more optional
grouping attributes.
The window is specified by referring to its alias.
@@ -1211,8 +1281,13 @@ class GroupWindowedTable(object):
:param fields: Group keys.
:return: A window grouped table.
"""
- return WindowGroupedTable(
- self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return WindowGroupedTable(
+ self._j_table.groupBy(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return WindowGroupedTable(self._j_table.groupBy(fields[0]),
self._t_env)
class WindowGroupedTable(object):
@@ -1224,7 +1299,7 @@ class WindowGroupedTable(object):
self._j_table = java_window_grouped_table
self._t_env = t_env
- def select(self, *fields: Expression) -> 'Table':
+ def select(self, *fields: Union[str, Expression]) -> 'Table':
"""
Performs a selection operation on a window grouped table. Similar to
an SQL SELECT
statement.
@@ -1236,6 +1311,7 @@ class WindowGroupedTable(object):
>>> window_grouped_table.select(col('key'),
... col('window').start,
... col('value').avg.alias('valavg'))
+ >>> window_grouped_table.select("key, window.start, value.avg as
valavg")
:param fields: Expression string.
:return: The result table.
@@ -1247,7 +1323,7 @@ class WindowGroupedTable(object):
assert isinstance(fields[0], str)
return Table(self._j_table.select(fields[0]), self._t_env)
- def aggregate(self, func: Union[Expression,
UserDefinedAggregateFunctionWrapper]) \
+ def aggregate(self, func: Union[str, Expression,
UserDefinedAggregateFunctionWrapper]) \
-> 'AggregatedTable':
"""
Performs an aggregate operation on a window grouped table. You have to
close the
@@ -1264,7 +1340,7 @@ class WindowGroupedTable(object):
>>> window_grouped_table.group_by("w") \
... .aggregate(agg(window_grouped_table.b) \
... .alias("c", "d")) \
- ... .select(col('c'), col('d'))
+ ... .select("c, d")
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1279,7 +1355,9 @@ class WindowGroupedTable(object):
.. versionadded:: 1.13.0
"""
- if isinstance(func, Expression):
+ if isinstance(func, str):
+ return AggregatedTable(self._j_table.aggregate(func), self._t_env)
+ elif isinstance(func, Expression):
return AggregatedTable(self._j_table.aggregate(func._j_expr),
self._t_env)
else:
func._set_takes_row_as_input()
@@ -1313,7 +1391,7 @@ class OverWindowedTable(object):
self._j_table = java_over_windowed_table
self._t_env = t_env
- def select(self, *fields: Expression) -> 'Table':
+ def select(self, *fields: Union[str, Expression]) -> 'Table':
"""
Performs a selection operation on a over windowed table. Similar to an
SQL SELECT
statement.
@@ -1325,11 +1403,17 @@ class OverWindowedTable(object):
>>> over_windowed_table.select(col('c'),
... col('b').count.over(col('ow')),
... col('e').sum.over(col('ow')))
+ >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
:param fields: Expression string.
:return: The result table.
"""
- return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.select(fields[0]), self._t_env)
class AggregatedTable(object):
@@ -1341,7 +1425,7 @@ class AggregatedTable(object):
self._j_table = java_table
self._t_env = t_env
- def select(self, *fields: Expression) -> 'Table':
+ def select(self, *fields: Union[str, Expression]) -> 'Table':
"""
Performs a selection operation after an aggregate operation. The field
expressions
cannot contain table functions and aggregations.
@@ -1354,7 +1438,7 @@ class AggregatedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.aggregate(agg(tab.a).alias("a", "b")).select(col('a'),
col('b'))
+ >>> tab.aggregate(agg(tab.a).alias("a", "b")).select("a, b")
>>> # take all the columns as inputs
>>> # pd is a Pandas.DataFrame
>>> agg_row = udaf(lambda pd: (pd.a.mean(), pd.b.max()),
@@ -1362,12 +1446,17 @@ class AggregatedTable(object):
... [DataTypes.FIELD("a", DataTypes.FLOAT()),
... DataTypes.FIELD("b", DataTypes.INT())]),
... func_type="pandas")
- >>> tab.group_by(tab.a).aggregate(agg.alias("a,
b")).select(col('a'), col('b'))
+ >>> tab.group_by(tab.a).aggregate(agg.alias("a, b")).select("a, b")
:param fields: Expression string.
:return: The result table.
"""
- return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.select(fields[0]), self._t_env)
class FlatAggregateTable(object):
@@ -1380,7 +1469,7 @@ class FlatAggregateTable(object):
self._j_table = java_table
self._t_env = t_env
- def select(self, *fields: Expression) -> 'Table':
+ def select(self, *fields: Union[str, Expression]) -> 'Table':
"""
Performs a selection operation on a FlatAggregateTable. Similar to a
SQL SELECT statement.
The field expressions can contain complex expressions.
@@ -1389,7 +1478,7 @@ class FlatAggregateTable(object):
::
>>> table_agg = udtaf(MyTableAggregateFunction())
- >>> tab.flat_aggregate(table_agg(tab.a).alias("a",
"b")).select(col('a'), col('b'))
+ >>> tab.flat_aggregate(table_agg(tab.a).alias("a",
"b")).select("a, b")
>>> # take all the columns as inputs
>>> class Top2(TableAggregateFunction):
... def emit_value(self, accumulator):
@@ -1415,9 +1504,14 @@ class FlatAggregateTable(object):
... return DataTypes.ROW(
... [DataTypes.FIELD("a", DataTypes.BIGINT())])
>>> top2 = udtaf(Top2())
- >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a",
"b")).select(col('a'), col('b'))
+ >>> tab.group_by(tab.c).flat_aggregate(top2.alias("a",
"b")).select("a, b")
:param fields: Expression string.
:return: The result table.
"""
- return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ if all(isinstance(f, Expression) for f in fields):
+ return Table(self._j_table.select(to_expression_jarray(fields)),
self._t_env)
+ else:
+ assert len(fields) == 1
+ assert isinstance(fields[0], str)
+ return Table(self._j_table.select(fields[0]), self._t_env)
diff --git a/flink-python/pyflink/table/window.py
b/flink-python/pyflink/table/window.py
index 630455e..282660c 100644
--- a/flink-python/pyflink/table/window.py
+++ b/flink-python/pyflink/table/window.py
@@ -144,6 +144,9 @@ class Session(object):
>>> Session.with_gap(expr.lit(10).minutes)
... .on(expr.col("rowtime"))
... .alias("w")
+
+ >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
+
"""
@classmethod
@@ -227,6 +230,8 @@ class Slide(object):
... .every(expr.lit(5).minutes)
... .on(expr.col("rowtime"))
... .alias("w")
+
+ >>>
Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
"""
@classmethod
@@ -333,6 +338,8 @@ class Over(object):
... .order_by(col("rowtime")) \\
... .preceding(expr.UNBOUNDED_RANGE) \\
... .alias("w")
+
+ >>>
Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
"""
@classmethod